# app/src/trading/auto/runner.py
from __future__ import annotations
import asyncio
import time
from typing import Callable
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
from src.core.event_bus import EventBus
from src.integrations.exchange.market_data_runner import MarketDataRunner
from src.trading.auto.service import AutoTradeService
from src.trading.journal.service import JournalService
class AutoTradeRunner:
_task: asyncio.Task | None = None
_bot: Bot | None = None
_chat_id: int | None = None
_message_id: int | None = None
_render_text: Callable[[], str] | None = None
_render_markup: Callable[[], object] | None = None
_current_screen: str | None = None
_analysis_interval_seconds = 5
_ui_interval_seconds = 60
_last_text: str | None = None
_last_ui_refresh_at: float = 0.0
_last_event_version: int = 0
_retry_after_until: float = 0.0
_last_strong_alert_key: str | None = None
_strong_alert_cooldown_seconds = 120
_last_strong_alert_at_by_key: dict[str, float] = {}
_last_execution_alert_key: str | None = None
@classmethod
def register_screen(
cls,
*,
bot: Bot,
chat_id: int,
message_id: int,
render_text: Callable[[], str],
render_markup: Callable[[], object],
) -> None:
cls._bot = bot
cls._chat_id = chat_id
cls._message_id = message_id
cls._render_text = render_text
cls._render_markup = render_markup
cls._last_text = None
@classmethod
async def delete_registered_screen(
cls,
*,
bot: Bot,
chat_id: int,
) -> None:
if cls._chat_id is None or cls._message_id is None:
return
if cls._chat_id != chat_id:
return
try:
await bot.delete_message(
chat_id=cls._chat_id,
message_id=cls._message_id,
)
except Exception:
pass
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
@classmethod
def unregister_screen(
cls,
*,
chat_id: int,
message_id: int,
) -> None:
if cls._chat_id != chat_id or cls._message_id != message_id:
return
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
@classmethod
def set_current_screen(cls, screen: str) -> None:
cls._current_screen = screen
@classmethod
def start(cls) -> None:
service = AutoTradeService()
MarketDataRunner.start(
symbol_provider=lambda: service.get_state().symbol,
interval_seconds=1,
)
if cls._task is not None and not cls._task.done():
return
cls._task = asyncio.create_task(cls._worker())
@classmethod
def stop(cls) -> None:
MarketDataRunner.stop()
if cls._task is None:
return
cls._task.cancel()
cls._task = None
@classmethod
async def _worker(cls) -> None:
service = AutoTradeService()
while True:
state = service.get_state()
if state.status == "OFF":
cls._task = None
MarketDataRunner.stop()
break
service.run_cycle()
current_event_version = EventBus.version()
has_important_event = current_event_version != cls._last_event_version
if has_important_event:
cls._last_event_version = current_event_version
await cls._handle_important_event(state)
await cls._refresh_screen(force=has_important_event)
await asyncio.sleep(cls._analysis_interval_seconds)
@classmethod
async def process_last_event_now(cls) -> None:
state = AutoTradeService().get_state()
await cls._handle_important_event(state)
@classmethod
async def _handle_important_event(cls, state) -> None:
event_type, payload = EventBus.last_event()
if event_type == "auto_decision_changed":
if payload.get("decision_status") != "READY":
return
signal = str(payload.get("signal", "")).upper()
if signal not in {"BUY", "SELL"}:
return
await cls._send_strong_signal_alert(state=state, payload=payload)
return
if event_type in {"paper_position_opened", "paper_position_closed"}:
await cls._send_execution_alert(
state=state,
event_type=event_type,
payload=payload,
)
return
@classmethod
async def _send_strong_signal_alert(cls, *, state, payload: dict) -> None:
if cls._bot is None or cls._chat_id is None:
return
signal = str(payload.get("signal", "")).upper()
symbol = str(payload.get("symbol") or state.symbol or "—")
strategy = str(payload.get("strategy") or state.strategy or "—")
repeat_count = int(payload.get("repeat_count") or state.last_signal_repeat_count or 0)
confidence = float(payload.get("confidence") or state.last_signal_confidence or 0.0)
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
reason = str(payload.get("reason") or state.last_signal_reason or "—")
position_context = str(getattr(state, "position_side", "NONE") or "NONE")
priority = cls._alert_priority(
confidence=confidence,
repeat_count=repeat_count,
)
alert_key = (
f"signal:{position_context}:{symbol}:{strategy}:{signal}:"
f"{repeat_count}:{confidence:.2f}:"
f"{state.decision_status}:{reason}"
)
now = time.monotonic()
last_alert_at = cls._last_strong_alert_at_by_key.get(alert_key)
if last_alert_at is not None:
elapsed = now - last_alert_at
if elapsed < cls._strong_alert_cooldown_seconds:
cls._log_suppressed_strong_alert(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
cooldown_left=round(cls._strong_alert_cooldown_seconds - elapsed, 2),
position_context=position_context,
)
return
cls._last_strong_alert_key = alert_key
cls._last_strong_alert_at_by_key[alert_key] = now
text = cls._build_strong_signal_alert_text(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
priority=priority,
position_context=position_context,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_sent",
message=f"Отправлено уведомление о сильном сигнале {signal}.",
screen="auto",
action="strong_signal_alert",
payload={
"symbol": symbol,
"strategy": strategy,
"signal": signal,
"repeat_count": repeat_count,
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"priority": priority,
"position_context": position_context,
},
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
@classmethod
async def _send_execution_alert(
cls,
*,
state,
event_type: str,
payload: dict,
) -> None:
if cls._bot is None or cls._chat_id is None:
return
alert_key = cls._execution_alert_key(
event_type=event_type,
payload=payload,
)
if alert_key == cls._last_execution_alert_key:
return
cls._last_execution_alert_key = alert_key
text = cls._build_execution_alert_text(
state=state,
event_type=event_type,
payload=payload,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_execution_alert_sent",
message="Отправлено Telegram-уведомление по paper execution.",
screen="auto",
action="execution_alert",
payload={
"source_event_type": event_type,
**payload,
},
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
@classmethod
def _execution_alert_key(
cls,
*,
event_type: str,
payload: dict,
) -> str:
return (
f"{event_type}:"
f"{payload.get('symbol')}:"
f"{payload.get('side')}:"
f"{payload.get('entry_price')}:"
f"{payload.get('exit_price')}:"
f"{payload.get('size')}:"
f"{payload.get('pnl')}"
)
@classmethod
def _build_execution_alert_text(
cls,
*,
state,
event_type: str,
payload: dict,
) -> str:
symbol = str(payload.get("symbol") or state.symbol or "—")
side = str(payload.get("side") or state.position_side or "—")
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
if event_type == "paper_position_opened":
entry_price = cls._format_price(payload.get("entry_price"))
size = cls._format_size(payload.get("size"))
side_icon = "🟢" if side == "LONG" else "🔴"
return (
f"📄 Paper position opened {side_icon} {side}\n\n"
f"{symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Size: {size}"
)
if event_type == "paper_position_closed":
entry_price = cls._format_price(payload.get("entry_price"))
exit_price = cls._format_price(payload.get("exit_price"))
size = cls._format_size(payload.get("size"))
pnl = cls._format_pnl(payload.get("pnl"))
return (
f"✅ Paper position closed\n\n"
f"{side} · {symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Exit: $ {exit_price}\n"
f"Size: {size}\n\n"
f"PnL: {pnl}"
)
return "📄 Paper execution event"
@classmethod
def _log_suppressed_strong_alert(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
cooldown_left: float,
position_context: str,
) -> None:
try:
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_suppressed",
message=f"Повторное уведомление о сильном сигнале {signal} подавлено.",
screen="auto",
action="strong_signal_alert",
payload={
"symbol": symbol,
"strategy": strategy,
"signal": signal,
"repeat_count": repeat_count,
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"cooldown_left": cooldown_left,
"position_context": position_context,
},
)
except Exception:
pass
@classmethod
def _alert_priority(
cls,
*,
confidence: float,
repeat_count: int,
) -> str:
if confidence >= 0.8 and repeat_count >= 3:
return "HIGH"
if confidence >= 0.6 or repeat_count >= 2:
return "MEDIUM"
return "LOW"
@classmethod
def _priority_label(cls, priority: str) -> str:
mapping = {
"HIGH": "🚨 HIGH",
"MEDIUM": "⚡ MEDIUM",
"LOW": "ℹ️ LOW",
}
return mapping.get(priority, priority)
@classmethod
def _format_alert_symbol(cls, symbol: str) -> str:
if not symbol or symbol == "—":
return "—"
base_symbol = symbol.split("_", 1)[0]
parts = base_symbol.split("/", 1)
if len(parts) == 2:
return f"{parts[0]} / {parts[1]}"
return base_symbol
@classmethod
def _format_alert_leverage(cls, leverage: object) -> str:
if isinstance(leverage, (int, float)):
return f"x{leverage:g}"
return "—"
@classmethod
def _signal_icon(cls, signal: str) -> str:
mapping = {
"BUY": "🟢",
"SELL": "🔴",
}
return mapping.get(signal, "⚪")
@classmethod
def _build_strong_signal_alert_text(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
priority: str,
position_context: str,
) -> str:
icon = cls._signal_icon(signal)
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
priority_text = cls._priority_label(priority)
return (
f"{priority_text} · {icon} {signal}\n\n"
f"{symbol_text} · {strategy} · {leverage_text}\n"
f"Position: {position_context}\n\n"
f"🧠 Confidence: {confidence:.2f}\n"
f"🔁 Repeats: {repeat_count}\n\n"
f"💡 Причина:\n"
f"{reason}"
)
@classmethod
def _format_price(cls, value: object) -> str:
try:
return f"{float(value):.2f}"
except (TypeError, ValueError):
return "—"
@classmethod
def _format_size(cls, value: object) -> str:
try:
return f"{float(value):.8f}".rstrip("0").rstrip(".")
except (TypeError, ValueError):
return "—"
@classmethod
def _format_pnl(cls, value: object) -> str:
try:
pnl = float(value)
except (TypeError, ValueError):
return "—"
prefix = "+" if pnl > 0 else ""
return f"{prefix}{pnl:.4f} USD"
@classmethod
async def _refresh_screen(cls, *, force: bool = False) -> None:
now = time.monotonic()
if now < cls._retry_after_until:
return
if not force and now - cls._last_ui_refresh_at < cls._ui_interval_seconds:
return
if not all(
[
cls._bot,
cls._chat_id,
cls._message_id,
cls._render_text,
cls._render_markup,
]
):
return
text = cls._render_text()
if text == cls._last_text:
return
try:
await cls._bot.edit_message_text(
chat_id=cls._chat_id,
message_id=cls._message_id,
text=text,
reply_markup=cls._render_markup(),
)
cls._last_text = text
cls._last_ui_refresh_at = now
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except TelegramBadRequest as exc:
error_text = str(exc).lower()
if "message is not modified" in error_text:
cls._last_text = text
cls._last_ui_refresh_at = now
return
if "message to edit not found" in error_text:
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
return
except Exception:
pass