Stage 04.3 - repositories, balance snapshots and environment mode fix

This commit is contained in:
2026-04-16 19:54:04 +03:00
parent 2c49bb70c0
commit 76fc122955
9 changed files with 262 additions and 8 deletions

View File

@@ -26,6 +26,8 @@ class Settings:
db_name: str
db_user: str
db_password: str
def is_demo_mode(self) -> bool:
return "demo" in self.exchange_base_url.lower()
def _parse_bool(raw_value: str, default: bool = False) -> bool:
value = (raw_value or "").strip().lower()
if not value:

View File

@@ -107,8 +107,9 @@ def _build_journal_status() -> ComponentStatus:
return ComponentStatus(name="Журнал", state="🔴", details=message)
def _resolve_mode_label(exchange_testnet: bool) -> str:
return "ДЕМО аккаунт" if exchange_testnet else "РЕАЛЬНЫЙ аккаунт"
def _resolve_mode_label(settings) -> str:
is_demo = "demo" in settings.exchange_base_url.lower()
return "ДЕМО аккаунт" if is_demo else "РЕАЛЬНЫЙ аккаунт"
def get_system_snapshot() -> SystemSnapshot:
@@ -134,7 +135,7 @@ def get_system_snapshot() -> SystemSnapshot:
app_version=APP_VERSION,
db_label=db_label,
timezone_name=settings.tz,
mode_label=_resolve_mode_label(settings.exchange_testnet),
mode_label=_resolve_mode_label(settings),
default_symbol=settings.default_symbol,
components=components,
)

View File

@@ -312,9 +312,15 @@ class ExchangeService:
else:
updated_at = "n/a"
source = (
"dzengi-demo-api"
if "demo" in self.settings.exchange_base_url.lower()
else "dzengi-api"
)
return TickerPrice(
symbol=symbol,
price=float(price_raw),
source="dzengi-demo-api",
source=source,
updated_at=updated_at,
)

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import json
from typing import Any
from src.storage.session import get_connection
class BalanceSnapshotRepository:
def add_snapshot(
self,
*,
source: str,
payload: dict[str, Any],
) -> None:
payload_json = json.dumps(payload, ensure_ascii=False)
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
'''
INSERT INTO balance_snapshots (source, payload_json)
VALUES (%s, %s::jsonb)
''',
(source, payload_json),
)
def count_snapshots(self) -> int:
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM balance_snapshots")
row = cursor.fetchone()
return int(row[0]) if row else 0
def list_recent_snapshots(self, limit: int = 5) -> list[dict[str, str]]:
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
'''
SELECT created_at, source, payload_json::text
FROM balance_snapshots
ORDER BY created_at DESC, id DESC
LIMIT %s
''',
(limit,),
)
rows = cursor.fetchall()
items: list[dict[str, str]] = []
for row in rows:
items.append(
{
"created_at": str(row[0]),
"source": str(row[1]),
"payload_json": str(row[2]),
}
)
return items

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
import json
from typing import Any
from src.storage.session import get_connection
class OrderDraftRepository:
def add_draft(
self,
*,
symbol: str,
side: str,
order_type: str,
quantity: str,
status: str = "draft",
payload: dict[str, Any] | None = None,
) -> None:
payload_json = json.dumps(payload, ensure_ascii=False) if payload is not None else None
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
'''
INSERT INTO order_drafts (symbol, side, order_type, quantity, status, payload_json)
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
''',
(symbol, side, order_type, quantity, status, payload_json),
)
def list_recent_drafts(self, limit: int = 10) -> list[dict[str, str]]:
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(
'''
SELECT created_at, symbol, side, order_type, quantity::text, status
FROM order_drafts
ORDER BY created_at DESC, id DESC
LIMIT %s
''',
(limit,),
)
rows = cursor.fetchall()
items: list[dict[str, str]] = []
for row in rows:
items.append(
{
"created_at": str(row[0]),
"symbol": str(row[1]),
"side": str(row[2]),
"order_type": str(row[3]),
"quantity": str(row[4]),
"status": str(row[5]),
}
)
return items
def count_drafts(self) -> int:
with get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM order_drafts")
row = cursor.fetchone()
return int(row[0]) if row else 0

