07.4.3.16 — Production Execution Pricing Layer

This commit is contained in:
2026-05-09 13:08:29 +03:00
parent 71cf206e32
commit e97dcd372b
15 changed files with 1179 additions and 188 deletions

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import time
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
@@ -17,12 +18,31 @@ class MarketPriceSnapshot:
ask_price: float | None
updated_at: str
source: str = "market-cache"
runtime_key: str = "default"
received_monotonic: float = 0.0
def age_seconds(self) -> float:
if self.received_monotonic <= 0:
return 999999.0
return max(0.0, time.monotonic() - self.received_monotonic)
def has_bid_ask(self) -> bool:
return (
self.bid_price is not None
and self.ask_price is not None
and self.bid_price > 0
and self.ask_price > 0
)
class MarketPriceCache:
_prices: dict[str, MarketPriceSnapshot] = {}
_prices: dict[tuple[str, str], MarketPriceSnapshot] = {}
@classmethod
def _key(cls, *, symbol: str, runtime_key: str = "default") -> tuple[str, str]:
return runtime_key.strip().lower(), symbol.upper()
# сохранить последнюю цену
@classmethod
def set_price(
cls,
@@ -33,22 +53,59 @@ class MarketPriceCache:
ask_price: float | None = None,
updated_at: str | None = None,
source: str = "market-polling",
runtime_key: str = "default",
) -> None:
settings = load_settings()
if updated_at is None:
updated_at = datetime.now(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S")
cls._prices[symbol.upper()] = MarketPriceSnapshot(
normalized_runtime_key = runtime_key.strip().lower()
cls._prices[cls._key(symbol=symbol, runtime_key=normalized_runtime_key)] = MarketPriceSnapshot(
symbol=symbol.upper(),
price=price,
bid_price=bid_price,
ask_price=ask_price,
price=float(price),
bid_price=float(bid_price) if bid_price is not None else None,
ask_price=float(ask_price) if ask_price is not None else None,
updated_at=updated_at,
source=source,
runtime_key=normalized_runtime_key,
received_monotonic=time.monotonic(),
)
# получить последнюю цену
@classmethod
def get_price(cls, symbol: str) -> MarketPriceSnapshot | None:
return cls._prices.get(symbol.upper())
def get_price(
cls,
symbol: str,
*,
runtime_key: str = "default",
) -> MarketPriceSnapshot | None:
return cls._prices.get(cls._key(symbol=symbol, runtime_key=runtime_key))
@classmethod
def clear(
cls,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> None:
if symbol is None and runtime_key is None:
cls._prices.clear()
return
if symbol is not None and runtime_key is not None:
cls._prices.pop(cls._key(symbol=symbol, runtime_key=runtime_key), None)
return
keys_to_delete = []
for key_runtime, key_symbol in cls._prices.keys():
if runtime_key is not None and key_runtime == runtime_key.strip().lower():
keys_to_delete.append((key_runtime, key_symbol))
continue
if symbol is not None and key_symbol == symbol.upper():
keys_to_delete.append((key_runtime, key_symbol))
for key in keys_to_delete:
cls._prices.pop(key, None)

View File

@@ -3,54 +3,450 @@
from __future__ import annotations
import asyncio
import traceback
from dataclasses import dataclass
from typing import Callable
from src.integrations.exchange.market_cache import MarketPriceCache
from src.integrations.exchange.service import ExchangeService
from src.integrations.exchange.ws_client import ExchangeWebSocketClient
from src.trading.journal.service import JournalService
@dataclass
class MarketRuntimeContext:
runtime_key: str
task: asyncio.Task | None
interval_seconds: int
symbol_provider: Callable[[], str | None]
screen: str | None
action: str
runtime_label: str | None
class MarketDataRunner:
_task: asyncio.Task | None = None
_interval_seconds: int = 1
_symbol_provider: Callable[[], str | None] | None = None
_runtimes: dict[str, MarketRuntimeContext] = {}
# запустить быстрый polling рыночной цены
@classmethod
def start(
cls,
*,
symbol_provider: Callable[[], str | None],
interval_seconds: int = 1,
runtime_key: str = "default",
screen: str | None = None,
action: str = "market_data",
runtime_label: str | None = None,
) -> None:
cls._symbol_provider = symbol_provider
cls._interval_seconds = interval_seconds
existing = cls._runtimes.get(runtime_key)
if cls._task is not None and not cls._task.done():
if existing is not None and existing.task is not None and not existing.task.done():
existing.symbol_provider = symbol_provider
existing.interval_seconds = interval_seconds
existing.screen = screen
existing.action = action
existing.runtime_label = runtime_label
cls._log_info(
existing,
"market_runner_context_updated",
"MarketDataRunner context updated.",
{"interval_seconds": interval_seconds},
)
return
cls._task = asyncio.create_task(cls._worker())
context = MarketRuntimeContext(
runtime_key=runtime_key,
task=None,
interval_seconds=interval_seconds,
symbol_provider=symbol_provider,
screen=screen,
action=action,
runtime_label=runtime_label,
)
cls._runtimes[runtime_key] = context
cls._log_info(
context,
"market_runner_started",
"MarketDataRunner started.",
{"interval_seconds": interval_seconds},
)
context.task = asyncio.create_task(cls._worker(context))
# остановить polling
@classmethod
def stop(cls) -> None:
if cls._task is None:
def stop(cls, runtime_key: str | None = None) -> None:
if runtime_key is None:
for key in list(cls._runtimes.keys()):
cls.stop(key)
return
cls._task.cancel()
cls._task = None
context = cls._runtimes.get(runtime_key)
if context is None:
return
if context.task is not None:
context.task.cancel()
context.task = None
cls._log_info(
context,
"market_runner_stopped",
"MarketDataRunner stopped.",
)
cls._runtimes.pop(runtime_key, None)
# рабочий цикл polling
@classmethod
async def _worker(cls) -> None:
async def _worker(cls, context: MarketRuntimeContext) -> None:
last_symbol: str | None = None
while True:
symbol = cls._symbol_provider() if cls._symbol_provider else None
symbol = context.symbol_provider()
if symbol:
try:
await asyncio.to_thread(
ExchangeService().refresh_price_cache,
symbol,
)
except Exception:
pass
if not symbol:
cls._log_warning(
context,
"market_runner_no_symbol",
"MarketDataRunner has no symbol.",
)
await asyncio.sleep(context.interval_seconds)
continue
await asyncio.sleep(cls._interval_seconds)
cache_symbol = cls._cache_symbol(symbol)
ws_symbol = cls._ws_symbol(symbol)
if symbol != last_symbol:
last_symbol = symbol
if not cls._is_cache_symbol_used_by_other_runtime(
runtime_key=context.runtime_key,
cache_symbol=cache_symbol,
):
MarketPriceCache.clear(cache_symbol)
cls._log_info(
context,
"market_runner_symbol_changed",
"MarketDataRunner symbol changed.",
{
"symbol": symbol,
"cache_symbol": cache_symbol,
"ws_symbol": ws_symbol,
},
)
try:
await cls._run_websocket(context, symbol)
except asyncio.CancelledError:
raise
except Exception as exc:
cls._log_error(
context,
"market_ws_error_fallback",
"WebSocket market data failed. Falling back to REST.",
{
"symbol": symbol,
"cache_symbol": cache_symbol,
"ws_symbol": ws_symbol,
"error": str(exc),
"error_type": type(exc).__name__,
"traceback": traceback.format_exc(limit=5),
},
)
await cls._rest_fallback_once(context, symbol)
await asyncio.sleep(context.interval_seconds)
@classmethod
async def _run_websocket(cls, context: MarketRuntimeContext, symbol: str) -> None:
cache_symbol = cls._cache_symbol(symbol)
ws_symbol = cls._ws_symbol(symbol)
cls._log_info(
context,
"market_ws_connecting",
"Connecting market WebSocket.",
{
"requested_symbol": symbol,
"cache_symbol": cache_symbol,
"ws_symbol": ws_symbol,
},
)
payload_count = 0
async for payload in ExchangeWebSocketClient().stream_depth(
ws_symbol,
interval_seconds=context.interval_seconds,
):
if payload_count == 0:
cls._log_info(
context,
"market_ws_connected",
"Market WebSocket connected and first payload received.",
{
"requested_symbol": symbol,
"cache_symbol": cache_symbol,
"ws_symbol": ws_symbol,
"payload_keys": list(payload.keys()),
"payload_preview": cls._safe_payload_preview(payload),
},
)
payload_count += 1
current_symbol = context.symbol_provider()
if current_symbol and current_symbol != symbol:
cls._log_info(
context,
"market_ws_symbol_switch",
"Market WebSocket stopped because symbol changed.",
{
"old_symbol": symbol,
"new_symbol": current_symbol,
},
)
break
best_bid = cls._extract_best_price(payload, "bids")
best_ask = cls._extract_best_price(payload, "asks")
if best_bid is None or best_ask is None:
cls._log_warning(
context,
"market_ws_payload_unrecognized",
"Market WebSocket payload does not contain recognizable bids/asks.",
{
"requested_symbol": symbol,
"cache_symbol": cache_symbol,
"ws_symbol": ws_symbol,
"payload_keys": list(payload.keys()),
"payload_preview": cls._safe_payload_preview(payload),
},
)
continue
MarketPriceCache.set_price(
symbol=cache_symbol,
price=(best_bid + best_ask) / 2,
bid_price=best_bid,
ask_price=best_ask,
source=f"ws_depth:{context.runtime_key}",
runtime_key=context.runtime_key,
)
@classmethod
async def _rest_fallback_once(cls, context: MarketRuntimeContext, symbol: str) -> None:
try:
snapshot = await asyncio.to_thread(
ExchangeService().refresh_market_snapshot_cache,
symbol,
runtime_key=context.runtime_key,
)
cls._log_warning(
context,
"market_rest_fallback_success",
"REST fallback market snapshot loaded.",
{
"symbol": symbol,
"snapshot_symbol": snapshot.get("symbol"),
"source": snapshot.get("source"),
"last_price": snapshot.get("last_price"),
"bid_price": snapshot.get("bid_price"),
"ask_price": snapshot.get("ask_price"),
},
)
except Exception as exc:
cls._log_error(
context,
"market_rest_fallback_error",
"REST fallback market snapshot failed.",
{
"symbol": symbol,
"error": str(exc),
"error_type": type(exc).__name__,
"traceback": traceback.format_exc(limit=5),
},
)
@classmethod
def _is_cache_symbol_used_by_other_runtime(cls, *, runtime_key: str, cache_symbol: str) -> bool:
for key, context in cls._runtimes.items():
if key == runtime_key:
continue
try:
symbol = context.symbol_provider()
except Exception:
continue
if symbol and cls._cache_symbol(symbol) == cache_symbol:
return True
return False
@classmethod
def _cache_symbol(cls, symbol: str) -> str:
try:
validation = ExchangeService().validate_symbol(symbol)
if validation.is_valid:
return validation.normalized_symbol
except Exception:
pass
return symbol
@classmethod
def _ws_symbol(cls, symbol: str) -> str:
return cls._cache_symbol(symbol)
@classmethod
def _extract_best_price(cls, payload: dict, side_key: str) -> float | None:
data = payload
inner = payload.get("payload")
if isinstance(inner, dict):
data = inner
values = data.get(side_key)
if not isinstance(values, list) or not values:
return None
first = values[0]
if isinstance(first, list) and first:
return cls._safe_float(first[0])
if isinstance(first, dict):
return cls._safe_float(
first.get("price")
or first.get("p")
or first.get("bidPrice")
or first.get("askPrice")
)
return None
@classmethod
def _safe_float(cls, value: object) -> float | None:
try:
number = float(value)
except (TypeError, ValueError):
return None
return number if number > 0 else None
@classmethod
def _safe_payload_preview(cls, payload: dict) -> dict:
preview: dict = {}
for key, value in payload.items():
if key in {"bids", "asks"} and isinstance(value, list):
preview[key] = value[:2]
elif key == "payload" and isinstance(value, dict):
preview[key] = {
inner_key: inner_value[:2]
if inner_key in {"bids", "asks"} and isinstance(inner_value, list)
else inner_value
for inner_key, inner_value in value.items()
}
else:
preview[key] = value
return preview
@classmethod
def _message(cls, context: MarketRuntimeContext, message: str) -> str:
if context.runtime_label:
return f"{context.runtime_label} {message}"
return message
@classmethod
def _payload(cls, context: MarketRuntimeContext, payload: dict | None = None) -> dict:
result = dict(payload or {})
result.setdefault("runtime_key", context.runtime_key)
if context.screen:
result.setdefault("runtime_screen", context.screen)
if context.runtime_label:
result.setdefault("runtime_label", context.runtime_label)
return result
@classmethod
def _log_info(
cls,
context: MarketRuntimeContext,
event_type: str,
message: str,
payload: dict | None = None,
) -> None:
try:
if context.screen:
JournalService().log_ui_info(
event_type=event_type,
message=cls._message(context, message),
screen=context.screen,
action=context.action,
payload=cls._payload(context, payload),
)
return
JournalService().log_info(event_type, cls._message(context, message), cls._payload(context, payload))
except Exception:
pass
@classmethod
def _log_warning(
cls,
context: MarketRuntimeContext,
event_type: str,
message: str,
payload: dict | None = None,
) -> None:
try:
if context.screen:
JournalService().log_ui_warning(
event_type=event_type,
message=cls._message(context, message),
screen=context.screen,
action=context.action,
payload=cls._payload(context, payload),
)
return
JournalService().log_warning(event_type, cls._message(context, message), cls._payload(context, payload))
except Exception:
pass
@classmethod
def _log_error(
cls,
context: MarketRuntimeContext,
event_type: str,
message: str,
payload: dict | None = None,
) -> None:
try:
if context.screen:
JournalService().log_ui_error(
event_type=event_type,
message=cls._message(context, message),
screen=context.screen,
action=context.action,
payload=cls._payload(context, payload),
error_type=(payload or {}).get("error_type"),
raw_error=(payload or {}).get("error"),
)
return
JournalService().log_error(event_type, cls._message(context, message), cls._payload(context, payload))
except Exception:
pass

View File

@@ -20,6 +20,18 @@ class TickerPrice:
updated_at: str
@dataclass(slots=True)
class ExecutionPriceSnapshot:
symbol: str
last_price: float
bid_price: float
ask_price: float
updated_at: str
source: str
is_fresh: bool
age_seconds: float | None = None
@dataclass(slots=True)
class BalanceSummary:
currency: str

View File

@@ -8,6 +8,7 @@ from zoneinfo import ZoneInfo
from src.core.config import load_settings
from src.integrations.exchange.balance_parser import parse_account_balances
from src.integrations.exchange.exceptions import ExchangeError
from src.integrations.exchange.market_cache import MarketPriceCache
from src.integrations.exchange.mock_data import (
mock_balance_summary,
mock_exchange_health,
@@ -17,6 +18,7 @@ from src.integrations.exchange.models import (
BalanceSummary,
ExchangeHealth,
ExchangeSymbol,
ExecutionPriceSnapshot,
PrivateAuthHealth,
SymbolValidationResult,
TickerPrice,
@@ -25,11 +27,12 @@ from src.integrations.exchange.private_client import ExchangePrivateClient
from src.integrations.exchange.rest_client import ExchangeRestClient
from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates
from src.trading.journal.service import JournalService
from src.integrations.exchange.market_cache import MarketPriceCache
class ExchangeService:
_exchange_symbols_cache: list[ExchangeSymbol] | None = None
_execution_cache_max_age_seconds = 2.0
_default_runtime_key = "auto"
def __init__(self) -> None:
self.settings = load_settings()
@@ -136,6 +139,9 @@ class ExchangeService:
else "dzengi-api"
)
def _runtime_key(self, runtime_key: str | None) -> str:
return (runtime_key or self._default_runtime_key).strip().lower()
def get_health(self) -> ExchangeHealth:
if not self.settings.exchange_enabled:
return mock_exchange_health()
@@ -194,38 +200,53 @@ class ExchangeService:
message=f"Private API OK. Балансов получено: {len(balances)}",
)
# принудительно обновить market cache через REST
def refresh_price_cache(self, symbol: str | None = None) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
if not self.settings.exchange_enabled:
ticker = mock_ticker_price(symbol_to_use)
MarketPriceCache.set_price(
symbol=ticker.symbol,
price=ticker.price,
updated_at=ticker.updated_at,
source=ticker.source,
)
return ticker
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
ticker = self._get_real_price(validation.normalized_symbol)
MarketPriceCache.set_price(
symbol=ticker.symbol,
price=ticker.price,
updated_at=ticker.updated_at,
source=ticker.source,
def refresh_price_cache(
self,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> TickerPrice:
snapshot = self.refresh_market_snapshot_cache(
symbol,
runtime_key=runtime_key,
)
return ticker
return TickerPrice(
symbol=str(snapshot["symbol"]),
price=float(snapshot["last_price"]),
source=str(snapshot.get("source") or self._source_name()),
updated_at=str(snapshot["updated_at"]),
)
# получить цену инструмента: сначала WebSocket cache, потом REST fallback
def get_price(self, symbol: str | None = None) -> TickerPrice:
def refresh_market_snapshot_cache(
self,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> dict[str, object]:
normalized_runtime_key = self._runtime_key(runtime_key)
snapshot = self.get_fresh_market_snapshot(symbol)
MarketPriceCache.set_price(
symbol=str(snapshot["symbol"]),
price=float(snapshot["last_price"]),
bid_price=float(snapshot["bid_price"]),
ask_price=float(snapshot["ask_price"]),
updated_at=str(snapshot["updated_at"]),
source=str(snapshot.get("source") or "rest_polling"),
runtime_key=normalized_runtime_key,
)
return snapshot
def get_price(
self,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
normalized_runtime_key = self._runtime_key(runtime_key)
if not self.settings.exchange_enabled:
return mock_ticker_price(symbol_to_use)
@@ -234,7 +255,10 @@ class ExchangeService:
if not validation.is_valid:
raise ExchangeError(validation.message)
cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
cached_price = MarketPriceCache.get_price(
validation.normalized_symbol,
runtime_key=normalized_runtime_key,
)
if cached_price is not None:
return TickerPrice(
@@ -246,9 +270,14 @@ class ExchangeService:
return self._get_real_price(validation.normalized_symbol)
# получить market snapshot: сначала WebSocket cache, потом REST fallback
def get_market_snapshot(self, symbol: str | None = None) -> dict[str, object]:
def get_market_snapshot(
self,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> dict[str, object]:
symbol_to_use = symbol or self.settings.default_symbol
normalized_runtime_key = self._runtime_key(runtime_key)
if not self.settings.exchange_enabled:
ticker = mock_ticker_price(symbol_to_use)
@@ -258,61 +287,102 @@ class ExchangeService:
"bid_price": ticker.price,
"ask_price": ticker.price,
"updated_at": ticker.updated_at,
"source": ticker.source,
"runtime_key": normalized_runtime_key,
"age_seconds": 0.0,
"is_fresh": True,
}
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
cached_price = MarketPriceCache.get_price(
validation.normalized_symbol,
runtime_key=normalized_runtime_key,
)
if cached_price is not None:
age = cached_price.age_seconds()
return {
"symbol": cached_price.symbol,
"last_price": cached_price.price,
"bid_price": cached_price.bid_price or cached_price.price,
"ask_price": cached_price.ask_price or cached_price.price,
"updated_at": cached_price.updated_at,
"source": cached_price.source,
"runtime_key": cached_price.runtime_key,
"age_seconds": round(age, 3),
"is_fresh": age <= self._execution_cache_max_age_seconds,
}
client = ExchangeRestClient()
snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol)
snapshot["runtime_key"] = normalized_runtime_key
return snapshot
try:
payload = client.get_json(
"/api/v2/ticker/24hr",
params={"symbol": validation.normalized_symbol},
def get_execution_snapshot(
self,
symbol: str | None = None,
*,
runtime_key: str | None = None,
) -> ExecutionPriceSnapshot:
symbol_to_use = symbol or self.settings.default_symbol
normalized_runtime_key = self._runtime_key(runtime_key)
if not self.settings.exchange_enabled:
ticker = mock_ticker_price(symbol_to_use)
return ExecutionPriceSnapshot(
symbol=ticker.symbol,
last_price=ticker.price,
bid_price=ticker.price,
ask_price=ticker.price,
updated_at=ticker.updated_at,
source=ticker.source,
is_fresh=True,
age_seconds=0.0,
)
except Exception as exc:
self._log_exchange_error(
endpoint="ticker/24hr",
exc=exc,
symbol=validation.normalized_symbol,
)
raise ExchangeError(str(exc)) from exc
last_raw = payload.get("lastPrice")
if last_raw is None:
exc = ExchangeError("Field 'lastPrice' is missing in ticker response.")
self._log_exchange_error(
endpoint="ticker/24hr",
exc=exc,
symbol=validation.normalized_symbol,
)
raise exc
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
bid_raw = payload.get("bidPrice") or last_raw
ask_raw = payload.get("askPrice") or last_raw
close_time = payload.get("closeTime") or payload.get("eventTime") or ""
cached_price = MarketPriceCache.get_price(
validation.normalized_symbol,
runtime_key=normalized_runtime_key,
)
return {
"symbol": validation.normalized_symbol,
"last_price": float(last_raw),
"bid_price": float(bid_raw),
"ask_price": float(ask_raw),
"updated_at": self._format_exchange_time(close_time),
}
if cached_price is not None:
age = cached_price.age_seconds()
if (
age <= self._execution_cache_max_age_seconds
and cached_price.has_bid_ask()
):
return ExecutionPriceSnapshot(
symbol=cached_price.symbol,
last_price=cached_price.price,
bid_price=float(cached_price.bid_price),
ask_price=float(cached_price.ask_price),
updated_at=cached_price.updated_at,
source=f"{cached_price.source}:fresh_cache",
is_fresh=True,
age_seconds=round(age, 3),
)
snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol)
return ExecutionPriceSnapshot(
symbol=str(snapshot["symbol"]),
last_price=float(snapshot["last_price"]),
bid_price=float(snapshot["bid_price"]),
ask_price=float(snapshot["ask_price"]),
updated_at=str(snapshot["updated_at"]),
source="rest_fallback",
is_fresh=True,
age_seconds=0.0,
)
# получить свежий market snapshot напрямую через REST, без WebSocket cache
def get_fresh_market_snapshot(self, symbol: str | None = None) -> dict[str, object]:
symbol_to_use = symbol or self.settings.default_symbol
@@ -325,6 +395,8 @@ class ExchangeService:
"ask_price": ticker.price,
"updated_at": ticker.updated_at,
"source": "mock",
"age_seconds": 0.0,
"is_fresh": True,
}
validation = self.validate_symbol(symbol_to_use)
@@ -367,8 +439,10 @@ class ExchangeService:
"ask_price": float(ask_raw),
"updated_at": self._format_exchange_time(close_time),
"source": "fresh_rest",
"age_seconds": 0.0,
"is_fresh": True,
}
def get_balance_summary(self) -> list[BalanceSummary]:
if not self.settings.exchange_enabled:
return mock_balance_summary()
@@ -562,7 +636,7 @@ class ExchangeService:
)
type(self)._exchange_symbols_cache = items
return items
def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult:
@@ -609,36 +683,11 @@ class ExchangeService:
)
def _get_real_price(self, symbol: str) -> TickerPrice:
client = ExchangeRestClient()
try:
payload = client.get_json(
"/api/v2/ticker/24hr",
params={"symbol": symbol},
)
except Exception as exc:
self._log_exchange_error(
endpoint="ticker/24hr",
exc=exc,
symbol=symbol,
)
raise ExchangeError(str(exc)) from exc
price_raw = payload.get("lastPrice")
if price_raw is None:
exc = ExchangeError("Field 'lastPrice' is missing in ticker response.")
self._log_exchange_error(
endpoint="ticker/24hr",
exc=exc,
symbol=symbol,
)
raise exc
close_time = payload.get("closeTime") or payload.get("eventTime") or ""
snapshot = self.get_fresh_market_snapshot(symbol)
return TickerPrice(
symbol=symbol,
price=float(price_raw),
symbol=str(snapshot["symbol"]),
price=float(snapshot["last_price"]),
source=self._source_name(),
updated_at=self._format_exchange_time(close_time),
updated_at=str(snapshot["updated_at"]),
)

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
import asyncio
import json
from typing import AsyncIterator
from urllib.parse import urlencode
from uuid import uuid4
import websockets
@@ -16,22 +17,27 @@ class ExchangeWebSocketClient:
self.settings = load_settings()
self.base_url = self._build_ws_base_url()
# собрать базовый websocket url
def _build_ws_base_url(self) -> str:
raw_url = self.settings.exchange_ws_url or self.settings.exchange_base_url
if raw_url.startswith("https://"):
return raw_url.replace("https://", "wss://", 1).rstrip("/")
raw_url = raw_url.replace("https://", "wss://", 1)
elif raw_url.startswith("http://"):
raw_url = raw_url.replace("http://", "ws://", 1)
if raw_url.startswith("http://"):
return raw_url.replace("http://", "ws://", 1).rstrip("/")
raw_url = raw_url.rstrip("/")
return raw_url.rstrip("/")
if raw_url.endswith("/connect"):
return raw_url
# читать стакан по websocket
async def stream_depth(self, symbol: str) -> AsyncIterator[dict]:
url = f"{self.base_url}/api/v2/depth"
return f"{raw_url}/connect"
async def stream_depth(
self,
symbol: str,
*,
interval_seconds: float = 1.0,
) -> AsyncIterator[dict]:
headers = {
"Origin": self.settings.exchange_base_url.rstrip("/"),
"Content-Type": "application/json",
@@ -40,25 +46,41 @@ class ExchangeWebSocketClient:
if self.settings.exchange_api_key:
headers["X-MBX-APIKEY"] = self.settings.exchange_api_key
request = {
"limit": 5,
"symbol": symbol,
}
async with websockets.connect(
url,
self.base_url,
extra_headers=headers,
subprotocols=["json"],
ping_interval=20,
open_timeout=self.settings.exchange_timeout_sec,
) as websocket:
await websocket.send(json.dumps(request))
while True:
request = {
"correlationId": str(uuid4()),
"destination": "/api/v2/depth",
"payload": {
"limit": 5,
"symbol": symbol,
},
}
await websocket.send(json.dumps(request))
try:
raw_message = await asyncio.wait_for(
websocket.recv(),
timeout=self.settings.exchange_timeout_sec,
)
except asyncio.TimeoutError:
await asyncio.sleep(interval_seconds)
continue
async for raw_message in websocket:
try:
payload = json.loads(raw_message)
except json.JSONDecodeError:
await asyncio.sleep(interval_seconds)
continue
if isinstance(payload, dict):
yield payload
yield payload
await asyncio.sleep(interval_seconds)