Files
dzentra_bot/app/src/trading/auto/runner.py

569 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/auto/runner.py
from __future__ import annotations
import asyncio
import time
from typing import Callable
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
from src.core.event_bus import EventBus
from src.integrations.exchange.market_data_runner import MarketDataRunner
from src.trading.auto.service import AutoTradeService
from src.trading.journal.service import JournalService
from src.telegram.ui.currency_ui import format_usd_pnl, format_usd_price
class AutoTradeRunner:
_task: asyncio.Task | None = None
_bot: Bot | None = None
_chat_id: int | None = None
_message_id: int | None = None
_render_text: Callable[[], str] | None = None
_render_markup: Callable[[], object] | None = None
_current_screen: str | None = None
_analysis_interval_seconds = 5
_ui_interval_seconds = 60
_last_text: str | None = None
_last_ui_refresh_at: float = 0.0
_last_event_version: int = 0
_retry_after_until: float = 0.0
_last_strong_alert_key: str | None = None
_strong_alert_cooldown_seconds = 120
_last_strong_alert_at_by_key: dict[str, float] = {}
_last_execution_alert_key: str | None = None
@classmethod
def register_screen(
cls,
*,
bot: Bot,
chat_id: int,
message_id: int,
render_text: Callable[[], str],
render_markup: Callable[[], object],
) -> None:
cls._bot = bot
cls._chat_id = chat_id
cls._message_id = message_id
cls._render_text = render_text
cls._render_markup = render_markup
cls._last_text = None
@classmethod
async def delete_registered_screen(
cls,
*,
bot: Bot,
chat_id: int,
) -> None:
if cls._chat_id is None or cls._message_id is None:
return
if cls._chat_id != chat_id:
return
try:
await bot.delete_message(
chat_id=cls._chat_id,
message_id=cls._message_id,
)
except Exception:
pass
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
@classmethod
def unregister_screen(
cls,
*,
chat_id: int,
message_id: int,
) -> None:
if cls._chat_id != chat_id or cls._message_id != message_id:
return
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
@classmethod
def set_current_screen(cls, screen: str) -> None:
cls._current_screen = screen
@classmethod
def start(cls) -> None:
service = AutoTradeService()
MarketDataRunner.start(
symbol_provider=lambda: service.get_state().symbol,
interval_seconds=1,
)
if cls._task is not None and not cls._task.done():
return
cls._task = asyncio.create_task(cls._worker())
@classmethod
def stop(cls) -> None:
MarketDataRunner.stop()
if cls._task is None:
return
cls._task.cancel()
cls._task = None
@classmethod
async def _worker(cls) -> None:
service = AutoTradeService()
while True:
state = service.get_state()
if state.status == "OFF":
cls._task = None
MarketDataRunner.stop()
break
service.run_cycle()
current_event_version = EventBus.version()
has_important_event = current_event_version != cls._last_event_version
if has_important_event:
cls._last_event_version = current_event_version
await cls._handle_important_event(state)
await cls._refresh_screen(force=has_important_event)
await asyncio.sleep(cls._analysis_interval_seconds)
@classmethod
async def process_last_event_now(cls) -> None:
state = AutoTradeService().get_state()
await cls._handle_important_event(state)
@classmethod
async def _handle_important_event(cls, state) -> None:
event_type, payload = EventBus.last_event()
if event_type == "auto_decision_changed":
if payload.get("decision_status") != "READY":
return
signal = str(payload.get("signal", "")).upper()
if signal not in {"BUY", "SELL"}:
return
await cls._send_strong_signal_alert(state=state, payload=payload)
return
if event_type in {"paper_position_opened", "paper_position_closed"}:
await cls._send_execution_alert(
state=state,
event_type=event_type,
payload=payload,
)
return
@classmethod
async def _send_strong_signal_alert(cls, *, state, payload: dict) -> None:
if cls._bot is None or cls._chat_id is None:
return
signal = str(payload.get("signal", "")).upper()
symbol = str(payload.get("symbol") or state.symbol or "")
strategy = str(payload.get("strategy") or state.strategy or "")
repeat_count = int(payload.get("repeat_count") or state.last_signal_repeat_count or 0)
confidence = float(payload.get("confidence") or state.last_signal_confidence or 0.0)
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
reason = str(payload.get("reason") or state.last_signal_reason or "")
position_context = str(getattr(state, "position_side", "NONE") or "NONE")
priority = cls._alert_priority(
confidence=confidence,
repeat_count=repeat_count,
)
alert_key = (
f"signal:{position_context}:{symbol}:{strategy}:{signal}:"
f"{repeat_count}:{confidence:.2f}:"
f"{state.decision_status}:{reason}"
)
now = time.monotonic()
last_alert_at = cls._last_strong_alert_at_by_key.get(alert_key)
if last_alert_at is not None:
elapsed = now - last_alert_at
if elapsed < cls._strong_alert_cooldown_seconds:
cls._log_suppressed_strong_alert(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
cooldown_left=round(cls._strong_alert_cooldown_seconds - elapsed, 2),
position_context=position_context,
)
return
cls._last_strong_alert_key = alert_key
cls._last_strong_alert_at_by_key[alert_key] = now
text = cls._build_strong_signal_alert_text(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
priority=priority,
position_context=position_context,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_sent",
message=f"Отправлено уведомление о сильном сигнале {signal}.",
screen="auto",
action="strong_signal_alert",
payload={
"symbol": symbol,
"strategy": strategy,
"signal": signal,
"repeat_count": repeat_count,
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"priority": priority,
"position_context": position_context,
},
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
@classmethod
async def _send_execution_alert(
cls,
*,
state,
event_type: str,
payload: dict,
) -> None:
if cls._bot is None or cls._chat_id is None:
return
alert_key = cls._execution_alert_key(
event_type=event_type,
payload=payload,
)
if alert_key == cls._last_execution_alert_key:
return
cls._last_execution_alert_key = alert_key
text = cls._build_execution_alert_text(
state=state,
event_type=event_type,
payload=payload,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_execution_alert_sent",
message="Отправлено Telegram-уведомление по paper execution.",
screen="auto",
action="execution_alert",
payload={
"source_event_type": event_type,
**payload,
},
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
@classmethod
def _execution_alert_key(
cls,
*,
event_type: str,
payload: dict,
) -> str:
return (
f"{event_type}:"
f"{payload.get('symbol')}:"
f"{payload.get('side')}:"
f"{payload.get('entry_price')}:"
f"{payload.get('exit_price')}:"
f"{payload.get('size')}:"
f"{payload.get('pnl')}"
)
@classmethod
def _build_execution_alert_text(
cls,
*,
state,
event_type: str,
payload: dict,
) -> str:
symbol = str(payload.get("symbol") or state.symbol or "")
side = str(payload.get("side") or state.position_side or "")
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
if event_type == "paper_position_opened":
entry_price = cls._format_price(payload.get("entry_price"))
size = cls._format_size(payload.get("size"))
side_icon = "🟢" if side == "LONG" else "🔴"
return (
f"<b>📄 Paper position opened {side_icon} {side}</b>\n\n"
f"{symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Size: {size}"
)
if event_type == "paper_position_closed":
entry_price = cls._format_price(payload.get("entry_price"))
exit_price = cls._format_price(payload.get("exit_price"))
size = cls._format_size(payload.get("size"))
pnl = cls._format_pnl(payload.get("pnl"))
return (
f"<b>✅ Paper position closed</b>\n\n"
f"{side} · {symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Exit: $ {exit_price}\n"
f"Size: {size}\n\n"
f"PnL: {pnl}"
)
return "<b>📄 Paper execution event</b>"
@classmethod
def _log_suppressed_strong_alert(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
cooldown_left: float,
position_context: str,
) -> None:
try:
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_suppressed",
message=f"Повторное уведомление о сильном сигнале {signal} подавлено.",
screen="auto",
action="strong_signal_alert",
payload={
"symbol": symbol,
"strategy": strategy,
"signal": signal,
"repeat_count": repeat_count,
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"cooldown_left": cooldown_left,
"position_context": position_context,
},
)
except Exception:
pass
@classmethod
def _alert_priority(
cls,
*,
confidence: float,
repeat_count: int,
) -> str:
if confidence >= 0.8 and repeat_count >= 3:
return "HIGH"
if confidence >= 0.6 or repeat_count >= 2:
return "MEDIUM"
return "LOW"
@classmethod
def _priority_label(cls, priority: str) -> str:
mapping = {
"HIGH": "🚨 HIGH",
"MEDIUM": "⚡ MEDIUM",
"LOW": " LOW",
}
return mapping.get(priority, priority)
@classmethod
def _format_alert_symbol(cls, symbol: str) -> str:
if not symbol or symbol == "":
return ""
base_symbol = symbol.split("_", 1)[0]
parts = base_symbol.split("/", 1)
if len(parts) == 2:
return f"{parts[0]} / {parts[1]}"
return base_symbol
@classmethod
def _format_alert_leverage(cls, leverage: object) -> str:
if isinstance(leverage, (int, float)):
return f"x{leverage:g}"
return ""
@classmethod
def _signal_icon(cls, signal: str) -> str:
mapping = {
"BUY": "🟢",
"SELL": "🔴",
}
return mapping.get(signal, "")
@classmethod
def _build_strong_signal_alert_text(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
priority: str,
position_context: str,
) -> str:
icon = cls._signal_icon(signal)
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
priority_text = cls._priority_label(priority)
return (
f"<b>{priority_text} · {icon} {signal}</b>\n\n"
f"{symbol_text} · {strategy} · {leverage_text}\n"
f"Position: {position_context}\n\n"
f"🧠 Confidence: {confidence:.2f}\n"
f"🔁 Repeats: {repeat_count}\n\n"
f"💡 Причина:\n"
f"{reason}"
)
@classmethod
def _format_price(cls, value: object) -> str:
return format_usd_price(value)
@classmethod
def _format_pnl(cls, value: object) -> str:
return format_usd_pnl(value)
@classmethod
def _format_size(cls, value: object) -> str:
try:
return f"{float(value):.8f}".rstrip("0").rstrip(".")
except (TypeError, ValueError):
return ""
@classmethod
async def _refresh_screen(cls, *, force: bool = False) -> None:
now = time.monotonic()
if now < cls._retry_after_until:
return
if not force and now - cls._last_ui_refresh_at < cls._ui_interval_seconds:
return
if not all(
[
cls._bot,
cls._chat_id,
cls._message_id,
cls._render_text,
cls._render_markup,
]
):
return
text = cls._render_text()
if text == cls._last_text:
return
try:
await cls._bot.edit_message_text(
chat_id=cls._chat_id,
message_id=cls._message_id,
text=text,
reply_markup=cls._render_markup(),
)
cls._last_text = text
cls._last_ui_refresh_at = now
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except TelegramBadRequest as exc:
error_text = str(exc).lower()
if "message is not modified" in error_text:
cls._last_text = text
cls._last_ui_refresh_at = now
return
if "message to edit not found" in error_text:
cls._message_id = None
cls._render_text = None
cls._render_markup = None
cls._last_text = None
return
except Exception:
pass