Files
dzentra_bot/app/src/telegram/live/runner.py

148 lines
4.2 KiB
Python

# app/src/telegram/live/runner.py
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Callable
from aiogram import Bot
@dataclass(slots=True)
class LiveScreen:
# имя live-экрана: market / portfolio / journal
screen: str
# Telegram bot instance
bot: Bot
# чат, где находится live-экран
chat_id: int
# сообщение, которое нужно автообновлять
message_id: int
# функция сборки текста экрана
render_text: Callable[[], str]
# функция сборки клавиатуры экрана
render_markup: Callable[[], object]
# интервал обновления в секундах
interval_seconds: int = 5
class LiveScreenRunner:
_screens: dict[str, list[LiveScreen]] = {}
_tasks: dict[str, asyncio.Task] = {}
# зарегистрировать live-экран
@classmethod
def register_screen(cls, live_screen: LiveScreen) -> None:
screens = cls._screens.setdefault(live_screen.screen, [])
screens[:] = [
item
for item in screens
if not (
item.chat_id == live_screen.chat_id
and item.message_id == live_screen.message_id
)
]
screens.append(live_screen)
# удалить все live-экраны указанного типа из Telegram
@classmethod
async def delete_screen(
cls,
*,
screen: str,
bot: Bot,
chat_id: int,
) -> None:
screens = cls._screens.get(screen, [])
remaining: list[LiveScreen] = []
for live_screen in screens:
if live_screen.chat_id != chat_id:
remaining.append(live_screen)
continue
try:
await bot.delete_message(
chat_id=live_screen.chat_id,
message_id=live_screen.message_id,
)
except Exception:
pass
if remaining:
cls._screens[screen] = remaining
else:
cls._screens.pop(screen, None)
# запустить автообновление группы экранов
@classmethod
def start(cls, screen: str) -> None:
task = cls._tasks.get(screen)
if task is not None and not task.done():
return
cls._tasks[screen] = asyncio.create_task(cls._worker(screen))
# остановить автообновление группы экранов
@classmethod
def stop(cls, screen: str) -> None:
task = cls._tasks.get(screen)
if task is None:
return
task.cancel()
cls._tasks.pop(screen, None)
# фоновый цикл обновления группы экранов
@classmethod
async def _worker(cls, screen: str) -> None:
while True:
await cls._refresh_screen(screen)
await asyncio.sleep(cls._screen_interval(screen))
# получить интервал обновления группы экранов
@classmethod
def _screen_interval(cls, screen: str) -> int:
screens = cls._screens.get(screen, [])
if not screens:
return 5
return screens[0].interval_seconds
# обновить все Telegram-сообщения live-экрана
@classmethod
async def _refresh_screen(cls, screen: str) -> None:
screens = cls._screens.get(screen, [])
if not screens:
return
alive_screens: list[LiveScreen] = []
for live_screen in screens:
try:
await live_screen.bot.edit_message_text(
chat_id=live_screen.chat_id,
message_id=live_screen.message_id,
text=live_screen.render_text(),
reply_markup=live_screen.render_markup(),
)
alive_screens.append(live_screen)
except Exception:
pass
if alive_screens:
cls._screens[screen] = alive_screens
else:
cls._screens.pop(screen, None)