Stage 07.4.3.2 — price polling, event bus and UI throttling

This commit is contained in:
2026-05-02 14:57:43 +03:00
parent 38c8686a9b
commit bd6b40fcb2
9 changed files with 644 additions and 31 deletions

28
app/src/core/event_bus.py Normal file
View File

@@ -0,0 +1,28 @@
# app/src/core/event_bus.py
from __future__ import annotations
from typing import Any
class EventBus:
_version: int = 0
_last_event_type: str | None = None
_last_payload: dict[str, Any] = {}
# зафиксировать важное событие системы
@classmethod
def emit(cls, event_type: str, payload: dict[str, Any] | None = None) -> None:
cls._version += 1
cls._last_event_type = event_type
cls._last_payload = payload or {}
# текущая версия событий
@classmethod
def version(cls) -> int:
return cls._version
# последнее событие
@classmethod
def last_event(cls) -> tuple[str | None, dict[str, Any]]:
return cls._last_event_type, dict(cls._last_payload)

View File

@@ -16,7 +16,7 @@ class MarketPriceSnapshot:
bid_price: float | None
ask_price: float | None
updated_at: str
source: str = "websocket"
source: str = "market-cache"
class MarketPriceCache:
@@ -32,6 +32,7 @@ class MarketPriceCache:
bid_price: float | None = None,
ask_price: float | None = None,
updated_at: str | None = None,
source: str = "market-polling",
) -> None:
settings = load_settings()
@@ -44,6 +45,7 @@ class MarketPriceCache:
bid_price=bid_price,
ask_price=ask_price,
updated_at=updated_at,
source=source,
)
# получить последнюю цену

View File

@@ -0,0 +1,56 @@
# app/src/integrations/exchange/market_data_runner.py
from __future__ import annotations
import asyncio
from typing import Callable
from src.integrations.exchange.service import ExchangeService
class MarketDataRunner:
_task: asyncio.Task | None = None
_interval_seconds: int = 1
_symbol_provider: Callable[[], str | None] | None = None
# запустить быстрый polling рыночной цены
@classmethod
def start(
cls,
*,
symbol_provider: Callable[[], str | None],
interval_seconds: int = 1,
) -> None:
cls._symbol_provider = symbol_provider
cls._interval_seconds = interval_seconds
if cls._task is not None and not cls._task.done():
return
cls._task = asyncio.create_task(cls._worker())
# остановить polling
@classmethod
def stop(cls) -> None:
if cls._task is None:
return
cls._task.cancel()
cls._task = None
# рабочий цикл polling
@classmethod
async def _worker(cls) -> None:
while True:
symbol = cls._symbol_provider() if cls._symbol_provider else None
if symbol:
try:
await asyncio.to_thread(
ExchangeService().refresh_price_cache,
symbol,
)
except Exception:
pass
await asyncio.sleep(cls._interval_seconds)

View File

@@ -29,6 +29,8 @@ from src.integrations.exchange.market_cache import MarketPriceCache
class ExchangeService:
_exchange_symbols_cache: list[ExchangeSymbol] | None = None
def __init__(self) -> None:
self.settings = load_settings()
self.journal = JournalService()
@@ -192,6 +194,35 @@ class ExchangeService:
message=f"Private API OK. Балансов получено: {len(balances)}",
)
# принудительно обновить market cache через REST
def refresh_price_cache(self, symbol: str | None = None) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
if not self.settings.exchange_enabled:
ticker = mock_ticker_price(symbol_to_use)
MarketPriceCache.set_price(
symbol=ticker.symbol,
price=ticker.price,
updated_at=ticker.updated_at,
source=ticker.source,
)
return ticker
validation = self.validate_symbol(symbol_to_use)
if not validation.is_valid:
raise ExchangeError(validation.message)
ticker = self._get_real_price(validation.normalized_symbol)
MarketPriceCache.set_price(
symbol=ticker.symbol,
price=ticker.price,
updated_at=ticker.updated_at,
source=ticker.source,
)
return ticker
# получить цену инструмента: сначала WebSocket cache, потом REST fallback
def get_price(self, symbol: str | None = None) -> TickerPrice:
symbol_to_use = symbol or self.settings.default_symbol
@@ -336,6 +367,9 @@ class ExchangeService:
if not self.settings.exchange_enabled:
return []
if type(self)._exchange_symbols_cache is not None:
return type(self)._exchange_symbols_cache
client = ExchangeRestClient()
try:
@@ -470,6 +504,8 @@ class ExchangeService:
)
)
type(self)._exchange_symbols_cache = items
return items
def validate_symbol(self, raw_symbol: str) -> SymbolValidationResult:

