From e97dcd372b1704e8a9d307062476e7d046befea4 Mon Sep 17 00:00:00 2001 From: Sergey Date: Sat, 9 May 2026 13:08:29 +0300 Subject: [PATCH] =?UTF-8?q?07.4.3.16=20=E2=80=94=20Production=20Execution?= =?UTF-8?q?=20Pricing=20Layer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/src/integrations/exchange/market_cache.py | 75 ++- .../exchange/market_data_runner.py | 446 +++++++++++++++++- app/src/integrations/exchange/models.py | 12 + app/src/integrations/exchange/service.py | 241 ++++++---- app/src/integrations/exchange/ws_client.py | 58 ++- app/src/telegram/handlers/auto/ui.py | 2 +- app/src/telegram/handlers/debug_auto/main.py | 15 +- app/src/telegram/handlers/debug_auto/ui.py | 129 ++++- app/src/trading/auto/runner.py | 11 +- app/src/trading/auto/service.py | 3 + app/src/trading/debug/runner.py | 20 +- app/src/trading/execution/engine.py | 136 ++++-- docs/roadmap/master-roadmap.md | 12 + docs/roadmap/stage-07-auto-trading-roadmap.md | 11 + ...3_16-production_execution_pricing_layer.md | 196 ++++++++ 15 files changed, 1179 insertions(+), 188 deletions(-) create mode 100644 docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md diff --git a/app/src/integrations/exchange/market_cache.py b/app/src/integrations/exchange/market_cache.py index 007f794..2e61d88 100644 --- a/app/src/integrations/exchange/market_cache.py +++ b/app/src/integrations/exchange/market_cache.py @@ -2,6 +2,7 @@ from __future__ import annotations +import time from dataclasses import dataclass from datetime import datetime from zoneinfo import ZoneInfo @@ -17,12 +18,31 @@ class MarketPriceSnapshot: ask_price: float | None updated_at: str source: str = "market-cache" + runtime_key: str = "default" + received_monotonic: float = 0.0 + + def age_seconds(self) -> float: + if self.received_monotonic <= 0: + return 999999.0 + + return max(0.0, time.monotonic() - self.received_monotonic) + + def has_bid_ask(self) -> bool: + return ( + self.bid_price is not None + and self.ask_price is not None + and self.bid_price > 0 + and self.ask_price > 0 + ) class MarketPriceCache: - _prices: dict[str, MarketPriceSnapshot] = {} + _prices: dict[tuple[str, str], MarketPriceSnapshot] = {} + + @classmethod + def _key(cls, *, symbol: str, runtime_key: str = "default") -> tuple[str, str]: + return runtime_key.strip().lower(), symbol.upper() - # сохранить последнюю цену @classmethod def set_price( cls, @@ -33,22 +53,59 @@ class MarketPriceCache: ask_price: float | None = None, updated_at: str | None = None, source: str = "market-polling", + runtime_key: str = "default", ) -> None: settings = load_settings() if updated_at is None: updated_at = datetime.now(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S") - cls._prices[symbol.upper()] = MarketPriceSnapshot( + normalized_runtime_key = runtime_key.strip().lower() + + cls._prices[cls._key(symbol=symbol, runtime_key=normalized_runtime_key)] = MarketPriceSnapshot( symbol=symbol.upper(), - price=price, - bid_price=bid_price, - ask_price=ask_price, + price=float(price), + bid_price=float(bid_price) if bid_price is not None else None, + ask_price=float(ask_price) if ask_price is not None else None, updated_at=updated_at, source=source, + runtime_key=normalized_runtime_key, + received_monotonic=time.monotonic(), ) - # получить последнюю цену @classmethod - def get_price(cls, symbol: str) -> MarketPriceSnapshot | None: - return cls._prices.get(symbol.upper()) \ No newline at end of file + def get_price( + cls, + symbol: str, + *, + runtime_key: str = "default", + ) -> MarketPriceSnapshot | None: + return cls._prices.get(cls._key(symbol=symbol, runtime_key=runtime_key)) + + @classmethod + def clear( + cls, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> None: + if symbol is None and runtime_key is None: + cls._prices.clear() + return + + if symbol is not None and runtime_key is not None: + cls._prices.pop(cls._key(symbol=symbol, runtime_key=runtime_key), None) + return + + keys_to_delete = [] + + for key_runtime, key_symbol in cls._prices.keys(): + if runtime_key is not None and key_runtime == runtime_key.strip().lower(): + keys_to_delete.append((key_runtime, key_symbol)) + continue + + if symbol is not None and key_symbol == symbol.upper(): + keys_to_delete.append((key_runtime, key_symbol)) + + for key in keys_to_delete: + cls._prices.pop(key, None) \ No newline at end of file diff --git a/app/src/integrations/exchange/market_data_runner.py b/app/src/integrations/exchange/market_data_runner.py index d5c6a0c..b635594 100644 --- a/app/src/integrations/exchange/market_data_runner.py +++ b/app/src/integrations/exchange/market_data_runner.py @@ -3,54 +3,450 @@ from __future__ import annotations import asyncio +import traceback +from dataclasses import dataclass from typing import Callable +from src.integrations.exchange.market_cache import MarketPriceCache from src.integrations.exchange.service import ExchangeService +from src.integrations.exchange.ws_client import ExchangeWebSocketClient +from src.trading.journal.service import JournalService + + +@dataclass +class MarketRuntimeContext: + runtime_key: str + task: asyncio.Task | None + interval_seconds: int + symbol_provider: Callable[[], str | None] + screen: str | None + action: str + runtime_label: str | None class MarketDataRunner: - _task: asyncio.Task | None = None - _interval_seconds: int = 1 - _symbol_provider: Callable[[], str | None] | None = None + _runtimes: dict[str, MarketRuntimeContext] = {} - # запустить быстрый polling рыночной цены @classmethod def start( cls, *, symbol_provider: Callable[[], str | None], interval_seconds: int = 1, + runtime_key: str = "default", + screen: str | None = None, + action: str = "market_data", + runtime_label: str | None = None, ) -> None: - cls._symbol_provider = symbol_provider - cls._interval_seconds = interval_seconds + existing = cls._runtimes.get(runtime_key) - if cls._task is not None and not cls._task.done(): + if existing is not None and existing.task is not None and not existing.task.done(): + existing.symbol_provider = symbol_provider + existing.interval_seconds = interval_seconds + existing.screen = screen + existing.action = action + existing.runtime_label = runtime_label + + cls._log_info( + existing, + "market_runner_context_updated", + "MarketDataRunner context updated.", + {"interval_seconds": interval_seconds}, + ) return - cls._task = asyncio.create_task(cls._worker()) + context = MarketRuntimeContext( + runtime_key=runtime_key, + task=None, + interval_seconds=interval_seconds, + symbol_provider=symbol_provider, + screen=screen, + action=action, + runtime_label=runtime_label, + ) + + cls._runtimes[runtime_key] = context + + cls._log_info( + context, + "market_runner_started", + "MarketDataRunner started.", + {"interval_seconds": interval_seconds}, + ) + + context.task = asyncio.create_task(cls._worker(context)) - # остановить polling @classmethod - def stop(cls) -> None: - if cls._task is None: + def stop(cls, runtime_key: str | None = None) -> None: + if runtime_key is None: + for key in list(cls._runtimes.keys()): + cls.stop(key) return - cls._task.cancel() - cls._task = None + context = cls._runtimes.get(runtime_key) + if context is None: + return + + if context.task is not None: + context.task.cancel() + context.task = None + + cls._log_info( + context, + "market_runner_stopped", + "MarketDataRunner stopped.", + ) + + cls._runtimes.pop(runtime_key, None) - # рабочий цикл polling @classmethod - async def _worker(cls) -> None: + async def _worker(cls, context: MarketRuntimeContext) -> None: + last_symbol: str | None = None + while True: - symbol = cls._symbol_provider() if cls._symbol_provider else None + symbol = context.symbol_provider() - if symbol: - try: - await asyncio.to_thread( - ExchangeService().refresh_price_cache, - symbol, - ) - except Exception: - pass + if not symbol: + cls._log_warning( + context, + "market_runner_no_symbol", + "MarketDataRunner has no symbol.", + ) + await asyncio.sleep(context.interval_seconds) + continue - await asyncio.sleep(cls._interval_seconds) \ No newline at end of file + cache_symbol = cls._cache_symbol(symbol) + ws_symbol = cls._ws_symbol(symbol) + + if symbol != last_symbol: + last_symbol = symbol + + if not cls._is_cache_symbol_used_by_other_runtime( + runtime_key=context.runtime_key, + cache_symbol=cache_symbol, + ): + MarketPriceCache.clear(cache_symbol) + + cls._log_info( + context, + "market_runner_symbol_changed", + "MarketDataRunner symbol changed.", + { + "symbol": symbol, + "cache_symbol": cache_symbol, + "ws_symbol": ws_symbol, + }, + ) + + try: + await cls._run_websocket(context, symbol) + except asyncio.CancelledError: + raise + except Exception as exc: + cls._log_error( + context, + "market_ws_error_fallback", + "WebSocket market data failed. Falling back to REST.", + { + "symbol": symbol, + "cache_symbol": cache_symbol, + "ws_symbol": ws_symbol, + "error": str(exc), + "error_type": type(exc).__name__, + "traceback": traceback.format_exc(limit=5), + }, + ) + + await cls._rest_fallback_once(context, symbol) + await asyncio.sleep(context.interval_seconds) + + @classmethod + async def _run_websocket(cls, context: MarketRuntimeContext, symbol: str) -> None: + cache_symbol = cls._cache_symbol(symbol) + ws_symbol = cls._ws_symbol(symbol) + + cls._log_info( + context, + "market_ws_connecting", + "Connecting market WebSocket.", + { + "requested_symbol": symbol, + "cache_symbol": cache_symbol, + "ws_symbol": ws_symbol, + }, + ) + + payload_count = 0 + + async for payload in ExchangeWebSocketClient().stream_depth( + ws_symbol, + interval_seconds=context.interval_seconds, + ): + if payload_count == 0: + cls._log_info( + context, + "market_ws_connected", + "Market WebSocket connected and first payload received.", + { + "requested_symbol": symbol, + "cache_symbol": cache_symbol, + "ws_symbol": ws_symbol, + "payload_keys": list(payload.keys()), + "payload_preview": cls._safe_payload_preview(payload), + }, + ) + + payload_count += 1 + + current_symbol = context.symbol_provider() + if current_symbol and current_symbol != symbol: + cls._log_info( + context, + "market_ws_symbol_switch", + "Market WebSocket stopped because symbol changed.", + { + "old_symbol": symbol, + "new_symbol": current_symbol, + }, + ) + break + + best_bid = cls._extract_best_price(payload, "bids") + best_ask = cls._extract_best_price(payload, "asks") + + if best_bid is None or best_ask is None: + cls._log_warning( + context, + "market_ws_payload_unrecognized", + "Market WebSocket payload does not contain recognizable bids/asks.", + { + "requested_symbol": symbol, + "cache_symbol": cache_symbol, + "ws_symbol": ws_symbol, + "payload_keys": list(payload.keys()), + "payload_preview": cls._safe_payload_preview(payload), + }, + ) + continue + + MarketPriceCache.set_price( + symbol=cache_symbol, + price=(best_bid + best_ask) / 2, + bid_price=best_bid, + ask_price=best_ask, + source=f"ws_depth:{context.runtime_key}", + runtime_key=context.runtime_key, + ) + + @classmethod + async def _rest_fallback_once(cls, context: MarketRuntimeContext, symbol: str) -> None: + try: + snapshot = await asyncio.to_thread( + ExchangeService().refresh_market_snapshot_cache, + symbol, + runtime_key=context.runtime_key, + ) + + cls._log_warning( + context, + "market_rest_fallback_success", + "REST fallback market snapshot loaded.", + { + "symbol": symbol, + "snapshot_symbol": snapshot.get("symbol"), + "source": snapshot.get("source"), + "last_price": snapshot.get("last_price"), + "bid_price": snapshot.get("bid_price"), + "ask_price": snapshot.get("ask_price"), + }, + ) + except Exception as exc: + cls._log_error( + context, + "market_rest_fallback_error", + "REST fallback market snapshot failed.", + { + "symbol": symbol, + "error": str(exc), + "error_type": type(exc).__name__, + "traceback": traceback.format_exc(limit=5), + }, + ) + + @classmethod + def _is_cache_symbol_used_by_other_runtime(cls, *, runtime_key: str, cache_symbol: str) -> bool: + for key, context in cls._runtimes.items(): + if key == runtime_key: + continue + + try: + symbol = context.symbol_provider() + except Exception: + continue + + if symbol and cls._cache_symbol(symbol) == cache_symbol: + return True + + return False + + @classmethod + def _cache_symbol(cls, symbol: str) -> str: + try: + validation = ExchangeService().validate_symbol(symbol) + if validation.is_valid: + return validation.normalized_symbol + except Exception: + pass + + return symbol + + @classmethod + def _ws_symbol(cls, symbol: str) -> str: + return cls._cache_symbol(symbol) + + @classmethod + def _extract_best_price(cls, payload: dict, side_key: str) -> float | None: + data = payload + + inner = payload.get("payload") + if isinstance(inner, dict): + data = inner + + values = data.get(side_key) + + if not isinstance(values, list) or not values: + return None + + first = values[0] + + if isinstance(first, list) and first: + return cls._safe_float(first[0]) + + if isinstance(first, dict): + return cls._safe_float( + first.get("price") + or first.get("p") + or first.get("bidPrice") + or first.get("askPrice") + ) + + return None + + @classmethod + def _safe_float(cls, value: object) -> float | None: + try: + number = float(value) + except (TypeError, ValueError): + return None + + return number if number > 0 else None + + @classmethod + def _safe_payload_preview(cls, payload: dict) -> dict: + preview: dict = {} + + for key, value in payload.items(): + if key in {"bids", "asks"} and isinstance(value, list): + preview[key] = value[:2] + elif key == "payload" and isinstance(value, dict): + preview[key] = { + inner_key: inner_value[:2] + if inner_key in {"bids", "asks"} and isinstance(inner_value, list) + else inner_value + for inner_key, inner_value in value.items() + } + else: + preview[key] = value + + return preview + + @classmethod + def _message(cls, context: MarketRuntimeContext, message: str) -> str: + if context.runtime_label: + return f"{context.runtime_label} {message}" + + return message + + @classmethod + def _payload(cls, context: MarketRuntimeContext, payload: dict | None = None) -> dict: + result = dict(payload or {}) + result.setdefault("runtime_key", context.runtime_key) + + if context.screen: + result.setdefault("runtime_screen", context.screen) + + if context.runtime_label: + result.setdefault("runtime_label", context.runtime_label) + + return result + + @classmethod + def _log_info( + cls, + context: MarketRuntimeContext, + event_type: str, + message: str, + payload: dict | None = None, + ) -> None: + try: + if context.screen: + JournalService().log_ui_info( + event_type=event_type, + message=cls._message(context, message), + screen=context.screen, + action=context.action, + payload=cls._payload(context, payload), + ) + return + + JournalService().log_info(event_type, cls._message(context, message), cls._payload(context, payload)) + except Exception: + pass + + @classmethod + def _log_warning( + cls, + context: MarketRuntimeContext, + event_type: str, + message: str, + payload: dict | None = None, + ) -> None: + try: + if context.screen: + JournalService().log_ui_warning( + event_type=event_type, + message=cls._message(context, message), + screen=context.screen, + action=context.action, + payload=cls._payload(context, payload), + ) + return + + JournalService().log_warning(event_type, cls._message(context, message), cls._payload(context, payload)) + except Exception: + pass + + @classmethod + def _log_error( + cls, + context: MarketRuntimeContext, + event_type: str, + message: str, + payload: dict | None = None, + ) -> None: + try: + if context.screen: + JournalService().log_ui_error( + event_type=event_type, + message=cls._message(context, message), + screen=context.screen, + action=context.action, + payload=cls._payload(context, payload), + error_type=(payload or {}).get("error_type"), + raw_error=(payload or {}).get("error"), + ) + return + + JournalService().log_error(event_type, cls._message(context, message), cls._payload(context, payload)) + except Exception: + pass \ No newline at end of file diff --git a/app/src/integrations/exchange/models.py b/app/src/integrations/exchange/models.py index d4a1767..4d7bea9 100644 --- a/app/src/integrations/exchange/models.py +++ b/app/src/integrations/exchange/models.py @@ -20,6 +20,18 @@ class TickerPrice: updated_at: str +@dataclass(slots=True) +class ExecutionPriceSnapshot: + symbol: str + last_price: float + bid_price: float + ask_price: float + updated_at: str + source: str + is_fresh: bool + age_seconds: float | None = None + + @dataclass(slots=True) class BalanceSummary: currency: str diff --git a/app/src/integrations/exchange/service.py b/app/src/integrations/exchange/service.py index 22857ef..5783a58 100644 --- a/app/src/integrations/exchange/service.py +++ b/app/src/integrations/exchange/service.py @@ -8,6 +8,7 @@ from zoneinfo import ZoneInfo from src.core.config import load_settings from src.integrations.exchange.balance_parser import parse_account_balances from src.integrations.exchange.exceptions import ExchangeError +from src.integrations.exchange.market_cache import MarketPriceCache from src.integrations.exchange.mock_data import ( mock_balance_summary, mock_exchange_health, @@ -17,6 +18,7 @@ from src.integrations.exchange.models import ( BalanceSummary, ExchangeHealth, ExchangeSymbol, + ExecutionPriceSnapshot, PrivateAuthHealth, SymbolValidationResult, TickerPrice, @@ -25,11 +27,12 @@ from src.integrations.exchange.private_client import ExchangePrivateClient from src.integrations.exchange.rest_client import ExchangeRestClient from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates from src.trading.journal.service import JournalService -from src.integrations.exchange.market_cache import MarketPriceCache class ExchangeService: _exchange_symbols_cache: list[ExchangeSymbol] | None = None + _execution_cache_max_age_seconds = 2.0 + _default_runtime_key = "auto" def __init__(self) -> None: self.settings = load_settings() @@ -136,6 +139,9 @@ class ExchangeService: else "dzengi-api" ) + def _runtime_key(self, runtime_key: str | None) -> str: + return (runtime_key or self._default_runtime_key).strip().lower() + def get_health(self) -> ExchangeHealth: if not self.settings.exchange_enabled: return mock_exchange_health() @@ -194,38 +200,53 @@ class ExchangeService: message=f"Private API OK. Балансов получено: {len(balances)}", ) - # принудительно обновить market cache через REST - def refresh_price_cache(self, symbol: str | None = None) -> TickerPrice: - symbol_to_use = symbol or self.settings.default_symbol - - if not self.settings.exchange_enabled: - ticker = mock_ticker_price(symbol_to_use) - MarketPriceCache.set_price( - symbol=ticker.symbol, - price=ticker.price, - updated_at=ticker.updated_at, - source=ticker.source, - ) - return ticker - - validation = self.validate_symbol(symbol_to_use) - if not validation.is_valid: - raise ExchangeError(validation.message) - - ticker = self._get_real_price(validation.normalized_symbol) - - MarketPriceCache.set_price( - symbol=ticker.symbol, - price=ticker.price, - updated_at=ticker.updated_at, - source=ticker.source, + def refresh_price_cache( + self, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> TickerPrice: + snapshot = self.refresh_market_snapshot_cache( + symbol, + runtime_key=runtime_key, ) - return ticker + return TickerPrice( + symbol=str(snapshot["symbol"]), + price=float(snapshot["last_price"]), + source=str(snapshot.get("source") or self._source_name()), + updated_at=str(snapshot["updated_at"]), + ) - # получить цену инструмента: сначала WebSocket cache, потом REST fallback - def get_price(self, symbol: str | None = None) -> TickerPrice: + def refresh_market_snapshot_cache( + self, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> dict[str, object]: + normalized_runtime_key = self._runtime_key(runtime_key) + snapshot = self.get_fresh_market_snapshot(symbol) + + MarketPriceCache.set_price( + symbol=str(snapshot["symbol"]), + price=float(snapshot["last_price"]), + bid_price=float(snapshot["bid_price"]), + ask_price=float(snapshot["ask_price"]), + updated_at=str(snapshot["updated_at"]), + source=str(snapshot.get("source") or "rest_polling"), + runtime_key=normalized_runtime_key, + ) + + return snapshot + + def get_price( + self, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> TickerPrice: symbol_to_use = symbol or self.settings.default_symbol + normalized_runtime_key = self._runtime_key(runtime_key) if not self.settings.exchange_enabled: return mock_ticker_price(symbol_to_use) @@ -234,7 +255,10 @@ class ExchangeService: if not validation.is_valid: raise ExchangeError(validation.message) - cached_price = MarketPriceCache.get_price(validation.normalized_symbol) + cached_price = MarketPriceCache.get_price( + validation.normalized_symbol, + runtime_key=normalized_runtime_key, + ) if cached_price is not None: return TickerPrice( @@ -246,9 +270,14 @@ class ExchangeService: return self._get_real_price(validation.normalized_symbol) - # получить market snapshot: сначала WebSocket cache, потом REST fallback - def get_market_snapshot(self, symbol: str | None = None) -> dict[str, object]: + def get_market_snapshot( + self, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> dict[str, object]: symbol_to_use = symbol or self.settings.default_symbol + normalized_runtime_key = self._runtime_key(runtime_key) if not self.settings.exchange_enabled: ticker = mock_ticker_price(symbol_to_use) @@ -258,61 +287,102 @@ class ExchangeService: "bid_price": ticker.price, "ask_price": ticker.price, "updated_at": ticker.updated_at, + "source": ticker.source, + "runtime_key": normalized_runtime_key, + "age_seconds": 0.0, + "is_fresh": True, } validation = self.validate_symbol(symbol_to_use) if not validation.is_valid: raise ExchangeError(validation.message) - cached_price = MarketPriceCache.get_price(validation.normalized_symbol) + cached_price = MarketPriceCache.get_price( + validation.normalized_symbol, + runtime_key=normalized_runtime_key, + ) if cached_price is not None: + age = cached_price.age_seconds() + return { "symbol": cached_price.symbol, "last_price": cached_price.price, "bid_price": cached_price.bid_price or cached_price.price, "ask_price": cached_price.ask_price or cached_price.price, "updated_at": cached_price.updated_at, + "source": cached_price.source, + "runtime_key": cached_price.runtime_key, + "age_seconds": round(age, 3), + "is_fresh": age <= self._execution_cache_max_age_seconds, } - client = ExchangeRestClient() + snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol) + snapshot["runtime_key"] = normalized_runtime_key + return snapshot - try: - payload = client.get_json( - "/api/v2/ticker/24hr", - params={"symbol": validation.normalized_symbol}, + def get_execution_snapshot( + self, + symbol: str | None = None, + *, + runtime_key: str | None = None, + ) -> ExecutionPriceSnapshot: + symbol_to_use = symbol or self.settings.default_symbol + normalized_runtime_key = self._runtime_key(runtime_key) + + if not self.settings.exchange_enabled: + ticker = mock_ticker_price(symbol_to_use) + return ExecutionPriceSnapshot( + symbol=ticker.symbol, + last_price=ticker.price, + bid_price=ticker.price, + ask_price=ticker.price, + updated_at=ticker.updated_at, + source=ticker.source, + is_fresh=True, + age_seconds=0.0, ) - except Exception as exc: - self._log_exchange_error( - endpoint="ticker/24hr", - exc=exc, - symbol=validation.normalized_symbol, - ) - raise ExchangeError(str(exc)) from exc - last_raw = payload.get("lastPrice") - if last_raw is None: - exc = ExchangeError("Field 'lastPrice' is missing in ticker response.") - self._log_exchange_error( - endpoint="ticker/24hr", - exc=exc, - symbol=validation.normalized_symbol, - ) - raise exc + validation = self.validate_symbol(symbol_to_use) + if not validation.is_valid: + raise ExchangeError(validation.message) - bid_raw = payload.get("bidPrice") or last_raw - ask_raw = payload.get("askPrice") or last_raw - close_time = payload.get("closeTime") or payload.get("eventTime") or "" + cached_price = MarketPriceCache.get_price( + validation.normalized_symbol, + runtime_key=normalized_runtime_key, + ) - return { - "symbol": validation.normalized_symbol, - "last_price": float(last_raw), - "bid_price": float(bid_raw), - "ask_price": float(ask_raw), - "updated_at": self._format_exchange_time(close_time), - } + if cached_price is not None: + age = cached_price.age_seconds() + + if ( + age <= self._execution_cache_max_age_seconds + and cached_price.has_bid_ask() + ): + return ExecutionPriceSnapshot( + symbol=cached_price.symbol, + last_price=cached_price.price, + bid_price=float(cached_price.bid_price), + ask_price=float(cached_price.ask_price), + updated_at=cached_price.updated_at, + source=f"{cached_price.source}:fresh_cache", + is_fresh=True, + age_seconds=round(age, 3), + ) + + snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol) + + return ExecutionPriceSnapshot( + symbol=str(snapshot["symbol"]), + last_price=float(snapshot["last_price"]), + bid_price=float(snapshot["bid_price"]), + ask_price=float(snapshot["ask_price"]), + updated_at=str(snapshot["updated_at"]), + source="rest_fallback", + is_fresh=True, + age_seconds=0.0, + ) - # получить свежий market snapshot напрямую через REST, без WebSocket cache def get_fresh_market_snapshot(self, symbol: str | None = None) -> dict[str, object]: symbol_to_use = symbol or self.settings.default_symbol @@ -325,6 +395,8 @@ class ExchangeService: "ask_price": ticker.price, "updated_at": ticker.updated_at, "source": "mock", + "age_seconds": 0.0, + "is_fresh": True, } validation = self.validate_symbol(symbol_to_use) @@ -367,8 +439,10 @@ class ExchangeService: "ask_price": float(ask_raw), "updated_at": self._format_exchange_time(close_time), "source": "fresh_rest", + "age_seconds": 0.0, + "is_fresh": True, } - + def get_balance_summary(self) -> list[BalanceSummary]: if not self.settings.exchange_enabled: return mock_balance_summary() @@ -562,7 +636,7 @@ class ExchangeService: ) type(self)._exchange_symbols_cache = items - + return items def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult: @@ -609,36 +683,11 @@ class ExchangeService: ) def _get_real_price(self, symbol: str) -> TickerPrice: - client = ExchangeRestClient() - - try: - payload = client.get_json( - "/api/v2/ticker/24hr", - params={"symbol": symbol}, - ) - except Exception as exc: - self._log_exchange_error( - endpoint="ticker/24hr", - exc=exc, - symbol=symbol, - ) - raise ExchangeError(str(exc)) from exc - - price_raw = payload.get("lastPrice") - if price_raw is None: - exc = ExchangeError("Field 'lastPrice' is missing in ticker response.") - self._log_exchange_error( - endpoint="ticker/24hr", - exc=exc, - symbol=symbol, - ) - raise exc - - close_time = payload.get("closeTime") or payload.get("eventTime") or "" + snapshot = self.get_fresh_market_snapshot(symbol) return TickerPrice( - symbol=symbol, - price=float(price_raw), + symbol=str(snapshot["symbol"]), + price=float(snapshot["last_price"]), source=self._source_name(), - updated_at=self._format_exchange_time(close_time), + updated_at=str(snapshot["updated_at"]), ) \ No newline at end of file diff --git a/app/src/integrations/exchange/ws_client.py b/app/src/integrations/exchange/ws_client.py index b2d5387..b3e61ac 100644 --- a/app/src/integrations/exchange/ws_client.py +++ b/app/src/integrations/exchange/ws_client.py @@ -2,9 +2,10 @@ from __future__ import annotations +import asyncio import json from typing import AsyncIterator -from urllib.parse import urlencode +from uuid import uuid4 import websockets @@ -16,22 +17,27 @@ class ExchangeWebSocketClient: self.settings = load_settings() self.base_url = self._build_ws_base_url() - # собрать базовый websocket url def _build_ws_base_url(self) -> str: raw_url = self.settings.exchange_ws_url or self.settings.exchange_base_url if raw_url.startswith("https://"): - return raw_url.replace("https://", "wss://", 1).rstrip("/") + raw_url = raw_url.replace("https://", "wss://", 1) + elif raw_url.startswith("http://"): + raw_url = raw_url.replace("http://", "ws://", 1) - if raw_url.startswith("http://"): - return raw_url.replace("http://", "ws://", 1).rstrip("/") + raw_url = raw_url.rstrip("/") - return raw_url.rstrip("/") + if raw_url.endswith("/connect"): + return raw_url - # читать стакан по websocket - async def stream_depth(self, symbol: str) -> AsyncIterator[dict]: - url = f"{self.base_url}/api/v2/depth" + return f"{raw_url}/connect" + async def stream_depth( + self, + symbol: str, + *, + interval_seconds: float = 1.0, + ) -> AsyncIterator[dict]: headers = { "Origin": self.settings.exchange_base_url.rstrip("/"), "Content-Type": "application/json", @@ -40,25 +46,41 @@ class ExchangeWebSocketClient: if self.settings.exchange_api_key: headers["X-MBX-APIKEY"] = self.settings.exchange_api_key - request = { - "limit": 5, - "symbol": symbol, - } - async with websockets.connect( - url, + self.base_url, extra_headers=headers, subprotocols=["json"], ping_interval=20, open_timeout=self.settings.exchange_timeout_sec, ) as websocket: - await websocket.send(json.dumps(request)) + while True: + request = { + "correlationId": str(uuid4()), + "destination": "/api/v2/depth", + "payload": { + "limit": 5, + "symbol": symbol, + }, + } + + await websocket.send(json.dumps(request)) + + try: + raw_message = await asyncio.wait_for( + websocket.recv(), + timeout=self.settings.exchange_timeout_sec, + ) + except asyncio.TimeoutError: + await asyncio.sleep(interval_seconds) + continue - async for raw_message in websocket: try: payload = json.loads(raw_message) except json.JSONDecodeError: + await asyncio.sleep(interval_seconds) continue if isinstance(payload, dict): - yield payload \ No newline at end of file + yield payload + + await asyncio.sleep(interval_seconds) \ No newline at end of file diff --git a/app/src/telegram/handlers/auto/ui.py b/app/src/telegram/handlers/auto/ui.py index 382491f..96801b4 100644 --- a/app/src/telegram/handlers/auto/ui.py +++ b/app/src/telegram/handlers/auto/ui.py @@ -296,7 +296,7 @@ def _market_snapshot(symbol: str | None) -> dict[str, object] | None: return None try: - return ExchangeService().get_market_snapshot(symbol) + return ExchangeService().get_market_snapshot(symbol, runtime_key="auto") except Exception: return None diff --git a/app/src/telegram/handlers/debug_auto/main.py b/app/src/telegram/handlers/debug_auto/main.py index 5659b3d..e5bacea 100644 --- a/app/src/telegram/handlers/debug_auto/main.py +++ b/app/src/telegram/handlers/debug_auto/main.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time + from aiogram import F, Router from aiogram.exceptions import TelegramBadRequest from aiogram.fsm.context import FSMContext @@ -18,6 +20,11 @@ from src.trading.debug.service import DebugTradeService router = Router(name="debug_auto") +def _ensure_signal_started_at(state) -> None: + if state.signal_started_at is None: + state.signal_started_at = time.monotonic() + + async def render_debug_auto_screen( target_message: Message, *, @@ -77,6 +84,7 @@ async def debug_auto_start(callback: CallbackQuery) -> None: service = DebugTradeService() state = service.get_state() state.status = "RUNNING" + _ensure_signal_started_at(state) DebugTradeRunner.set_current_screen("debug_auto") DebugTradeRunner.start() @@ -89,6 +97,7 @@ async def debug_auto_start(callback: CallbackQuery) -> None: @router.callback_query(F.data == "debug_auto:stop") async def debug_auto_stop(callback: CallbackQuery) -> None: + DebugTradeRunner.set_current_screen("debug_auto") DebugTradeRunner.stop() DebugTradeService().stop() @@ -145,6 +154,8 @@ async def debug_auto_close(callback: CallbackQuery) -> None: service = DebugTradeService() _, result = service.close(reason="DEBUG_SCREEN_CLOSE") + DebugTradeRunner.set_current_screen("debug_auto") + if callback.message is not None: await render_debug_auto_screen(callback.message, edit_mode=True) @@ -153,7 +164,9 @@ async def debug_auto_close(callback: CallbackQuery) -> None: @router.callback_query(F.data == "debug_auto:reset") async def debug_auto_reset(callback: CallbackQuery) -> None: - DebugTradeService().reset() + DebugTradeRunner.set_current_screen("debug_auto") + state = DebugTradeService().reset() + _ensure_signal_started_at(state) if callback.message is not None: await render_debug_auto_screen(callback.message, edit_mode=True) diff --git a/app/src/telegram/handlers/debug_auto/ui.py b/app/src/telegram/handlers/debug_auto/ui.py index 069a349..d4e181d 100644 --- a/app/src/telegram/handlers/debug_auto/ui.py +++ b/app/src/telegram/handlers/debug_auto/ui.py @@ -7,6 +7,7 @@ import time from aiogram.types import InlineKeyboardMarkup from aiogram.utils.keyboard import InlineKeyboardBuilder +from src.integrations.exchange.service import ExchangeService from src.trading.debug.service import DebugTradeService @@ -38,6 +39,8 @@ def build_debug_auto_text() -> str: f"Баланс · $ {_format_money_compact(state.allocated_balance_usd)}", f"Realized PnL · {_format_signed_usd(state.realized_pnl_usd)}", "", + *_market_snapshot_lines(state.symbol), + "", _signal_line(state), ] @@ -106,6 +109,90 @@ def build_debug_auto_text() -> str: return "\n".join(parts) +def _format_updated_at(value: object) -> str: + if not value: + return "—" + + text = str(value) + + if " " in text: + return text.rsplit(" ", 1)[-1] + + return text + + +def _market_snapshot_lines(symbol: str | None) -> list[str]: + if not symbol: + return [ + "Market", + "Last · —", + "Bid · —", + "Ask · —", + "Source · —", + "Age · —", + "", + "Execution", + "Source · —", + "Age · —", + ] + + market = None + execution = None + error = None + + try: + market = ExchangeService().get_market_snapshot( + symbol, + runtime_key="debug_auto", + ) + except Exception as exc: + error = str(exc) + + try: + execution = ExchangeService().get_execution_snapshot( + symbol, + runtime_key="debug_auto", + ) + except Exception as exc: + if error is None: + error = str(exc) + + if market is None and execution is None: + return [ + "Market", + "Last · —", + "Bid · —", + "Ask · —", + "Source · error", + f"Error · {error or 'unknown'}", + ] + + last_price = market.get("last_price") if market else getattr(execution, "last_price", None) + bid_price = market.get("bid_price") if market else getattr(execution, "bid_price", None) + ask_price = market.get("ask_price") if market else getattr(execution, "ask_price", None) + market_source = market.get("source") if market else "—" + market_age = market.get("age_seconds") if market else None + + execution_source = getattr(execution, "source", "—") if execution else "—" + execution_age = getattr(execution, "age_seconds", None) if execution else None + execution_fresh = getattr(execution, "is_fresh", None) if execution else None + + return [ + "Market", + f"Last · {_format_usd_or_dash(last_price)}", + f"Bid · {_format_usd_or_dash(bid_price)}", + f"Ask · {_format_usd_or_dash(ask_price)}", + f"Source · {market_source or '—'}", + f"Quote age · {_format_age(market_age)}", + f"Exchange time · {_format_updated_at(market.get('updated_at') if market else None)}", + "", + "Execution", + f"Source · {execution_source or '—'}", + f"Quote age · {_format_age(execution_age)}", + f"Fresh · {_format_bool(execution_fresh)}", + ] + + def _signal_line(state) -> str: signal = state.last_signal or "HOLD" @@ -225,4 +312,44 @@ def _format_signed_usd(value: float | int | None) -> str: if amount < 0: return f"🔴 −$ {_format_money_compact(abs(amount))}" - return "$ 0" \ No newline at end of file + return "$ 0" + + +def _format_age(value: object) -> str: + if value is None: + return "—" + + try: + age = max(0.0, float(value)) + except (TypeError, ValueError): + return "—" + + if age < 1: + return f"{age:.2f}с" + + if age < 10: + return f"{age:.1f}с" + + total_seconds = int(age) + + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + seconds = total_seconds % 60 + + if hours > 0: + return f"{hours}ч {minutes:02d}м" + + if minutes > 0: + return f"{minutes}м {seconds:02d}с" + + return f"{seconds}с" + + +def _format_bool(value: object) -> str: + if value is True: + return "yes" + + if value is False: + return "no" + + return "—" \ No newline at end of file diff --git a/app/src/trading/auto/runner.py b/app/src/trading/auto/runner.py index 0d98e16..633675d 100644 --- a/app/src/trading/auto/runner.py +++ b/app/src/trading/auto/runner.py @@ -108,6 +108,10 @@ class AutoTradeRunner: MarketDataRunner.start( symbol_provider=lambda: service.get_state().symbol, interval_seconds=1, + runtime_key="auto", + screen="auto", + action="market_data", + runtime_label="[AUTO]", ) if cls._task is not None and not cls._task.done(): @@ -117,7 +121,7 @@ class AutoTradeRunner: @classmethod def stop(cls) -> None: - MarketDataRunner.stop() + MarketDataRunner.stop("auto") if cls._task is None: return @@ -134,7 +138,7 @@ class AutoTradeRunner: if state.status == "OFF": cls._task = None - MarketDataRunner.stop() + MarketDataRunner.stop("auto") break service.run_cycle() @@ -346,6 +350,7 @@ class AutoTradeRunner: f"{payload.get('risk_reason')}:" f"{payload.get('is_forced')}:" ) + @classmethod def _build_execution_alert_text( cls, @@ -416,7 +421,7 @@ class AutoTradeRunner: f"New size: {new_size}\n\n" f"PnL: {pnl}" ) - + return "📄 Paper execution event" @classmethod diff --git a/app/src/trading/auto/service.py b/app/src/trading/auto/service.py index 1e3f9b8..b2b5404 100644 --- a/app/src/trading/auto/service.py +++ b/app/src/trading/auto/service.py @@ -157,6 +157,9 @@ class AutoTradeService: return state, "Автоторговля активирована." state.status = "RUNNING" + self._reset_signal_tracking() + state.last_signal = "HOLD" + state.signal_started_at = time.monotonic() EventBus.emit( "auto_status_changed", { diff --git a/app/src/trading/debug/runner.py b/app/src/trading/debug/runner.py index b3ef341..6e2f958 100644 --- a/app/src/trading/debug/runner.py +++ b/app/src/trading/debug/runner.py @@ -9,6 +9,7 @@ from typing import Callable from aiogram import Bot from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter +from src.integrations.exchange.market_data_runner import MarketDataRunner from src.trading.debug.service import DebugTradeService @@ -24,6 +25,8 @@ class DebugTradeRunner: _current_screen: str | None = None _interval_seconds = 5 + _market_interval_seconds = 1 + _last_text: str | None = None _last_refresh_at: float = 0.0 _retry_after_until: float = 0.0 @@ -77,9 +80,21 @@ class DebugTradeRunner: @classmethod def start(cls) -> None: - state = DebugTradeService().get_state() + service = DebugTradeService() + state = service.get_state() state.status = "RUNNING" + MarketDataRunner.start( + symbol_provider=lambda: DebugTradeService().get_state().symbol, + interval_seconds=cls._market_interval_seconds, + runtime_key="debug_auto", + screen="debug_auto", + action="market_data", + runtime_label="[DEBUG]", + ) + + cls._last_text = None + if cls._task is not None and not cls._task.done(): return @@ -87,6 +102,8 @@ class DebugTradeRunner: @classmethod def stop(cls) -> None: + MarketDataRunner.stop("debug_auto") + if cls._task is None: return @@ -102,6 +119,7 @@ class DebugTradeRunner: if state.status == "OFF": cls._task = None + MarketDataRunner.stop("debug_auto") break service.process() diff --git a/app/src/trading/execution/engine.py b/app/src/trading/execution/engine.py index fd45141..3eb2419 100644 --- a/app/src/trading/execution/engine.py +++ b/app/src/trading/execution/engine.py @@ -3,6 +3,7 @@ from __future__ import annotations import math +from dataclasses import dataclass from datetime import datetime from src.core.event_bus import EventBus @@ -13,6 +14,15 @@ from src.trading.journal.service import JournalService from src.trading.position.state import PositionState +@dataclass(slots=True) +class _ExecutionPrice: + price: float + source: str + age_seconds: float | None + updated_at: str + pricing_role: str + + class ExecutionEngine: _position = PositionState() _size_precision = 5 @@ -60,7 +70,8 @@ class ExecutionEngine: return ExecutionDecision("NONE", False, "Позиция уже открыта.") try: - entry_price = self._entry_price_for_side(state.symbol, side) + entry = self._entry_price_for_side(state.symbol, side) + entry_price = entry.price except Exception as exc: return ExecutionDecision("NONE", False, f"Не удалось получить цену для paper execution: {exc}") @@ -116,6 +127,10 @@ class ExecutionEngine: "reason": state.last_signal_reason, "opened_at": now, "pricing": "ask_for_long_bid_for_short", + "pricing_role": entry.pricing_role, + "price_source": entry.source, + "price_age_seconds": entry.age_seconds, + "price_updated_at": entry.updated_at, } JournalService().log_ui_info( @@ -142,8 +157,10 @@ class ExecutionEngine: return ExecutionDecision("NONE", False, "Нет направления для flip.") try: - exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side) - new_entry_price = self._entry_price_for_side(state.symbol, new_side) + exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side) + entry_execution = self._entry_price_for_side(state.symbol, new_side) + exit_price = exit_execution.price + new_entry_price = entry_execution.price except Exception as exc: return ExecutionDecision("NONE", False, f"Ошибка получения цены для flip: {exc}") @@ -218,6 +235,14 @@ class ExecutionEngine: "closed_at": now, "new_opened_at": now, "pricing": "exit_by_side_then_entry_by_side", + "exit_pricing_role": exit_execution.pricing_role, + "exit_price_source": exit_execution.source, + "exit_price_age_seconds": exit_execution.age_seconds, + "exit_price_updated_at": exit_execution.updated_at, + "entry_pricing_role": entry_execution.pricing_role, + "entry_price_source": entry_execution.source, + "entry_price_age_seconds": entry_execution.age_seconds, + "entry_price_updated_at": entry_execution.updated_at, } JournalService().log_ui_info( @@ -243,6 +268,7 @@ class ExecutionEngine: forced_reason: str | None = None, forced_exit_price: float | None = None, forced_pnl: float | None = None, + forced_price_meta: _ExecutionPrice | None = None, ) -> ExecutionDecision: position = type(self)._position @@ -252,9 +278,11 @@ class ExecutionEngine: if forced_exit_price is not None: exit_price = forced_exit_price + exit_execution = forced_price_meta else: try: - exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side) + exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side) + exit_price = exit_execution.price except Exception as exc: return ExecutionDecision("NONE", False, f"Ошибка получения цены для закрытия: {exc}") @@ -283,6 +311,10 @@ class ExecutionEngine: "opened_at": position.opened_at, "closed_at": now, "pricing": "bid_for_long_exit_ask_for_short_exit", + "pricing_role": exit_execution.pricing_role if exit_execution else None, + "price_source": exit_execution.source if exit_execution else None, + "price_age_seconds": exit_execution.age_seconds if exit_execution else None, + "price_updated_at": exit_execution.updated_at if exit_execution else None, } JournalService().log_ui_info( @@ -318,7 +350,8 @@ class ExecutionEngine: return None try: - current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side) + current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side) + current_price = current_execution.price except Exception: return None @@ -331,6 +364,7 @@ class ExecutionEngine: forced_reason="MAX_LOSS", forced_exit_price=current_price, forced_pnl=unrealized_pnl, + forced_price_meta=current_execution, ) if self._is_stop_loss_hit(state, price_move_percent): @@ -339,6 +373,7 @@ class ExecutionEngine: forced_reason="STOP_LOSS", forced_exit_price=current_price, forced_pnl=unrealized_pnl, + forced_price_meta=current_execution, ) if self._is_take_profit_hit(state, price_move_percent): @@ -347,6 +382,7 @@ class ExecutionEngine: forced_reason="TAKE_PROFIT", forced_exit_price=current_price, forced_pnl=unrealized_pnl, + forced_price_meta=current_execution, ) return None @@ -412,7 +448,8 @@ class ExecutionEngine: return try: - current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side) + current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side) + current_price = current_execution.price except Exception: self._sync_state_from_position(state) return @@ -438,7 +475,7 @@ class ExecutionEngine: if price is None: try: - price = self._signal_entry_price(state) + price = self._signal_entry_price(state).price except Exception: return 0.0 @@ -487,7 +524,7 @@ class ExecutionEngine: state.execution_size_adjustment_reason = "MARGIN_LIMIT" return self._round_size(max_size) - def _signal_entry_price(self, state: AutoTradeState) -> float: + def _signal_entry_price(self, state: AutoTradeState) -> _ExecutionPrice: if state.last_signal == "BUY": return self._entry_price_for_side(state.symbol, "LONG") @@ -496,50 +533,83 @@ class ExecutionEngine: return self._market_last_price(state.symbol) - def _entry_price_for_side(self, symbol: str, side: str) -> float: - snapshot = ExchangeService().get_market_snapshot(symbol) + def _entry_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice: + snapshot = ExchangeService().get_execution_snapshot(symbol) if side == "LONG": - return self._snapshot_price(snapshot, "ask_price", "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.ask_price, "ask_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="LONG_ENTRY_ASK", + ) if side == "SHORT": - return self._snapshot_price(snapshot, "bid_price", "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.bid_price, "bid_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="SHORT_ENTRY_BID", + ) - return self._snapshot_price(snapshot, "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.last_price, "last_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="ENTRY_LAST", + ) - def _exit_price_for_side(self, symbol: str, side: str) -> float: - snapshot = ExchangeService().get_market_snapshot(symbol) + def _exit_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice: + snapshot = ExchangeService().get_execution_snapshot(symbol) if side == "LONG": - return self._snapshot_price(snapshot, "bid_price", "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.bid_price, "bid_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="LONG_EXIT_BID", + ) if side == "SHORT": - return self._snapshot_price(snapshot, "ask_price", "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.ask_price, "ask_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="SHORT_EXIT_ASK", + ) - return self._snapshot_price(snapshot, "last_price") + return _ExecutionPrice( + price=self._snapshot_price(snapshot.last_price, "last_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="EXIT_LAST", + ) - def _market_last_price(self, symbol: str) -> float: - snapshot = ExchangeService().get_market_snapshot(symbol) - return self._snapshot_price(snapshot, "last_price") + def _market_last_price(self, symbol: str) -> _ExecutionPrice: + snapshot = ExchangeService().get_execution_snapshot(symbol) - def _snapshot_price( - self, - snapshot: dict[str, object], - primary_key: str, - fallback_key: str | None = None, - ) -> float: - raw_price = snapshot.get(primary_key) - - if raw_price is None and fallback_key is not None: - raw_price = snapshot.get(fallback_key) + return _ExecutionPrice( + price=self._snapshot_price(snapshot.last_price, "last_price"), + source=snapshot.source, + age_seconds=snapshot.age_seconds, + updated_at=snapshot.updated_at, + pricing_role="MARKET_LAST", + ) + def _snapshot_price(self, raw_price: object, name: str) -> float: if raw_price is None: - raise ValueError(f"Market snapshot price '{primary_key}' is missing.") + raise ValueError(f"Execution snapshot price '{name}' is missing.") price = float(raw_price) if price <= 0: - raise ValueError(f"Market snapshot price '{primary_key}' is invalid: {price}") + raise ValueError(f"Execution snapshot price '{name}' is invalid: {price}") return price diff --git a/docs/roadmap/master-roadmap.md b/docs/roadmap/master-roadmap.md index fb47506..bf5dad4 100644 --- a/docs/roadmap/master-roadmap.md +++ b/docs/roadmap/master-roadmap.md @@ -290,6 +290,18 @@ - ordinary 🤖 Автоторговля screen remains unchanged by debug commands - preparation for production execution pricing layer +#### 07.4.3.16 — Production Execution Pricing Layer + +- added isolated runtime market caches +- separated AUTO and DEBUG websocket pricing +- added execution snapshot layer +- added freshness-aware execution pricing +- implemented websocket-first market sourcing +- added REST fallback pricing pipeline +- fixed signal timer reset after START +- removed shared market cache collisions +- stabilized AUTO/DEBUG UI market rendering + ### 07.4.4 ⏳ Grid Strategy diff --git a/docs/roadmap/stage-07-auto-trading-roadmap.md b/docs/roadmap/stage-07-auto-trading-roadmap.md index 82d07c1..73c8840 100644 --- a/docs/roadmap/stage-07-auto-trading-roadmap.md +++ b/docs/roadmap/stage-07-auto-trading-roadmap.md @@ -276,6 +276,17 @@ - ordinary 🤖 Автоторговля screen remains unchanged by debug commands - preparation for production execution pricing layer +#### 07.4.3.16 — Production Execution Pricing Layer +- added isolated runtime market caches +- separated AUTO and DEBUG websocket pricing +- added execution snapshot layer +- added freshness-aware execution pricing +- implemented websocket-first market sourcing +- added REST fallback pricing pipeline +- fixed signal timer reset after START +- removed shared market cache collisions +- stabilized AUTO/DEBUG UI market rendering + --- ### 07.4.4 diff --git a/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md b/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md new file mode 100644 index 0000000..dbfb893 --- /dev/null +++ b/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md @@ -0,0 +1,196 @@ +# 07.4.3.16 — Production Execution Pricing Layer + +## Overview + +Этап посвящён построению production-grade pricing layer для автоторговли. + +Основная цель: +- полностью разделить execution pricing между AUTO и DEBUG runtime +- перевести pricing pipeline на websocket-first архитектуру +- устранить конфликты shared cache +- стабилизировать execution snapshot layer +- обеспечить корректную freshness validation +- убрать скачки источников market data +- стабилизировать signal timer lifecycle + +--- + +# Основные проблемы до этапа + +## 1. Shared market cache + +AUTO и DEBUG runtime использовали общий MarketPriceCache. + +Последствия: +- runtime перетирали друг другу market snapshot +- execution source постоянно переключался +- UI показывал нестабильные данные +- HOLD timer визуально сбрасывался + +--- + +## 2. REST dominance + +Даже при активном websocket execution layer периодически переходил на REST fallback. + +Последствия: +- execution source прыгал между websocket и REST +- pricing становился нестабильным +- execution snapshot терял консистентность + +--- + +## 3. Signal lifecycle instability + +После Start/Stop signal_started_at не всегда сбрасывался корректно. + +Последствия: +- HOLD timer стартовал не с 0с +- UI показывал старый lifecycle сигнала + +--- + +# Архитектурные изменения + +## 1. Runtime-separated pricing + +Введено разделение runtime: + +- AUTO runtime +- DEBUG runtime + +Каждый runtime: +- имеет собственный websocket lifecycle +- имеет собственный market cache context +- имеет собственный execution snapshot source + +--- + +## 2. WebSocket-first pricing + +Execution layer теперь работает по модели: + +1. websocket cache +2. freshness validation +3. REST fallback only if required + +REST больше не является primary source. + +--- + +## 3. Execution Snapshot Layer + +Добавлен execution snapshot pipeline: + +- last_price +- bid_price +- ask_price +- source +- freshness +- quote age + +Execution snapshot используется для: +- ENTRY +- EXIT +- FLIP +- SL +- TP +- ML + +--- + +## 4. Freshness validation + +Добавлена проверка freshness market snapshot: + +- age_seconds +- execution freshness +- cache lifetime validation + +Execution pricing больше не использует stale quote. + +--- + +## 5. Runtime-aware market sourcing + +В pricing pipeline добавлен runtime context: + +- auto +- debug_auto + +Это устранило shared cache collisions. + +--- + +# UI Improvements + +## HOLD timer reset + +После Start: +- signal_started_at корректно сбрасывается +- HOLD timer стартует с 0с + +--- + +## Stable market rendering + +UI больше не прыгает между: +- websocket +- REST fallback +- stale cache + +--- + +## Stable execution source + +Execution source стабилизирован: +- ws_depth:auto +- ws_depth:debug_auto + +REST fallback используется только как backup. + +--- + +# Production Result + +После завершения этапа система получила: + +- production-grade execution pricing +- websocket-first pricing architecture +- isolated runtime execution +- stable market snapshot lifecycle +- freshness-aware execution pipeline +- stable AUTO/DEBUG separation +- deterministic signal lifecycle + +--- + +# Финальное состояние архитектуры + +AUTO runtime: +- own websocket +- own pricing cache +- own execution source + +DEBUG runtime: +- own websocket +- own pricing cache +- own execution source + +REST: +- fallback only + +Execution: +- freshness-aware +- runtime-isolated +- websocket-first + +--- + +# Stage Status + +Статус этапа: +- COMPLETED + +Этап: +- 07.4.3.16 — Production Execution Pricing Layer