diff --git a/app/src/integrations/exchange/market_cache.py b/app/src/integrations/exchange/market_cache.py
index 007f794..2e61d88 100644
--- a/app/src/integrations/exchange/market_cache.py
+++ b/app/src/integrations/exchange/market_cache.py
@@ -2,6 +2,7 @@
from __future__ import annotations
+import time
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
@@ -17,12 +18,31 @@ class MarketPriceSnapshot:
ask_price: float | None
updated_at: str
source: str = "market-cache"
+ runtime_key: str = "default"
+ received_monotonic: float = 0.0
+
+ def age_seconds(self) -> float:
+ if self.received_monotonic <= 0:
+ return 999999.0
+
+ return max(0.0, time.monotonic() - self.received_monotonic)
+
+ def has_bid_ask(self) -> bool:
+ return (
+ self.bid_price is not None
+ and self.ask_price is not None
+ and self.bid_price > 0
+ and self.ask_price > 0
+ )
class MarketPriceCache:
- _prices: dict[str, MarketPriceSnapshot] = {}
+ _prices: dict[tuple[str, str], MarketPriceSnapshot] = {}
+
+ @classmethod
+ def _key(cls, *, symbol: str, runtime_key: str = "default") -> tuple[str, str]:
+ return runtime_key.strip().lower(), symbol.upper()
- # сохранить последнюю цену
@classmethod
def set_price(
cls,
@@ -33,22 +53,59 @@ class MarketPriceCache:
ask_price: float | None = None,
updated_at: str | None = None,
source: str = "market-polling",
+ runtime_key: str = "default",
) -> None:
settings = load_settings()
if updated_at is None:
updated_at = datetime.now(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S")
- cls._prices[symbol.upper()] = MarketPriceSnapshot(
+ normalized_runtime_key = runtime_key.strip().lower()
+
+ cls._prices[cls._key(symbol=symbol, runtime_key=normalized_runtime_key)] = MarketPriceSnapshot(
symbol=symbol.upper(),
- price=price,
- bid_price=bid_price,
- ask_price=ask_price,
+ price=float(price),
+ bid_price=float(bid_price) if bid_price is not None else None,
+ ask_price=float(ask_price) if ask_price is not None else None,
updated_at=updated_at,
source=source,
+ runtime_key=normalized_runtime_key,
+ received_monotonic=time.monotonic(),
)
- # получить последнюю цену
@classmethod
- def get_price(cls, symbol: str) -> MarketPriceSnapshot | None:
- return cls._prices.get(symbol.upper())
\ No newline at end of file
+ def get_price(
+ cls,
+ symbol: str,
+ *,
+ runtime_key: str = "default",
+ ) -> MarketPriceSnapshot | None:
+ return cls._prices.get(cls._key(symbol=symbol, runtime_key=runtime_key))
+
+ @classmethod
+ def clear(
+ cls,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> None:
+ if symbol is None and runtime_key is None:
+ cls._prices.clear()
+ return
+
+ if symbol is not None and runtime_key is not None:
+ cls._prices.pop(cls._key(symbol=symbol, runtime_key=runtime_key), None)
+ return
+
+ keys_to_delete = []
+
+ for key_runtime, key_symbol in cls._prices.keys():
+ if runtime_key is not None and key_runtime == runtime_key.strip().lower():
+ keys_to_delete.append((key_runtime, key_symbol))
+ continue
+
+ if symbol is not None and key_symbol == symbol.upper():
+ keys_to_delete.append((key_runtime, key_symbol))
+
+ for key in keys_to_delete:
+ cls._prices.pop(key, None)
\ No newline at end of file
diff --git a/app/src/integrations/exchange/market_data_runner.py b/app/src/integrations/exchange/market_data_runner.py
index d5c6a0c..b635594 100644
--- a/app/src/integrations/exchange/market_data_runner.py
+++ b/app/src/integrations/exchange/market_data_runner.py
@@ -3,54 +3,450 @@
from __future__ import annotations
import asyncio
+import traceback
+from dataclasses import dataclass
from typing import Callable
+from src.integrations.exchange.market_cache import MarketPriceCache
from src.integrations.exchange.service import ExchangeService
+from src.integrations.exchange.ws_client import ExchangeWebSocketClient
+from src.trading.journal.service import JournalService
+
+
+@dataclass
+class MarketRuntimeContext:
+ runtime_key: str
+ task: asyncio.Task | None
+ interval_seconds: int
+ symbol_provider: Callable[[], str | None]
+ screen: str | None
+ action: str
+ runtime_label: str | None
class MarketDataRunner:
- _task: asyncio.Task | None = None
- _interval_seconds: int = 1
- _symbol_provider: Callable[[], str | None] | None = None
+ _runtimes: dict[str, MarketRuntimeContext] = {}
- # запустить быстрый polling рыночной цены
@classmethod
def start(
cls,
*,
symbol_provider: Callable[[], str | None],
interval_seconds: int = 1,
+ runtime_key: str = "default",
+ screen: str | None = None,
+ action: str = "market_data",
+ runtime_label: str | None = None,
) -> None:
- cls._symbol_provider = symbol_provider
- cls._interval_seconds = interval_seconds
+ existing = cls._runtimes.get(runtime_key)
- if cls._task is not None and not cls._task.done():
+ if existing is not None and existing.task is not None and not existing.task.done():
+ existing.symbol_provider = symbol_provider
+ existing.interval_seconds = interval_seconds
+ existing.screen = screen
+ existing.action = action
+ existing.runtime_label = runtime_label
+
+ cls._log_info(
+ existing,
+ "market_runner_context_updated",
+ "MarketDataRunner context updated.",
+ {"interval_seconds": interval_seconds},
+ )
return
- cls._task = asyncio.create_task(cls._worker())
+ context = MarketRuntimeContext(
+ runtime_key=runtime_key,
+ task=None,
+ interval_seconds=interval_seconds,
+ symbol_provider=symbol_provider,
+ screen=screen,
+ action=action,
+ runtime_label=runtime_label,
+ )
+
+ cls._runtimes[runtime_key] = context
+
+ cls._log_info(
+ context,
+ "market_runner_started",
+ "MarketDataRunner started.",
+ {"interval_seconds": interval_seconds},
+ )
+
+ context.task = asyncio.create_task(cls._worker(context))
- # остановить polling
@classmethod
- def stop(cls) -> None:
- if cls._task is None:
+ def stop(cls, runtime_key: str | None = None) -> None:
+ if runtime_key is None:
+ for key in list(cls._runtimes.keys()):
+ cls.stop(key)
return
- cls._task.cancel()
- cls._task = None
+ context = cls._runtimes.get(runtime_key)
+ if context is None:
+ return
+
+ if context.task is not None:
+ context.task.cancel()
+ context.task = None
+
+ cls._log_info(
+ context,
+ "market_runner_stopped",
+ "MarketDataRunner stopped.",
+ )
+
+ cls._runtimes.pop(runtime_key, None)
- # рабочий цикл polling
@classmethod
- async def _worker(cls) -> None:
+ async def _worker(cls, context: MarketRuntimeContext) -> None:
+ last_symbol: str | None = None
+
while True:
- symbol = cls._symbol_provider() if cls._symbol_provider else None
+ symbol = context.symbol_provider()
- if symbol:
- try:
- await asyncio.to_thread(
- ExchangeService().refresh_price_cache,
- symbol,
- )
- except Exception:
- pass
+ if not symbol:
+ cls._log_warning(
+ context,
+ "market_runner_no_symbol",
+ "MarketDataRunner has no symbol.",
+ )
+ await asyncio.sleep(context.interval_seconds)
+ continue
- await asyncio.sleep(cls._interval_seconds)
\ No newline at end of file
+ cache_symbol = cls._cache_symbol(symbol)
+ ws_symbol = cls._ws_symbol(symbol)
+
+ if symbol != last_symbol:
+ last_symbol = symbol
+
+ if not cls._is_cache_symbol_used_by_other_runtime(
+ runtime_key=context.runtime_key,
+ cache_symbol=cache_symbol,
+ ):
+ MarketPriceCache.clear(cache_symbol)
+
+ cls._log_info(
+ context,
+ "market_runner_symbol_changed",
+ "MarketDataRunner symbol changed.",
+ {
+ "symbol": symbol,
+ "cache_symbol": cache_symbol,
+ "ws_symbol": ws_symbol,
+ },
+ )
+
+ try:
+ await cls._run_websocket(context, symbol)
+ except asyncio.CancelledError:
+ raise
+ except Exception as exc:
+ cls._log_error(
+ context,
+ "market_ws_error_fallback",
+ "WebSocket market data failed. Falling back to REST.",
+ {
+ "symbol": symbol,
+ "cache_symbol": cache_symbol,
+ "ws_symbol": ws_symbol,
+ "error": str(exc),
+ "error_type": type(exc).__name__,
+ "traceback": traceback.format_exc(limit=5),
+ },
+ )
+
+ await cls._rest_fallback_once(context, symbol)
+ await asyncio.sleep(context.interval_seconds)
+
+ @classmethod
+ async def _run_websocket(cls, context: MarketRuntimeContext, symbol: str) -> None:
+ cache_symbol = cls._cache_symbol(symbol)
+ ws_symbol = cls._ws_symbol(symbol)
+
+ cls._log_info(
+ context,
+ "market_ws_connecting",
+ "Connecting market WebSocket.",
+ {
+ "requested_symbol": symbol,
+ "cache_symbol": cache_symbol,
+ "ws_symbol": ws_symbol,
+ },
+ )
+
+ payload_count = 0
+
+ async for payload in ExchangeWebSocketClient().stream_depth(
+ ws_symbol,
+ interval_seconds=context.interval_seconds,
+ ):
+ if payload_count == 0:
+ cls._log_info(
+ context,
+ "market_ws_connected",
+ "Market WebSocket connected and first payload received.",
+ {
+ "requested_symbol": symbol,
+ "cache_symbol": cache_symbol,
+ "ws_symbol": ws_symbol,
+ "payload_keys": list(payload.keys()),
+ "payload_preview": cls._safe_payload_preview(payload),
+ },
+ )
+
+ payload_count += 1
+
+ current_symbol = context.symbol_provider()
+ if current_symbol and current_symbol != symbol:
+ cls._log_info(
+ context,
+ "market_ws_symbol_switch",
+ "Market WebSocket stopped because symbol changed.",
+ {
+ "old_symbol": symbol,
+ "new_symbol": current_symbol,
+ },
+ )
+ break
+
+ best_bid = cls._extract_best_price(payload, "bids")
+ best_ask = cls._extract_best_price(payload, "asks")
+
+ if best_bid is None or best_ask is None:
+ cls._log_warning(
+ context,
+ "market_ws_payload_unrecognized",
+ "Market WebSocket payload does not contain recognizable bids/asks.",
+ {
+ "requested_symbol": symbol,
+ "cache_symbol": cache_symbol,
+ "ws_symbol": ws_symbol,
+ "payload_keys": list(payload.keys()),
+ "payload_preview": cls._safe_payload_preview(payload),
+ },
+ )
+ continue
+
+ MarketPriceCache.set_price(
+ symbol=cache_symbol,
+ price=(best_bid + best_ask) / 2,
+ bid_price=best_bid,
+ ask_price=best_ask,
+ source=f"ws_depth:{context.runtime_key}",
+ runtime_key=context.runtime_key,
+ )
+
+ @classmethod
+ async def _rest_fallback_once(cls, context: MarketRuntimeContext, symbol: str) -> None:
+ try:
+ snapshot = await asyncio.to_thread(
+ ExchangeService().refresh_market_snapshot_cache,
+ symbol,
+ runtime_key=context.runtime_key,
+ )
+
+ cls._log_warning(
+ context,
+ "market_rest_fallback_success",
+ "REST fallback market snapshot loaded.",
+ {
+ "symbol": symbol,
+ "snapshot_symbol": snapshot.get("symbol"),
+ "source": snapshot.get("source"),
+ "last_price": snapshot.get("last_price"),
+ "bid_price": snapshot.get("bid_price"),
+ "ask_price": snapshot.get("ask_price"),
+ },
+ )
+ except Exception as exc:
+ cls._log_error(
+ context,
+ "market_rest_fallback_error",
+ "REST fallback market snapshot failed.",
+ {
+ "symbol": symbol,
+ "error": str(exc),
+ "error_type": type(exc).__name__,
+ "traceback": traceback.format_exc(limit=5),
+ },
+ )
+
+ @classmethod
+ def _is_cache_symbol_used_by_other_runtime(cls, *, runtime_key: str, cache_symbol: str) -> bool:
+ for key, context in cls._runtimes.items():
+ if key == runtime_key:
+ continue
+
+ try:
+ symbol = context.symbol_provider()
+ except Exception:
+ continue
+
+ if symbol and cls._cache_symbol(symbol) == cache_symbol:
+ return True
+
+ return False
+
+ @classmethod
+ def _cache_symbol(cls, symbol: str) -> str:
+ try:
+ validation = ExchangeService().validate_symbol(symbol)
+ if validation.is_valid:
+ return validation.normalized_symbol
+ except Exception:
+ pass
+
+ return symbol
+
+ @classmethod
+ def _ws_symbol(cls, symbol: str) -> str:
+ return cls._cache_symbol(symbol)
+
+ @classmethod
+ def _extract_best_price(cls, payload: dict, side_key: str) -> float | None:
+ data = payload
+
+ inner = payload.get("payload")
+ if isinstance(inner, dict):
+ data = inner
+
+ values = data.get(side_key)
+
+ if not isinstance(values, list) or not values:
+ return None
+
+ first = values[0]
+
+ if isinstance(first, list) and first:
+ return cls._safe_float(first[0])
+
+ if isinstance(first, dict):
+ return cls._safe_float(
+ first.get("price")
+ or first.get("p")
+ or first.get("bidPrice")
+ or first.get("askPrice")
+ )
+
+ return None
+
+ @classmethod
+ def _safe_float(cls, value: object) -> float | None:
+ try:
+ number = float(value)
+ except (TypeError, ValueError):
+ return None
+
+ return number if number > 0 else None
+
+ @classmethod
+ def _safe_payload_preview(cls, payload: dict) -> dict:
+ preview: dict = {}
+
+ for key, value in payload.items():
+ if key in {"bids", "asks"} and isinstance(value, list):
+ preview[key] = value[:2]
+ elif key == "payload" and isinstance(value, dict):
+ preview[key] = {
+ inner_key: inner_value[:2]
+ if inner_key in {"bids", "asks"} and isinstance(inner_value, list)
+ else inner_value
+ for inner_key, inner_value in value.items()
+ }
+ else:
+ preview[key] = value
+
+ return preview
+
+ @classmethod
+ def _message(cls, context: MarketRuntimeContext, message: str) -> str:
+ if context.runtime_label:
+ return f"{context.runtime_label} {message}"
+
+ return message
+
+ @classmethod
+ def _payload(cls, context: MarketRuntimeContext, payload: dict | None = None) -> dict:
+ result = dict(payload or {})
+ result.setdefault("runtime_key", context.runtime_key)
+
+ if context.screen:
+ result.setdefault("runtime_screen", context.screen)
+
+ if context.runtime_label:
+ result.setdefault("runtime_label", context.runtime_label)
+
+ return result
+
+ @classmethod
+ def _log_info(
+ cls,
+ context: MarketRuntimeContext,
+ event_type: str,
+ message: str,
+ payload: dict | None = None,
+ ) -> None:
+ try:
+ if context.screen:
+ JournalService().log_ui_info(
+ event_type=event_type,
+ message=cls._message(context, message),
+ screen=context.screen,
+ action=context.action,
+ payload=cls._payload(context, payload),
+ )
+ return
+
+ JournalService().log_info(event_type, cls._message(context, message), cls._payload(context, payload))
+ except Exception:
+ pass
+
+ @classmethod
+ def _log_warning(
+ cls,
+ context: MarketRuntimeContext,
+ event_type: str,
+ message: str,
+ payload: dict | None = None,
+ ) -> None:
+ try:
+ if context.screen:
+ JournalService().log_ui_warning(
+ event_type=event_type,
+ message=cls._message(context, message),
+ screen=context.screen,
+ action=context.action,
+ payload=cls._payload(context, payload),
+ )
+ return
+
+ JournalService().log_warning(event_type, cls._message(context, message), cls._payload(context, payload))
+ except Exception:
+ pass
+
+ @classmethod
+ def _log_error(
+ cls,
+ context: MarketRuntimeContext,
+ event_type: str,
+ message: str,
+ payload: dict | None = None,
+ ) -> None:
+ try:
+ if context.screen:
+ JournalService().log_ui_error(
+ event_type=event_type,
+ message=cls._message(context, message),
+ screen=context.screen,
+ action=context.action,
+ payload=cls._payload(context, payload),
+ error_type=(payload or {}).get("error_type"),
+ raw_error=(payload or {}).get("error"),
+ )
+ return
+
+ JournalService().log_error(event_type, cls._message(context, message), cls._payload(context, payload))
+ except Exception:
+ pass
\ No newline at end of file
diff --git a/app/src/integrations/exchange/models.py b/app/src/integrations/exchange/models.py
index d4a1767..4d7bea9 100644
--- a/app/src/integrations/exchange/models.py
+++ b/app/src/integrations/exchange/models.py
@@ -20,6 +20,18 @@ class TickerPrice:
updated_at: str
+@dataclass(slots=True)
+class ExecutionPriceSnapshot:
+ symbol: str
+ last_price: float
+ bid_price: float
+ ask_price: float
+ updated_at: str
+ source: str
+ is_fresh: bool
+ age_seconds: float | None = None
+
+
@dataclass(slots=True)
class BalanceSummary:
currency: str
diff --git a/app/src/integrations/exchange/service.py b/app/src/integrations/exchange/service.py
index 22857ef..5783a58 100644
--- a/app/src/integrations/exchange/service.py
+++ b/app/src/integrations/exchange/service.py
@@ -8,6 +8,7 @@ from zoneinfo import ZoneInfo
from src.core.config import load_settings
from src.integrations.exchange.balance_parser import parse_account_balances
from src.integrations.exchange.exceptions import ExchangeError
+from src.integrations.exchange.market_cache import MarketPriceCache
from src.integrations.exchange.mock_data import (
mock_balance_summary,
mock_exchange_health,
@@ -17,6 +18,7 @@ from src.integrations.exchange.models import (
BalanceSummary,
ExchangeHealth,
ExchangeSymbol,
+ ExecutionPriceSnapshot,
PrivateAuthHealth,
SymbolValidationResult,
TickerPrice,
@@ -25,11 +27,12 @@ from src.integrations.exchange.private_client import ExchangePrivateClient
from src.integrations.exchange.rest_client import ExchangeRestClient
from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates
from src.trading.journal.service import JournalService
-from src.integrations.exchange.market_cache import MarketPriceCache
class ExchangeService:
_exchange_symbols_cache: list[ExchangeSymbol] | None = None
+ _execution_cache_max_age_seconds = 2.0
+ _default_runtime_key = "auto"
def __init__(self) -> None:
self.settings = load_settings()
@@ -136,6 +139,9 @@ class ExchangeService:
else "dzengi-api"
)
+ def _runtime_key(self, runtime_key: str | None) -> str:
+ return (runtime_key or self._default_runtime_key).strip().lower()
+
def get_health(self) -> ExchangeHealth:
if not self.settings.exchange_enabled:
return mock_exchange_health()
@@ -194,38 +200,53 @@ class ExchangeService:
message=f"Private API OK. Балансов получено: {len(balances)}",
)
- # принудительно обновить market cache через REST
- def refresh_price_cache(self, symbol: str | None = None) -> TickerPrice:
- symbol_to_use = symbol or self.settings.default_symbol
-
- if not self.settings.exchange_enabled:
- ticker = mock_ticker_price(symbol_to_use)
- MarketPriceCache.set_price(
- symbol=ticker.symbol,
- price=ticker.price,
- updated_at=ticker.updated_at,
- source=ticker.source,
- )
- return ticker
-
- validation = self.validate_symbol(symbol_to_use)
- if not validation.is_valid:
- raise ExchangeError(validation.message)
-
- ticker = self._get_real_price(validation.normalized_symbol)
-
- MarketPriceCache.set_price(
- symbol=ticker.symbol,
- price=ticker.price,
- updated_at=ticker.updated_at,
- source=ticker.source,
+ def refresh_price_cache(
+ self,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> TickerPrice:
+ snapshot = self.refresh_market_snapshot_cache(
+ symbol,
+ runtime_key=runtime_key,
)
- return ticker
+ return TickerPrice(
+ symbol=str(snapshot["symbol"]),
+ price=float(snapshot["last_price"]),
+ source=str(snapshot.get("source") or self._source_name()),
+ updated_at=str(snapshot["updated_at"]),
+ )
- # получить цену инструмента: сначала WebSocket cache, потом REST fallback
- def get_price(self, symbol: str | None = None) -> TickerPrice:
+ def refresh_market_snapshot_cache(
+ self,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> dict[str, object]:
+ normalized_runtime_key = self._runtime_key(runtime_key)
+ snapshot = self.get_fresh_market_snapshot(symbol)
+
+ MarketPriceCache.set_price(
+ symbol=str(snapshot["symbol"]),
+ price=float(snapshot["last_price"]),
+ bid_price=float(snapshot["bid_price"]),
+ ask_price=float(snapshot["ask_price"]),
+ updated_at=str(snapshot["updated_at"]),
+ source=str(snapshot.get("source") or "rest_polling"),
+ runtime_key=normalized_runtime_key,
+ )
+
+ return snapshot
+
+ def get_price(
+ self,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
+ normalized_runtime_key = self._runtime_key(runtime_key)
if not self.settings.exchange_enabled:
return mock_ticker_price(symbol_to_use)
@@ -234,7 +255,10 @@ class ExchangeService:
if not validation.is_valid:
raise ExchangeError(validation.message)
- cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
+ cached_price = MarketPriceCache.get_price(
+ validation.normalized_symbol,
+ runtime_key=normalized_runtime_key,
+ )
if cached_price is not None:
return TickerPrice(
@@ -246,9 +270,14 @@ class ExchangeService:
return self._get_real_price(validation.normalized_symbol)
- # получить market snapshot: сначала WebSocket cache, потом REST fallback
- def get_market_snapshot(self, symbol: str | None = None) -> dict[str, object]:
+ def get_market_snapshot(
+ self,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> dict[str, object]:
symbol_to_use = symbol or self.settings.default_symbol
+ normalized_runtime_key = self._runtime_key(runtime_key)
if not self.settings.exchange_enabled:
ticker = mock_ticker_price(symbol_to_use)
@@ -258,61 +287,102 @@ class ExchangeService:
"bid_price": ticker.price,
"ask_price": ticker.price,
"updated_at": ticker.updated_at,
+ "source": ticker.source,
+ "runtime_key": normalized_runtime_key,
+ "age_seconds": 0.0,
+ "is_fresh": True,
}
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
- cached_price = MarketPriceCache.get_price(validation.normalized_symbol)
+ cached_price = MarketPriceCache.get_price(
+ validation.normalized_symbol,
+ runtime_key=normalized_runtime_key,
+ )
if cached_price is not None:
+ age = cached_price.age_seconds()
+
return {
"symbol": cached_price.symbol,
"last_price": cached_price.price,
"bid_price": cached_price.bid_price or cached_price.price,
"ask_price": cached_price.ask_price or cached_price.price,
"updated_at": cached_price.updated_at,
+ "source": cached_price.source,
+ "runtime_key": cached_price.runtime_key,
+ "age_seconds": round(age, 3),
+ "is_fresh": age <= self._execution_cache_max_age_seconds,
}
- client = ExchangeRestClient()
+ snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol)
+ snapshot["runtime_key"] = normalized_runtime_key
+ return snapshot
- try:
- payload = client.get_json(
- "/api/v2/ticker/24hr",
- params={"symbol": validation.normalized_symbol},
+ def get_execution_snapshot(
+ self,
+ symbol: str | None = None,
+ *,
+ runtime_key: str | None = None,
+ ) -> ExecutionPriceSnapshot:
+ symbol_to_use = symbol or self.settings.default_symbol
+ normalized_runtime_key = self._runtime_key(runtime_key)
+
+ if not self.settings.exchange_enabled:
+ ticker = mock_ticker_price(symbol_to_use)
+ return ExecutionPriceSnapshot(
+ symbol=ticker.symbol,
+ last_price=ticker.price,
+ bid_price=ticker.price,
+ ask_price=ticker.price,
+ updated_at=ticker.updated_at,
+ source=ticker.source,
+ is_fresh=True,
+ age_seconds=0.0,
)
- except Exception as exc:
- self._log_exchange_error(
- endpoint="ticker/24hr",
- exc=exc,
- symbol=validation.normalized_symbol,
- )
- raise ExchangeError(str(exc)) from exc
- last_raw = payload.get("lastPrice")
- if last_raw is None:
- exc = ExchangeError("Field 'lastPrice' is missing in ticker response.")
- self._log_exchange_error(
- endpoint="ticker/24hr",
- exc=exc,
- symbol=validation.normalized_symbol,
- )
- raise exc
+ validation = self.validate_symbol(symbol_to_use)
+ if not validation.is_valid:
+ raise ExchangeError(validation.message)
- bid_raw = payload.get("bidPrice") or last_raw
- ask_raw = payload.get("askPrice") or last_raw
- close_time = payload.get("closeTime") or payload.get("eventTime") or ""
+ cached_price = MarketPriceCache.get_price(
+ validation.normalized_symbol,
+ runtime_key=normalized_runtime_key,
+ )
- return {
- "symbol": validation.normalized_symbol,
- "last_price": float(last_raw),
- "bid_price": float(bid_raw),
- "ask_price": float(ask_raw),
- "updated_at": self._format_exchange_time(close_time),
- }
+ if cached_price is not None:
+ age = cached_price.age_seconds()
+
+ if (
+ age <= self._execution_cache_max_age_seconds
+ and cached_price.has_bid_ask()
+ ):
+ return ExecutionPriceSnapshot(
+ symbol=cached_price.symbol,
+ last_price=cached_price.price,
+ bid_price=float(cached_price.bid_price),
+ ask_price=float(cached_price.ask_price),
+ updated_at=cached_price.updated_at,
+ source=f"{cached_price.source}:fresh_cache",
+ is_fresh=True,
+ age_seconds=round(age, 3),
+ )
+
+ snapshot = self.get_fresh_market_snapshot(validation.normalized_symbol)
+
+ return ExecutionPriceSnapshot(
+ symbol=str(snapshot["symbol"]),
+ last_price=float(snapshot["last_price"]),
+ bid_price=float(snapshot["bid_price"]),
+ ask_price=float(snapshot["ask_price"]),
+ updated_at=str(snapshot["updated_at"]),
+ source="rest_fallback",
+ is_fresh=True,
+ age_seconds=0.0,
+ )
- # получить свежий market snapshot напрямую через REST, без WebSocket cache
def get_fresh_market_snapshot(self, symbol: str | None = None) -> dict[str, object]:
symbol_to_use = symbol or self.settings.default_symbol
@@ -325,6 +395,8 @@ class ExchangeService:
"ask_price": ticker.price,
"updated_at": ticker.updated_at,
"source": "mock",
+ "age_seconds": 0.0,
+ "is_fresh": True,
}
validation = self.validate_symbol(symbol_to_use)
@@ -367,8 +439,10 @@ class ExchangeService:
"ask_price": float(ask_raw),
"updated_at": self._format_exchange_time(close_time),
"source": "fresh_rest",
+ "age_seconds": 0.0,
+ "is_fresh": True,
}
-
+
def get_balance_summary(self) -> list[BalanceSummary]:
if not self.settings.exchange_enabled:
return mock_balance_summary()
@@ -562,7 +636,7 @@ class ExchangeService:
)
type(self)._exchange_symbols_cache = items
-
+
return items
def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult:
@@ -609,36 +683,11 @@ class ExchangeService:
)
def _get_real_price(self, symbol: str) -> TickerPrice:
- client = ExchangeRestClient()
-
- try:
- payload = client.get_json(
- "/api/v2/ticker/24hr",
- params={"symbol": symbol},
- )
- except Exception as exc:
- self._log_exchange_error(
- endpoint="ticker/24hr",
- exc=exc,
- symbol=symbol,
- )
- raise ExchangeError(str(exc)) from exc
-
- price_raw = payload.get("lastPrice")
- if price_raw is None:
- exc = ExchangeError("Field 'lastPrice' is missing in ticker response.")
- self._log_exchange_error(
- endpoint="ticker/24hr",
- exc=exc,
- symbol=symbol,
- )
- raise exc
-
- close_time = payload.get("closeTime") or payload.get("eventTime") or ""
+ snapshot = self.get_fresh_market_snapshot(symbol)
return TickerPrice(
- symbol=symbol,
- price=float(price_raw),
+ symbol=str(snapshot["symbol"]),
+ price=float(snapshot["last_price"]),
source=self._source_name(),
- updated_at=self._format_exchange_time(close_time),
+ updated_at=str(snapshot["updated_at"]),
)
\ No newline at end of file
diff --git a/app/src/integrations/exchange/ws_client.py b/app/src/integrations/exchange/ws_client.py
index b2d5387..b3e61ac 100644
--- a/app/src/integrations/exchange/ws_client.py
+++ b/app/src/integrations/exchange/ws_client.py
@@ -2,9 +2,10 @@
from __future__ import annotations
+import asyncio
import json
from typing import AsyncIterator
-from urllib.parse import urlencode
+from uuid import uuid4
import websockets
@@ -16,22 +17,27 @@ class ExchangeWebSocketClient:
self.settings = load_settings()
self.base_url = self._build_ws_base_url()
- # собрать базовый websocket url
def _build_ws_base_url(self) -> str:
raw_url = self.settings.exchange_ws_url or self.settings.exchange_base_url
if raw_url.startswith("https://"):
- return raw_url.replace("https://", "wss://", 1).rstrip("/")
+ raw_url = raw_url.replace("https://", "wss://", 1)
+ elif raw_url.startswith("http://"):
+ raw_url = raw_url.replace("http://", "ws://", 1)
- if raw_url.startswith("http://"):
- return raw_url.replace("http://", "ws://", 1).rstrip("/")
+ raw_url = raw_url.rstrip("/")
- return raw_url.rstrip("/")
+ if raw_url.endswith("/connect"):
+ return raw_url
- # читать стакан по websocket
- async def stream_depth(self, symbol: str) -> AsyncIterator[dict]:
- url = f"{self.base_url}/api/v2/depth"
+ return f"{raw_url}/connect"
+ async def stream_depth(
+ self,
+ symbol: str,
+ *,
+ interval_seconds: float = 1.0,
+ ) -> AsyncIterator[dict]:
headers = {
"Origin": self.settings.exchange_base_url.rstrip("/"),
"Content-Type": "application/json",
@@ -40,25 +46,41 @@ class ExchangeWebSocketClient:
if self.settings.exchange_api_key:
headers["X-MBX-APIKEY"] = self.settings.exchange_api_key
- request = {
- "limit": 5,
- "symbol": symbol,
- }
-
async with websockets.connect(
- url,
+ self.base_url,
extra_headers=headers,
subprotocols=["json"],
ping_interval=20,
open_timeout=self.settings.exchange_timeout_sec,
) as websocket:
- await websocket.send(json.dumps(request))
+ while True:
+ request = {
+ "correlationId": str(uuid4()),
+ "destination": "/api/v2/depth",
+ "payload": {
+ "limit": 5,
+ "symbol": symbol,
+ },
+ }
+
+ await websocket.send(json.dumps(request))
+
+ try:
+ raw_message = await asyncio.wait_for(
+ websocket.recv(),
+ timeout=self.settings.exchange_timeout_sec,
+ )
+ except asyncio.TimeoutError:
+ await asyncio.sleep(interval_seconds)
+ continue
- async for raw_message in websocket:
try:
payload = json.loads(raw_message)
except json.JSONDecodeError:
+ await asyncio.sleep(interval_seconds)
continue
if isinstance(payload, dict):
- yield payload
\ No newline at end of file
+ yield payload
+
+ await asyncio.sleep(interval_seconds)
\ No newline at end of file
diff --git a/app/src/telegram/handlers/auto/ui.py b/app/src/telegram/handlers/auto/ui.py
index 382491f..96801b4 100644
--- a/app/src/telegram/handlers/auto/ui.py
+++ b/app/src/telegram/handlers/auto/ui.py
@@ -296,7 +296,7 @@ def _market_snapshot(symbol: str | None) -> dict[str, object] | None:
return None
try:
- return ExchangeService().get_market_snapshot(symbol)
+ return ExchangeService().get_market_snapshot(symbol, runtime_key="auto")
except Exception:
return None
diff --git a/app/src/telegram/handlers/debug_auto/main.py b/app/src/telegram/handlers/debug_auto/main.py
index 5659b3d..e5bacea 100644
--- a/app/src/telegram/handlers/debug_auto/main.py
+++ b/app/src/telegram/handlers/debug_auto/main.py
@@ -2,6 +2,8 @@
from __future__ import annotations
+import time
+
from aiogram import F, Router
from aiogram.exceptions import TelegramBadRequest
from aiogram.fsm.context import FSMContext
@@ -18,6 +20,11 @@ from src.trading.debug.service import DebugTradeService
router = Router(name="debug_auto")
+def _ensure_signal_started_at(state) -> None:
+ if state.signal_started_at is None:
+ state.signal_started_at = time.monotonic()
+
+
async def render_debug_auto_screen(
target_message: Message,
*,
@@ -77,6 +84,7 @@ async def debug_auto_start(callback: CallbackQuery) -> None:
service = DebugTradeService()
state = service.get_state()
state.status = "RUNNING"
+ _ensure_signal_started_at(state)
DebugTradeRunner.set_current_screen("debug_auto")
DebugTradeRunner.start()
@@ -89,6 +97,7 @@ async def debug_auto_start(callback: CallbackQuery) -> None:
@router.callback_query(F.data == "debug_auto:stop")
async def debug_auto_stop(callback: CallbackQuery) -> None:
+ DebugTradeRunner.set_current_screen("debug_auto")
DebugTradeRunner.stop()
DebugTradeService().stop()
@@ -145,6 +154,8 @@ async def debug_auto_close(callback: CallbackQuery) -> None:
service = DebugTradeService()
_, result = service.close(reason="DEBUG_SCREEN_CLOSE")
+ DebugTradeRunner.set_current_screen("debug_auto")
+
if callback.message is not None:
await render_debug_auto_screen(callback.message, edit_mode=True)
@@ -153,7 +164,9 @@ async def debug_auto_close(callback: CallbackQuery) -> None:
@router.callback_query(F.data == "debug_auto:reset")
async def debug_auto_reset(callback: CallbackQuery) -> None:
- DebugTradeService().reset()
+ DebugTradeRunner.set_current_screen("debug_auto")
+ state = DebugTradeService().reset()
+ _ensure_signal_started_at(state)
if callback.message is not None:
await render_debug_auto_screen(callback.message, edit_mode=True)
diff --git a/app/src/telegram/handlers/debug_auto/ui.py b/app/src/telegram/handlers/debug_auto/ui.py
index 069a349..d4e181d 100644
--- a/app/src/telegram/handlers/debug_auto/ui.py
+++ b/app/src/telegram/handlers/debug_auto/ui.py
@@ -7,6 +7,7 @@ import time
from aiogram.types import InlineKeyboardMarkup
from aiogram.utils.keyboard import InlineKeyboardBuilder
+from src.integrations.exchange.service import ExchangeService
from src.trading.debug.service import DebugTradeService
@@ -38,6 +39,8 @@ def build_debug_auto_text() -> str:
f"Баланс · $ {_format_money_compact(state.allocated_balance_usd)}",
f"Realized PnL · {_format_signed_usd(state.realized_pnl_usd)}",
"",
+ *_market_snapshot_lines(state.symbol),
+ "",
_signal_line(state),
]
@@ -106,6 +109,90 @@ def build_debug_auto_text() -> str:
return "\n".join(parts)
+def _format_updated_at(value: object) -> str:
+ if not value:
+ return "—"
+
+ text = str(value)
+
+ if " " in text:
+ return text.rsplit(" ", 1)[-1]
+
+ return text
+
+
+def _market_snapshot_lines(symbol: str | None) -> list[str]:
+ if not symbol:
+ return [
+ "Market",
+ "Last · —",
+ "Bid · —",
+ "Ask · —",
+ "Source · —",
+ "Age · —",
+ "",
+ "Execution",
+ "Source · —",
+ "Age · —",
+ ]
+
+ market = None
+ execution = None
+ error = None
+
+ try:
+ market = ExchangeService().get_market_snapshot(
+ symbol,
+ runtime_key="debug_auto",
+ )
+ except Exception as exc:
+ error = str(exc)
+
+ try:
+ execution = ExchangeService().get_execution_snapshot(
+ symbol,
+ runtime_key="debug_auto",
+ )
+ except Exception as exc:
+ if error is None:
+ error = str(exc)
+
+ if market is None and execution is None:
+ return [
+ "Market",
+ "Last · —",
+ "Bid · —",
+ "Ask · —",
+ "Source · error",
+ f"Error · {error or 'unknown'}",
+ ]
+
+ last_price = market.get("last_price") if market else getattr(execution, "last_price", None)
+ bid_price = market.get("bid_price") if market else getattr(execution, "bid_price", None)
+ ask_price = market.get("ask_price") if market else getattr(execution, "ask_price", None)
+ market_source = market.get("source") if market else "—"
+ market_age = market.get("age_seconds") if market else None
+
+ execution_source = getattr(execution, "source", "—") if execution else "—"
+ execution_age = getattr(execution, "age_seconds", None) if execution else None
+ execution_fresh = getattr(execution, "is_fresh", None) if execution else None
+
+ return [
+ "Market",
+ f"Last · {_format_usd_or_dash(last_price)}",
+ f"Bid · {_format_usd_or_dash(bid_price)}",
+ f"Ask · {_format_usd_or_dash(ask_price)}",
+ f"Source · {market_source or '—'}",
+ f"Quote age · {_format_age(market_age)}",
+ f"Exchange time · {_format_updated_at(market.get('updated_at') if market else None)}",
+ "",
+ "Execution",
+ f"Source · {execution_source or '—'}",
+ f"Quote age · {_format_age(execution_age)}",
+ f"Fresh · {_format_bool(execution_fresh)}",
+ ]
+
+
def _signal_line(state) -> str:
signal = state.last_signal or "HOLD"
@@ -225,4 +312,44 @@ def _format_signed_usd(value: float | int | None) -> str:
if amount < 0:
return f"🔴 −$ {_format_money_compact(abs(amount))}"
- return "$ 0"
\ No newline at end of file
+ return "$ 0"
+
+
+def _format_age(value: object) -> str:
+ if value is None:
+ return "—"
+
+ try:
+ age = max(0.0, float(value))
+ except (TypeError, ValueError):
+ return "—"
+
+ if age < 1:
+ return f"{age:.2f}с"
+
+ if age < 10:
+ return f"{age:.1f}с"
+
+ total_seconds = int(age)
+
+ hours = total_seconds // 3600
+ minutes = (total_seconds % 3600) // 60
+ seconds = total_seconds % 60
+
+ if hours > 0:
+ return f"{hours}ч {minutes:02d}м"
+
+ if minutes > 0:
+ return f"{minutes}м {seconds:02d}с"
+
+ return f"{seconds}с"
+
+
+def _format_bool(value: object) -> str:
+ if value is True:
+ return "yes"
+
+ if value is False:
+ return "no"
+
+ return "—"
\ No newline at end of file
diff --git a/app/src/trading/auto/runner.py b/app/src/trading/auto/runner.py
index 0d98e16..633675d 100644
--- a/app/src/trading/auto/runner.py
+++ b/app/src/trading/auto/runner.py
@@ -108,6 +108,10 @@ class AutoTradeRunner:
MarketDataRunner.start(
symbol_provider=lambda: service.get_state().symbol,
interval_seconds=1,
+ runtime_key="auto",
+ screen="auto",
+ action="market_data",
+ runtime_label="[AUTO]",
)
if cls._task is not None and not cls._task.done():
@@ -117,7 +121,7 @@ class AutoTradeRunner:
@classmethod
def stop(cls) -> None:
- MarketDataRunner.stop()
+ MarketDataRunner.stop("auto")
if cls._task is None:
return
@@ -134,7 +138,7 @@ class AutoTradeRunner:
if state.status == "OFF":
cls._task = None
- MarketDataRunner.stop()
+ MarketDataRunner.stop("auto")
break
service.run_cycle()
@@ -346,6 +350,7 @@ class AutoTradeRunner:
f"{payload.get('risk_reason')}:"
f"{payload.get('is_forced')}:"
)
+
@classmethod
def _build_execution_alert_text(
cls,
@@ -416,7 +421,7 @@ class AutoTradeRunner:
f"New size: {new_size}\n\n"
f"PnL: {pnl}"
)
-
+
return "📄 Paper execution event"
@classmethod
diff --git a/app/src/trading/auto/service.py b/app/src/trading/auto/service.py
index 1e3f9b8..b2b5404 100644
--- a/app/src/trading/auto/service.py
+++ b/app/src/trading/auto/service.py
@@ -157,6 +157,9 @@ class AutoTradeService:
return state, "Автоторговля активирована."
state.status = "RUNNING"
+ self._reset_signal_tracking()
+ state.last_signal = "HOLD"
+ state.signal_started_at = time.monotonic()
EventBus.emit(
"auto_status_changed",
{
diff --git a/app/src/trading/debug/runner.py b/app/src/trading/debug/runner.py
index b3ef341..6e2f958 100644
--- a/app/src/trading/debug/runner.py
+++ b/app/src/trading/debug/runner.py
@@ -9,6 +9,7 @@ from typing import Callable
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
+from src.integrations.exchange.market_data_runner import MarketDataRunner
from src.trading.debug.service import DebugTradeService
@@ -24,6 +25,8 @@ class DebugTradeRunner:
_current_screen: str | None = None
_interval_seconds = 5
+ _market_interval_seconds = 1
+
_last_text: str | None = None
_last_refresh_at: float = 0.0
_retry_after_until: float = 0.0
@@ -77,9 +80,21 @@ class DebugTradeRunner:
@classmethod
def start(cls) -> None:
- state = DebugTradeService().get_state()
+ service = DebugTradeService()
+ state = service.get_state()
state.status = "RUNNING"
+ MarketDataRunner.start(
+ symbol_provider=lambda: DebugTradeService().get_state().symbol,
+ interval_seconds=cls._market_interval_seconds,
+ runtime_key="debug_auto",
+ screen="debug_auto",
+ action="market_data",
+ runtime_label="[DEBUG]",
+ )
+
+ cls._last_text = None
+
if cls._task is not None and not cls._task.done():
return
@@ -87,6 +102,8 @@ class DebugTradeRunner:
@classmethod
def stop(cls) -> None:
+ MarketDataRunner.stop("debug_auto")
+
if cls._task is None:
return
@@ -102,6 +119,7 @@ class DebugTradeRunner:
if state.status == "OFF":
cls._task = None
+ MarketDataRunner.stop("debug_auto")
break
service.process()
diff --git a/app/src/trading/execution/engine.py b/app/src/trading/execution/engine.py
index fd45141..3eb2419 100644
--- a/app/src/trading/execution/engine.py
+++ b/app/src/trading/execution/engine.py
@@ -3,6 +3,7 @@
from __future__ import annotations
import math
+from dataclasses import dataclass
from datetime import datetime
from src.core.event_bus import EventBus
@@ -13,6 +14,15 @@ from src.trading.journal.service import JournalService
from src.trading.position.state import PositionState
+@dataclass(slots=True)
+class _ExecutionPrice:
+ price: float
+ source: str
+ age_seconds: float | None
+ updated_at: str
+ pricing_role: str
+
+
class ExecutionEngine:
_position = PositionState()
_size_precision = 5
@@ -60,7 +70,8 @@ class ExecutionEngine:
return ExecutionDecision("NONE", False, "Позиция уже открыта.")
try:
- entry_price = self._entry_price_for_side(state.symbol, side)
+ entry = self._entry_price_for_side(state.symbol, side)
+ entry_price = entry.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Не удалось получить цену для paper execution: {exc}")
@@ -116,6 +127,10 @@ class ExecutionEngine:
"reason": state.last_signal_reason,
"opened_at": now,
"pricing": "ask_for_long_bid_for_short",
+ "pricing_role": entry.pricing_role,
+ "price_source": entry.source,
+ "price_age_seconds": entry.age_seconds,
+ "price_updated_at": entry.updated_at,
}
JournalService().log_ui_info(
@@ -142,8 +157,10 @@ class ExecutionEngine:
return ExecutionDecision("NONE", False, "Нет направления для flip.")
try:
- exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
- new_entry_price = self._entry_price_for_side(state.symbol, new_side)
+ exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ entry_execution = self._entry_price_for_side(state.symbol, new_side)
+ exit_price = exit_execution.price
+ new_entry_price = entry_execution.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для flip: {exc}")
@@ -218,6 +235,14 @@ class ExecutionEngine:
"closed_at": now,
"new_opened_at": now,
"pricing": "exit_by_side_then_entry_by_side",
+ "exit_pricing_role": exit_execution.pricing_role,
+ "exit_price_source": exit_execution.source,
+ "exit_price_age_seconds": exit_execution.age_seconds,
+ "exit_price_updated_at": exit_execution.updated_at,
+ "entry_pricing_role": entry_execution.pricing_role,
+ "entry_price_source": entry_execution.source,
+ "entry_price_age_seconds": entry_execution.age_seconds,
+ "entry_price_updated_at": entry_execution.updated_at,
}
JournalService().log_ui_info(
@@ -243,6 +268,7 @@ class ExecutionEngine:
forced_reason: str | None = None,
forced_exit_price: float | None = None,
forced_pnl: float | None = None,
+ forced_price_meta: _ExecutionPrice | None = None,
) -> ExecutionDecision:
position = type(self)._position
@@ -252,9 +278,11 @@ class ExecutionEngine:
if forced_exit_price is not None:
exit_price = forced_exit_price
+ exit_execution = forced_price_meta
else:
try:
- exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ exit_price = exit_execution.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для закрытия: {exc}")
@@ -283,6 +311,10 @@ class ExecutionEngine:
"opened_at": position.opened_at,
"closed_at": now,
"pricing": "bid_for_long_exit_ask_for_short_exit",
+ "pricing_role": exit_execution.pricing_role if exit_execution else None,
+ "price_source": exit_execution.source if exit_execution else None,
+ "price_age_seconds": exit_execution.age_seconds if exit_execution else None,
+ "price_updated_at": exit_execution.updated_at if exit_execution else None,
}
JournalService().log_ui_info(
@@ -318,7 +350,8 @@ class ExecutionEngine:
return None
try:
- current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ current_price = current_execution.price
except Exception:
return None
@@ -331,6 +364,7 @@ class ExecutionEngine:
forced_reason="MAX_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
+ forced_price_meta=current_execution,
)
if self._is_stop_loss_hit(state, price_move_percent):
@@ -339,6 +373,7 @@ class ExecutionEngine:
forced_reason="STOP_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
+ forced_price_meta=current_execution,
)
if self._is_take_profit_hit(state, price_move_percent):
@@ -347,6 +382,7 @@ class ExecutionEngine:
forced_reason="TAKE_PROFIT",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
+ forced_price_meta=current_execution,
)
return None
@@ -412,7 +448,8 @@ class ExecutionEngine:
return
try:
- current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
+ current_price = current_execution.price
except Exception:
self._sync_state_from_position(state)
return
@@ -438,7 +475,7 @@ class ExecutionEngine:
if price is None:
try:
- price = self._signal_entry_price(state)
+ price = self._signal_entry_price(state).price
except Exception:
return 0.0
@@ -487,7 +524,7 @@ class ExecutionEngine:
state.execution_size_adjustment_reason = "MARGIN_LIMIT"
return self._round_size(max_size)
- def _signal_entry_price(self, state: AutoTradeState) -> float:
+ def _signal_entry_price(self, state: AutoTradeState) -> _ExecutionPrice:
if state.last_signal == "BUY":
return self._entry_price_for_side(state.symbol, "LONG")
@@ -496,50 +533,83 @@ class ExecutionEngine:
return self._market_last_price(state.symbol)
- def _entry_price_for_side(self, symbol: str, side: str) -> float:
- snapshot = ExchangeService().get_market_snapshot(symbol)
+ def _entry_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice:
+ snapshot = ExchangeService().get_execution_snapshot(symbol)
if side == "LONG":
- return self._snapshot_price(snapshot, "ask_price", "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.ask_price, "ask_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="LONG_ENTRY_ASK",
+ )
if side == "SHORT":
- return self._snapshot_price(snapshot, "bid_price", "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.bid_price, "bid_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="SHORT_ENTRY_BID",
+ )
- return self._snapshot_price(snapshot, "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.last_price, "last_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="ENTRY_LAST",
+ )
- def _exit_price_for_side(self, symbol: str, side: str) -> float:
- snapshot = ExchangeService().get_market_snapshot(symbol)
+ def _exit_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice:
+ snapshot = ExchangeService().get_execution_snapshot(symbol)
if side == "LONG":
- return self._snapshot_price(snapshot, "bid_price", "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.bid_price, "bid_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="LONG_EXIT_BID",
+ )
if side == "SHORT":
- return self._snapshot_price(snapshot, "ask_price", "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.ask_price, "ask_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="SHORT_EXIT_ASK",
+ )
- return self._snapshot_price(snapshot, "last_price")
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.last_price, "last_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="EXIT_LAST",
+ )
- def _market_last_price(self, symbol: str) -> float:
- snapshot = ExchangeService().get_market_snapshot(symbol)
- return self._snapshot_price(snapshot, "last_price")
+ def _market_last_price(self, symbol: str) -> _ExecutionPrice:
+ snapshot = ExchangeService().get_execution_snapshot(symbol)
- def _snapshot_price(
- self,
- snapshot: dict[str, object],
- primary_key: str,
- fallback_key: str | None = None,
- ) -> float:
- raw_price = snapshot.get(primary_key)
-
- if raw_price is None and fallback_key is not None:
- raw_price = snapshot.get(fallback_key)
+ return _ExecutionPrice(
+ price=self._snapshot_price(snapshot.last_price, "last_price"),
+ source=snapshot.source,
+ age_seconds=snapshot.age_seconds,
+ updated_at=snapshot.updated_at,
+ pricing_role="MARKET_LAST",
+ )
+ def _snapshot_price(self, raw_price: object, name: str) -> float:
if raw_price is None:
- raise ValueError(f"Market snapshot price '{primary_key}' is missing.")
+ raise ValueError(f"Execution snapshot price '{name}' is missing.")
price = float(raw_price)
if price <= 0:
- raise ValueError(f"Market snapshot price '{primary_key}' is invalid: {price}")
+ raise ValueError(f"Execution snapshot price '{name}' is invalid: {price}")
return price
diff --git a/docs/roadmap/master-roadmap.md b/docs/roadmap/master-roadmap.md
index fb47506..bf5dad4 100644
--- a/docs/roadmap/master-roadmap.md
+++ b/docs/roadmap/master-roadmap.md
@@ -290,6 +290,18 @@
- ordinary 🤖 Автоторговля screen remains unchanged by debug commands
- preparation for production execution pricing layer
+#### 07.4.3.16 — Production Execution Pricing Layer
+
+- added isolated runtime market caches
+- separated AUTO and DEBUG websocket pricing
+- added execution snapshot layer
+- added freshness-aware execution pricing
+- implemented websocket-first market sourcing
+- added REST fallback pricing pipeline
+- fixed signal timer reset after START
+- removed shared market cache collisions
+- stabilized AUTO/DEBUG UI market rendering
+
### 07.4.4
⏳ Grid Strategy
diff --git a/docs/roadmap/stage-07-auto-trading-roadmap.md b/docs/roadmap/stage-07-auto-trading-roadmap.md
index 82d07c1..73c8840 100644
--- a/docs/roadmap/stage-07-auto-trading-roadmap.md
+++ b/docs/roadmap/stage-07-auto-trading-roadmap.md
@@ -276,6 +276,17 @@
- ordinary 🤖 Автоторговля screen remains unchanged by debug commands
- preparation for production execution pricing layer
+#### 07.4.3.16 — Production Execution Pricing Layer
+- added isolated runtime market caches
+- separated AUTO and DEBUG websocket pricing
+- added execution snapshot layer
+- added freshness-aware execution pricing
+- implemented websocket-first market sourcing
+- added REST fallback pricing pipeline
+- fixed signal timer reset after START
+- removed shared market cache collisions
+- stabilized AUTO/DEBUG UI market rendering
+
---
### 07.4.4
diff --git a/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md b/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md
new file mode 100644
index 0000000..dbfb893
--- /dev/null
+++ b/docs/stages/stage-07_4_3_16-production_execution_pricing_layer.md
@@ -0,0 +1,196 @@
+# 07.4.3.16 — Production Execution Pricing Layer
+
+## Overview
+
+Этап посвящён построению production-grade pricing layer для автоторговли.
+
+Основная цель:
+- полностью разделить execution pricing между AUTO и DEBUG runtime
+- перевести pricing pipeline на websocket-first архитектуру
+- устранить конфликты shared cache
+- стабилизировать execution snapshot layer
+- обеспечить корректную freshness validation
+- убрать скачки источников market data
+- стабилизировать signal timer lifecycle
+
+---
+
+# Основные проблемы до этапа
+
+## 1. Shared market cache
+
+AUTO и DEBUG runtime использовали общий MarketPriceCache.
+
+Последствия:
+- runtime перетирали друг другу market snapshot
+- execution source постоянно переключался
+- UI показывал нестабильные данные
+- HOLD timer визуально сбрасывался
+
+---
+
+## 2. REST dominance
+
+Даже при активном websocket execution layer периодически переходил на REST fallback.
+
+Последствия:
+- execution source прыгал между websocket и REST
+- pricing становился нестабильным
+- execution snapshot терял консистентность
+
+---
+
+## 3. Signal lifecycle instability
+
+После Start/Stop signal_started_at не всегда сбрасывался корректно.
+
+Последствия:
+- HOLD timer стартовал не с 0с
+- UI показывал старый lifecycle сигнала
+
+---
+
+# Архитектурные изменения
+
+## 1. Runtime-separated pricing
+
+Введено разделение runtime:
+
+- AUTO runtime
+- DEBUG runtime
+
+Каждый runtime:
+- имеет собственный websocket lifecycle
+- имеет собственный market cache context
+- имеет собственный execution snapshot source
+
+---
+
+## 2. WebSocket-first pricing
+
+Execution layer теперь работает по модели:
+
+1. websocket cache
+2. freshness validation
+3. REST fallback only if required
+
+REST больше не является primary source.
+
+---
+
+## 3. Execution Snapshot Layer
+
+Добавлен execution snapshot pipeline:
+
+- last_price
+- bid_price
+- ask_price
+- source
+- freshness
+- quote age
+
+Execution snapshot используется для:
+- ENTRY
+- EXIT
+- FLIP
+- SL
+- TP
+- ML
+
+---
+
+## 4. Freshness validation
+
+Добавлена проверка freshness market snapshot:
+
+- age_seconds
+- execution freshness
+- cache lifetime validation
+
+Execution pricing больше не использует stale quote.
+
+---
+
+## 5. Runtime-aware market sourcing
+
+В pricing pipeline добавлен runtime context:
+
+- auto
+- debug_auto
+
+Это устранило shared cache collisions.
+
+---
+
+# UI Improvements
+
+## HOLD timer reset
+
+После Start:
+- signal_started_at корректно сбрасывается
+- HOLD timer стартует с 0с
+
+---
+
+## Stable market rendering
+
+UI больше не прыгает между:
+- websocket
+- REST fallback
+- stale cache
+
+---
+
+## Stable execution source
+
+Execution source стабилизирован:
+- ws_depth:auto
+- ws_depth:debug_auto
+
+REST fallback используется только как backup.
+
+---
+
+# Production Result
+
+После завершения этапа система получила:
+
+- production-grade execution pricing
+- websocket-first pricing architecture
+- isolated runtime execution
+- stable market snapshot lifecycle
+- freshness-aware execution pipeline
+- stable AUTO/DEBUG separation
+- deterministic signal lifecycle
+
+---
+
+# Финальное состояние архитектуры
+
+AUTO runtime:
+- own websocket
+- own pricing cache
+- own execution source
+
+DEBUG runtime:
+- own websocket
+- own pricing cache
+- own execution source
+
+REST:
+- fallback only
+
+Execution:
+- freshness-aware
+- runtime-isolated
+- websocket-first
+
+---
+
+# Stage Status
+
+Статус этапа:
+- COMPLETED
+
+Этап:
+- 07.4.3.16 — Production Execution Pricing Layer