Files
dzentra_bot/app/src/trading/auto/service.py

407 lines
14 KiB
Python

# app/src/trading/auto/service.py
from __future__ import annotations
import asyncio
from datetime import datetime
from src.core.config import load_settings
from src.trading.auto.state import AutoTradeState
from src.trading.journal.service import JournalService
from src.trading.strategies.base import BaseStrategy, StrategyContext
from src.trading.strategies.registry import StrategyRegistry
class AutoTradeService:
_state = AutoTradeState()
_loop_task: asyncio.Task | None = None
_loop_interval_seconds = 5
# минимальное количество повторов BUY / SELL для подтверждения сигнала
_confirm_repeats = 3
# минимальная уверенность для готовности к будущему execution
_ready_confidence = 0.7
_last_signal_key: str | None = None
_last_signal_value: str | None = None
_last_signal_reason: str = ""
_last_signal_confidence: float = 0.0
_last_signal_payload: dict | None = None
_same_signal_count = 0
# получить текущее состояние автоторговли
def get_state(self) -> AutoTradeState:
if not self._state.symbol:
self._state.symbol = load_settings().default_symbol
return self._state
# проверить, запущен ли background loop
def is_loop_running(self) -> bool:
return self._loop_task is not None and not self._loop_task.done()
# запустить background loop, если он ещё не запущен
def start_loop(self) -> None:
if self.is_loop_running():
return
self._loop_task = asyncio.create_task(self._loop_worker())
# остановить background loop
def stop_loop(self) -> None:
if self._loop_task is None:
return
self._loop_task.cancel()
self._loop_task = None
# рабочий цикл автоторговли
async def _loop_worker(self) -> None:
while True:
state = self.get_state()
if state.status == "OFF":
break
self.run_cycle()
await asyncio.sleep(self._loop_interval_seconds)
# запустить активную торговлю
def start(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
if state.status == "RUNNING":
self.start_loop()
return state, "Автоторговля уже активна."
if state.status == "OBSERVING":
state.status = "RUNNING"
self.start_loop()
return state, "Автоторговля активирована."
state.status = "RUNNING"
self.start_loop()
return state, "Автоторговля запущена."
# включить режим наблюдения
def observe(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if previous_status == "OBSERVING":
self.start_loop()
return state, "Режим наблюдения уже включён."
state.status = "OBSERVING"
self.start_loop()
if previous_status == "OFF":
return state, "Включён режим наблюдения."
return state, "Автоторговля переведена в режим наблюдения."
# полностью выключить автоторговлю
def stop(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
if state.status == "OFF":
self.stop_loop()
return state, "Автоторговля уже выключена."
state.status = "OFF"
self.stop_loop()
return state, "Автоторговля выключена."
# установить инструмент
def set_symbol(self, symbol: str) -> AutoTradeState:
state = self.get_state()
state.symbol = symbol
self._reset_signal_tracking()
return state
# установить стратегию
def set_strategy(self, strategy: str) -> AutoTradeState:
state = self.get_state()
state.strategy = strategy.strip().upper()
self._reset_signal_tracking()
return state
# установить риск
def set_risk_percent(self, risk_percent: float) -> AutoTradeState:
state = self.get_state()
state.risk_percent = risk_percent
return state
# установить плечо
def set_leverage(self, leverage: float) -> AutoTradeState:
state = self.get_state()
state.leverage = leverage
return state
# сбросить внутренний трекинг сигналов
def _reset_signal_tracking(self) -> None:
self._last_signal_key = None
self._last_signal_value = None
self._last_signal_reason = ""
self._last_signal_confidence = 0.0
self._last_signal_payload = None
self._same_signal_count = 0
state = self.get_state()
state.last_signal_repeat_count = 0
state.last_signal_confidence = 0.0
state.last_signal_reason = None
state.decision_status = "WAITING"
state.decision_reason = None
state.is_signal_confirmed = False
state.is_signal_ready = False
# собрать контекст для стратегии
def _build_strategy_context(self) -> StrategyContext:
state = self.get_state()
return StrategyContext(
symbol=state.symbol,
status=state.status,
risk_percent=state.risk_percent,
)
# получить стратегию для текущего цикла
def _get_strategy(self) -> BaseStrategy:
state = self.get_state()
return StrategyRegistry.get(state.strategy)
# обновить статус решения по текущему сигналу
def _update_decision_state(
self,
*,
state: AutoTradeState,
signal: str,
confidence: float,
) -> None:
state.is_signal_confirmed = False
state.is_signal_ready = False
if signal == "HOLD":
state.decision_status = "WAITING"
state.decision_reason = "Нет торгового направления."
return
if self._same_signal_count < self._confirm_repeats:
state.decision_status = "CONFIRMING"
state.decision_reason = (
f"Сигнал {signal} подтверждается: "
f"{self._same_signal_count}/{self._confirm_repeats} повторов."
)
return
state.is_signal_confirmed = True
if confidence < self._ready_confidence:
state.decision_status = "BLOCKED"
state.decision_reason = (
f"Сигнал {signal} подтверждён, но уверенность низкая: "
f"{confidence:.2f} < {self._ready_confidence:.2f}."
)
return
state.is_signal_ready = True
state.decision_status = "READY"
state.decision_reason = (
f"Сигнал {signal} подтверждён и готов к будущему execution."
)
# записать новый сигнал и итог предыдущей серии при смене сигнала
def _log_signal_if_changed(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
signal_key = f"{state.status}:{state.symbol}:{strategy_name}:{signal}:{reason}"
previous_signal = self._last_signal_value
previous_count = self._same_signal_count
is_same_signal = signal_key == self._last_signal_key
if is_same_signal:
self._same_signal_count += 1
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
return
if previous_signal is not None:
if previous_count > 1:
self._log_signal_summary(
strategy_name=strategy_name,
state=state,
previous_signal=previous_signal,
previous_count=previous_count,
next_signal=signal,
reason=self._last_signal_reason,
confidence=self._last_signal_confidence,
payload=self._last_signal_payload,
)
else:
self._log_signal_event(
strategy_name=strategy_name,
state=state,
signal=previous_signal,
reason=f"{previous_signal} завершился без серии.",
confidence=self._last_signal_confidence,
payload={
"previous_signal": previous_signal,
"next_signal": signal,
},
)
self._last_signal_key = signal_key
self._last_signal_value = signal
self._last_signal_reason = reason
self._last_signal_confidence = confidence
self._last_signal_payload = payload
self._same_signal_count = 1
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
# обновить поля state для экрана автоторговли
def _update_signal_state_fields(
self,
*,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
) -> None:
state.last_signal = signal
state.last_signal_repeat_count = self._same_signal_count
state.last_signal_confidence = confidence
state.last_signal_reason = reason
self._update_decision_state(
state=state,
signal=signal,
confidence=confidence,
)
# записать одиночный сигнал в журнал
def _log_signal_event(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
emoji_map = {
"BUY": "🟢",
"SELL": "🔴",
"HOLD": "🟡",
}
emoji = emoji_map.get(signal, "")
try:
JournalService().log_ui_info(
event_type="auto_signal_generated",
message=f"{emoji} Сигнал автоторговли {signal}: {reason}",
screen="auto",
action="run_cycle",
payload={
"strategy": strategy_name,
"status": state.status,
"symbol": state.symbol,
"signal": signal,
"confidence": confidence,
"reason": reason,
"repeat_count": 1,
"is_strong_signal": confidence > self._ready_confidence,
"is_aggregated": False,
"payload": payload or {},
},
)
except Exception:
pass
# записать итог серии одинаковых сигналов при смене сигнала
def _log_signal_summary(
self,
*,
strategy_name: str,
state: AutoTradeState,
previous_signal: str,
previous_count: int,
next_signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
emoji_map = {
"BUY": "🟢",
"SELL": "🔴",
"HOLD": "🟡",
}
emoji = emoji_map.get(previous_signal, "")
try:
JournalService().log_ui_info(
event_type="auto_signal_summary",
message=(
f"{emoji} {previous_count} {previous_signal} подряд "
f"до смены на {next_signal}"
),
screen="auto",
action="signal_summary",
payload={
"strategy": strategy_name,
"status": state.status,
"symbol": state.symbol,
"signal": previous_signal,
"next_signal": next_signal,
"repeat_count": previous_count,
"confidence": confidence,
"reason": reason,
"is_strong_signal": confidence > self._ready_confidence,
"is_aggregated": True,
"payload": payload or {},
},
)
except Exception:
pass
# выполнить один цикл анализа рынка
def run_cycle(self) -> AutoTradeState:
state = self.get_state()
if state.status == "OFF":
return state
strategy = self._get_strategy()
context = self._build_strategy_context()
result = strategy.analyze(context)
state.last_check_at = datetime.now().strftime("%H:%M:%S")
self._log_signal_if_changed(
strategy_name=strategy.name,
state=state,
signal=result.signal.value,
reason=result.reason,
confidence=result.confidence,
payload=result.payload,
)
return state