07.4.3.18.1 — Runtime Event Skeleton Architecture
This commit is contained in:
54
app/src/notifications/templates/execution.py
Normal file
54
app/src/notifications/templates/execution.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# app/src/notifications/templates/execution.py
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from src.notifications.models import NotificationMessage
|
||||
from src.runtime_events.event_types import RuntimeEventType
|
||||
from src.runtime_events.models import RuntimeEvent
|
||||
|
||||
|
||||
def build_execution_notification(event: RuntimeEvent) -> NotificationMessage | None:
|
||||
if event.event_type not in {
|
||||
RuntimeEventType.POSITION_OPENED,
|
||||
RuntimeEventType.POSITION_CLOSED,
|
||||
RuntimeEventType.POSITION_FLIPPED,
|
||||
}:
|
||||
return None
|
||||
|
||||
payload = event.payload
|
||||
|
||||
symbol = str(payload.get("symbol") or "—")
|
||||
side = str(payload.get("side") or payload.get("new_side") or "—")
|
||||
entry_price = payload.get("entry_price") or payload.get("new_entry_price")
|
||||
exit_price = payload.get("exit_price")
|
||||
pnl = payload.get("pnl")
|
||||
|
||||
if event.event_type == RuntimeEventType.POSITION_OPENED:
|
||||
title = "📄 Position opened"
|
||||
elif event.event_type == RuntimeEventType.POSITION_CLOSED:
|
||||
title = "✅ Position closed"
|
||||
else:
|
||||
title = "🔁 Position flipped"
|
||||
|
||||
lines = [
|
||||
f"<b>{title}</b>",
|
||||
"",
|
||||
f"{symbol} · {side}",
|
||||
]
|
||||
|
||||
if entry_price is not None:
|
||||
lines.append(f"Entry: $ {entry_price}")
|
||||
|
||||
if exit_price is not None:
|
||||
lines.append(f"Exit: $ {exit_price}")
|
||||
|
||||
if pnl is not None:
|
||||
lines.append("")
|
||||
lines.append(f"PnL: {pnl}")
|
||||
|
||||
return NotificationMessage(
|
||||
title=event.title,
|
||||
text="\n".join(lines),
|
||||
priority=event.priority,
|
||||
dedupe_key=event.dedupe_key,
|
||||
)
|
||||
Reference in New Issue
Block a user