View File

@@ -11,6 +11,9 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from src.telegram.ui.common import mode_line
from src.trading.auto.runner import AutoTradeRunner
from src.trading.auto.service import AutoTradeService
from src.telegram.handlers.system import open_auto_settings
from src.integrations.exchange.service import ExchangeService
from src.telegram.ui.currency_ui import format_usd_amount
router = Router(name="auto")
@@ -71,6 +74,18 @@ def _price_or_dash(value: float | None) -> str:
return f"{value:.2f}"
# текущая цена инструмента
def _market_price_or_dash(symbol: str | None) -> str:
if not symbol:
return ""
try:
ticker = ExchangeService().get_price(symbol)
return f"$ {format_usd_amount(ticker.price)}"
except Exception:
return ""
# формат USD
def _usd_or_dash(value: float | None) -> str:
if value is None:
@@ -162,9 +177,16 @@ def _build_auto_text() -> str:
account_mode = "DEMO" if "DEMO" in mode_line().upper() else "LIVE"
risk = f"{state.risk_percent:.1f}%" if state.risk_percent is not None else ""
configured = _is_auto_configured(state)
price = _market_price_or_dash(state.symbol)
status_line = {
"OFF": "⚪ Off",
"OBSERVING": "👀 Watch",
"RUNNING": "🟢 On",
}.get(state.status, state.status)
header = (
"<b>🤖 Автоторговля</b>\n"
f"<b>🤖 Автоторговля · {status_line}</b>\n"
f"🔸 {account_mode} аккаунт\n\n"
)
@@ -172,32 +194,36 @@ def _build_auto_text() -> str:
if not configured:
return (
f"{header}"
"⚪ Выключена\n\n"
"⚠️ Не настроена\n"
"Настрой параметры"
)
return (
f"{header}"
"⚪ Выключена\n\n"
f"{_context_line(state)}\n"
f"Price: {price}\n"
f"Risk: {risk}"
)
status_line = (
"🟢 Активна"
if state.status == "RUNNING"
else "👀 Наблюдение"
position_line = (
f"Pos: {_value_or_dash(state.position_side)} | "
f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}"
)
if state.position_side != "NONE" and state.entry_price is not None:
position_line = (
f"Pos: {_value_or_dash(state.position_side)} | "
f"Entry: $ {_price_or_dash(state.entry_price)} | "
f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}"
)
return (
f"{header}"
f"{status_line}\n\n"
f"{_context_line(state)}\n\n"
f"{_context_line(state)}\n"
f"Price: {price}\n\n"
f"{_signal_label(state.last_signal)} ×{state.last_signal_repeat_count} "
f"· {state.decision_status}\n\n"
f"Pos: {_value_or_dash(state.position_side)} | "
f"PnL: {_usd_or_dash(state.unrealized_pnl_usd)}\n"
f"{position_line}\n"
f"Risk: {risk}"
)
@@ -272,6 +298,19 @@ async def open_auto_from_callback(callback: CallbackQuery, state: FSMContext) ->
@router.callback_query(F.data == "auto:start")
async def auto_start(callback: CallbackQuery) -> None:
service = AutoTradeService()
state = service.get_state()
if not _is_auto_configured(state):
await callback.answer(
"Сначала настрой параметры автоторговли",
show_alert=True,
)
if callback.message is not None:
await open_auto_settings(callback)
return
_, message = service.start()
AutoTradeRunner.set_current_screen("auto")
@@ -287,6 +326,19 @@ async def auto_start(callback: CallbackQuery) -> None:
@router.callback_query(F.data == "auto:observe")
async def auto_observe(callback: CallbackQuery) -> None:
service = AutoTradeService()
state = service.get_state()
if not _is_auto_configured(state):
await callback.answer(
"Сначала настрой параметры автоторговли",
show_alert=True,
)
if callback.message is not None:
await open_auto_settings(callback)
return
_, message = service.observe()
AutoTradeRunner.set_current_screen("auto")

View File

