# app/src/telegram/live/runner.py from __future__ import annotations import asyncio from dataclasses import dataclass from typing import Callable from aiogram import Bot from aiogram.exceptions import TelegramBadRequest @dataclass(slots=True) class LiveScreen: # имя live-экрана: market / portfolio screen: str # Telegram bot instance bot: Bot # чат, где находится live-экран chat_id: int # сообщение, которое нужно автообновлять message_id: int # функция сборки текста экрана render_text: Callable[[], str] # функция сборки клавиатуры экрана render_markup: Callable[[], object] # интервал обновления в секундах interval_seconds: int = 5 @dataclass(slots=True) class StaticScreen: # имя статичного экрана: journal / monitoring / etc screen: str # Telegram bot instance bot: Bot # чат, где находится экран chat_id: int # сообщение экрана message_id: int class ScreenRegistry: _screens: dict[str, list[StaticScreen]] = {} # зарегистрировать статичный экран @classmethod def register_screen(cls, static_screen: StaticScreen) -> None: screens = cls._screens.setdefault(static_screen.screen, []) screens[:] = [ item for item in screens if not ( item.chat_id == static_screen.chat_id and item.message_id == static_screen.message_id ) ] screens.append(static_screen) # удалить конкретное сообщение из всех статичных экранов без удаления из Telegram @classmethod def unregister_message( cls, *, chat_id: int, message_id: int, ) -> None: empty_screens: list[str] = [] for screen, screens in cls._screens.items(): screens[:] = [ item for item in screens if not ( item.chat_id == chat_id and item.message_id == message_id ) ] if not screens: empty_screens.append(screen) for screen in empty_screens: cls._screens.pop(screen, None) # удалить старые статичные экраны указанного типа @classmethod async def delete_screen( cls, *, screen: str, bot: Bot, chat_id: int, ) -> None: screens = cls._screens.get(screen, []) remaining: list[StaticScreen] = [] for static_screen in screens: if static_screen.chat_id != chat_id: remaining.append(static_screen) continue try: await bot.delete_message( chat_id=static_screen.chat_id, message_id=static_screen.message_id, ) except Exception: pass if remaining: cls._screens[screen] = remaining else: cls._screens.pop(screen, None) class LiveScreenRunner: _screens: dict[str, list[LiveScreen]] = {} _tasks: dict[str, asyncio.Task] = {} # зарегистрировать live-экран @classmethod def register_screen(cls, live_screen: LiveScreen) -> None: screens = cls._screens.setdefault(live_screen.screen, []) screens[:] = [ item for item in screens if not ( item.chat_id == live_screen.chat_id and item.message_id == live_screen.message_id ) ] screens.append(live_screen) # удалить конкретное сообщение из всех live-экранов без удаления из Telegram @classmethod def unregister_message( cls, *, chat_id: int, message_id: int, ) -> None: empty_screens: list[str] = [] for screen, screens in cls._screens.items(): screens[:] = [ item for item in screens if not ( item.chat_id == chat_id and item.message_id == message_id ) ] if not screens: empty_screens.append(screen) for screen in empty_screens: cls._screens.pop(screen, None) # удалить все live-экраны указанного типа из Telegram @classmethod async def delete_screen( cls, *, screen: str, bot: Bot, chat_id: int, ) -> None: screens = cls._screens.get(screen, []) remaining: list[LiveScreen] = [] for live_screen in screens: if live_screen.chat_id != chat_id: remaining.append(live_screen) continue try: await bot.delete_message( chat_id=live_screen.chat_id, message_id=live_screen.message_id, ) except Exception: pass if remaining: cls._screens[screen] = remaining else: cls._screens.pop(screen, None) # запустить автообновление группы экранов @classmethod def start(cls, screen: str) -> None: task = cls._tasks.get(screen) if task is not None and not task.done(): return cls._tasks[screen] = asyncio.create_task(cls._worker(screen)) # остановить автообновление группы экранов @classmethod def stop(cls, screen: str) -> None: task = cls._tasks.get(screen) if task is None: return task.cancel() cls._tasks.pop(screen, None) # фоновый цикл обновления группы экранов @classmethod async def _worker(cls, screen: str) -> None: while True: await cls._refresh_screen(screen) await asyncio.sleep(cls._screen_interval(screen)) # получить интервал обновления группы экранов @classmethod def _screen_interval(cls, screen: str) -> int: screens = cls._screens.get(screen, []) if not screens: return 5 return screens[0].interval_seconds # обновить все Telegram-сообщения live-экрана @classmethod async def _refresh_screen(cls, screen: str) -> None: screens = cls._screens.get(screen, []) if not screens: return alive_screens: list[LiveScreen] = [] for live_screen in screens: try: await live_screen.bot.edit_message_text( chat_id=live_screen.chat_id, message_id=live_screen.message_id, text=live_screen.render_text(), reply_markup=live_screen.render_markup(), ) alive_screens.append(live_screen) except TelegramBadRequest as exc: if "message is not modified" in str(exc).lower(): alive_screens.append(live_screen) continue except Exception: pass if alive_screens: cls._screens[screen] = alive_screens else: cls._screens.pop(screen, None)