Files
dzentra_bot/app/src/integrations/exchange/service.py

232 lines
7.5 KiB
Python

from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from src.core.config import load_settings
from src.integrations.exchange.exceptions import ExchangeError
from src.integrations.exchange.mock_data import (
mock_balance_summary,
mock_exchange_health,
mock_ticker_price,
)
from src.integrations.exchange.models import (
BalanceSummary,
ExchangeHealth,
ExchangeSymbol,
SymbolValidationResult,
TickerPrice,
)
from src.integrations.exchange.rest_client import ExchangeRestClient
from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates
class ExchangeService:
def __init__(self) -> None:
self.settings = load_settings()
# =========================
# PUBLIC API
# =========================
def get_health(self) -> ExchangeHealth:
if not self.settings.exchange_enabled:
return mock_exchange_health()
try:
validation = self.validate_symbol(self.settings.default_symbol)
if not validation.is_valid:
return ExchangeHealth(
ok=False,
mode="real_symbol_error",
message=validation.message,
)
ticker = self._get_real_price(validation.normalized_symbol)
except ExchangeError as exc:
return ExchangeHealth(
ok=False,
mode="real_error",
message=f"Ошибка подключения к API: {exc}",
)
return ExchangeHealth(
ok=True,
mode="real_public_api",
message=f"Public API OK. Цена {ticker.symbol}: {ticker.price:.2f}",
)
def get_price(self, symbol: str | None = None) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
if not self.settings.exchange_enabled:
return mock_ticker_price(symbol_to_use)
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
return self._get_real_price(validation.normalized_symbol)
def get_balance_summary(self) -> list[BalanceSummary]:
return mock_balance_summary()
# =========================
# EXCHANGE INFO
# =========================
def get_exchange_symbols(self) -> list[ExchangeSymbol]:
if not self.settings.exchange_enabled:
return []
client = ExchangeRestClient()
payload = client.get_json("/api/v2/exchangeInfo")
# 🔥 защита от разных форматов ответа
if isinstance(payload.get("symbols"), list):
symbols_raw = payload["symbols"]
else:
inner = payload.get("payload")
if isinstance(inner, dict) and isinstance(inner.get("symbols"), list):
symbols_raw = inner["symbols"]
else:
raise ExchangeError(
"Field 'symbols' is missing in exchangeInfo response."
)
def _safe_str(value: object, default: str = "") -> str:
if value is None:
return default
return str(value).strip()
items: list[ExchangeSymbol] = []
for item in symbols_raw:
if not isinstance(item, dict):
continue
# ---- tickSize ----
tick_size_raw = item.get("tickSize")
tick_size = None
if tick_size_raw not in (None, ""):
try:
tick_size = float(str(tick_size_raw))
except (TypeError, ValueError):
tick_size = None
# ---- marketModes ----
market_modes_raw = item.get("marketModes")
if isinstance(market_modes_raw, list):
market_modes = [
str(x).strip() for x in market_modes_raw if str(x).strip()
]
elif isinstance(market_modes_raw, str) and market_modes_raw.strip():
market_modes = [market_modes_raw.strip()]
else:
market_modes = []
# ---- marketType ----
market_type_raw = item.get("marketType")
market_type = (
str(market_type_raw).strip()
if market_type_raw is not None
else "unknown"
)
items.append(
ExchangeSymbol(
symbol=_safe_str(item.get("symbol")),
name=_safe_str(item.get("name")),
status=_safe_str(item.get("status"), "unknown"),
base_asset=_safe_str(item.get("baseAsset")),
quote_asset=_safe_str(item.get("quoteAsset")),
market_modes=market_modes,
market_type=market_type,
tick_size=tick_size,
)
)
return items
# =========================
# SYMBOL VALIDATION
# =========================
def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult:
requested = normalize_symbol(raw_symbol)
if not requested:
return SymbolValidationResult(
requested_symbol=requested,
normalized_symbol="",
is_valid=False,
message="Символ пустой.",
symbol_info=None,
)
if not self.settings.exchange_enabled:
return SymbolValidationResult(
requested_symbol=requested,
normalized_symbol=requested,
is_valid=True,
message="Mock mode active.",
symbol_info=None,
)
symbols = self.get_exchange_symbols()
candidates = symbol_candidates(requested)
for candidate in candidates:
for symbol_info in symbols:
if normalize_symbol(symbol_info.symbol) == candidate:
return SymbolValidationResult(
requested_symbol=requested,
normalized_symbol=normalize_symbol(symbol_info.symbol),
is_valid=True,
message="Символ найден в exchangeInfo.",
symbol_info=symbol_info,
)
return SymbolValidationResult(
requested_symbol=requested,
normalized_symbol=requested,
is_valid=False,
message=f"Символ '{requested}' не найден в exchangeInfo.",
symbol_info=None,
)
# =========================
# INTERNAL
# =========================
def _get_real_price(self, symbol: str) -> TickerPrice:
client = ExchangeRestClient()
payload = client.get_json(
"/api/v2/ticker/24hr",
params={"symbol": symbol},
)
price_raw = payload.get("lastPrice")
if price_raw is None:
raise ExchangeError("Field 'lastPrice' is missing in ticker response.")
close_time = payload.get("closeTime") or payload.get("eventTime") or ""
if close_time:
dt_utc = datetime.fromtimestamp(int(close_time) / 1000, tz=ZoneInfo("UTC"))
dt_local = dt_utc.astimezone(ZoneInfo(self.settings.tz))
updated_at = dt_local.strftime("%d.%m.%Y %H:%M:%S")
else:
updated_at = "n/a"
return TickerPrice(
symbol=symbol,
price=float(price_raw),
source="dzengi-demo-api",
updated_at=updated_at,
)