diff --git a/app/src/core/event_bus.py b/app/src/core/event_bus.py new file mode 100644 index 0000000..2dcb5ea --- /dev/null +++ b/app/src/core/event_bus.py @@ -0,0 +1,28 @@ +# app/src/core/event_bus.py + +from __future__ import annotations + +from typing import Any + + +class EventBus: + _version: int = 0 + _last_event_type: str | None = None + _last_payload: dict[str, Any] = {} + + # зафиксировать важное событие системы + @classmethod + def emit(cls, event_type: str, payload: dict[str, Any] | None = None) -> None: + cls._version += 1 + cls._last_event_type = event_type + cls._last_payload = payload or {} + + # текущая версия событий + @classmethod + def version(cls) -> int: + return cls._version + + # последнее событие + @classmethod + def last_event(cls) -> tuple[str | None, dict[str, Any]]: + return cls._last_event_type, dict(cls._last_payload) \ No newline at end of file diff --git a/app/src/integrations/exchange/market_cache.py b/app/src/integrations/exchange/market_cache.py index e7a6c60..007f794 100644 --- a/app/src/integrations/exchange/market_cache.py +++ b/app/src/integrations/exchange/market_cache.py @@ -16,7 +16,7 @@ class MarketPriceSnapshot: bid_price: float | None ask_price: float | None updated_at: str - source: str = "websocket" + source: str = "market-cache" class MarketPriceCache: @@ -32,6 +32,7 @@ class MarketPriceCache: bid_price: float | None = None, ask_price: float | None = None, updated_at: str | None = None, + source: str = "market-polling", ) -> None: settings = load_settings() @@ -44,6 +45,7 @@ class MarketPriceCache: bid_price=bid_price, ask_price=ask_price, updated_at=updated_at, + source=source, ) # получить последнюю цену diff --git a/app/src/integrations/exchange/market_data_runner.py b/app/src/integrations/exchange/market_data_runner.py new file mode 100644 index 0000000..d5c6a0c --- /dev/null +++ b/app/src/integrations/exchange/market_data_runner.py @@ -0,0 +1,56 @@ +# app/src/integrations/exchange/market_data_runner.py + +from __future__ import annotations + +import asyncio +from typing import Callable + +from src.integrations.exchange.service import ExchangeService + + +class MarketDataRunner: + _task: asyncio.Task | None = None + _interval_seconds: int = 1 + _symbol_provider: Callable[[], str | None] | None = None + + # запустить быстрый polling рыночной цены + @classmethod + def start( + cls, + *, + symbol_provider: Callable[[], str | None], + interval_seconds: int = 1, + ) -> None: + cls._symbol_provider = symbol_provider + cls._interval_seconds = interval_seconds + + if cls._task is not None and not cls._task.done(): + return + + cls._task = asyncio.create_task(cls._worker()) + + # остановить polling + @classmethod + def stop(cls) -> None: + if cls._task is None: + return + + cls._task.cancel() + cls._task = None + + # рабочий цикл polling + @classmethod + async def _worker(cls) -> None: + while True: + symbol = cls._symbol_provider() if cls._symbol_provider else None + + if symbol: + try: + await asyncio.to_thread( + ExchangeService().refresh_price_cache, + symbol, + ) + except Exception: + pass + + await asyncio.sleep(cls._interval_seconds) \ No newline at end of file diff --git a/app/src/integrations/exchange/service.py b/app/src/integrations/exchange/service.py index 334094f..0f9b348 100644 --- a/app/src/integrations/exchange/service.py +++ b/app/src/integrations/exchange/service.py @@ -29,6 +29,8 @@ from src.integrations.exchange.market_cache import MarketPriceCache class ExchangeService: + _exchange_symbols_cache: list[ExchangeSymbol] | None = None + def __init__(self) -> None: self.settings = load_settings() self.journal = JournalService() @@ -192,6 +194,35 @@ class ExchangeService: message=f"Private API OK. Балансов получено: {len(balances)}", ) + # принудительно обновить market cache через REST + def refresh_price_cache(self, symbol: str | None = None) -> TickerPrice: + symbol_to_use = symbol or self.settings.default_symbol + + if not self.settings.exchange_enabled: + ticker = mock_ticker_price(symbol_to_use) + MarketPriceCache.set_price( + symbol=ticker.symbol, + price=ticker.price, + updated_at=ticker.updated_at, + source=ticker.source, + ) + return ticker + + validation = self.validate_symbol(symbol_to_use) + if not validation.is_valid: + raise ExchangeError(validation.message) + + ticker = self._get_real_price(validation.normalized_symbol) + + MarketPriceCache.set_price( + symbol=ticker.symbol, + price=ticker.price, + updated_at=ticker.updated_at, + source=ticker.source, + ) + + return ticker + # получить цену инструмента: сначала WebSocket cache, потом REST fallback def get_price(self, symbol: str | None = None) -> TickerPrice: symbol_to_use = symbol or self.settings.default_symbol @@ -336,6 +367,9 @@ class ExchangeService: if not self.settings.exchange_enabled: return [] + if type(self)._exchange_symbols_cache is not None: + return type(self)._exchange_symbols_cache + client = ExchangeRestClient() try: @@ -470,6 +504,8 @@ class ExchangeService: ) ) + type(self)._exchange_symbols_cache = items + return items def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult: diff --git a/app/src/telegram/handlers/auto.py b/app/src/telegram/handlers/auto.py index 98ded26..dffbecb 100644 --- a/app/src/telegram/handlers/auto.py +++ b/app/src/telegram/handlers/auto.py @@ -11,6 +11,9 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder from src.telegram.ui.common import mode_line from src.trading.auto.runner import AutoTradeRunner from src.trading.auto.service import AutoTradeService +from src.telegram.handlers.system import open_auto_settings +from src.integrations.exchange.service import ExchangeService +from src.telegram.ui.currency_ui import format_usd_amount router = Router(name="auto") @@ -71,6 +74,18 @@ def _price_or_dash(value: float | None) -> str: return f"{value:.2f}" +# текущая цена инструмента +def _market_price_or_dash(symbol: str | None) -> str: + if not symbol: + return "—" + + try: + ticker = ExchangeService().get_price(symbol) + return f"$ {format_usd_amount(ticker.price)}" + except Exception: + return "—" + + # формат USD def _usd_or_dash(value: float | None) -> str: if value is None: @@ -162,9 +177,16 @@ def _build_auto_text() -> str: account_mode = "DEMO" if "DEMO" in mode_line().upper() else "LIVE" risk = f"{state.risk_percent:.1f}%" if state.risk_percent is not None else "—" configured = _is_auto_configured(state) + price = _market_price_or_dash(state.symbol) + + status_line = { + "OFF": "⚪ Off", + "OBSERVING": "👀 Watch", + "RUNNING": "🟢 On", + }.get(state.status, state.status) header = ( - "🤖 Автоторговля\n" + f"🤖 Автоторговля · {status_line}\n" f"🔸 {account_mode} аккаунт\n\n" ) @@ -172,32 +194,36 @@ def _build_auto_text() -> str: if not configured: return ( f"{header}" - "⚪ Выключена\n\n" "⚠️ Не настроена\n" "Настрой параметры" ) return ( f"{header}" - "⚪ Выключена\n\n" f"{_context_line(state)}\n" + f"Price: {price}\n" f"Risk: {risk}" ) - status_line = ( - "🟢 Активна" - if state.status == "RUNNING" - else "👀 Наблюдение" + position_line = ( + f"Pos: {_value_or_dash(state.position_side)} | " + f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}" ) + if state.position_side != "NONE" and state.entry_price is not None: + position_line = ( + f"Pos: {_value_or_dash(state.position_side)} | " + f"Entry: $ {_price_or_dash(state.entry_price)} | " + f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}" + ) + return ( f"{header}" - f"{status_line}\n\n" - f"{_context_line(state)}\n\n" + f"{_context_line(state)}\n" + f"Price: {price}\n\n" f"{_signal_label(state.last_signal)} ×{state.last_signal_repeat_count} " f"· {state.decision_status}\n\n" - f"Pos: {_value_or_dash(state.position_side)} | " - f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}\n" + f"{position_line}\n" f"Risk: {risk}" ) @@ -272,6 +298,19 @@ async def open_auto_from_callback(callback: CallbackQuery, state: FSMContext) -> @router.callback_query(F.data == "auto:start") async def auto_start(callback: CallbackQuery) -> None: service = AutoTradeService() + state = service.get_state() + + if not _is_auto_configured(state): + await callback.answer( + "Сначала настрой параметры автоторговли", + show_alert=True, + ) + + if callback.message is not None: + await open_auto_settings(callback) + + return + _, message = service.start() AutoTradeRunner.set_current_screen("auto") @@ -287,6 +326,19 @@ async def auto_start(callback: CallbackQuery) -> None: @router.callback_query(F.data == "auto:observe") async def auto_observe(callback: CallbackQuery) -> None: service = AutoTradeService() + state = service.get_state() + + if not _is_auto_configured(state): + await callback.answer( + "Сначала настрой параметры автоторговли", + show_alert=True, + ) + + if callback.message is not None: + await open_auto_settings(callback) + + return + _, message = service.observe() AutoTradeRunner.set_current_screen("auto") diff --git a/app/src/telegram/handlers/system.py b/app/src/telegram/handlers/system.py index cd7ae87..a1cd08b 100644 --- a/app/src/telegram/handlers/system.py +++ b/app/src/telegram/handlers/system.py @@ -168,7 +168,7 @@ async def open_system_management(callback: CallbackQuery) -> None: @router.callback_query(F.data == "settings:auto") async def open_auto_settings(callback: CallbackQuery) -> None: AutoTradeRunner.set_current_screen("settings_auto") - + if callback.message is None: await callback.answer("Сообщение не найдено", show_alert=True) return @@ -177,7 +177,7 @@ async def open_auto_settings(callback: CallbackQuery) -> None: chat_id=callback.message.chat.id, message_id=callback.message.message_id, ) - + state = AutoTradeService().get_state() strategy_map = { @@ -185,20 +185,43 @@ async def open_auto_settings(callback: CallbackQuery) -> None: "GRID": "🧩 Grid Trading", "SCALP": "⚡ Scalping", } + + strategy_ready = state.strategy is not None + symbol_ready = bool(state.symbol) + risk_ready = state.risk_percent is not None + leverage_ready = state.leverage is not None + + is_configured = strategy_ready and symbol_ready and risk_ready and leverage_ready + strategy = strategy_map.get(state.strategy or "", "—") + symbol = state.symbol or "—" risk = f"{state.risk_percent:.1f}%" if state.risk_percent is not None else "—" leverage = f"x{state.leverage:g}" if state.leverage is not None else "—" + strategy_icon = "✅" if strategy_ready else "👉" + symbol_icon = "✅" if symbol_ready else "👉" + risk_icon = "✅" if risk_ready else "👉" + leverage_icon = "✅" if leverage_ready else "👉" + + config_status = ( + "✅ Все параметры настроены" + if is_configured + else "⛔️ Настрой все параметры" + ) + text = ( "🤖 Автоторговля\n\n" "СИСТЕМА · Настройки\n\n" - f"Стратегия: {strategy}\n" - f"Инструмент: {state.symbol}\n" - f"Риск: {risk}\n" - f"Плечо: {leverage}\n\n" - "Выберите настройку:" + f"{strategy_icon} Стратегия: {strategy}\n" + f"{symbol_icon} Инструмент: {symbol}\n" + f"{risk_icon} Риск: {risk}\n" + f"{leverage_icon} Плечо: {leverage}\n\n" + f"{config_status}" ) + if not is_configured: + text += "\n\nВыберите настройку:" + builder = InlineKeyboardBuilder() builder.button(text="🧠 Стратегия", callback_data="settings:auto_strategy") builder.button(text="📈 Инструмент", callback_data="settings:auto_symbol") @@ -361,7 +384,7 @@ async def set_auto_leverage(callback: CallbackQuery) -> None: AutoTradeRunner.set_current_screen("settings_auto") await callback.answer("Плечо обновлено") - + @router.callback_query(F.data == "settings:trade") async def open_trade_settings(callback: CallbackQuery) -> None: if callback.message is None: diff --git a/app/src/trading/auto/runner.py b/app/src/trading/auto/runner.py index 2efc102..7cb15ff 100644 --- a/app/src/trading/auto/runner.py +++ b/app/src/trading/auto/runner.py @@ -9,6 +9,8 @@ from typing import Callable from aiogram import Bot from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter +from src.core.event_bus import EventBus +from src.integrations.exchange.market_data_runner import MarketDataRunner from src.trading.auto.service import AutoTradeService @@ -20,9 +22,16 @@ class AutoTradeRunner: _render_text: Callable[[], str] | None = None _render_markup: Callable[[], object] | None = None _current_screen: str | None = None - _interval_seconds = 15 + + # анализ стратегии — часто + _analysis_interval_seconds = 5 + + # Telegram UI — редко + _ui_interval_seconds = 60 _last_text: str | None = None + _last_ui_refresh_at: float = 0.0 + _last_event_version: int = 0 _retry_after_until: float = 0.0 @classmethod @@ -89,6 +98,13 @@ class AutoTradeRunner: @classmethod def start(cls) -> None: + service = AutoTradeService() + + MarketDataRunner.start( + symbol_provider=lambda: service.get_state().symbol, + interval_seconds=1, + ) + if cls._task is not None and not cls._task.done(): return @@ -96,6 +112,8 @@ class AutoTradeRunner: @classmethod def stop(cls) -> None: + MarketDataRunner.stop() + if cls._task is None: return @@ -111,16 +129,29 @@ class AutoTradeRunner: if state.status == "OFF": cls._task = None + MarketDataRunner.stop() break service.run_cycle() - await cls._refresh_screen() - await asyncio.sleep(cls._interval_seconds) + current_event_version = EventBus.version() + has_important_event = current_event_version != cls._last_event_version + + if has_important_event: + cls._last_event_version = current_event_version + + await cls._refresh_screen(force=has_important_event) + + await asyncio.sleep(cls._analysis_interval_seconds) @classmethod - async def _refresh_screen(cls) -> None: - if time.monotonic() < cls._retry_after_until: + async def _refresh_screen(cls, *, force: bool = False) -> None: + now = time.monotonic() + + if now < cls._retry_after_until: + return + + if not force and now - cls._last_ui_refresh_at < cls._ui_interval_seconds: return if not all( @@ -147,6 +178,7 @@ class AutoTradeRunner: reply_markup=cls._render_markup(), ) cls._last_text = text + cls._last_ui_refresh_at = now except TelegramRetryAfter as exc: cls._retry_after_until = time.monotonic() + exc.retry_after + 5 @@ -156,6 +188,7 @@ class AutoTradeRunner: if "message is not modified" in error_text: cls._last_text = text + cls._last_ui_refresh_at = now return if "message to edit not found" in error_text: diff --git a/app/src/trading/auto/service.py b/app/src/trading/auto/service.py index 5461db4..b5c319e 100644 --- a/app/src/trading/auto/service.py +++ b/app/src/trading/auto/service.py @@ -10,6 +10,7 @@ from src.trading.auto.state import AutoTradeState from src.trading.journal.service import JournalService from src.trading.strategies.base import BaseStrategy, StrategyContext from src.trading.strategies.registry import StrategyRegistry +from src.core.event_bus import EventBus class AutoTradeService: @@ -69,18 +70,30 @@ class AutoTradeService: # запустить активную торговлю def start(self) -> tuple[AutoTradeState, str]: state = self.get_state() + previous_status = state.status if state.status == "RUNNING": - self.start_loop() return state, "Автоторговля уже активна." if state.status == "OBSERVING": state.status = "RUNNING" - self.start_loop() + EventBus.emit( + "auto_status_changed", + { + "previous_status": previous_status, + "status": state.status, + }, + ) return state, "Автоторговля активирована." state.status = "RUNNING" - self.start_loop() + EventBus.emit( + "auto_status_changed", + { + "previous_status": previous_status, + "status": state.status, + }, + ) return state, "Автоторговля запущена." # включить режим наблюдения @@ -89,11 +102,17 @@ class AutoTradeService: previous_status = state.status if previous_status == "OBSERVING": - self.start_loop() return state, "Режим наблюдения уже включён." state.status = "OBSERVING" - self.start_loop() + + EventBus.emit( + "auto_status_changed", + { + "previous_status": previous_status, + "status": state.status, + }, + ) if previous_status == "OFF": return state, "Включён режим наблюдения." @@ -103,6 +122,7 @@ class AutoTradeService: # полностью выключить автоторговлю def stop(self) -> tuple[AutoTradeState, str]: state = self.get_state() + previous_status = state.status if state.status == "OFF": self.stop_loop() @@ -110,6 +130,15 @@ class AutoTradeService: state.status = "OFF" self.stop_loop() + + EventBus.emit( + "auto_status_changed", + { + "previous_status": previous_status, + "status": state.status, + }, + ) + return state, "Автоторговля выключена." # установить инструмент @@ -132,7 +161,7 @@ class AutoTradeService: state.risk_percent = risk_percent return state - # установить плечо + # установить плечо def set_leverage(self, leverage: float) -> AutoTradeState: state = self.get_state() state.leverage = leverage @@ -285,6 +314,9 @@ class AutoTradeService: reason: str, confidence: float, ) -> None: + previous_signal = state.last_signal + previous_decision_status = state.decision_status + state.last_signal = signal state.last_signal_repeat_count = self._same_signal_count state.last_signal_confidence = confidence @@ -296,6 +328,29 @@ class AutoTradeService: confidence=confidence, ) + if previous_signal != state.last_signal: + EventBus.emit( + "auto_signal_changed", + { + "previous_signal": previous_signal, + "signal": state.last_signal, + "repeat_count": state.last_signal_repeat_count, + "confidence": state.last_signal_confidence, + }, + ) + + if previous_decision_status != state.decision_status: + EventBus.emit( + "auto_decision_changed", + { + "previous_decision_status": previous_decision_status, + "decision_status": state.decision_status, + "signal": state.last_signal, + "repeat_count": state.last_signal_repeat_count, + "confidence": state.last_signal_confidence, + }, + ) + # записать одиночный сигнал в журнал def _log_signal_event( self, diff --git a/docs/stages/stage-07_4_3_2-price-polling-eventbus-ui-throttling.md b/docs/stages/stage-07_4_3_2-price-polling-eventbus-ui-throttling.md new file mode 100644 index 0000000..b5f2ac9 --- /dev/null +++ b/docs/stages/stage-07_4_3_2-price-polling-eventbus-ui-throttling.md @@ -0,0 +1,328 @@ +# Stage 07.4.3.2 — Price Polling + EventBus + UI Throttling + +## 🎯 Цель этапа + +Разделить три независимых процесса автоторговли: + +- частое получение рыночной цены; +- анализ стратегии; +- обновление Telegram UI. + +Главная цель — убрать постоянное обновление Telegram на каждом цикле анализа и подготовить архитектуру к реальному execution. + +--- + +## 🧱 Что было до этапа + +Ранее один runner фактически связывал всё вместе: + +```text +run_cycle() +↓ +strategy.analyze() +↓ +update state +↓ +edit Telegram message +``` + +Из-за этого: + +- Telegram мог получать слишком частые `edit_message_text`; +- появлялся риск `Flood control exceeded`; +- частота анализа зависела от UI; +- цена не была выделена в отдельный polling-процесс. + +--- + +## ✅ Что реализовано + +## 1. MarketDataRunner + +Добавлен отдельный runner для быстрого обновления рыночной цены. + +Файл: + +```text +app/src/integrations/exchange/market_data_runner.py +``` + +Задача: + +```text +каждую 1 секунду: + получить актуальную цену + сохранить её в MarketPriceCache +``` + +Runner работает отдельно от Telegram UI и отдельно от анализа стратегии. + +--- + +## 2. MarketPriceCache + +Обновлён кэш рыночных цен. + +Файл: + +```text +app/src/integrations/exchange/market_cache.py +``` + +Теперь cache хранит: + +- symbol; +- price; +- bid_price; +- ask_price; +- updated_at; +- source. + +Кэш становится единым источником актуальной цены для: + +- экрана автоторговли; +- стратегии; +- будущего execution engine. + +--- + +## 3. ExchangeService.refresh_price_cache() + +В `ExchangeService` добавлен метод: + +```python +refresh_price_cache(symbol) +``` + +Назначение: + +- получить цену через REST; +- записать цену в `MarketPriceCache`; +- вернуть `TickerPrice`. + +Это позволяет отдельно обновлять цену без прямого обновления Telegram UI. + +--- + +## 4. Кэширование exchangeInfo + +В `ExchangeService` добавлен class-level cache: + +```python +_exchange_symbols_cache +``` + +Это нужно, чтобы быстрый price polling не вызывал `exchangeInfo` слишком часто. + +Используется class-level доступ: + +```python +type(self)._exchange_symbols_cache +``` + +--- + +## 5. EventBus + +Добавлен простой in-memory EventBus. + +Файл: + +```text +app/src/core/event_bus.py +``` + +EventBus хранит: + +- текущую версию событий; +- последний тип события; +- последний payload. + +Используется для важных изменений: + +```text +auto_status_changed +auto_signal_changed +auto_decision_changed +``` + +--- + +## 6. AutoTradeService emits events + +`AutoTradeService` теперь отправляет события при изменениях: + +### Смена статуса автоторговли + +```text +OFF → OBSERVING +OBSERVING → RUNNING +RUNNING → OFF +``` + +Событие: + +```text +auto_status_changed +``` + +### Смена сигнала + +```text +HOLD → BUY +BUY → SELL +SELL → HOLD +``` + +Событие: + +```text +auto_signal_changed +``` + +### Смена decision state + +```text +WAITING → CONFIRMING +CONFIRMING → READY +READY → BLOCKED +``` + +Событие: + +```text +auto_decision_changed +``` + +--- + +## 7. AutoTradeRunner decoupling + +`AutoTradeRunner` теперь разделяет процессы: + +```text +MarketDataRunner: 1 сек +Strategy analysis: 5 сек +Telegram UI: 60 сек или событие +``` + +### Telegram обновляется: + +- сразу при важном событии; +- либо раз в 60 секунд; +- только если текст реально изменился; +- с учётом `TelegramRetryAfter`. + +--- + +## 8. UI throttling + +Добавлена защита от Telegram flood control: + +- не отправлять update, если текст не изменился; +- не обновлять UI чаще заданного интервала; +- учитывать `TelegramRetryAfter`; +- снимать регистрацию, если сообщение удалено. + +--- + +## 🧠 Итоговая архитектура + +```text +MarketDataRunner + ↓ +ExchangeService.refresh_price_cache() + ↓ +MarketPriceCache + ↓ +TrendStrategy / Auto UI + ↓ +AutoTradeService + ↓ +EventBus + ↓ +AutoTradeRunner + ↓ +Telegram UI +``` + +--- + +## 📂 Изменённые / добавленные файлы + +### Добавлены + +```text +app/src/core/event_bus.py +app/src/integrations/exchange/market_data_runner.py +``` + +### Изменены + +```text +app/src/trading/auto/runner.py +app/src/trading/auto/service.py +app/src/integrations/exchange/service.py +app/src/integrations/exchange/market_cache.py +app/src/telegram/handlers/auto.py +``` + +--- + +## ✅ Проверка + +Проверено: + +- бот запускается без ошибок; +- цена отображается на экране автоторговли; +- `MarketDataRunner` обновляет цену отдельно; +- Telegram UI не обновляется каждую секунду; +- кнопки не ловят flood control; +- сигнал и decision state обновляются через EventBus; +- экран обновляется сразу при важных изменениях; +- `exchangeInfo` не дергается на каждом price polling. + +--- + +## ⚠️ Ограничения текущего этапа + +EventBus пока: + +- in-memory; +- без подписчиков; +- без persistence; +- без приоритетов событий. + +Это нормально для текущего этапа. Для будущего execution можно расширить EventBus или заменить на более полноценную event-модель. + +--- + +## 🔜 Следующий этап + +Рекомендуемый следующий этап: + +```text +Stage 07.4.3.3 — Position / Execution Skeleton +``` + +Возможные задачи: + +- модель позиции; +- entry price; +- position size; +- unrealized PnL; +- execution readiness; +- безопасный skeleton исполнения ордеров без реальной отправки. + +--- + +## ✅ Итог + +Stage 07.4.3.2 завершает важный архитектурный переход: + +```text +от UI-driven автоторговли +к event-driven trading engine +``` + +Теперь система готова к дальнейшему развитию execution layer.