@@ -168,7 +168,7 @@ async def open_system_management(callback: CallbackQuery) -> None:
@router.callback_query(F.data == "settings:auto")
async def open_auto_settings(callback: CallbackQuery) -> None:
AutoTradeRunner.set_current_screen("settings_auto")
if callback.message is None:
await callback.answer("Сообщение не найдено", show_alert=True)
return
@@ -177,7 +177,7 @@ async def open_auto_settings(callback: CallbackQuery) -> None:
chat_id=callback.message.chat.id,
message_id=callback.message.message_id,
)
state = AutoTradeService().get_state()
strategy_map = {
@@ -185,20 +185,43 @@ async def open_auto_settings(callback: CallbackQuery) -> None:
"GRID": "🧩 Grid Trading",
"SCALP": "⚡ Scalping",
}
strategy_ready = state.strategy is not None
symbol_ready = bool(state.symbol)
risk_ready = state.risk_percent is not None
leverage_ready = state.leverage is not None
is_configured = strategy_ready and symbol_ready and risk_ready and leverage_ready
strategy = strategy_map.get(state.strategy or "", "")
symbol = state.symbol or ""
risk = f"{state.risk_percent:.1f}%" if state.risk_percent is not None else ""
leverage = f"x{state.leverage:g}" if state.leverage is not None else ""
strategy_icon = "" if strategy_ready else "👉"
symbol_icon = "" if symbol_ready else "👉"
risk_icon = "" if risk_ready else "👉"
leverage_icon = "" if leverage_ready else "👉"
config_status = (
"Все параметры настроены"
if is_configured
else "⛔️ Настрой все параметры"
)
text = (
"<b>🤖 Автоторговля</b>\n\n"
"<b>СИСТЕМА</b> · Настройки\n\n"
f"Стратегия: {strategy}\n"
f"Инструмент: {state.symbol}\n"
f"Риск: {risk}\n"
f"Плечо: {leverage}\n\n"
"Выберите настройку:"
f"{strategy_icon} Стратегия: {strategy}\n"
f"{symbol_icon} Инструмент: {symbol}\n"
f"{risk_icon} Риск: {risk}\n"
f"{leverage_icon} Плечо: {leverage}\n\n"
f"{config_status}"
)
if not is_configured:
text += "\n\nВыберите настройку:"
builder = InlineKeyboardBuilder()
builder.button(text="🧠 Стратегия", callback_data="settings:auto_strategy")
builder.button(text="📈 Инструмент", callback_data="settings:auto_symbol")
@@ -361,7 +384,7 @@ async def set_auto_leverage(callback: CallbackQuery) -> None:
AutoTradeRunner.set_current_screen("settings_auto")
await callback.answer("Плечо обновлено")
@router.callback_query(F.data == "settings:trade")
async def open_trade_settings(callback: CallbackQuery) -> None:
if callback.message is None:

View File

@@ -9,6 +9,8 @@ from typing import Callable
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
from src.core.event_bus import EventBus
from src.integrations.exchange.market_data_runner import MarketDataRunner
from src.trading.auto.service import AutoTradeService
@@ -20,9 +22,16 @@ class AutoTradeRunner:
_render_text: Callable[[], str] | None = None
_render_markup: Callable[[], object] | None = None
_current_screen: str | None = None
_interval_seconds = 15
# анализ стратегии — часто
_analysis_interval_seconds = 5
# Telegram UI — редко
_ui_interval_seconds = 60
_last_text: str | None = None
_last_ui_refresh_at: float = 0.0
_last_event_version: int = 0
_retry_after_until: float = 0.0
@classmethod
@@ -89,6 +98,13 @@ class AutoTradeRunner:
@classmethod
def start(cls) -> None:
service = AutoTradeService()
MarketDataRunner.start(
symbol_provider=lambda: service.get_state().symbol,
interval_seconds=1,
)
if cls._task is not None and not cls._task.done():
return
@@ -96,6 +112,8 @@ class AutoTradeRunner:
@classmethod
def stop(cls) -> None:
MarketDataRunner.stop()
if cls._task is None:
return
@@ -111,16 +129,29 @@ class AutoTradeRunner:
if state.status == "OFF":
cls._task = None
MarketDataRunner.stop()
break
service.run_cycle()
await cls._refresh_screen()
await asyncio.sleep(cls._interval_seconds)
current_event_version = EventBus.version()
has_important_event = current_event_version != cls._last_event_version
if has_important_event:
cls._last_event_version = current_event_version
await cls._refresh_screen(force=has_important_event)
await asyncio.sleep(cls._analysis_interval_seconds)
@classmethod
async def _refresh_screen(cls) -> None:
if time.monotonic() < cls._retry_after_until:
async def _refresh_screen(cls, *, force: bool = False) -> None:
now = time.monotonic()
if now < cls._retry_after_until:
return
if not force and now - cls._last_ui_refresh_at < cls._ui_interval_seconds:
return
if not all(
@@ -147,6 +178,7 @@ class AutoTradeRunner:
reply_markup=cls._render_markup(),
)
cls._last_text = text
cls._last_ui_refresh_at = now
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
@@ -156,6 +188,7 @@ class AutoTradeRunner:
if "message is not modified" in error_text:
cls._last_text = text
cls._last_ui_refresh_at = now
return
if "message to edit not found" in error_text:

