Files
dzentra_bot/app/src/trading/auto/market_runtime.py

274 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/auto/market_runtime.py
from __future__ import annotations
import time
from src.core.numbers import safe_float
from src.core.types import JsonDict
from src.trading.auto.state import AutoTradeState
from src.trading.journal.service import JournalService
class AutoMarketRuntimeMixin:
_last_logged_market_state: str | None
_last_logged_market_trend: str | None
_last_logged_market_volatility: str | None
_last_logged_entry_block_reason: str | None
_last_logged_entry_block_at: float | None = None
_entry_block_log_ttl_seconds: int = 900
# синхронизировать market analysis payload в AutoTradeState
def _sync_market_analysis_state(
self,
*,
state: AutoTradeState,
payload: JsonDict | None,
) -> None:
if not isinstance(payload, dict):
return
previous_market_state = state.market_state
previous_market_trend = state.market_trend
previous_market_volatility = state.market_volatility
state.market_state = str(payload.get("market_state") or "")
state.market_trend = str(payload.get("trend") or payload.get("market_trend") or "")
state.market_volatility = str(payload.get("volatility") or payload.get("market_volatility") or "")
state.market_trend_strength = str(payload.get("market_trend_strength") or "")
state.market_trend_quality = str(payload.get("market_trend_quality") or "")
state.market_phase = str(payload.get("market_phase") or "")
state.market_phase_direction = str(payload.get("market_phase_direction") or "")
state.market_trend_gap_percent = safe_float(payload.get("market_trend_gap_percent"))
state.market_trend_consistency = safe_float(payload.get("market_trend_consistency"))
state.market_trend_efficiency = safe_float(payload.get("market_trend_efficiency"))
state.trend_quality_score = safe_float(payload.get("trend_quality_score"))
state.ema_distance_atr_ratio = safe_float(payload.get("ema_distance_atr_ratio"))
state.ema_distance_state = str(payload.get("ema_distance_state") or "")
state.entry_timing_state = str(payload.get("entry_timing_state") or "")
state.entry_timing_reason = str(payload.get("entry_timing_reason") or "")
state.ema_fast_slope_percent = safe_float(payload.get("ema_fast_slope_percent"))
state.ema_slow_slope_percent = safe_float(payload.get("ema_slow_slope_percent"))
state.candle_noise_score = safe_float(payload.get("candle_noise_score"))
state.price_position_score = safe_float(payload.get("price_position_score"))
state.htf_interval = str(payload.get("htf_interval") or "")
state.htf_atr_percent = safe_float(payload.get("htf_atr_percent"))
state.htf_atr_percent_baseline = safe_float(payload.get("htf_atr_percent_baseline"))
state.htf_volatility_ratio = safe_float(payload.get("htf_volatility_ratio"))
state.htf_volatility = str(payload.get("htf_volatility") or "")
state.market_analysis_interval = str(payload.get("interval") or payload.get("market_analysis_interval") or "")
state.market_analysis_reason = str(payload.get("reason") or payload.get("market_analysis_reason") or "")
state.momentum_state = str(payload.get("momentum_state") or "")
state.momentum_direction = str(payload.get("momentum_direction") or "")
state.momentum_change_percent = safe_float(payload.get("momentum_change_percent"))
state.momentum_strength = safe_float(payload.get("momentum_strength"))
state.breakout_level = safe_float(payload.get("breakout_level"))
state.breakout_distance_percent = safe_float(payload.get("breakout_distance_percent"))
state.breakout_reason = str(payload.get("breakout_reason") or "")
state.entry_block_reason = str(payload.get("entry_block_reason") or "")
state.entry_block_message = str(payload.get("entry_block_message") or "")
self._log_market_state_if_changed(
state=state,
payload=payload,
previous_market_state=previous_market_state,
previous_market_trend=previous_market_trend,
previous_market_volatility=previous_market_volatility,
)
self._log_entry_block_if_changed(
state=state,
payload=payload,
)
# записать entry-block событие, если причина изменилась или истёк TTL
def _log_entry_block_if_changed(
self,
*,
state: AutoTradeState,
payload: JsonDict,
) -> None:
reason = state.entry_block_reason
message = state.entry_block_message
if not reason or not message:
return
now = time.monotonic()
# status специально не входит в key:
# RUNNING / OBSERVING не должны создавать дубли одной и той же причины.
key = f"{state.symbol}:{state.strategy}:{reason}:{message}"
last_logged_at = type(self)._last_logged_entry_block_at
ttl_expired = (
last_logged_at is None
or now - last_logged_at >= type(self)._entry_block_log_ttl_seconds
)
if (
key == type(self)._last_logged_entry_block_reason
and not ttl_expired
):
return
type(self)._last_logged_entry_block_reason = key
type(self)._last_logged_entry_block_at = now
try:
JournalService().log_ui_info(
event_type="entry_blocked",
message=f"Вход в позицию не выполнен: {message}.",
screen="auto",
action="entry_diagnostics",
payload={
**payload,
"entry_block_reason": reason,
"entry_block_message": message,
"entry_block_key": key,
"entry_block_ttl_seconds": type(self)._entry_block_log_ttl_seconds,
"symbol": state.symbol,
"strategy": state.strategy,
"status": state.status,
"market_state": state.market_state,
"market_trend": state.market_trend,
"market_trend_strength": state.market_trend_strength,
"market_trend_quality": state.market_trend_quality,
"market_phase": state.market_phase,
"market_phase_direction": state.market_phase_direction,
"momentum_state": state.momentum_state,
"momentum_direction": state.momentum_direction,
"momentum_strength": state.momentum_strength,
"momentum_change_percent": state.momentum_change_percent,
"execution_quality": state.execution_quality,
"execution_quality_reason": state.execution_quality_reason,
"execution_confidence_score": state.execution_confidence_score,
"last_signal": state.last_signal,
"last_signal_confidence": state.last_signal_confidence,
"last_signal_reason": state.last_signal_reason,
},
)
except Exception:
pass
# записать market state / volatility событие, если состояние изменилось
def _log_market_state_if_changed(
self,
*,
state: AutoTradeState,
payload: JsonDict,
previous_market_state: str | None,
previous_market_trend: str | None,
previous_market_volatility: str | None,
) -> None:
market_state = state.market_state
market_trend = state.market_trend
market_volatility = state.market_volatility
if not market_state or market_state == "UNKNOWN":
return
state_changed = (
market_state != previous_market_state
and market_state != type(self)._last_logged_market_state
)
volatility_changed = (
market_volatility is not None
and market_volatility != previous_market_volatility
and market_volatility != type(self)._last_logged_market_volatility
)
if not state_changed and not volatility_changed:
return
journal_payload = {
**payload,
"previous_market_state": previous_market_state,
"previous_market_trend": previous_market_trend,
"previous_market_volatility": previous_market_volatility,
"current_market_state": market_state,
"current_market_trend": market_trend,
"current_market_volatility": market_volatility,
}
try:
if state_changed:
self._write_market_journal_event(
event_type="market_state_changed",
market_state=market_state,
message=self._market_state_message(market_state),
payload=journal_payload,
)
if volatility_changed:
self._write_market_journal_event(
event_type="market_volatility_changed",
market_state=market_state,
message=self._market_volatility_message(market_volatility),
payload=journal_payload,
)
except Exception:
pass
type(self)._last_logged_market_state = market_state
type(self)._last_logged_market_trend = market_trend
type(self)._last_logged_market_volatility = market_volatility
# записать market journal событие с нужным уровнем важности
def _write_market_journal_event(
self,
*,
event_type: str,
market_state: str,
message: str,
payload: JsonDict,
) -> None:
level = self._market_journal_level(market_state)
if level == "WARNING":
JournalService().log_ui_warning(
event_type=event_type,
message=message,
screen="auto",
action="market_analysis",
payload=payload,
)
return
JournalService().log_ui_info(
event_type=event_type,
message=message,
screen="auto",
action="market_analysis",
payload=payload,
)
# получить человекочитаемое сообщение по volatility
def _market_volatility_message(self, market_volatility: str | None) -> str:
messages = {
"LOW": "Волатильность изменена: низкая.",
"NORMAL": "Волатильность изменена: нормальная.",
"HIGH": "Волатильность изменена: высокая.",
}
return messages.get(str(market_volatility or ""), "Волатильность не определена.")
# определить уровень journal события для market state
def _market_journal_level(self, market_state: str | None) -> str:
if market_state == "HIGH_VOLATILITY":
return "WARNING"
return "INFO"
# получить человекочитаемое сообщение по market state
def _market_state_message(self, market_state: str) -> str:
messages = {
"TREND_UP": "Состояние рынка изменено: рост.",
"TREND_DOWN": "Состояние рынка изменено: снижение.",
"RANGE": "Состояние рынка изменено: нет выраженного направления.",
"HIGH_VOLATILITY": "Состояние рынка изменено: высокая волатильность.",
"LOW_VOLATILITY": "Состояние рынка изменено: низкая активность.",
}
return messages.get(market_state, "Состояние рынка анализируется.")