View File

@@ -5,7 +5,7 @@ from aiogram.types import Message
from src.integrations.exchange.exceptions import ExchangeError
from src.integrations.exchange.models import BalanceSummary
from src.integrations.exchange.service import ExchangeService
from src.trading.accounts.service import AccountsService
from src.trading.journal.service import JournalService
@@ -126,7 +126,7 @@ def _safe_log_error(
@router.message(F.text == "💼 Портфель")
async def open_portfolio(message: Message) -> None:
service = ExchangeService()
service = AccountsService()
journal = JournalService()
user_id = message.from_user.id if message.from_user else None
@@ -143,7 +143,7 @@ async def open_portfolio(message: Message) -> None:
)
try:
balances = service.get_balance_summary()
balances = service.get_live_balance_summary()
except ExchangeError as exc:
_safe_log_error(
journal,

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
from src.integrations.exchange.models import BalanceSummary
from src.integrations.exchange.service import ExchangeService
from src.storage.repositories.balance_snapshots import BalanceSnapshotRepository
from src.trading.journal.service import JournalService
class AccountsService:
def __init__(self) -> None:
self.exchange_service = ExchangeService()
self.snapshot_repository = BalanceSnapshotRepository()
self.journal = JournalService()
def get_live_balance_summary(self) -> list[BalanceSummary]:
balances = self.exchange_service.get_balance_summary()
self._save_snapshot(balances)
return balances
def _save_snapshot(self, balances: list[BalanceSummary]) -> None:
payload = {
"assets": [
{
"currency": item.currency,
"available": item.available,
"locked": item.locked,
"source": item.source,
}
for item in balances
]
}
try:
self.snapshot_repository.add_snapshot(
source="portfolio_screen",
payload=payload,
)
except Exception as exc:
try:
self.journal.log_warning(
"balance_snapshot_error",
f"Не удалось сохранить snapshot баланса: {exc}",
{"assets_count": len(balances)},
)
except Exception:
pass
return
try:
self.journal.log_info(
"balance_snapshot_saved",
f"Snapshot баланса сохранён. Активов: {len(balances)}",
{"assets_count": len(balances)},
)
except Exception:
pass

View File

@@ -0,0 +1,14 @@
# 0011 — Repositories before Orders
## Решение
Сначала добавить repository слой для snapshots и drafts, и только потом переходить к order flow.
## Причины
- order flow без repositories быстро приводит к смешению SQL и бизнес-логики
- snapshots баланса — безопасный и полезный use case
- это подготавливает архитектуру к следующим этапам
## Последствия
- storage становится частью бизнес-логики
- Telegram handlers становятся тоньше
- следующий этап с order drafts будет проще и чище

View File

@@ -0,0 +1,49 @@
# Stage 04.3 — Repositories & Balance Snapshots
## Цель
Сделать storage частью бизнес-логики:
- вынести SQL в repositories
- добавить слой AccountsService
- начать сохранять состояние системы (snapshots)
---
## Что реализовано
### Repository слой
Добавлены:
#### 1. BalanceSnapshotRepository
Работа с таблицей `balance_snapshots`:
- `add_snapshot` — сохранение снимка баланса
- `count_snapshots`
- `list_recent_snapshots`
---
#### 2. OrderDraftRepository
Подготовка к order flow:
- `add_draft`
- `list_recent_drafts`
- `count_drafts`
(используется на следующих этапах)
---
### Service слой
Добавлен:
#### AccountsService
Функции:
- получение live баланса через ExchangeService
- сохранение snapshot в PostgreSQL
- логирование через Journal
```text
exchange → accounts service → repository → database