View File

@@ -10,6 +10,7 @@ from src.trading.auto.state import AutoTradeState
from src.trading.journal.service import JournalService
from src.trading.strategies.base import BaseStrategy, StrategyContext
from src.trading.strategies.registry import StrategyRegistry
from src.core.event_bus import EventBus
class AutoTradeService:
@@ -69,18 +70,30 @@ class AutoTradeService:
# запустить активную торговлю
def start(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "RUNNING":
self.start_loop()
return state, "Автоторговля уже активна."
if state.status == "OBSERVING":
state.status = "RUNNING"
self.start_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля активирована."
state.status = "RUNNING"
self.start_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля запущена."
# включить режим наблюдения
@@ -89,11 +102,17 @@ class AutoTradeService:
previous_status = state.status
if previous_status == "OBSERVING":
self.start_loop()
return state, "Режим наблюдения уже включён."
state.status = "OBSERVING"
self.start_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
if previous_status == "OFF":
return state, "Включён режим наблюдения."
@@ -103,6 +122,7 @@ class AutoTradeService:
# полностью выключить автоторговлю
def stop(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "OFF":
self.stop_loop()
@@ -110,6 +130,15 @@ class AutoTradeService:
state.status = "OFF"
self.stop_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля выключена."
# установить инструмент
@@ -132,7 +161,7 @@ class AutoTradeService:
state.risk_percent = risk_percent
return state
# установить плечо
# установить плечо
def set_leverage(self, leverage: float) -> AutoTradeState:
state = self.get_state()
state.leverage = leverage
@@ -285,6 +314,9 @@ class AutoTradeService:
reason: str,
confidence: float,
) -> None:
previous_signal = state.last_signal
previous_decision_status = state.decision_status
state.last_signal = signal
state.last_signal_repeat_count = self._same_signal_count
state.last_signal_confidence = confidence
@@ -296,6 +328,29 @@ class AutoTradeService:
confidence=confidence,
)
if previous_signal != state.last_signal:
EventBus.emit(
"auto_signal_changed",
{
"previous_signal": previous_signal,
"signal": state.last_signal,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
},
)
if previous_decision_status != state.decision_status:
EventBus.emit(
"auto_decision_changed",
{
"previous_decision_status": previous_decision_status,
"decision_status": state.decision_status,
"signal": state.last_signal,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
},
)
# записать одиночный сигнал в журнал
def _log_signal_event(
self,

View File

@@ -0,0 +1,328 @@
# Stage 07.4.3.2 — Price Polling + EventBus + UI Throttling
## 🎯 Цель этапа
Разделить три независимых процесса автоторговли:
- частое получение рыночной цены;
- анализ стратегии;
- обновление Telegram UI.
Главная цель — убрать постоянное обновление Telegram на каждом цикле анализа и подготовить архитектуру к реальному execution.
---
## 🧱 Что было до этапа
Ранее один runner фактически связывал всё вместе:
```text
run_cycle()
strategy.analyze()
update state
edit Telegram message
```
Из-за этого:
- Telegram мог получать слишком частые `edit_message_text`;
- появлялся риск `Flood control exceeded`;
- частота анализа зависела от UI;
- цена не была выделена в отдельный polling-процесс.
---
## ✅ Что реализовано
## 1. MarketDataRunner
Добавлен отдельный runner для быстрого обновления рыночной цены.
Файл:
```text
app/src/integrations/exchange/market_data_runner.py
```
Задача:
```text
каждую 1 секунду:
получить актуальную цену
сохранить её в MarketPriceCache
```
Runner работает отдельно от Telegram UI и отдельно от анализа стратегии.
---
## 2. MarketPriceCache
Обновлён кэш рыночных цен.
Файл:
```text
app/src/integrations/exchange/market_cache.py
```
Теперь cache хранит:
- symbol;
- price;
- bid_price;
- ask_price;
- updated_at;
- source.
Кэш становится единым источником актуальной цены для:
- экрана автоторговли;
- стратегии;
- будущего execution engine.
---
## 3. ExchangeService.refresh_price_cache()
В `ExchangeService` добавлен метод:
```python
refresh_price_cache(symbol)
```
Назначение:
- получить цену через REST;
- записать цену в `MarketPriceCache`;
- вернуть `TickerPrice`.
Это позволяет отдельно обновлять цену без прямого обновления Telegram UI.
---
## 4. Кэширование exchangeInfo
В `ExchangeService` добавлен class-level cache:
```python
_exchange_symbols_cache
```
Это нужно, чтобы быстрый price polling не вызывал `exchangeInfo` слишком часто.
Используется class-level доступ:
```python
type(self)._exchange_symbols_cache
```
---
## 5. EventBus
Добавлен простой in-memory EventBus.
Файл:
```text
app/src/core/event_bus.py
```
EventBus хранит:
- текущую версию событий;
- последний тип события;
- последний payload.
Используется для важных изменений:
```text
auto_status_changed
auto_signal_changed
auto_decision_changed
```
---
## 6. AutoTradeService emits events
`AutoTradeService` теперь отправляет события при изменениях:
### Смена статуса автоторговли
```text
OFF → OBSERVING
OBSERVING → RUNNING
RUNNING → OFF
```
Событие:
```text
auto_status_changed
```
### Смена сигнала
```text
HOLD → BUY
BUY → SELL
SELL → HOLD
```
Событие:
```text
auto_signal_changed
```
### Смена decision state
```text
WAITING → CONFIRMING
CONFIRMING → READY
READY → BLOCKED
```
Событие:
```text
auto_decision_changed
```
---
## 7. AutoTradeRunner decoupling
`AutoTradeRunner` теперь разделяет процессы:
```text
MarketDataRunner: 1 сек
Strategy analysis: 5 сек
Telegram UI: 60 сек или событие
```
### Telegram обновляется:
- сразу при важном событии;
- либо раз в 60 секунд;
- только если текст реально изменился;
- с учётом `TelegramRetryAfter`.
---
## 8. UI throttling
Добавлена защита от Telegram flood control:
- не отправлять update, если текст не изменился;
- не обновлять UI чаще заданного интервала;
- учитывать `TelegramRetryAfter`;
- снимать регистрацию, если сообщение удалено.
---
## 🧠 Итоговая архитектура
```text
MarketDataRunner
ExchangeService.refresh_price_cache()
MarketPriceCache
TrendStrategy / Auto UI
AutoTradeService
EventBus
AutoTradeRunner
Telegram UI
```
---
## 📂 Изменённые / добавленные файлы
### Добавлены
```text
app/src/core/event_bus.py
app/src/integrations/exchange/market_data_runner.py
```
### Изменены
```text
app/src/trading/auto/runner.py
app/src/trading/auto/service.py
app/src/integrations/exchange/service.py
app/src/integrations/exchange/market_cache.py
app/src/telegram/handlers/auto.py
```
---
## ✅ Проверка
Проверено:
- бот запускается без ошибок;
- цена отображается на экране автоторговли;
- `MarketDataRunner` обновляет цену отдельно;
- Telegram UI не обновляется каждую секунду;
- кнопки не ловят flood control;
- сигнал и decision state обновляются через EventBus;
- экран обновляется сразу при важных изменениях;
- `exchangeInfo` не дергается на каждом price polling.
---
## ⚠️ Ограничения текущего этапа
EventBus пока:
- in-memory;
- без подписчиков;
- без persistence;
- без приоритетов событий.
Это нормально для текущего этапа. Для будущего execution можно расширить EventBus или заменить на более полноценную event-модель.
---
## 🔜 Следующий этап
Рекомендуемый следующий этап:
```text
Stage 07.4.3.3 — Position / Execution Skeleton
```
Возможные задачи:
- модель позиции;
- entry price;
- position size;
- unrealized PnL;
- execution readiness;
- безопасный skeleton исполнения ордеров без реальной отправки.
---
## ✅ Итог
Stage 07.4.3.2 завершает важный архитектурный переход:
```text
от UI-driven автоторговли
к event-driven trading engine
```
Теперь система готова к дальнейшему развитию execution layer.