Files
dzentra_bot/app/src/trading/auto/service.py

792 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/auto/service.py
from __future__ import annotations
import asyncio
import time
from datetime import datetime
from src.core.config import load_settings
from src.core.event_bus import EventBus
from src.trading.auto.state import AutoTradeState
from src.trading.execution.engine import ExecutionEngine
from src.trading.journal.service import JournalService
from src.trading.strategies.base import BaseStrategy, StrategyContext
from src.trading.strategies.registry import StrategyRegistry
class AutoTradeService:
_state = AutoTradeState()
_loop_task: asyncio.Task | None = None
_loop_interval_seconds = 5
# минимальное количество повторов BUY / SELL для подтверждения сигнала
_confirm_repeats = 2
# минимальная уверенность для готовности к будущему execution
_ready_confidence = 0.3
_last_signal_key: str | None = None
_last_signal_value: str | None = None
_last_signal_reason: str = ""
_last_signal_confidence: float = 0.0
_last_signal_payload: dict | None = None
_last_signal_started_at: float | None = None
_last_logged_market_state: str | None = None
_last_logged_market_trend: str | None = None
_last_logged_market_volatility: str | None = None
_same_signal_count = 0
# debug: принудительно выставить сигнал и decision
def debug_force_signal(
self,
*,
signal: str,
confidence: float = 0.9,
repeat_count: int = 2,
reason: str = "DEBUG SIGNAL",
) -> AutoTradeState:
state = self.get_state()
normalized_signal = signal.strip().upper()
if normalized_signal not in {"BUY", "SELL", "HOLD"}:
normalized_signal = "HOLD"
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != normalized_signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = normalized_signal
state.last_signal_repeat_count = repeat_count
state.last_signal_confidence = confidence
state.last_signal_reason = reason
if normalized_signal == "HOLD":
state.decision_status = "WAITING"
state.decision_reason = "Debug HOLD."
state.is_signal_confirmed = False
state.is_signal_ready = False
else:
state.decision_status = "READY"
state.decision_reason = "Debug READY signal."
state.is_signal_confirmed = True
state.is_signal_ready = True
signal_intent = self._signal_intent(
state=state,
signal=state.last_signal,
)
EventBus.emit(
"auto_decision_changed",
{
"previous_signal": previous_signal,
"previous_decision_status": previous_decision_status,
"decision_status": state.decision_status,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
"symbol": state.symbol,
"strategy": state.strategy,
"leverage": state.leverage,
"reason": state.last_signal_reason,
"debug": True,
},
)
return state
# установить капитал, выделенный под автоторговлю
def set_allocated_balance_usd(self, value: float) -> AutoTradeState:
state = self.get_state()
if value <= 0:
value = 1000.0
state.allocated_balance_usd = value
state.execution_block_reason = None
state.execution_size_adjustment_reason = None
return state
# получить текущее состояние автоторговли
def get_state(self) -> AutoTradeState:
if not self._state.symbol:
self._state.symbol = load_settings().default_symbol
return self._state
# проверить, запущен ли background loop
def is_loop_running(self) -> bool:
return self._loop_task is not None and not self._loop_task.done()
# запустить background loop, если он ещё не запущен
def start_loop(self) -> None:
if self.is_loop_running():
return
self._loop_task = asyncio.create_task(self._loop_worker())
# остановить background loop
def stop_loop(self) -> None:
if self._loop_task is None:
return
self._loop_task.cancel()
self._loop_task = None
# рабочий цикл автоторговли
async def _loop_worker(self) -> None:
while True:
state = self.get_state()
if state.status == "OFF":
break
self.run_cycle()
await asyncio.sleep(self._loop_interval_seconds)
# запустить активную торговлю
def start(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "RUNNING":
return state, "Автоторговля уже активна."
if state.status == "OBSERVING":
state.status = "RUNNING"
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля активирована."
state.status = "RUNNING"
self._reset_signal_tracking()
state.last_signal = "HOLD"
state.signal_started_at = time.monotonic()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля запущена."
# включить режим наблюдения
def observe(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if previous_status == "OBSERVING":
return state, "Режим наблюдения уже включён."
state.status = "OBSERVING"
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
if previous_status == "OFF":
return state, "Включён режим наблюдения."
return state, "Автоторговля переведена в режим наблюдения."
# полностью выключить автоторговлю
def stop(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "OFF":
self.stop_loop()
return state, "Автоторговля уже выключена."
state.status = "OFF"
self.stop_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля выключена."
# установить инструмент
def set_symbol(self, symbol: str) -> AutoTradeState:
state = self.get_state()
state.symbol = symbol
self._reset_signal_tracking()
return state
# установить стратегию
def set_strategy(self, strategy: str) -> AutoTradeState:
state = self.get_state()
state.strategy = strategy.strip().upper()
self._reset_signal_tracking()
return state
# установить риск
def set_risk_percent(self, risk_percent: float) -> AutoTradeState:
state = self.get_state()
state.risk_percent = risk_percent
return state
# установить плечо
def set_leverage(self, leverage: float) -> AutoTradeState:
state = self.get_state()
state.leverage = leverage
return state
# установить stop loss в %
def set_stop_loss_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.stop_loss_percent = value
return state
# установить take profit в %
def set_take_profit_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.take_profit_percent = value
return state
# установить max loss в USD
def set_max_loss_usd(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.max_loss_usd = value
return state
# установить максимальное использование баланса под маржу
def set_max_reserved_balance_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.max_reserved_balance_percent = value
state.execution_block_reason = None
return state
# сбросить внутренний трекинг сигналов
def _reset_signal_tracking(self) -> None:
self._last_signal_key = None
self._last_signal_value = None
self._last_signal_reason = ""
self._last_signal_confidence = 0.0
self._last_signal_payload = None
self._last_signal_started_at = None
self._same_signal_count = 0
state = self.get_state()
state.last_signal_repeat_count = 0
state.last_signal_confidence = 0.0
state.last_signal_reason = None
state.decision_status = "WAITING"
state.decision_reason = None
state.is_signal_confirmed = False
state.is_signal_ready = False
state.execution_block_reason = None
state.signal_started_at = None
# собрать контекст для стратегии
def _build_strategy_context(self) -> StrategyContext:
state = self.get_state()
return StrategyContext(
symbol=state.symbol,
status=state.status,
risk_percent=state.risk_percent,
)
# получить стратегию для текущего цикла
def _get_strategy(self) -> BaseStrategy:
state = self.get_state()
return StrategyRegistry.get(state.strategy)
# определить смысл сигнала с учетом открытой позиции
def _signal_intent(self, *, state: AutoTradeState, signal: str | None) -> str:
normalized_signal = (signal or "HOLD").upper()
position_side = str(getattr(state, "position_side", "NONE") or "NONE").upper()
if normalized_signal == "HOLD":
return "HOLD_MARKET"
if normalized_signal not in {"BUY", "SELL"}:
return "NOISE"
if position_side == "NONE":
return "ENTRY_CANDIDATE"
if position_side == "LONG" and normalized_signal == "BUY":
return "REINFORCE_POSITION"
if position_side == "SHORT" and normalized_signal == "SELL":
return "REINFORCE_POSITION"
if position_side == "LONG" and normalized_signal == "SELL":
return "REVERSAL_CANDIDATE"
if position_side == "SHORT" and normalized_signal == "BUY":
return "REVERSAL_CANDIDATE"
return "NOISE"
# обновить статус решения по текущему сигналу
def _update_decision_state(
self,
*,
state: AutoTradeState,
signal: str,
confidence: float,
) -> None:
state.is_signal_confirmed = False
state.is_signal_ready = False
if signal == "HOLD":
state.decision_status = "WAITING"
state.decision_reason = "Нет торгового направления."
return
if self._same_signal_count < self._confirm_repeats:
state.decision_status = "CONFIRMING"
state.decision_reason = (
f"Сигнал {signal} подтверждается: "
f"{self._same_signal_count}/{self._confirm_repeats} повторов."
)
return
state.is_signal_confirmed = True
if confidence < self._ready_confidence:
state.decision_status = "BLOCKED"
state.decision_reason = (
f"Сигнал {signal} подтверждён, но уверенность низкая: "
f"{confidence:.2f} < {self._ready_confidence:.2f}."
)
return
state.is_signal_ready = True
state.decision_status = "READY"
state.decision_reason = (
f"Сигнал {signal} подтверждён и готов к будущему execution."
)
# записать новый сигнал и итог предыдущей серии при смене сигнала
def _log_signal_if_changed(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
signal_key = f"{state.status}:{state.symbol}:{strategy_name}:{signal}"
previous_signal = self._last_signal_value
previous_count = self._same_signal_count
is_same_signal = signal_key == self._last_signal_key
now = time.monotonic()
if is_same_signal:
self._same_signal_count += 1
self._last_signal_reason = reason
self._last_signal_confidence = confidence
self._last_signal_payload = payload
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
return
if previous_signal is not None and previous_signal != signal:
if previous_count > 1:
self._log_signal_summary(
strategy_name=strategy_name,
state=state,
previous_signal=previous_signal,
previous_count=previous_count,
next_signal=signal,
reason=self._last_signal_reason,
confidence=self._last_signal_confidence,
payload=self._last_signal_payload,
duration_seconds=self._signal_duration_seconds(now=now),
)
else:
self._log_signal_event(
strategy_name=strategy_name,
state=state,
signal=previous_signal,
reason=f"{previous_signal} завершился без серии.",
confidence=self._last_signal_confidence,
payload={
"previous_signal": previous_signal,
"next_signal": signal,
},
)
self._last_signal_key = signal_key
self._last_signal_value = signal
self._last_signal_reason = reason
self._last_signal_confidence = confidence
self._last_signal_payload = payload
self._last_signal_started_at = now
self._same_signal_count = 1
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
def _signal_duration_seconds(self, *, now: float) -> int:
if self._last_signal_started_at is None:
return max(0, int(self._same_signal_count * self._loop_interval_seconds))
return max(0, int(now - self._last_signal_started_at))
def _format_duration(self, total_seconds: int) -> str:
total_seconds = max(0, int(total_seconds))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours > 0:
return f"{hours}ч {minutes:02d}м {seconds:02d}с"
if minutes > 0:
return f"{minutes}м {seconds:02d}с"
return f"{seconds}с"
# обновить поля state для экрана автоторговли
def _update_signal_state_fields(
self,
*,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
) -> None:
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = signal
state.last_signal_repeat_count = self._same_signal_count
state.last_signal_confidence = confidence
state.last_signal_reason = reason
self._update_decision_state(
state=state,
signal=signal,
confidence=confidence,
)
signal_intent = self._signal_intent(
state=state,
signal=state.last_signal,
)
if (
previous_decision_status != state.decision_status
and state.decision_status == "READY"
):
self._log_ready_signal(
state=state,
signal=state.last_signal,
reason=state.last_signal_reason or reason,
confidence=state.last_signal_confidence,
signal_intent=signal_intent,
)
if previous_signal != state.last_signal:
EventBus.emit(
"auto_signal_changed",
{
"previous_signal": previous_signal,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
},
)
if previous_decision_status != state.decision_status:
EventBus.emit(
"auto_decision_changed",
{
"previous_decision_status": previous_decision_status,
"decision_status": state.decision_status,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
"symbol": state.symbol,
"strategy": state.strategy,
"leverage": state.leverage,
"reason": state.last_signal_reason,
},
)
# одиночные BUY / SELL больше не пишем в журнал как полезные события
def _log_signal_event(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
return
# записать итог серии одинаковых сигналов при смене сигнала
def _log_signal_summary(
self,
*,
strategy_name: str,
state: AutoTradeState,
previous_signal: str,
previous_count: int,
next_signal: str,
reason: str,
confidence: float,
payload: dict | None,
duration_seconds: int,
) -> None:
if previous_signal != "HOLD":
return
duration_text = self._format_duration(duration_seconds)
signal_intent = "HOLD_MARKET"
try:
JournalService().log_ui_info(
event_type="signal_summary",
message=(
f"🟡 HOLD {duration_text} завершён сигналом {next_signal}."
),
screen="auto",
action="signal_summary",
payload={
"strategy": strategy_name,
"status": state.status,
"symbol": state.symbol,
"signal": previous_signal,
"next_signal": next_signal,
"signal_intent": signal_intent,
"repeat_count": previous_count,
"duration_seconds": duration_seconds,
"duration_text": duration_text,
"confidence": confidence,
"reason": reason,
"is_strong_signal": False,
"is_aggregated": True,
"payload": payload or {},
},
)
except Exception:
pass
def _log_ready_signal(
self,
*,
state: AutoTradeState,
signal: str | None,
reason: str,
confidence: float,
signal_intent: str,
) -> None:
normalized_signal = (signal or "HOLD").upper()
if normalized_signal not in {"BUY", "SELL"}:
return
try:
JournalService().log_ui_info(
event_type="signal_ready",
message=f"Сигнал {normalized_signal} готов к исполнению.",
screen="auto",
action="signal_ready",
payload={
"strategy": state.strategy,
"status": state.status,
"symbol": state.symbol,
"signal": normalized_signal,
"signal_intent": signal_intent,
"confidence": confidence,
"reason": reason,
"repeat_count": state.last_signal_repeat_count,
"position_side": state.position_side,
"decision_status": state.decision_status,
"is_strong_signal": confidence > self._ready_confidence,
"is_aggregated": False,
},
)
except Exception:
pass
def _sync_market_analysis_state(
self,
*,
state: AutoTradeState,
payload: dict | None,
) -> None:
if not isinstance(payload, dict):
return
previous_market_state = state.market_state
previous_market_trend = state.market_trend
previous_market_volatility = state.market_volatility
state.market_state = payload.get("market_state")
state.market_trend = payload.get("market_trend")
state.market_volatility = payload.get("market_volatility")
state.market_analysis_interval = payload.get("market_analysis_interval")
state.market_analysis_reason = payload.get("market_analysis_reason")
self._log_market_state_if_changed(
state=state,
payload=payload,
previous_market_state=previous_market_state,
previous_market_trend=previous_market_trend,
previous_market_volatility=previous_market_volatility,
)
def _log_market_state_if_changed(
self,
*,
state: AutoTradeState,
payload: dict,
previous_market_state: str | None,
previous_market_trend: str | None,
previous_market_volatility: str | None,
) -> None:
market_state = state.market_state
market_trend = state.market_trend
market_volatility = state.market_volatility
if not market_state or market_state == "UNKNOWN":
return
state_changed = (
market_state != previous_market_state
and market_state != type(self)._last_logged_market_state
)
trend_changed = (
market_trend is not None
and market_trend != previous_market_trend
and market_trend != type(self)._last_logged_market_trend
)
volatility_changed = (
market_volatility is not None
and market_volatility != previous_market_volatility
and market_volatility != type(self)._last_logged_market_volatility
)
if not state_changed and not trend_changed and not volatility_changed:
return
type(self)._last_logged_market_state = market_state
type(self)._last_logged_market_trend = market_trend
type(self)._last_logged_market_volatility = market_volatility
level = self._market_journal_level(market_state)
message = self._market_state_message(market_state)
journal_payload = {
**payload,
"previous_market_state": previous_market_state,
"previous_market_trend": previous_market_trend,
"previous_market_volatility": previous_market_volatility,
"current_market_state": market_state,
"current_market_trend": market_trend,
"current_market_volatility": market_volatility,
}
try:
if level == "WARNING":
JournalService().log_ui_warning(
event_type="market_state_changed",
message=message,
screen="auto",
action="market_analysis",
payload=journal_payload,
)
return
JournalService().log_ui_info(
event_type="market_state_changed",
message=message,
screen="auto",
action="market_analysis",
payload=journal_payload,
)
except Exception:
pass
def _market_journal_level(self, market_state: str) -> str:
if market_state == "HIGH_VOLATILITY":
return "WARNING"
return "INFO"
def _market_state_message(self, market_state: str) -> str:
messages = {
"TREND_UP": "📈 Рынок перешёл в рост.",
"TREND_DOWN": "📉 Рынок перешёл в снижение.",
"RANGE": "🟰 На рынке нет выраженного направления.",
"HIGH_VOLATILITY": "⚠️ Рынок стал слишком волатильным.",
"LOW_VOLATILITY": "💤 Рынок почти не движется.",
}
return messages.get(market_state, "⏳ Состояние рынка анализируется.")
def run_cycle(self) -> AutoTradeState:
state = self.get_state()
if state.status == "OFF":
return state
strategy = self._get_strategy()
context = self._build_strategy_context()
result = strategy.analyze(context)
self._sync_market_analysis_state(
state=state,
payload=result.payload,
)
state.last_check_at = datetime.now().strftime("%H:%M:%S")
self._log_signal_if_changed(
strategy_name=strategy.name,
state=state,
signal=result.signal.value,
reason=result.reason,
confidence=result.confidence,
payload=result.payload,
)
ExecutionEngine().process(state)
return state