Files
dzentra_bot/app/src/trading/auto/service.py

1535 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/auto/service.py
from __future__ import annotations
import asyncio
import time
from datetime import datetime
from src.core.config import load_settings
from src.core.event_bus import EventBus
from src.trading.auto.state import AutoTradeState
from src.trading.execution.engine import ExecutionEngine
from src.trading.journal.service import JournalService
from src.trading.strategies.base import BaseStrategy, StrategyContext
from src.trading.strategies.registry import StrategyRegistry
from src.integrations.exchange.service import ExchangeService
class AutoTradeService:
_state = AutoTradeState()
_loop_task: asyncio.Task | None = None
_loop_interval_seconds = 5
# минимальное количество повторов BUY / SELL для подтверждения сигнала
_confirm_repeats = 2
# минимальное время удержания BUY / SELL сигнала для подтверждения
_confirm_min_duration_seconds = 10
# минимальная уверенность для готовности к будущему execution
_ready_confidence = 0.3
# минимальный итоговый execution confidence для допуска входа
_execution_confidence_required_score = 0.55
_signal_ttl_seconds = 90
_market_analysis_ttl_seconds = 180
_last_logged_runtime_expired_key: str | None = None
_last_signal_key: str | None = None
_last_signal_value: str | None = None
_last_signal_reason: str = ""
_last_signal_confidence: float = 0.0
_last_signal_payload: dict | None = None
_last_signal_started_at: float | None = None
_last_logged_market_state: str | None = None
_last_logged_market_trend: str | None = None
_last_logged_market_volatility: str | None = None
_last_logged_entry_block_reason: str | None = None
_same_signal_count = 0
_max_snapshot_age_seconds = 5.0
_warning_snapshot_age_seconds = 2.0
_spread_warning_enter_percent = 0.08
_spread_warning_exit_percent = 0.06
_spread_block_enter_percent = 0.15
_spread_block_exit_percent = 0.12
_last_logged_execution_quality_key: str | None = None
def _spread_execution_quality(
self,
*,
state: AutoTradeState,
spread_percent: float | None,
) -> tuple[str | None, str | None, str | None, bool]:
if spread_percent is None:
return None, None, None, False
previous_quality = state.execution_quality
previous_reason = state.execution_quality_reason
if previous_quality == "BLOCKED" and previous_reason == "HIGH_SPREAD":
if spread_percent > self._spread_block_exit_percent:
return "BLOCKED", "HIGH_SPREAD", "высокий spread", False
if spread_percent > self._spread_warning_exit_percent:
return "WARNING", "WIDE_SPREAD", "spread повышен", False
return "GOOD", "MARKET_OK", "рынок готов", False
if previous_quality == "WARNING" and previous_reason == "WIDE_SPREAD":
if spread_percent >= self._spread_block_enter_percent:
return "BLOCKED", "HIGH_SPREAD", "высокий spread", False
if spread_percent > self._spread_warning_exit_percent:
return "WARNING", "WIDE_SPREAD", "spread повышен", False
return "GOOD", "MARKET_OK", "рынок готов", False
if spread_percent >= self._spread_block_enter_percent:
return "BLOCKED", "HIGH_SPREAD", "высокий spread", False
if spread_percent >= self._spread_warning_enter_percent:
return "WARNING", "WIDE_SPREAD", "spread повышен", False
return "GOOD", "MARKET_OK", "рынок готов", False
# debug: принудительно выставить сигнал и decision
def debug_force_signal(
self,
*,
signal: str,
confidence: float = 0.9,
repeat_count: int = 2,
reason: str = "DEBUG SIGNAL",
) -> AutoTradeState:
state = self.get_state()
normalized_signal = signal.strip().upper()
if normalized_signal not in {"BUY", "SELL", "HOLD"}:
normalized_signal = "HOLD"
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != normalized_signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = normalized_signal
state.last_signal_repeat_count = repeat_count
state.last_signal_confidence = confidence
state.last_signal_reason = reason
state.signal_confirmation_seconds = self._confirm_min_duration_seconds
state.signal_confirmation_required_seconds = self._confirm_min_duration_seconds
state.signal_confirmation_missing_repeats = 0
state.signal_confirmation_progress = 1.0
state.signal_confirmation_reason = "debug confirmation"
if normalized_signal == "HOLD":
state.decision_status = "WAITING"
state.decision_reason = "Debug HOLD."
state.is_signal_confirmed = False
state.is_signal_ready = False
else:
state.decision_status = "READY"
state.decision_reason = "Debug READY signal."
state.is_signal_confirmed = True
state.is_signal_ready = True
signal_intent = self._signal_intent(
state=state,
signal=state.last_signal,
)
EventBus.emit(
"auto_decision_changed",
{
"previous_signal": previous_signal,
"previous_decision_status": previous_decision_status,
"decision_status": state.decision_status,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
"symbol": state.symbol,
"strategy": state.strategy,
"leverage": state.leverage,
"reason": state.last_signal_reason,
"debug": True,
},
)
return state
# установить капитал, выделенный под автоторговлю
def set_allocated_balance_usd(self, value: float) -> AutoTradeState:
state = self.get_state()
if value <= 0:
value = 1000.0
state.allocated_balance_usd = value
state.execution_block_reason = None
state.execution_size_adjustment_reason = None
return state
# получить текущее состояние автоторговли
def get_state(self) -> AutoTradeState:
if not self._state.symbol:
self._state.symbol = load_settings().default_symbol
return self._state
# проверить, запущен ли background loop
def is_loop_running(self) -> bool:
return self._loop_task is not None and not self._loop_task.done()
# запустить background loop, если он ещё не запущен
def start_loop(self) -> None:
if self.is_loop_running():
return
self._loop_task = asyncio.create_task(self._loop_worker())
# остановить background loop
def stop_loop(self) -> None:
if self._loop_task is None:
return
self._loop_task.cancel()
self._loop_task = None
# рабочий цикл автоторговли
async def _loop_worker(self) -> None:
while True:
state = self.get_state()
if state.status == "OFF":
break
self.run_cycle()
await asyncio.sleep(self._loop_interval_seconds)
# запустить активную торговлю
def start(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "RUNNING":
return state, "Автоторговля уже активна."
if state.status == "OBSERVING":
state.status = "RUNNING"
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля активирована."
state.status = "RUNNING"
self._reset_signal_tracking()
state.last_signal = "HOLD"
state.signal_started_at = time.monotonic()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля запущена."
# включить режим наблюдения
def observe(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if previous_status == "OBSERVING":
return state, "Режим наблюдения уже включён."
state.status = "OBSERVING"
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
if previous_status == "OFF":
return state, "Включён режим наблюдения."
return state, "Автоторговля переведена в режим наблюдения."
# полностью выключить автоторговлю
def stop(self) -> tuple[AutoTradeState, str]:
state = self.get_state()
previous_status = state.status
if state.status == "OFF":
self.stop_loop()
return state, "Автоторговля уже выключена."
state.status = "OFF"
self.stop_loop()
EventBus.emit(
"auto_status_changed",
{
"previous_status": previous_status,
"status": state.status,
},
)
return state, "Автоторговля выключена."
# установить инструмент
def set_symbol(self, symbol: str) -> AutoTradeState:
state = self.get_state()
previous_symbol = state.symbol
state.symbol = symbol
self._reset_signal_tracking()
StrategyRegistry.reset_runtime(symbol=previous_symbol)
StrategyRegistry.reset_runtime(symbol=symbol)
return state
# установить стратегию
def set_strategy(self, strategy: str) -> AutoTradeState:
state = self.get_state()
previous_strategy = state.strategy
normalized_strategy = strategy.strip().upper()
state.strategy = normalized_strategy
self._reset_signal_tracking()
StrategyRegistry.reset_runtime(previous_strategy)
StrategyRegistry.reset_runtime(normalized_strategy)
return state
# установить риск
def set_risk_percent(self, risk_percent: float) -> AutoTradeState:
state = self.get_state()
state.risk_percent = risk_percent
return state
# установить плечо
def set_leverage(self, leverage: float) -> AutoTradeState:
state = self.get_state()
state.leverage = leverage
return state
# установить stop loss в %
def set_stop_loss_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.stop_loss_percent = value
return state
# установить take profit в %
def set_take_profit_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.take_profit_percent = value
return state
# установить max loss в USD
def set_max_loss_usd(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.max_loss_usd = value
return state
# установить максимальное использование баланса под маржу
def set_max_reserved_balance_percent(self, value: float | None) -> AutoTradeState:
state = self.get_state()
state.max_reserved_balance_percent = value
state.execution_block_reason = None
return state
# сбросить внутренний трекинг сигналов
def _reset_signal_tracking(self) -> None:
self._last_signal_key = None
self._last_signal_value = None
self._last_signal_reason = ""
self._last_signal_confidence = 0.0
self._last_signal_payload = None
self._last_signal_started_at = None
self._same_signal_count = 0
state = self.get_state()
state.last_signal_repeat_count = 0
state.last_signal_confidence = 0.0
state.last_signal_reason = None
state.decision_status = "WAITING"
state.decision_reason = None
state.is_signal_confirmed = False
state.is_signal_ready = False
state.signal_confirmation_seconds = 0
state.signal_confirmation_required_seconds = self._confirm_min_duration_seconds
state.signal_confirmation_missing_repeats = self._confirm_repeats
state.signal_confirmation_progress = 0.0
state.signal_confirmation_reason = None
state.execution_block_reason = None
state.execution_semantic_status = None
state.execution_semantic_message = None
state.execution_semantic_reason = None
state.execution_quality = None
state.execution_quality_reason = None
state.execution_quality_message = None
state.execution_confidence_score = None
state.execution_confidence_level = None
state.execution_confidence_required_score = self._execution_confidence_required_score
state.execution_confidence_reason = None
state.execution_confidence_factors = None
state.signal_started_at = None
state.signal_updated_at = None
state.market_state = None
state.market_trend = None
state.market_volatility = None
state.market_analysis_interval = None
state.market_analysis_reason = None
state.market_analysis_updated_at = None
state.market_runtime_degraded = False
state.market_trend_strength = None
state.market_trend_quality = None
state.market_phase = None
state.market_phase_direction = None
state.entry_block_reason = None
state.entry_block_message = None
state.runtime_expired_reason = None
state.runtime_expired_message = None
state.snapshot_age_seconds = None
state.spread_percent = None
# собрать контекст для стратегии
def _build_strategy_context(self) -> StrategyContext:
state = self.get_state()
return StrategyContext(
symbol=state.symbol,
status=state.status,
risk_percent=state.risk_percent,
)
# получить стратегию для текущего цикла
def _get_strategy(self) -> BaseStrategy:
state = self.get_state()
return StrategyRegistry.get(state.strategy)
# определить смысл сигнала с учетом открытой позиции
def _signal_intent(self, *, state: AutoTradeState, signal: str | None) -> str:
normalized_signal = (signal or "HOLD").upper()
position_side = str(getattr(state, "position_side", "NONE") or "NONE").upper()
if normalized_signal == "HOLD":
return "HOLD_MARKET"
if normalized_signal not in {"BUY", "SELL"}:
return "NOISE"
if position_side == "NONE":
return "ENTRY_CANDIDATE"
if position_side == "LONG" and normalized_signal == "BUY":
return "REINFORCE_POSITION"
if position_side == "SHORT" and normalized_signal == "SELL":
return "REINFORCE_POSITION"
if position_side == "LONG" and normalized_signal == "SELL":
return "REVERSAL_CANDIDATE"
if position_side == "SHORT" and normalized_signal == "BUY":
return "REVERSAL_CANDIDATE"
return "NOISE"
# обновить статус решения по текущему сигналу
def _update_decision_state(
self,
*,
state: AutoTradeState,
signal: str,
confidence: float,
) -> None:
state.is_signal_confirmed = False
state.is_signal_ready = False
state.signal_confirmation_required_seconds = self._confirm_min_duration_seconds
if signal == "HOLD":
state.signal_confirmation_seconds = 0
state.signal_confirmation_missing_repeats = self._confirm_repeats
state.signal_confirmation_progress = 0.0
state.signal_confirmation_reason = None
state.decision_status = "WAITING"
state.decision_reason = "Нет торгового направления."
return
now = time.monotonic()
if state.signal_started_at is None:
signal_age_seconds = 0
else:
signal_age_seconds = max(0, int(now - float(state.signal_started_at)))
missing_repeats = max(0, self._confirm_repeats - self._same_signal_count)
missing_seconds = max(
0,
self._confirm_min_duration_seconds - signal_age_seconds,
)
repeat_progress = min(
1.0,
self._same_signal_count / max(1, self._confirm_repeats),
)
time_progress = min(
1.0,
signal_age_seconds / max(1, self._confirm_min_duration_seconds),
)
confirmation_progress = min(repeat_progress, time_progress)
state.signal_confirmation_seconds = signal_age_seconds
state.signal_confirmation_missing_repeats = missing_repeats
state.signal_confirmation_progress = round(confirmation_progress, 3)
if missing_repeats > 0 or missing_seconds > 0:
state.decision_status = "CONFIRMING"
state.signal_confirmation_reason = (
f"{self._same_signal_count}/{self._confirm_repeats} повторов, "
f"{signal_age_seconds}/{self._confirm_min_duration_seconds}с"
)
state.decision_reason = (
f"Сигнал {signal} подтверждается: "
f"{self._same_signal_count}/{self._confirm_repeats} повторов, "
f"{signal_age_seconds}/{self._confirm_min_duration_seconds}с."
)
return
state.is_signal_confirmed = True
state.signal_confirmation_reason = "сигнал подтверждён"
if confidence < self._ready_confidence:
state.decision_status = "BLOCKED"
state.decision_reason = (
f"Сигнал {signal} подтверждён, но уверенность низкая: "
f"{confidence:.2f} < {self._ready_confidence:.2f}."
)
return
self._sync_execution_confidence_state(
state=state,
signal=signal,
confidence=confidence,
)
if (
state.execution_confidence_score is not None
and state.execution_confidence_score < self._execution_confidence_required_score
):
state.decision_status = "BLOCKED"
state.decision_reason = (
f"Execution confidence низкий: "
f"{state.execution_confidence_score:.2f} < "
f"{self._execution_confidence_required_score:.2f}."
)
return
state.is_signal_ready = True
state.signal_confirmation_progress = 1.0
state.decision_status = "READY"
state.decision_reason = (
f"Сигнал {signal} подтверждён по повторам и времени удержания."
)
# записать новый сигнал и итог предыдущей серии при смене сигнала
def _log_signal_if_changed(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
signal_key = f"{state.status}:{state.symbol}:{strategy_name}:{signal}"
previous_signal = self._last_signal_value
previous_count = self._same_signal_count
is_same_signal = signal_key == self._last_signal_key
now = time.monotonic()
if is_same_signal:
self._same_signal_count += 1
self._last_signal_reason = reason
self._last_signal_confidence = confidence
self._last_signal_payload = payload
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
return
if previous_signal is not None and previous_signal != signal:
if previous_count > 1:
self._log_signal_summary(
strategy_name=strategy_name,
state=state,
previous_signal=previous_signal,
previous_count=previous_count,
next_signal=signal,
reason=self._last_signal_reason,
confidence=self._last_signal_confidence,
payload=self._last_signal_payload,
duration_seconds=self._signal_duration_seconds(now=now),
)
else:
self._log_signal_event(
strategy_name=strategy_name,
state=state,
signal=previous_signal,
reason=f"{previous_signal} завершился без серии.",
confidence=self._last_signal_confidence,
payload={
"previous_signal": previous_signal,
"next_signal": signal,
},
)
self._last_signal_key = signal_key
self._last_signal_value = signal
self._last_signal_reason = reason
self._last_signal_confidence = confidence
self._last_signal_payload = payload
self._last_signal_started_at = now
self._same_signal_count = 1
self._update_signal_state_fields(
state=state,
signal=signal,
reason=reason,
confidence=confidence,
)
def _signal_duration_seconds(self, *, now: float) -> int:
if self._last_signal_started_at is None:
return max(0, int(self._same_signal_count * self._loop_interval_seconds))
return max(0, int(now - self._last_signal_started_at))
def _format_duration(self, total_seconds: int) -> str:
total_seconds = max(0, int(total_seconds))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours > 0:
return f"{hours}ч {minutes:02d}м {seconds:02d}с"
if minutes > 0:
return f"{minutes}м {seconds:02d}с"
return f"{seconds}с"
# обновить поля state для экрана автоторговли
def _update_signal_state_fields(
self,
*,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
) -> None:
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = signal
state.last_signal_repeat_count = self._same_signal_count
state.last_signal_confidence = confidence
state.last_signal_reason = reason
state.signal_updated_at = time.monotonic()
state.runtime_expired_reason = None
state.runtime_expired_message = None
self._update_decision_state(
state=state,
signal=signal,
confidence=confidence,
)
signal_intent = self._signal_intent(
state=state,
signal=state.last_signal,
)
if (
previous_decision_status != state.decision_status
and state.decision_status == "READY"
):
self._log_ready_signal(
state=state,
signal=state.last_signal,
reason=state.last_signal_reason or reason,
confidence=state.last_signal_confidence,
signal_intent=signal_intent,
)
if previous_signal != state.last_signal:
EventBus.emit(
"auto_signal_changed",
{
"previous_signal": previous_signal,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
},
)
if previous_decision_status != state.decision_status:
EventBus.emit(
"auto_decision_changed",
{
"previous_decision_status": previous_decision_status,
"decision_status": state.decision_status,
"signal": state.last_signal,
"signal_intent": signal_intent,
"repeat_count": state.last_signal_repeat_count,
"confidence": state.last_signal_confidence,
"symbol": state.symbol,
"strategy": state.strategy,
"leverage": state.leverage,
"reason": state.last_signal_reason,
},
)
# одиночные BUY / SELL больше не пишем в журнал как полезные события
def _log_signal_event(
self,
*,
strategy_name: str,
state: AutoTradeState,
signal: str,
reason: str,
confidence: float,
payload: dict | None,
) -> None:
return
# записать итог серии одинаковых сигналов при смене сигнала
def _log_signal_summary(
self,
*,
strategy_name: str,
state: AutoTradeState,
previous_signal: str,
previous_count: int,
next_signal: str,
reason: str,
confidence: float,
payload: dict | None,
duration_seconds: int,
) -> None:
if previous_signal != "HOLD":
return
duration_text = self._format_duration(duration_seconds)
signal_intent = "HOLD_MARKET"
try:
JournalService().log_ui_info(
event_type="signal_summary",
message=(
f"HOLD длился {duration_text} и завершился сигналом {next_signal}."
),
screen="auto",
action="signal_summary",
payload={
"strategy": strategy_name,
"status": state.status,
"symbol": state.symbol,
"signal": previous_signal,
"next_signal": next_signal,
"signal_intent": signal_intent,
"repeat_count": previous_count,
"duration_seconds": duration_seconds,
"duration_text": duration_text,
"confidence": confidence,
"reason": reason,
"is_strong_signal": False,
"is_aggregated": True,
"payload": payload or {},
},
)
except Exception:
pass
def _log_ready_signal(
self,
*,
state: AutoTradeState,
signal: str | None,
reason: str,
confidence: float,
signal_intent: str,
) -> None:
normalized_signal = (signal or "HOLD").upper()
if normalized_signal not in {"BUY", "SELL"}:
return
try:
JournalService().log_ui_info(
event_type="signal_ready",
message=(
f"Сигнал {normalized_signal} подтверждён и готов к исполнению."
),
screen="auto",
action="signal_ready",
payload={
"strategy": state.strategy,
"status": state.status,
"symbol": state.symbol,
"signal": normalized_signal,
"signal_intent": signal_intent,
"confidence": confidence,
"reason": reason,
"repeat_count": state.last_signal_repeat_count,
"position_side": state.position_side,
"decision_status": state.decision_status,
"is_strong_signal": confidence > self._ready_confidence,
"is_aggregated": False,
"confirmation_seconds": state.signal_confirmation_seconds,
"confirmation_required_seconds": state.signal_confirmation_required_seconds,
"confirmation_progress": state.signal_confirmation_progress,
},
)
except Exception:
pass
def _sync_market_analysis_state(
self,
*,
state: AutoTradeState,
payload: dict | None,
) -> None:
if not isinstance(payload, dict):
return
previous_market_state = state.market_state
previous_market_trend = state.market_trend
previous_market_volatility = state.market_volatility
state.market_state = payload.get("market_state")
state.market_trend = payload.get("market_trend")
state.market_volatility = payload.get("market_volatility")
state.market_trend_strength = payload.get("market_trend_strength")
state.market_trend_quality = payload.get("market_trend_quality")
state.market_phase = payload.get("market_phase")
state.market_phase_direction = payload.get("market_phase_direction")
state.market_analysis_interval = payload.get("market_analysis_interval")
state.market_analysis_reason = payload.get("market_analysis_reason")
state.market_analysis_updated_at = time.monotonic()
state.entry_block_reason = payload.get("entry_block_reason")
state.entry_block_message = payload.get("entry_block_message")
self._log_market_state_if_changed(
state=state,
payload=payload,
previous_market_state=previous_market_state,
previous_market_trend=previous_market_trend,
previous_market_volatility=previous_market_volatility,
)
self._log_entry_block_if_changed(
state=state,
payload=payload,
)
def _log_entry_block_if_changed(
self,
*,
state: AutoTradeState,
payload: dict,
) -> None:
reason = state.entry_block_reason
message = state.entry_block_message
if not reason or not message:
return
key = f"{state.status}:{state.symbol}:{state.strategy}:{reason}:{message}"
if key == type(self)._last_logged_entry_block_reason:
return
type(self)._last_logged_entry_block_reason = key
try:
JournalService().log_ui_info(
event_type="entry_blocked",
message=f"Вход в позицию не выполнен: {message}.",
screen="auto",
action="entry_diagnostics",
payload={
**payload,
"entry_block_reason": reason,
"entry_block_message": message,
"symbol": state.symbol,
"strategy": state.strategy,
"status": state.status,
},
)
except Exception:
pass
def _log_market_state_if_changed(
self,
*,
state: AutoTradeState,
payload: dict,
previous_market_state: str | None,
previous_market_trend: str | None,
previous_market_volatility: str | None,
) -> None:
market_state = state.market_state
market_trend = state.market_trend
market_volatility = state.market_volatility
if not market_state or market_state == "UNKNOWN":
return
state_changed = (
market_state != previous_market_state
and market_state != type(self)._last_logged_market_state
)
volatility_changed = (
market_volatility is not None
and market_volatility != previous_market_volatility
and market_volatility != type(self)._last_logged_market_volatility
)
if not state_changed and not volatility_changed:
return
journal_payload = {
**payload,
"previous_market_state": previous_market_state,
"previous_market_trend": previous_market_trend,
"previous_market_volatility": previous_market_volatility,
"current_market_state": market_state,
"current_market_trend": market_trend,
"current_market_volatility": market_volatility,
}
try:
if state_changed:
self._write_market_journal_event(
event_type="market_state_changed",
market_state=market_state,
message=self._market_state_message(market_state),
payload=journal_payload,
)
if volatility_changed:
self._write_market_journal_event(
event_type="market_volatility_changed",
market_state=market_state,
message=self._market_volatility_message(market_volatility),
payload=journal_payload,
)
except Exception:
pass
type(self)._last_logged_market_state = market_state
type(self)._last_logged_market_trend = market_trend
type(self)._last_logged_market_volatility = market_volatility
def _write_market_journal_event(
self,
*,
event_type: str,
market_state: str,
message: str,
payload: dict,
) -> None:
level = self._market_journal_level(market_state)
if level == "WARNING":
JournalService().log_ui_warning(
event_type=event_type,
message=message,
screen="auto",
action="market_analysis",
payload=payload,
)
return
JournalService().log_ui_info(
event_type=event_type,
message=message,
screen="auto",
action="market_analysis",
payload=payload,
)
def _market_volatility_message(self, market_volatility: str | None) -> str:
messages = {
"LOW": "Волатильность изменена: низкая.",
"NORMAL": "Волатильность изменена: нормальная.",
"HIGH": "Волатильность изменена: высокая.",
}
return messages.get(str(market_volatility or ""), "Волатильность не определена.")
def _market_journal_level(self, market_state: str) -> str:
if market_state == "HIGH_VOLATILITY":
return "WARNING"
return "INFO"
def _market_state_message(self, market_state: str) -> str:
messages = {
"TREND_UP": "Состояние рынка изменено: рост.",
"TREND_DOWN": "Состояние рынка изменено: снижение.",
"RANGE": "Состояние рынка изменено: нет выраженного направления.",
"HIGH_VOLATILITY": "Состояние рынка изменено: высокая волатильность.",
"LOW_VOLATILITY": "Состояние рынка изменено: низкая активность.",
}
return messages.get(market_state, "Состояние рынка анализируется.")
def _expire_runtime_if_needed(self, state: AutoTradeState) -> None:
now = time.monotonic()
signal_updated_at = getattr(state, "signal_updated_at", None)
if signal_updated_at is not None:
signal_age = now - float(signal_updated_at)
if signal_age > self._signal_ttl_seconds:
previous_signal = state.last_signal
self._reset_signal_tracking()
state.runtime_expired_reason = "SIGNAL_TTL_EXPIRED"
state.runtime_expired_message = "сигнал устарел и был сброшен"
self._log_runtime_expired_if_changed(
state=state,
reason="SIGNAL_TTL_EXPIRED",
message="Сигнал устарел и был сброшен.",
payload={
"previous_signal": previous_signal,
"signal_age_seconds": int(signal_age),
"signal_ttl_seconds": self._signal_ttl_seconds,
},
)
return
market_updated_at = getattr(state, "market_analysis_updated_at", None)
if market_updated_at is not None:
market_age = now - float(market_updated_at)
if market_age > self._market_analysis_ttl_seconds:
state.market_state = None
state.market_trend = None
state.market_volatility = None
state.market_analysis_interval = None
state.market_analysis_reason = None
state.market_analysis_updated_at = None
state.entry_block_reason = None
state.entry_block_message = None
state.runtime_expired_reason = "MARKET_ANALYSIS_TTL_EXPIRED"
state.runtime_expired_message = "анализ рынка устарел"
self._log_runtime_expired_if_changed(
state=state,
reason="MARKET_ANALYSIS_TTL_EXPIRED",
message="Анализ рынка устарел и был сброшен.",
payload={
"market_age_seconds": int(market_age),
"market_analysis_ttl_seconds": self._market_analysis_ttl_seconds,
},
)
def _log_runtime_expired_if_changed(
self,
*,
state: AutoTradeState,
reason: str,
message: str,
payload: dict,
) -> None:
key = f"{state.status}:{state.symbol}:{state.strategy}:{reason}"
if key == type(self)._last_logged_runtime_expired_key:
return
type(self)._last_logged_runtime_expired_key = key
try:
JournalService().log_ui_warning(
event_type="runtime_expired",
message=message,
screen="auto",
action="runtime_expiration",
payload={
**payload,
"symbol": state.symbol,
"strategy": state.strategy,
"status": state.status,
"runtime_expired_reason": reason,
},
)
except Exception:
pass
def _sync_execution_quality_state(self, state: AutoTradeState) -> None:
try:
snapshot = ExchangeService().get_market_snapshot(
state.symbol,
runtime_key="auto",
)
except Exception as exc:
fallback_price = None
try:
fallback_price = float(
ExchangeService().get_price(
state.symbol,
runtime_key="auto",
).price
)
except Exception:
pass
state.snapshot_age_seconds = None
state.spread_percent = None
if fallback_price is not None and fallback_price > 0:
state.execution_quality = "WARNING"
state.execution_quality_reason = "SNAPSHOT_UNAVAILABLE"
state.execution_quality_message = "нет depth snapshot"
state.market_runtime_degraded = True
else:
state.execution_quality = "BLOCKED"
state.execution_quality_reason = "SNAPSHOT_ERROR"
state.execution_quality_message = "нет данных рынка"
state.market_runtime_degraded = True
self._log_execution_quality_if_changed(
state=state,
payload={
"error": str(exc),
"error_type": type(exc).__name__,
"fallback_price_available": fallback_price is not None,
},
)
return
bid_price = self._safe_float(snapshot.get("bid_price"))
ask_price = self._safe_float(snapshot.get("ask_price"))
last_price = self._safe_float(snapshot.get("last_price"))
age_seconds = self._safe_float(snapshot.get("age_seconds"))
is_fresh = bool(snapshot.get("is_fresh", False))
source = str(snapshot.get("source") or "")
state.snapshot_age_seconds = age_seconds
state.spread_percent = self._spread_percent(
bid_price=bid_price,
ask_price=ask_price,
)
if age_seconds is not None and age_seconds > self._max_snapshot_age_seconds:
state.execution_quality = "BLOCKED"
state.execution_quality_reason = "STALE_SNAPSHOT"
state.execution_quality_message = "snapshot устарел"
state.market_runtime_degraded = True
elif age_seconds is not None and age_seconds > self._warning_snapshot_age_seconds:
state.execution_quality = "WARNING"
state.execution_quality_reason = "AGING_SNAPSHOT"
state.execution_quality_message = "snapshot стареет"
state.market_runtime_degraded = not is_fresh
elif state.spread_percent is not None:
(
state.execution_quality,
state.execution_quality_reason,
state.execution_quality_message,
state.market_runtime_degraded,
) = self._spread_execution_quality(
state=state,
spread_percent=state.spread_percent,
)
else:
state.execution_quality = "GOOD"
state.execution_quality_reason = "MARKET_OK"
state.execution_quality_message = "рынок готов"
state.market_runtime_degraded = False
if state.execution_quality == "BLOCKED":
state.execution_block_reason = state.execution_quality_message
elif state.execution_block_reason == state.execution_quality_message:
state.execution_block_reason = None
self._log_execution_quality_if_changed(
state=state,
payload={
"symbol": state.symbol,
"strategy": state.strategy,
"bid_price": bid_price,
"ask_price": ask_price,
"last_price": last_price,
"snapshot_age_seconds": age_seconds,
"spread_percent": state.spread_percent,
"is_fresh": is_fresh,
"source": source,
"execution_quality": state.execution_quality,
"execution_quality_reason": state.execution_quality_reason,
"execution_quality_message": state.execution_quality_message,
"market_runtime_degraded": state.market_runtime_degraded,
"max_snapshot_age_seconds": self._max_snapshot_age_seconds,
"warning_snapshot_age_seconds": self._warning_snapshot_age_seconds,
"spread_warning_enter_percent": self._spread_warning_enter_percent,
"spread_warning_exit_percent": self._spread_warning_exit_percent,
"spread_block_enter_percent": self._spread_block_enter_percent,
"spread_block_exit_percent": self._spread_block_exit_percent,
},
)
def _spread_percent(
self,
*,
bid_price: float | None,
ask_price: float | None,
) -> float | None:
if bid_price is None or ask_price is None:
return None
if bid_price <= 0 or ask_price <= 0:
return None
mid_price = (bid_price + ask_price) / 2
if mid_price <= 0:
return None
spread = ask_price - bid_price
if spread < 0:
return None
return round((spread / mid_price) * 100, 5)
def _safe_float(self, value: object) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _log_execution_quality_if_changed(
self,
*,
state: AutoTradeState,
payload: dict,
) -> None:
quality = state.execution_quality
reason = state.execution_quality_reason
message = state.execution_quality_message
if not quality or not reason or not message:
return
key = f"{state.status}:{state.symbol}:{state.strategy}:{quality}:{reason}:{message}"
if key == type(self)._last_logged_execution_quality_key:
return
type(self)._last_logged_execution_quality_key = key
if quality == "GOOD":
return
try:
log_payload = {
**payload,
"status": state.status,
"symbol": state.symbol,
"strategy": state.strategy,
}
if quality == "BLOCKED":
JournalService().log_ui_warning(
event_type="execution_quality_changed",
message=f"Качество исполнения: {message}.",
screen="auto",
action="execution_quality",
payload=log_payload,
)
return
JournalService().log_ui_info(
event_type="execution_quality_changed",
message=f"Качество исполнения: {message}.",
screen="auto",
action="execution_quality",
payload=log_payload,
)
except Exception:
pass
def _sync_execution_confidence_state(
self,
*,
state: AutoTradeState,
signal: str,
confidence: float,
) -> None:
if signal not in {"BUY", "SELL"}:
state.execution_confidence_score = None
state.execution_confidence_level = None
state.execution_confidence_required_score = self._execution_confidence_required_score
state.execution_confidence_reason = None
state.execution_confidence_factors = None
return
signal_score = self._clamp_score(confidence)
confirmation_score = self._clamp_score(state.signal_confirmation_progress)
market_score = self._market_confidence_score(state)
execution_score = self._execution_quality_confidence_score(state)
score = (
signal_score * 0.35
+ confirmation_score * 0.20
+ market_score * 0.25
+ execution_score * 0.20
)
score = round(self._clamp_score(score), 3)
state.execution_confidence_score = score
state.execution_confidence_required_score = self._execution_confidence_required_score
state.execution_confidence_level = self._execution_confidence_level(score)
state.execution_confidence_reason = self._execution_confidence_reason(state)
state.execution_confidence_factors = {
"signal_score": round(signal_score, 3),
"confirmation_score": round(confirmation_score, 3),
"market_score": round(market_score, 3),
"execution_score": round(execution_score, 3),
"required_score": self._execution_confidence_required_score,
"market_state": state.market_state,
"market_trend": state.market_trend,
"market_trend_strength": state.market_trend_strength,
"market_trend_quality": state.market_trend_quality,
"market_phase": state.market_phase,
"execution_quality": state.execution_quality,
"execution_quality_reason": state.execution_quality_reason,
"spread_percent": state.spread_percent,
}
def _market_confidence_score(self, state: AutoTradeState) -> float:
market_state = state.market_state
strength = state.market_trend_strength
quality = state.market_trend_quality
phase = state.market_phase
if market_state in {"HIGH_VOLATILITY", "LOW_VOLATILITY", "RANGE", "UNKNOWN", None}:
return 0.25
score = 0.65
if strength == "STRONG":
score += 0.2
elif strength == "NORMAL":
score += 0.1
elif strength == "WEAK":
score -= 0.25
if quality == "CLEAN":
score += 0.1
elif quality == "NOISY":
score -= 0.25
if phase == "IMPULSE":
score += 0.1
elif phase == "PULLBACK":
score -= 0.25
elif phase in {"RANGE", "SQUEEZE"}:
score -= 0.3
return self._clamp_score(score)
def _execution_quality_confidence_score(self, state: AutoTradeState) -> float:
quality = state.execution_quality
reason = state.execution_quality_reason
if quality == "GOOD":
return 1.0
if quality == "WARNING":
if reason == "WIDE_SPREAD":
return 0.65
if reason == "AGING_SNAPSHOT":
return 0.6
if reason == "SNAPSHOT_UNAVAILABLE":
return 0.55
return 0.6
if quality == "BLOCKED":
return 0.0
return 0.5
def _execution_confidence_level(self, score: float) -> str:
if score >= 0.75:
return "HIGH"
if score >= self._execution_confidence_required_score:
return "NORMAL"
return "LOW"
def _execution_confidence_reason(self, state: AutoTradeState) -> str:
score = state.execution_confidence_score
if score is None:
return "execution confidence не рассчитан"
if score < self._execution_confidence_required_score:
return "низкая совокупная уверенность входа"
if state.execution_confidence_level == "HIGH":
return "высокая совокупная уверенность входа"
return "достаточная совокупная уверенность входа"
def _clamp_score(self, value: float | int | None) -> float:
if value is None:
return 0.0
return max(0.0, min(1.0, float(value)))
def _sync_execution_semantic_state(self, state: AutoTradeState) -> None:
if state.execution_quality == "BLOCKED":
state.execution_semantic_status = "BLOCKED"
state.execution_semantic_message = self._execution_block_semantic_message(state)
state.execution_semantic_reason = state.execution_quality_reason
return
if state.decision_status == "BLOCKED":
state.execution_semantic_status = "BLOCKED"
if (
state.execution_confidence_score is not None
and state.execution_confidence_score < self._execution_confidence_required_score
):
state.execution_semantic_message = "⛔ Исполнение · низкая уверенность"
state.execution_semantic_reason = state.execution_confidence_reason
return
state.execution_semantic_message = "⛔ Исполнение · сигнал заблокирован"
state.execution_semantic_reason = state.decision_reason
return
if state.position_side != "NONE":
state.execution_semantic_status = "POSITION_OPEN"
state.execution_semantic_message = "📌 Исполнение · позиция открыта"
state.execution_semantic_reason = state.last_execution_reason
return
if state.decision_status == "READY" and state.is_signal_ready:
state.execution_semantic_status = "READY"
state.execution_semantic_message = "✅ Исполнение · готово"
state.execution_semantic_reason = state.decision_reason
return
if state.decision_status == "CONFIRMING":
state.execution_semantic_status = "WAITING_SIGNAL"
state.execution_semantic_message = "⏳ Исполнение · ждёт подтверждения"
state.execution_semantic_reason = state.decision_reason
return
if state.last_signal in {"BUY", "SELL"}:
state.execution_semantic_status = "WAITING_SIGNAL"
state.execution_semantic_message = "⏳ Исполнение · сигнал проверяется"
state.execution_semantic_reason = state.decision_reason
return
state.execution_semantic_status = "IDLE"
state.execution_semantic_message = ""
state.execution_semantic_reason = state.decision_reason
def _execution_block_semantic_message(self, state: AutoTradeState) -> str:
reason = state.execution_quality_reason
if reason == "STALE_SNAPSHOT":
return "⛔ Исполнение · рынок неактуален"
if reason == "HIGH_SPREAD":
return "⛔ Исполнение · высокий spread"
if reason == "SNAPSHOT_ERROR":
return "⛔ Исполнение · нет данных рынка"
if reason == "SNAPSHOT_UNAVAILABLE":
return "⚠️ Исполнение · нет стакана"
return "⛔ Исполнение · заблокировано"
def run_cycle(self) -> AutoTradeState:
state = self.get_state()
if state.status == "OFF":
return state
self._expire_runtime_if_needed(state)
strategy = self._get_strategy()
context = self._build_strategy_context()
result = strategy.analyze(context)
self._sync_market_analysis_state(
state=state,
payload=result.payload,
)
self._sync_execution_quality_state(state)
state.last_check_at = datetime.now().strftime("%H:%M:%S")
self._log_signal_if_changed(
strategy_name=strategy.name,
state=state,
signal=result.signal.value,
reason=result.reason,
confidence=result.confidence,
payload=result.payload,
)
if state.execution_quality != "BLOCKED":
ExecutionEngine().process(state)
self._sync_execution_semantic_state(state)
return state