From 7c8895c3a53132fd1d4fb7d8f794cdf5682e3bfb Mon Sep 17 00:00:00 2001 From: Sergey Date: Wed, 29 Apr 2026 21:40:25 +0300 Subject: [PATCH] =?UTF-8?q?Stage=2007.3.5=20=E2=80=94=20WebSocket=20Probe?= =?UTF-8?q?=20+=20REST=20Fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/requirements.txt | 3 +- app/src/bootstrap/app_factory.py | 18 + app/src/core/config.py | 51 ++- app/src/integrations/exchange/market_cache.py | 52 +++ .../integrations/exchange/market_stream.py | 109 ++++++ .../integrations/exchange/private_client.py | 2 + app/src/integrations/exchange/rest_client.py | 2 + app/src/integrations/exchange/service.py | 24 ++ app/src/integrations/exchange/ws_client.py | 64 ++++ app/src/main.py | 17 +- app/src/telegram/handlers/journal.py | 24 +- app/src/telegram/handlers/market.py | 15 +- app/src/telegram/handlers/monitoring.py | 40 ++- app/src/telegram/handlers/portfolio.py | 11 +- app/src/telegram/live/runner.py | 52 +++ app/tools/ws_probe.py | 149 ++++++++ ...7_3_5-websocket-probe-and-rest-fallback.md | 319 ++++++++++++++++++ 17 files changed, 934 insertions(+), 18 deletions(-) create mode 100644 app/src/integrations/exchange/market_cache.py create mode 100644 app/src/integrations/exchange/market_stream.py create mode 100644 app/src/integrations/exchange/ws_client.py create mode 100644 app/tools/ws_probe.py create mode 100644 docs/stages/stage-07_3_5-websocket-probe-and-rest-fallback.md diff --git a/app/requirements.txt b/app/requirements.txt index 7c70f68..73bf5f6 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -3,4 +3,5 @@ aiogram==3.13.1 python-dotenv==1.0.1 psycopg[binary]==3.2.9 -openpyxl==3.1.5 \ No newline at end of file +openpyxl==3.1.5 +websockets==13.1 \ No newline at end of file diff --git a/app/src/bootstrap/app_factory.py b/app/src/bootstrap/app_factory.py index 24bc11d..bbe0d7b 100644 --- a/app/src/bootstrap/app_factory.py +++ b/app/src/bootstrap/app_factory.py @@ -1,3 +1,5 @@ +# app/src/bootstrap/app_factory.py + from __future__ import annotations from aiogram import Bot, Dispatcher @@ -11,11 +13,16 @@ from src.trading.journal.service import JournalService def create_app() -> tuple[Bot, Dispatcher]: + # загружаем настройки приложения settings = load_settings() + + # настраиваем logging setup_logging(settings.log_level) + # сервис журнала journal = JournalService() + # инициализация схемы БД try: init_schema() except Exception as exc: @@ -33,6 +40,7 @@ def create_app() -> tuple[Bot, Dispatcher]: pass raise + # лог старта приложения try: journal.log_info( "app_start", @@ -47,11 +55,21 @@ def create_app() -> tuple[Bot, Dispatcher]: # журнал не должен ломать запуск приложения pass + # здесь позже можно инициализировать stream/cache сервисы: + # init_market_cache() + # init_market_stream() + # init_auto_trade_runner() + + # создаем Telegram Bot bot = Bot( token=settings.bot_token, default=DefaultBotProperties(parse_mode=settings.bot_parse_mode), ) + + # создаем Dispatcher dispatcher = Dispatcher() + + # подключаем routers setup_routers(dispatcher) return bot, dispatcher \ No newline at end of file diff --git a/app/src/core/config.py b/app/src/core/config.py index 97d4598..e87a28f 100644 --- a/app/src/core/config.py +++ b/app/src/core/config.py @@ -1,61 +1,106 @@ +# app/src/core/config.py + from __future__ import annotations + import os from dataclasses import dataclass from pathlib import Path + from dotenv import load_dotenv + + +# корень проекта BASE_DIR = Path(__file__).resolve().parents[2] + +# .env файл ENV_FILE = BASE_DIR / ".env" + +# загружаем переменные окружения load_dotenv(ENV_FILE) + + @dataclass(slots=True) class Settings: + # Telegram bot_token: str bot_parse_mode: str + + # App app_env: str log_level: str tz: str + + # Exchange exchange_enabled: bool exchange_name: str exchange_base_url: str + exchange_ws_url: str exchange_api_key: str exchange_api_secret: str exchange_timeout_sec: int exchange_testnet: bool default_symbol: str + + # Database db_host: str db_port: int db_name: str db_user: str db_password: str -def is_demo_mode(self) -> bool: - return "demo" in self.exchange_base_url.lower() + + # helper: demo/live mode + def is_demo_mode(self) -> bool: + return "demo" in self.exchange_base_url.lower() + + +# parse bool def _parse_bool(raw_value: str, default: bool = False) -> bool: value = (raw_value or "").strip().lower() if not value: return default + return value in {"1", "true", "yes", "on"} + + +# parse int def _parse_int(raw_value: str, default: int) -> int: value = (raw_value or "").strip() if not value: return default + return int(value) + + +# load all settings def load_settings() -> Settings: bot_token = os.getenv("BOT_TOKEN", "").strip() + if not bot_token: raise RuntimeError("BOT_TOKEN is not set in app/.env") + return Settings( + # Telegram bot_token=bot_token, bot_parse_mode=os.getenv("BOT_PARSE_MODE", "HTML").strip() or "HTML", + + # App app_env=os.getenv("APP_ENV", "dev").strip() or "dev", log_level=os.getenv("LOG_LEVEL", "INFO").strip().upper() or "INFO", tz=os.getenv("TZ", "Europe/Minsk").strip() or "Europe/Minsk", + + # Exchange exchange_enabled=_parse_bool(os.getenv("EXCHANGE_ENABLED", "false")), exchange_name=os.getenv("EXCHANGE_NAME", "dzengi").strip() or "dzengi", exchange_base_url=os.getenv("EXCHANGE_BASE_URL", "").strip(), + exchange_ws_url=os.getenv("EXCHANGE_WS_URL", "").strip(), exchange_api_key=os.getenv("EXCHANGE_API_KEY", "").strip(), exchange_api_secret=os.getenv("EXCHANGE_API_SECRET", "").strip(), exchange_timeout_sec=_parse_int(os.getenv("EXCHANGE_TIMEOUT_SEC", "10"), 10), exchange_testnet=_parse_bool(os.getenv("EXCHANGE_TESTNET", "false")), - default_symbol=os.getenv("DEFAULT_SYMBOL", "BTC/USD_LEVERAGE").strip() or "BTC/USD_LEVERAGE", + default_symbol=os.getenv("DEFAULT_SYMBOL", "BTC/USD_LEVERAGE").strip() + or "BTC/USD_LEVERAGE", + + # Database db_host=os.getenv("DB_HOST", "localhost").strip() or "localhost", db_port=_parse_int(os.getenv("DB_PORT", "5432"), 5432), db_name=os.getenv("DB_NAME", "dzentra_bot").strip() or "dzentra_bot", diff --git a/app/src/integrations/exchange/market_cache.py b/app/src/integrations/exchange/market_cache.py new file mode 100644 index 0000000..e7a6c60 --- /dev/null +++ b/app/src/integrations/exchange/market_cache.py @@ -0,0 +1,52 @@ +# app/src/integrations/exchange/market_cache.py + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from zoneinfo import ZoneInfo + +from src.core.config import load_settings + + +@dataclass(slots=True) +class MarketPriceSnapshot: + symbol: str + price: float + bid_price: float | None + ask_price: float | None + updated_at: str + source: str = "websocket" + + +class MarketPriceCache: + _prices: dict[str, MarketPriceSnapshot] = {} + + # сохранить последнюю цену + @classmethod + def set_price( + cls, + *, + symbol: str, + price: float, + bid_price: float | None = None, + ask_price: float | None = None, + updated_at: str | None = None, + ) -> None: + settings = load_settings() + + if updated_at is None: + updated_at = datetime.now(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S") + + cls._prices[symbol.upper()] = MarketPriceSnapshot( + symbol=symbol.upper(), + price=price, + bid_price=bid_price, + ask_price=ask_price, + updated_at=updated_at, + ) + + # получить последнюю цену + @classmethod + def get_price(cls, symbol: str) -> MarketPriceSnapshot | None: + return cls._prices.get(symbol.upper()) \ No newline at end of file diff --git a/app/src/integrations/exchange/market_stream.py b/app/src/integrations/exchange/market_stream.py new file mode 100644 index 0000000..33fdbbd --- /dev/null +++ b/app/src/integrations/exchange/market_stream.py @@ -0,0 +1,109 @@ +# app/src/integrations/exchange/market_stream.py + +from __future__ import annotations + +import asyncio +from datetime import datetime +from zoneinfo import ZoneInfo + +from src.core.config import load_settings +from src.integrations.exchange.market_cache import MarketPriceCache +from src.integrations.exchange.service import ExchangeService +from src.integrations.exchange.ws_client import ExchangeWebSocketClient +from src.trading.journal.service import JournalService + + +def _format_timestamp(raw_timestamp: object) -> str | None: + if raw_timestamp is None: + return None + + try: + settings = load_settings() + dt_utc = datetime.fromtimestamp(int(raw_timestamp) / 1000, tz=ZoneInfo("UTC")) + return dt_utc.astimezone(ZoneInfo(settings.tz)).strftime("%d.%m.%Y %H:%M:%S") + except Exception: + return None + + +def _extract_market_event(payload: dict) -> dict | None: + event = payload.get("Payload") or payload.get("payload") + + if isinstance(event, dict) and "Payload" in event: + event = event.get("Payload") + + if not isinstance(event, dict): + return None + + symbol = event.get("symbolName") or event.get("symbol") + bid = event.get("bid") + ask = event.get("ofr") or event.get("ask") + timestamp = event.get("timestamp") + + if symbol is None or bid is None or ask is None: + return None + + bid_price = float(bid) + ask_price = float(ask) + price = (bid_price + ask_price) / 2 + + return { + "symbol": str(symbol).upper(), + "price": price, + "bid_price": bid_price, + "ask_price": ask_price, + "updated_at": _format_timestamp(timestamp), + } + + +async def start_market_stream() -> None: + settings = load_settings() + journal = JournalService() + + if not settings.exchange_enabled: + return + + while True: + try: + service = ExchangeService() + validation = service.validate_symbol(settings.default_symbol) + + if not validation.is_valid: + await asyncio.sleep(10) + continue + + symbol = validation.normalized_symbol + client = ExchangeWebSocketClient() + + journal.log_info( + "market_ws_started", + "WebSocket market stream запущен.", + {"symbol": symbol}, + ) + + async for message in client.stream_depth(symbol): + event = _extract_market_event(message) + + if event is None: + continue + + MarketPriceCache.set_price( + symbol=symbol, + price=event["price"], + bid_price=event["bid_price"], + ask_price=event["ask_price"], + updated_at=event["updated_at"], + ) + + except asyncio.CancelledError: + raise + except Exception as exc: + try: + journal.log_warning( + "market_ws_reconnect", + f"WebSocket market stream будет переподключен: {exc}", + {"raw_error": str(exc)}, + ) + except Exception: + pass + + await asyncio.sleep(5) \ No newline at end of file diff --git a/app/src/integrations/exchange/private_client.py b/app/src/integrations/exchange/private_client.py index b67a3ea..33985d7 100644 --- a/app/src/integrations/exchange/private_client.py +++ b/app/src/integrations/exchange/private_client.py @@ -1,3 +1,5 @@ +# app/src/integrations/exchange/private_client.py + from __future__ import annotations from src.core.config import load_settings diff --git a/app/src/integrations/exchange/rest_client.py b/app/src/integrations/exchange/rest_client.py index bea9d9e..eba5261 100644 --- a/app/src/integrations/exchange/rest_client.py +++ b/app/src/integrations/exchange/rest_client.py @@ -1,3 +1,5 @@ +# app/src/integrations/exchange/rest_client.py + from __future__ import annotations import json diff --git a/app/src/integrations/exchange/service.py b/app/src/integrations/exchange/service.py index 6c68d54..334094f 100644 --- a/app/src/integrations/exchange/service.py +++ b/app/src/integrations/exchange/service.py @@ -25,6 +25,7 @@ from src.integrations.exchange.private_client import ExchangePrivateClient from src.integrations.exchange.rest_client import ExchangeRestClient from src.integrations.exchange.symbol_utils import normalize_symbol, symbol_candidates from src.trading.journal.service import JournalService +from src.integrations.exchange.market_cache import MarketPriceCache class ExchangeService: @@ -191,6 +192,7 @@ class ExchangeService: message=f"Private API OK. Балансов получено: {len(balances)}", ) + # получить цену инструмента: сначала WebSocket cache, потом REST fallback def get_price(self, symbol: str | None = None) -> TickerPrice: symbol_to_use = symbol or self.settings.default_symbol @@ -201,8 +203,19 @@ class ExchangeService: if not validation.is_valid: raise ExchangeError(validation.message) + cached_price = MarketPriceCache.get_price(validation.normalized_symbol) + + if cached_price is not None: + return TickerPrice( + symbol=cached_price.symbol, + price=cached_price.price, + source=cached_price.source, + updated_at=cached_price.updated_at, + ) + return self._get_real_price(validation.normalized_symbol) + # получить market snapshot: сначала WebSocket cache, потом REST fallback def get_market_snapshot(self, symbol: str | None = None) -> dict[str, object]: symbol_to_use = symbol or self.settings.default_symbol @@ -220,6 +233,17 @@ class ExchangeService: if not validation.is_valid: raise ExchangeError(validation.message) + cached_price = MarketPriceCache.get_price(validation.normalized_symbol) + + if cached_price is not None: + return { + "symbol": cached_price.symbol, + "last_price": cached_price.price, + "bid_price": cached_price.bid_price or cached_price.price, + "ask_price": cached_price.ask_price or cached_price.price, + "updated_at": cached_price.updated_at, + } + client = ExchangeRestClient() try: diff --git a/app/src/integrations/exchange/ws_client.py b/app/src/integrations/exchange/ws_client.py new file mode 100644 index 0000000..b2d5387 --- /dev/null +++ b/app/src/integrations/exchange/ws_client.py @@ -0,0 +1,64 @@ +# app/src/integrations/exchange/ws_client.py + +from __future__ import annotations + +import json +from typing import AsyncIterator +from urllib.parse import urlencode + +import websockets + +from src.core.config import load_settings + + +class ExchangeWebSocketClient: + def __init__(self) -> None: + self.settings = load_settings() + self.base_url = self._build_ws_base_url() + + # собрать базовый websocket url + def _build_ws_base_url(self) -> str: + raw_url = self.settings.exchange_ws_url or self.settings.exchange_base_url + + if raw_url.startswith("https://"): + return raw_url.replace("https://", "wss://", 1).rstrip("/") + + if raw_url.startswith("http://"): + return raw_url.replace("http://", "ws://", 1).rstrip("/") + + return raw_url.rstrip("/") + + # читать стакан по websocket + async def stream_depth(self, symbol: str) -> AsyncIterator[dict]: + url = f"{self.base_url}/api/v2/depth" + + headers = { + "Origin": self.settings.exchange_base_url.rstrip("/"), + "Content-Type": "application/json", + } + + if self.settings.exchange_api_key: + headers["X-MBX-APIKEY"] = self.settings.exchange_api_key + + request = { + "limit": 5, + "symbol": symbol, + } + + async with websockets.connect( + url, + extra_headers=headers, + subprotocols=["json"], + ping_interval=20, + open_timeout=self.settings.exchange_timeout_sec, + ) as websocket: + await websocket.send(json.dumps(request)) + + async for raw_message in websocket: + try: + payload = json.loads(raw_message) + except json.JSONDecodeError: + continue + + if isinstance(payload, dict): + yield payload \ No newline at end of file diff --git a/app/src/main.py b/app/src/main.py index 8c41cb3..aecec1b 100644 --- a/app/src/main.py +++ b/app/src/main.py @@ -1,12 +1,27 @@ +# app/src/main.py + import asyncio from src.bootstrap.app_factory import create_app async def main() -> None: + # создаём bot + dispatcher bot, dispatcher = create_app() + + # WebSocket stream временно отключён. + # Причина: Dzengi Swagger содержит wss:/api/v2/* endpoints, + # но runtime probe не нашёл endpoint с WebSocket Upgrade 101. + # + # Когда Dzengi подтвердит рабочий WS endpoint, + # можно будет вернуть запуск: + # + # from src.integrations.exchange.market_stream import start_market_stream + # market_stream_task = asyncio.create_task(start_market_stream()) + + # запускаем Telegram polling await dispatcher.start_polling(bot) if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/app/src/telegram/handlers/journal.py b/app/src/telegram/handlers/journal.py index 0ac1ca1..f280952 100644 --- a/app/src/telegram/handlers/journal.py +++ b/app/src/telegram/handlers/journal.py @@ -15,7 +15,7 @@ from src.telegram.handlers.journal_ui import ( build_actions_keyboard, render_actions, ) -from src.telegram.live.runner import ScreenRegistry, StaticScreen +from src.telegram.live.runner import LiveScreenRunner, ScreenRegistry, StaticScreen from src.trading.journal.service import JournalService from src.trading.auto.runner import AutoTradeRunner @@ -61,6 +61,16 @@ async def _show_journal_page( if edit_mode: await target_message.edit_text(text, reply_markup=kb) + + LiveScreenRunner.unregister_message( + chat_id=target_message.chat.id, + message_id=target_message.message_id, + ) + ScreenRegistry.unregister_message( + chat_id=target_message.chat.id, + message_id=target_message.message_id, + ) + ScreenRegistry.register_screen( StaticScreen( screen="journal", @@ -71,6 +81,16 @@ async def _show_journal_page( ) else: sent_message = await target_message.answer(text, reply_markup=kb) + + LiveScreenRunner.unregister_message( + chat_id=sent_message.chat.id, + message_id=sent_message.message_id, + ) + ScreenRegistry.unregister_message( + chat_id=sent_message.chat.id, + message_id=sent_message.message_id, + ) + ScreenRegistry.register_screen( StaticScreen( screen="journal", @@ -151,7 +171,7 @@ async def open_journal_from_monitoring(callback: CallbackQuery, state: FSMContex ) await callback.answer() - + @router.callback_query(F.data == "journal:noop") async def journal_noop(callback: CallbackQuery) -> None: await callback.answer() diff --git a/app/src/telegram/handlers/market.py b/app/src/telegram/handlers/market.py index f45d7ab..e2c2679 100644 --- a/app/src/telegram/handlers/market.py +++ b/app/src/telegram/handlers/market.py @@ -9,7 +9,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder from src.integrations.exchange.exceptions import ExchangeError from src.integrations.exchange.service import ExchangeService -from src.telegram.live.runner import LiveScreen, LiveScreenRunner +from src.telegram.live.runner import LiveScreen, LiveScreenRunner, ScreenRegistry from src.telegram.ui.common import mode_line, now_line from src.telegram.ui.currency_ui import format_usd_amount from src.telegram.ui.exchange_error import ( @@ -48,9 +48,9 @@ def _build_market_text( if previous_price is not None: if ticker_price > previous_price: - price_direction = "▲" + price_direction = "🔺" elif ticker_price < previous_price: - price_direction = "▼" + price_direction = "🔻" _last_market_prices[name] = ticker_price _last_market_directions[name] = price_direction @@ -105,6 +105,15 @@ def _build_market_live_text() -> str: # зарегистрировать сообщение как live-экран рынка def _register_market_live_screen(message: Message) -> None: + LiveScreenRunner.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + ScreenRegistry.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + LiveScreenRunner.register_screen( LiveScreen( screen="market", diff --git a/app/src/telegram/handlers/monitoring.py b/app/src/telegram/handlers/monitoring.py index 784465d..df835f8 100644 --- a/app/src/telegram/handlers/monitoring.py +++ b/app/src/telegram/handlers/monitoring.py @@ -7,6 +7,7 @@ from aiogram.fsm.context import FSMContext from aiogram.types import CallbackQuery, InlineKeyboardMarkup, Message from aiogram.utils.keyboard import InlineKeyboardBuilder +from src.telegram.live.runner import LiveScreenRunner, ScreenRegistry, StaticScreen from src.trading.auto.runner import AutoTradeRunner @@ -31,17 +32,46 @@ def _monitoring_text() -> str: ) +# зарегистрировать сообщение как статичный экран мониторинга +def _register_monitoring_screen(message: Message) -> None: + LiveScreenRunner.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + ScreenRegistry.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + + ScreenRegistry.register_screen( + StaticScreen( + screen="monitoring", + bot=message.bot, + chat_id=message.chat.id, + message_id=message.message_id, + ) + ) + + # открыть мониторинг из главного меню @router.message(F.text == "📊 Мониторинг") async def open_monitoring(message: Message, state: FSMContext) -> None: await state.clear() AutoTradeRunner.set_current_screen("monitoring") - await message.answer( + await ScreenRegistry.delete_screen( + screen="monitoring", + bot=message.bot, + chat_id=message.chat.id, + ) + + sent_message = await message.answer( _monitoring_text(), reply_markup=_monitoring_keyboard(), ) + _register_monitoring_screen(sent_message) + # вернуться на экран мониторинга из callback @router.callback_query(F.data == "monitoring:home") @@ -57,10 +87,6 @@ async def open_monitoring_callback(callback: CallbackQuery, state: FSMContext) - _monitoring_text(), reply_markup=_monitoring_keyboard(), ) - await callback.answer() - -# переход к портфелю из мониторинга - - -# переход к рынку из мониторинга \ No newline at end of file + _register_monitoring_screen(callback.message) + await callback.answer() \ No newline at end of file diff --git a/app/src/telegram/handlers/portfolio.py b/app/src/telegram/handlers/portfolio.py index e52bb3b..a2a7fd2 100644 --- a/app/src/telegram/handlers/portfolio.py +++ b/app/src/telegram/handlers/portfolio.py @@ -10,7 +10,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder from src.integrations.exchange.exceptions import ExchangeError from src.integrations.exchange.models import BalanceSummary from src.integrations.exchange.service import ExchangeService -from src.telegram.live.runner import LiveScreen, LiveScreenRunner +from src.telegram.live.runner import LiveScreen, LiveScreenRunner, ScreenRegistry from src.telegram.ui.common import mode_line, now_line from src.telegram.ui.currency_ui import format_usd_amount from src.telegram.ui.currency_ui import ( @@ -197,6 +197,15 @@ def _portfolio_live_markup() -> InlineKeyboardMarkup: # зарегистрировать сообщение как live-экран портфеля def _register_portfolio_live_screen(message: Message) -> None: + LiveScreenRunner.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + ScreenRegistry.unregister_message( + chat_id=message.chat.id, + message_id=message.message_id, + ) + LiveScreenRunner.register_screen( LiveScreen( screen="portfolio", diff --git a/app/src/telegram/live/runner.py b/app/src/telegram/live/runner.py index d0814bb..97ff485 100644 --- a/app/src/telegram/live/runner.py +++ b/app/src/telegram/live/runner.py @@ -67,6 +67,32 @@ class ScreenRegistry: ] screens.append(static_screen) + + # удалить конкретное сообщение из всех статичных экранов без удаления из Telegram + @classmethod + def unregister_message( + cls, + *, + chat_id: int, + message_id: int, + ) -> None: + empty_screens: list[str] = [] + + for screen, screens in cls._screens.items(): + screens[:] = [ + item + for item in screens + if not ( + item.chat_id == chat_id + and item.message_id == message_id + ) + ] + + if not screens: + empty_screens.append(screen) + + for screen in empty_screens: + cls._screens.pop(screen, None) # удалить старые статичные экраны указанного типа @classmethod @@ -118,6 +144,32 @@ class LiveScreenRunner: ] screens.append(live_screen) + + # удалить конкретное сообщение из всех live-экранов без удаления из Telegram + @classmethod + def unregister_message( + cls, + *, + chat_id: int, + message_id: int, + ) -> None: + empty_screens: list[str] = [] + + for screen, screens in cls._screens.items(): + screens[:] = [ + item + for item in screens + if not ( + item.chat_id == chat_id + and item.message_id == message_id + ) + ] + + if not screens: + empty_screens.append(screen) + + for screen in empty_screens: + cls._screens.pop(screen, None) # удалить все live-экраны указанного типа из Telegram @classmethod diff --git a/app/tools/ws_probe.py b/app/tools/ws_probe.py new file mode 100644 index 0000000..0a021f9 --- /dev/null +++ b/app/tools/ws_probe.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import asyncio +import json +import os +import sys +from pathlib import Path + +import websockets +from dotenv import load_dotenv + + +BASE_DIR = Path(__file__).resolve().parents[1] +ENV_FILE = BASE_DIR / ".env" +load_dotenv(ENV_FILE) + +BASE_HTTP_URL = os.getenv("EXCHANGE_BASE_URL", "").strip().rstrip("/") +API_KEY = os.getenv("EXCHANGE_API_KEY", "").strip() +SYMBOL = os.getenv("DEFAULT_SYMBOL", "BTC/USD_LEVERAGE").strip() +TIMEOUT = int(os.getenv("EXCHANGE_TIMEOUT_SEC", "10")) + +FOUND = False + + +def to_ws_url(raw_url: str) -> str: + if raw_url.startswith("https://"): + return raw_url.replace("https://", "wss://", 1) + if raw_url.startswith("http://"): + return raw_url.replace("http://", "ws://", 1) + return raw_url + + +async def try_connect( + *, + label: str, + url: str, + headers: dict[str, str] | None = None, + subprotocols: list[str] | None = None, + send_payload: dict | None = None, +) -> None: + global FOUND + + if FOUND: + return + + print(f"\n=== {label} ===") + print(f"URL: {url}") + + try: + async with websockets.connect( + url, + extra_headers=headers or {}, + subprotocols=subprotocols, + ping_interval=20, + open_timeout=TIMEOUT, + ) as websocket: + print("CONNECTED: 101 Switching Protocols") + FOUND = True + + if send_payload is not None: + raw_payload = json.dumps(send_payload) + print(f"SEND: {raw_payload}") + await websocket.send(raw_payload) + + try: + message = await asyncio.wait_for(websocket.recv(), timeout=5) + print(f"RECV: {message}") + except asyncio.TimeoutError: + print("RECV: timeout after 5s") + + except Exception as exc: + print(f"FAILED: {type(exc).__name__}: {exc}") + + +async def main() -> None: + global FOUND + + if not BASE_HTTP_URL: + print("EXCHANGE_BASE_URL is empty") + sys.exit(1) + + base_ws = to_ws_url(BASE_HTTP_URL) + + payload = { + "limit": 5, + "symbol": SYMBOL, + } + + header_sets = [{}] + + if API_KEY: + header_sets.append({"X-MBX-APIKEY": API_KEY}) + + paths = [ + "/api/v2/depth", + "/api/v1/depth", + "/ws", + "/websocket", + ] + + query_variants = [ + "", + f"?symbol={SYMBOL}&limit=5", + ] + + subprotocol_variants = [ + None, + ["json"], + ] + + count = 0 + + for path in paths: + if FOUND: + break + + for query in query_variants: + if FOUND: + break + + url = f"{base_ws}{path}{query}" + + for headers in header_sets: + if FOUND: + break + + for subprotocols in subprotocol_variants: + if FOUND: + break + + send_payload = None if query else payload + count += 1 + + await try_connect( + label=f"probe #{count}", + url=url, + headers=headers, + subprotocols=subprotocols, + send_payload=send_payload, + ) + + if FOUND: + print("\nSUCCESS: working WebSocket endpoint found") + else: + print("\nFAILED: no WebSocket endpoint found") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/docs/stages/stage-07_3_5-websocket-probe-and-rest-fallback.md b/docs/stages/stage-07_3_5-websocket-probe-and-rest-fallback.md new file mode 100644 index 0000000..7f5d111 --- /dev/null +++ b/docs/stages/stage-07_3_5-websocket-probe-and-rest-fallback.md @@ -0,0 +1,319 @@ +# Stage 07.3.5 — WebSocket Probe + REST Fallback + +## Статус этапа + +Этап не переводит рынок на WebSocket в production-режим. + +В рамках этапа была выполнена проверка WebSocket API Dzengi и подготовлена архитектурная заготовка: + +- WebSocket client; +- market cache; +- market stream task; +- fallback через REST. + +Но реальный WebSocket endpoint не удалось подтвердить на runtime-уровне. + +--- + +## Что проверялось + +В Swagger есть группа `websocket-api`, где endpoints отображаются в формате: + +```text +wss:/api/v2/depth +wss:/api/v2/account +wss:/api/v2/aggTrades +wss:/api/v2/exchangeInfo +... +``` + +Для рынка был выбран endpoint: + +```text +wss:/api/v2/depth +``` + +Он описан как `orderBook`. + +Параметры запроса: + +```json +{ + "limit": 0, + "symbol": "string" +} +``` + +Для нашего случая: + +```json +{ + "limit": 5, + "symbol": "BTC/USD_LEVERAGE" +} +``` + +--- + +## Что означают параметры + +### symbol + +Инструмент, по которому нужно получить данные стакана. + +Пример: + +```text +BTC/USD_LEVERAGE +``` + +### limit + +Количество уровней стакана. + +Например: + +```json +"limit": 5 +``` + +означает получить 5 лучших уровней ask и 5 лучших уровней bid. + +--- + +## Проверенные варианты подключения + +Были проверены варианты: + +```text +/api/v2/depth +/api/v1/depth +/ws/api/v2/depth +/ws/api/v1/depth +/ws +/websocket +/stream +/api/v2/ws +/api/v1/ws +``` + +Также проверялись варианты: + +```text +без query parameters +?symbol=BTC/USD_LEVERAGE&limit=5 +?symbolName=BTC/USD_LEVERAGE&limit=5 +``` + +И варианты headers: + +```text +без headers +X-MBX-APIKEY +Origin +Content-Type: application/json +subprotocol: json +``` + +--- + +## Фактический результат + +Ни один вариант не вернул: + +```text +101 Switching Protocols +``` + +А именно `101 Switching Protocols` является признаком успешного WebSocket Upgrade. + +Фактические ответы: + +```text +HTTP 404 +HTTP 400 +HTTP 200 +``` + +--- + +## Интерпретация ошибок + +### HTTP 404 + +Endpoint не найден как WebSocket route. + +### HTTP 400 + +Сервер получил WebSocket handshake, но отклонил запрос как некорректный. + +### HTTP 200 + +Endpoint существует как обычный HTTP endpoint, но не выполняет WebSocket Upgrade. + +Это значит, что сервер отвечает как REST API, а не как WebSocket. + +--- + +## Вывод + +На текущих URL и по текущей Swagger-документации WebSocket endpoint Dzengi не подтверждён. + +Swagger показывает `wss:/api/v2/*`, но runtime-проверка не нашла endpoint, который реально открывает WebSocket-соединение. + +Поэтому рынок временно остаётся на REST polling через существующий `LiveScreenRunner`. + +--- + +## Что оставить в коде + +Можно оставить заготовки: + +```text +app/src/integrations/exchange/ws_client.py +app/src/integrations/exchange/market_cache.py +app/src/integrations/exchange/market_stream.py +app/tools/ws_probe.py +``` + +Они пригодятся, если Dzengi подтвердит настоящий WebSocket endpoint. + +Также можно оставить зависимость: + +```text +websockets==13.1 +``` + +--- + +## Что отключить сейчас + +Нужно отключить автозапуск WebSocket stream в `app/src/main.py`. + +### Было + +```python +import asyncio +from contextlib import suppress + +from src.bootstrap.app_factory import create_app +from src.integrations.exchange.market_stream import start_market_stream + + +async def main() -> None: + bot, dispatcher = create_app() + + market_stream_task = asyncio.create_task(start_market_stream()) + + try: + await dispatcher.start_polling(bot) + finally: + market_stream_task.cancel() + + with suppress(asyncio.CancelledError): + await market_stream_task + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +### Должно стать временно + +```python +import asyncio + +from src.bootstrap.app_factory import create_app + + +async def main() -> None: + # создаём bot + dispatcher + bot, dispatcher = create_app() + + # WebSocket stream временно отключён. + # Причина: Dzengi Swagger содержит wss:/api/v2/* endpoints, + # но runtime probe не нашёл endpoint с WebSocket Upgrade 101. + # + # Когда Dzengi подтвердит рабочий WS endpoint, + # можно будет вернуть запуск: + # + # from src.integrations.exchange.market_stream import start_market_stream + # market_stream_task = asyncio.create_task(start_market_stream()) + + # запускаем Telegram polling + await dispatcher.start_polling(bot) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +--- + +## Что НЕ нужно отключать + +Не нужно откатывать: + +- live-экран рынка; +- live-экран портфеля; +- `LiveScreenRunner`; +- REST polling; +- `ExchangeService.get_price()`; +- `ExchangeService.get_market_snapshot()`. + +REST fallback должен остаться рабочим. + +--- + +## Текущее поведение после отключения stream + +После отключения WebSocket task: + +- бот больше не спамит `market_ws_reconnect`; +- экран 📈 Рынок продолжает обновляться через REST polling; +- экран 💼 Портфель продолжает работать; +- архитектурная заготовка WebSocket остаётся в проекте. + +--- + +## Что нужно запросить у Dzengi / брокера + +Для продолжения WebSocket-интеграции нужен один из вариантов: + +1. настоящий WebSocket base URL; +2. пример рабочего подключения; +3. required headers; +4. required subprotocol; +5. пример handshake; +6. пример Python/JavaScript клиента; +7. подтверждение, что `wss:/api/v2/depth` действительно поддерживает WebSocket Upgrade. + +Ключевой вопрос: + +```text +Какой полный URL должен вернуть 101 Switching Protocols для market depth stream? +``` + +--- + +## Commit + +Рекомендуемый commit message: + +```bash +git add . +git commit -m "Stage 07.3.5 - websocket probe with REST fallback" +git push +``` + +--- + +## Следующий этап + +После фиксации 07.3.5 можно перейти к: + +```text +Stage 07.4 — Strategy Plugins +``` + +Потому что UI, мониторинг и REST fallback уже стабильны.