07.4.3.18.2 — Runtime Notification Migration

This commit is contained in:
2026-05-09 21:46:58 +03:00
parent 7e5ecc2ed9
commit c3cf446143
9 changed files with 665 additions and 370 deletions

View File

@@ -11,10 +11,12 @@ from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
from src.core.event_bus import EventBus
from src.integrations.exchange.market_data_runner import MarketDataRunner
from src.notifications.targets import NotificationTargetRegistry
from src.runtime_events.event_types import RuntimeEventType
from src.runtime_events.models import RuntimeEvent
from src.runtime_events.publisher import RuntimeEventPublisher
from src.trading.auto.service import AutoTradeService
from src.trading.journal.service import JournalService
from src.telegram.ui.currency_ui import format_usd_pnl, format_usd_price
from src.notifications.targets import NotificationTargetRegistry
class AutoTradeRunner:
@@ -34,12 +36,6 @@ class AutoTradeRunner:
_last_event_version: int = 0
_retry_after_until: float = 0.0
_last_strong_alert_key: str | None = None
_strong_alert_cooldown_seconds = 120
_last_strong_alert_at_by_key: dict[str, float] = {}
_last_execution_alert_key: str | None = None
@classmethod
def register_screen(
cls,
@@ -210,7 +206,7 @@ class AutoTradeRunner:
if signal not in {"BUY", "SELL"}:
return
await cls._send_strong_signal_alert(state=state, payload=payload)
cls._publish_strong_signal_event(state=state, payload=payload)
return
if event_type in {
@@ -218,18 +214,15 @@ class AutoTradeRunner:
"paper_position_closed",
"paper_position_flipped",
}:
await cls._send_execution_alert(
cls._publish_execution_event(
state=state,
event_type=event_type,
event_type=str(event_type),
payload=payload,
)
return
@classmethod
async def _send_strong_signal_alert(cls, *, state, payload: dict) -> None:
if cls._bot is None or cls._chat_id is None:
return
def _publish_strong_signal_event(cls, *, state, payload: dict) -> None:
signal = str(payload.get("signal", "")).upper()
symbol = str(payload.get("symbol") or state.symbol or "")
strategy = str(payload.get("strategy") or state.strategy or "")
@@ -237,7 +230,6 @@ class AutoTradeRunner:
confidence = float(payload.get("confidence") or state.last_signal_confidence or 0.0)
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
reason = str(payload.get("reason") or state.last_signal_reason or "")
position_context = str(getattr(state, "position_side", "NONE") or "NONE")
priority = cls._alert_priority(
@@ -245,58 +237,11 @@ class AutoTradeRunner:
repeat_count=repeat_count,
)
alert_key = (
f"signal:{position_context}:{symbol}:{strategy}:{signal}:"
f"{repeat_count}:{confidence:.2f}:"
f"{state.decision_status}:{reason}"
)
now = time.monotonic()
last_alert_at = cls._last_strong_alert_at_by_key.get(alert_key)
if last_alert_at is not None:
elapsed = now - last_alert_at
if elapsed < cls._strong_alert_cooldown_seconds:
cls._log_suppressed_strong_alert(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
cooldown_left=round(cls._strong_alert_cooldown_seconds - elapsed, 2),
position_context=position_context,
)
return
cls._last_strong_alert_key = alert_key
cls._last_strong_alert_at_by_key[alert_key] = now
text = cls._build_strong_signal_alert_text(
signal=signal,
symbol=symbol,
strategy=strategy,
repeat_count=repeat_count,
confidence=confidence,
leverage=leverage,
reason=reason,
priority=priority,
position_context=position_context,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_sent",
message=f"Отправлено уведомление о сильном сигнале {signal}.",
screen="auto",
action="strong_signal_alert",
RuntimeEventPublisher.publish(
RuntimeEvent(
event_type=RuntimeEventType.AUTO_SIGNAL_READY,
source="auto_trade_runner",
title=f"Auto strong signal {signal}",
payload={
"symbol": symbol,
"strategy": strategy,
@@ -305,76 +250,90 @@ class AutoTradeRunner:
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"priority": priority,
"position_context": position_context,
"decision_status": state.decision_status,
},
priority=priority.lower(),
dedupe_key=(
f"auto_signal_ready:"
f"{position_context}:"
f"{symbol}:"
f"{strategy}:"
f"{signal}:"
f"{repeat_count}:"
f"{confidence:.2f}:"
f"{state.decision_status}:"
f"{reason}"
),
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
)
@classmethod
async def _send_execution_alert(
def _publish_execution_event(
cls,
*,
state,
event_type: str,
payload: dict,
) -> None:
if cls._bot is None or cls._chat_id is None:
runtime_event_type = cls._runtime_execution_event_type(event_type)
if runtime_event_type is None:
return
alert_key = cls._execution_alert_key(
event_type=event_type,
payload=payload,
)
symbol = str(payload.get("symbol") or state.symbol or "")
side = str(payload.get("side") or getattr(state, "position_side", "") or "")
old_side = str(payload.get("old_side") or "")
new_side = str(payload.get("new_side") or side or "")
if alert_key == cls._last_execution_alert_key:
return
cls._last_execution_alert_key = alert_key
text = cls._build_execution_alert_text(
state=state,
event_type=event_type,
payload=payload,
)
try:
await cls._bot.send_message(
chat_id=cls._chat_id,
text=text,
)
JournalService().log_ui_info(
event_type="auto_execution_alert_sent",
message="Отправлено Telegram-уведомление по paper execution.",
screen="auto",
action="execution_alert",
RuntimeEventPublisher.publish(
RuntimeEvent(
event_type=runtime_event_type,
source="auto_trade_runner",
title=cls._execution_event_title(runtime_event_type),
payload={
"source_event_type": event_type,
"symbol": symbol,
"side": side,
"old_side": old_side,
"new_side": new_side,
"leverage": payload.get("leverage") if payload.get("leverage") is not None else state.leverage,
**payload,
},
priority="normal",
dedupe_key=cls._execution_dedupe_key(
runtime_event_type=runtime_event_type,
payload=payload,
),
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
except Exception:
pass
)
@classmethod
def _execution_alert_key(
def _runtime_execution_event_type(cls, event_type: str) -> RuntimeEventType | None:
mapping = {
"paper_position_opened": RuntimeEventType.POSITION_OPENED,
"paper_position_closed": RuntimeEventType.POSITION_CLOSED,
"paper_position_flipped": RuntimeEventType.POSITION_FLIPPED,
}
return mapping.get(event_type)
@classmethod
def _execution_event_title(cls, event_type: RuntimeEventType) -> str:
mapping = {
RuntimeEventType.POSITION_OPENED: "Paper position opened",
RuntimeEventType.POSITION_CLOSED: "Paper position closed",
RuntimeEventType.POSITION_FLIPPED: "Paper position flipped",
}
return mapping.get(event_type, "Paper execution event")
@classmethod
def _execution_dedupe_key(
cls,
*,
event_type: str,
runtime_event_type: RuntimeEventType,
payload: dict,
) -> str:
return (
f"{event_type}:"
f"{runtime_event_type.value}:"
f"{payload.get('symbol')}:"
f"{payload.get('side')}:"
f"{payload.get('old_side')}:"
@@ -385,119 +344,11 @@ class AutoTradeRunner:
f"{payload.get('size')}:"
f"{payload.get('old_size')}:"
f"{payload.get('new_size')}:"
f"{payload.get('pnl')}"
f"{payload.get('pnl')}:"
f"{payload.get('risk_reason')}:"
f"{payload.get('is_forced')}:"
f"{payload.get('is_forced')}"
)
@classmethod
def _build_execution_alert_text(
cls,
*,
state,
event_type: str,
payload: dict,
) -> str:
symbol = str(payload.get("symbol") or state.symbol or "")
side = str(payload.get("side") or state.position_side or "")
leverage = payload.get("leverage") if payload.get("leverage") is not None else state.leverage
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
if event_type == "paper_position_opened":
entry_price = cls._format_price(payload.get("entry_price"))
size = cls._format_size(payload.get("size"))
side_icon = "🟢" if side == "LONG" else "🔴"
return (
f"<b>📄 Paper position opened {side_icon} {side}</b>\n\n"
f"{symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Size: {size}"
)
if event_type == "paper_position_closed":
entry_price = cls._format_price(payload.get("entry_price"))
exit_price = cls._format_price(payload.get("exit_price"))
size = cls._format_size(payload.get("size"))
pnl = cls._format_pnl(payload.get("pnl"))
risk_reason = payload.get("risk_reason")
risk_line = f"\nRisk: {risk_reason}" if risk_reason else ""
return (
f"<b>✅ Paper position closed</b>\n\n"
f"{side} · {symbol_text} · {leverage_text}\n"
f"Entry: $ {entry_price}\n"
f"Exit: $ {exit_price}\n"
f"Size: {size}\n\n"
f"PnL: {pnl}"
f"{risk_line}"
)
if event_type == "paper_position_flipped":
old_side = str(payload.get("old_side") or "")
new_side = str(payload.get("new_side") or side or "")
entry_price = cls._format_price(payload.get("entry_price"))
exit_price = cls._format_price(payload.get("exit_price"))
new_entry_price = cls._format_price(payload.get("new_entry_price"))
old_size = cls._format_size(payload.get("old_size"))
new_size = cls._format_size(payload.get("new_size"))
pnl = cls._format_pnl(payload.get("pnl"))
old_icon = "🟢" if old_side == "LONG" else "🔴"
new_icon = "🟢" if new_side == "LONG" else "🔴"
return (
f"<b>🔁 Paper position flipped {old_icon} {old_side}"
f"{new_icon} {new_side}</b>\n\n"
f"{symbol_text} · {leverage_text}\n\n"
f"Old entry: $ {entry_price}\n"
f"Exit: $ {exit_price}\n"
f"Old size: {old_size}\n\n"
f"New entry: $ {new_entry_price}\n"
f"New size: {new_size}\n\n"
f"PnL: {pnl}"
)
return "<b>📄 Paper execution event</b>"
@classmethod
def _log_suppressed_strong_alert(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
cooldown_left: float,
position_context: str,
) -> None:
try:
JournalService().log_ui_info(
event_type="auto_strong_signal_alert_suppressed",
message=f"Повторное уведомление о сильном сигнале {signal} подавлено.",
screen="auto",
action="strong_signal_alert",
payload={
"symbol": symbol,
"strategy": strategy,
"signal": signal,
"repeat_count": repeat_count,
"confidence": confidence,
"leverage": leverage,
"reason": reason,
"cooldown_left": cooldown_left,
"position_context": position_context,
},
)
except Exception:
pass
@classmethod
def _alert_priority(
cls,
@@ -513,87 +364,6 @@ class AutoTradeRunner:
return "LOW"
@classmethod
def _priority_label(cls, priority: str) -> str:
mapping = {
"HIGH": "🚨 HIGH",
"MEDIUM": "⚡ MEDIUM",
"LOW": " LOW",
}
return mapping.get(priority, priority)
@classmethod
def _format_alert_symbol(cls, symbol: str) -> str:
if not symbol or symbol == "":
return ""
base_symbol = symbol.split("_", 1)[0]
parts = base_symbol.split("/", 1)
if len(parts) == 2:
return f"{parts[0]} / {parts[1]}"
return base_symbol
@classmethod
def _format_alert_leverage(cls, leverage: object) -> str:
if isinstance(leverage, (int, float)):
return f"x{leverage:g}"
return ""
@classmethod
def _signal_icon(cls, signal: str) -> str:
mapping = {
"BUY": "🟢",
"SELL": "🔴",
}
return mapping.get(signal, "")
@classmethod
def _build_strong_signal_alert_text(
cls,
*,
signal: str,
symbol: str,
strategy: str,
repeat_count: int,
confidence: float,
leverage: object,
reason: str,
priority: str,
position_context: str,
) -> str:
icon = cls._signal_icon(signal)
symbol_text = cls._format_alert_symbol(symbol)
leverage_text = cls._format_alert_leverage(leverage)
priority_text = cls._priority_label(priority)
return (
f"<b>{priority_text} · {icon} {signal}</b>\n\n"
f"{symbol_text} · {strategy} · {leverage_text}\n"
f"Position: {position_context}\n\n"
f"🧠 Confidence: {confidence:.2f}\n"
f"🔁 Repeats: {repeat_count}\n\n"
f"💡 Причина:\n"
f"{reason}"
)
@classmethod
def _format_price(cls, value: object) -> str:
return format_usd_price(value)
@classmethod
def _format_pnl(cls, value: object) -> str:
return format_usd_pnl(value)
@classmethod
def _format_size(cls, value: object) -> str:
try:
return f"{float(value):.8f}".rstrip("0").rstrip(".")
except (TypeError, ValueError):
return ""
@classmethod
def _log_refresh_skip(cls, reason: str, payload: dict | None = None) -> None:
try: