Stage 07.3.5 — WebSocket Probe + REST Fallback
This commit is contained in:
52
app/src/integrations/exchange/market_cache.py
Normal file
52
app/src/integrations/exchange/market_cache.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# app/src/integrations/exchange/market_cache.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from src.core.config import load_settings
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class MarketPriceSnapshot:
|
||||
symbol: str
|
||||
price: float
|
||||
bid_price: float | None
|
||||
ask_price: float | None
|
||||
updated_at: str
|
||||
source: str = "websocket"
|
||||
|
||||
|
||||
class MarketPriceCache:
|
||||
_prices: dict[str, MarketPriceSnapshot] = {}
|
||||
|
||||
# сохранить последнюю цену
|
||||
@classmethod
|
||||
def set_price(
|
||||
cls,
|
||||
*,
|
||||
symbol: str,
|
||||
price: float,
|
||||
bid_price: float | None = None,
|
||||
ask_price: float | None = None,
|
||||
updated_at: str | None = None,
|
||||
) -> None:
|
||||
settings = load_settings()
|
||||
|
||||
if updated_at is None:
|
||||
updated_at = datetime.now(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S")
|
||||
|
||||
cls._prices[symbol.upper()] = MarketPriceSnapshot(
|
||||
symbol=symbol.upper(),
|
||||
price=price,
|
||||
bid_price=bid_price,
|
||||
ask_price=ask_price,
|
||||
updated_at=updated_at,
|
||||
)
|
||||
|
||||
# получить последнюю цену
|
||||
@classmethod
|
||||
def get_price(cls, symbol: str) -> MarketPriceSnapshot | None:
|
||||
return cls._prices.get(symbol.upper())
|
||||
109
app/src/integrations/exchange/market_stream.py
Normal file
109
app/src/integrations/exchange/market_stream.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# app/src/integrations/exchange/market_stream.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from src.core.config import load_settings
|
||||
from src.integrations.exchange.market_cache import MarketPriceCache
|
||||
from src.integrations.exchange.service import ExchangeService
|
||||
from src.integrations.exchange.ws_client import ExchangeWebSocketClient
|
||||
from src.trading.journal.service import JournalService
|
||||
|
||||
|
||||
def _format_timestamp(raw_timestamp: object) -> str | None:
|
||||
if raw_timestamp is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
settings = load_settings()
|
||||
dt_utc = datetime.fromtimestamp(int(raw_timestamp) / 1000, tz=ZoneInfo("UTC"))
|
||||
return dt_utc.astimezone(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _extract_market_event(payload: dict) -> dict | None:
|
||||
event = payload.get("Payload") or payload.get("payload")
|
||||
|
||||
if isinstance(event, dict) and "Payload" in event:
|
||||
event = event.get("Payload")
|
||||
|
||||
if not isinstance(event, dict):
|
||||
return None
|
||||
|
||||
symbol = event.get("symbolName") or event.get("symbol")
|
||||
bid = event.get("bid")
|
||||
ask = event.get("ofr") or event.get("ask")
|
||||
timestamp = event.get("timestamp")
|
||||
|
||||
if symbol is None or bid is None or ask is None:
|
||||
return None
|
||||
|
||||
bid_price = float(bid)
|
||||
ask_price = float(ask)
|
||||
price = (bid_price + ask_price) / 2
|
||||
|
||||
return {
|
||||
"symbol": str(symbol).upper(),
|
||||
"price": price,
|
||||
"bid_price": bid_price,
|
||||
"ask_price": ask_price,
|
||||
"updated_at": _format_timestamp(timestamp),
|
||||
}
|
||||
|
||||
|
||||
async def start_market_stream() -> None:
|
||||
settings = load_settings()
|
||||
journal = JournalService()
|
||||
|
||||
if not settings.exchange_enabled:
|
||||
return
|
||||
|
||||
while True:
|
||||
try:
|
||||
service = ExchangeService()
|
||||
validation = service.validate_symbol(settings.default_symbol)
|
||||
|
||||
if not validation.is_valid:
|
||||
await asyncio.sleep(10)
|
||||
continue
|
||||
|
||||
symbol = validation.normalized_symbol
|
||||
client = ExchangeWebSocketClient()
|
||||
|
||||
journal.log_info(
|
||||
"market_ws_started",
|
||||
"WebSocket market stream запущен.",
|
||||
{"symbol": symbol},
|
||||
)
|
||||
|
||||
async for message in client.stream_depth(symbol):
|
||||
event = _extract_market_event(message)
|
||||
|
||||
if event is None:
|
||||
continue
|
||||
|
||||
MarketPriceCache.set_price(
|
||||
symbol=symbol,
|
||||
price=event["price"],
|
||||
bid_price=event["bid_price"],
|
||||
ask_price=event["ask_price"],
|
||||
updated_at=event["updated_at"],
|
||||
)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
try:
|
||||
journal.log_warning(
|
||||
"market_ws_reconnect",
|
||||
f"WebSocket market stream будет переподключен: {exc}",
|
||||
{"raw_error": str(exc)},
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await asyncio.sleep(5)
|
||||
@@ -1,3 +1,5 @@
|
||||
# app/src/integrations/exchange/private_client.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from src.core.config import load_settings
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# app/src/integrations/exchange/rest_client.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
@@ -25,6 +25,7 @@ from src.integrations.exchange.private_client import ExchangePrivateClient
|
||||
from src.integrations.exchange.rest_client import ExchangeRestClient
|
||||
from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates
|
||||
from src.trading.journal.service import JournalService
|
||||
from src.integrations.exchange.market_cache import MarketPriceCache
|
||||
|
||||
|
||||
class ExchangeService:
|
||||
@@ -191,6 +192,7 @@ class ExchangeService:
|
||||
message=f"Private API OK. Балансов получено: {len(balances)}",
|
||||
)
|
||||
|
||||
# получить цену инструмента: сначала WebSocket cache, потом REST fallback
|
||||
def get_price(self, symbol: str | None = None) -> TickerPrice:
|
||||
symbol_to_use = symbol or self.settings.default_symbol
|
||||
|
||||
@@ -201,8 +203,19 @@ class ExchangeService:
|
||||
if not validation.is_valid:
|
||||
raise ExchangeError(validation.message)
|
||||
|
||||
cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
|
||||
|
||||
if cached_price is not None:
|
||||
return TickerPrice(
|
||||
symbol=cached_price.symbol,
|
||||
price=cached_price.price,
|
||||
source=cached_price.source,
|
||||
updated_at=cached_price.updated_at,
|
||||
)
|
||||
|
||||
return self._get_real_price(validation.normalized_symbol)
|
||||
|
||||
# получить market snapshot: сначала WebSocket cache, потом REST fallback
|
||||
def get_market_snapshot(self, symbol: str | None = None) -> dict[str, object]:
|
||||
symbol_to_use = symbol or self.settings.default_symbol
|
||||
|
||||
@@ -220,6 +233,17 @@ class ExchangeService:
|
||||
if not validation.is_valid:
|
||||
raise ExchangeError(validation.message)
|
||||
|
||||
cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
|
||||
|
||||
if cached_price is not None:
|
||||
return {
|
||||
"symbol": cached_price.symbol,
|
||||
"last_price": cached_price.price,
|
||||
"bid_price": cached_price.bid_price or cached_price.price,
|
||||
"ask_price": cached_price.ask_price or cached_price.price,
|
||||
"updated_at": cached_price.updated_at,
|
||||
}
|
||||
|
||||
client = ExchangeRestClient()
|
||||
|
||||
try:
|
||||
|
||||
64
app/src/integrations/exchange/ws_client.py
Normal file
64
app/src/integrations/exchange/ws_client.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# app/src/integrations/exchange/ws_client.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import AsyncIterator
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import websockets
|
||||
|
||||
from src.core.config import load_settings
|
||||
|
||||
|
||||
class ExchangeWebSocketClient:
|
||||
def __init__(self) -> None:
|
||||
self.settings = load_settings()
|
||||
self.base_url = self._build_ws_base_url()
|
||||
|
||||
# собрать базовый websocket url
|
||||
def _build_ws_base_url(self) -> str:
|
||||
raw_url = self.settings.exchange_ws_url or self.settings.exchange_base_url
|
||||
|
||||
if raw_url.startswith("https://"):
|
||||
return raw_url.replace("https://", "wss://", 1).rstrip("/")
|
||||
|
||||
if raw_url.startswith("http://"):
|
||||
return raw_url.replace("http://", "ws://", 1).rstrip("/")
|
||||
|
||||
return raw_url.rstrip("/")
|
||||
|
||||
# читать стакан по websocket
|
||||
async def stream_depth(self, symbol: str) -> AsyncIterator[dict]:
|
||||
url = f"{self.base_url}/api/v2/depth"
|
||||
|
||||
headers = {
|
||||
"Origin": self.settings.exchange_base_url.rstrip("/"),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
if self.settings.exchange_api_key:
|
||||
headers["X-MBX-APIKEY"] = self.settings.exchange_api_key
|
||||
|
||||
request = {
|
||||
"limit": 5,
|
||||
"symbol": symbol,
|
||||
}
|
||||
|
||||
async with websockets.connect(
|
||||
url,
|
||||
extra_headers=headers,
|
||||
subprotocols=["json"],
|
||||
ping_interval=20,
|
||||
open_timeout=self.settings.exchange_timeout_sec,
|
||||
) as websocket:
|
||||
await websocket.send(json.dumps(request))
|
||||
|
||||
async for raw_message in websocket:
|
||||
try:
|
||||
payload = json.loads(raw_message)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
if isinstance(payload, dict):
|
||||
yield payload
|
||||
Reference in New Issue
Block a user