Files
dzentra_bot/app/src/trading/market_analysis/service.py

489 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/market_analysis/service.py
from __future__ import annotations
from src.integrations.exchange.service import ExchangeService
from src.trading.market_analysis.indicators import atr, ema, rsi
from src.trading.market_analysis.models import (
MarketAnalysisResult,
MarketPhase,
MarketState,
TrendDirection,
TrendQuality,
TrendStrength,
VolatilityState,
)
class MarketAnalysisService:
_fast_ema_period = 20
_slow_ema_period = 50
_atr_period = 14
_rsi_period = 14
_min_candles = 60
_low_volatility_atr_percent = 0.05
_high_volatility_atr_percent = 1.8
_trend_gap_percent = 0.03
_trend_consistency_window = 20
_phase_window = 5
_phase_direction_threshold_percent = 0.03
def analyze(
self,
symbol: str,
*,
interval: str = "5m",
limit: int = 200,
) -> MarketAnalysisResult:
try:
batch = ExchangeService().get_klines(
symbol=symbol,
interval=interval,
limit=limit,
)
except Exception as exc:
return self._unknown(
symbol=symbol,
interval=interval,
reason=f"Не удалось получить свечи: {exc}",
)
candles = batch.candles
closes = [item.close_price for item in candles]
if len(candles) < self._min_candles:
return self._unknown(
symbol=batch.symbol,
interval=interval,
reason="Недостаточно свечей для анализа рынка.",
candles_count=len(candles),
)
close_price = closes[-1] if closes else None
ema_fast = ema(closes, self._fast_ema_period)
ema_slow = ema(closes, self._slow_ema_period)
atr_value = atr(candles, self._atr_period)
rsi_value = rsi(closes, self._rsi_period)
if (
close_price is None
or close_price <= 0
or ema_fast is None
or ema_slow is None
or atr_value is None
):
return self._unknown(
symbol=batch.symbol,
interval=interval,
reason="Недостаточно данных для расчёта EMA / ATR.",
candles_count=len(candles),
)
atr_percent = (atr_value / close_price) * 100
trend_gap_percent = self._trend_gap_percent_value(
ema_fast=ema_fast,
ema_slow=ema_slow,
)
volatility = self._classify_volatility(atr_percent)
trend = self._classify_trend(
ema_fast=ema_fast,
ema_slow=ema_slow,
)
trend_strength = self._classify_trend_strength(trend_gap_percent)
trend_consistency = self._trend_consistency(
closes=closes,
trend=trend,
)
trend_quality = self._classify_trend_quality(trend_consistency)
phase_change_percent = self._recent_change_percent(
closes=closes,
window=self._phase_window,
)
phase_direction = self._classify_phase_direction(phase_change_percent)
market_phase, phase_reason = self._classify_market_phase(
trend=trend,
volatility=volatility,
trend_strength=trend_strength,
trend_quality=trend_quality,
rsi_value=rsi_value,
phase_direction=phase_direction,
)
state = self._classify_market_state(
trend=trend,
volatility=volatility,
)
is_trade_allowed = state in {
MarketState.TREND_UP,
MarketState.TREND_DOWN,
}
reason = self._reason(
state=state,
trend=trend,
volatility=volatility,
atr_percent=atr_percent,
rsi_value=rsi_value,
)
return MarketAnalysisResult(
symbol=batch.symbol,
interval=interval,
state=state,
trend=trend,
volatility=volatility,
close_price=close_price,
ema_fast=ema_fast,
ema_slow=ema_slow,
atr=atr_value,
atr_percent=atr_percent,
rsi=rsi_value,
candles_count=len(candles),
reason=reason,
is_trade_allowed=is_trade_allowed,
payload={
"symbol": batch.symbol,
"interval": interval,
"market_state": state.value,
"trend": trend.value,
"volatility": volatility.value,
"market_trend_strength": trend_strength.value,
"market_trend_quality": trend_quality.value,
"market_phase": market_phase.value,
"market_phase_direction": phase_direction.value,
"market_phase_change_percent": round(phase_change_percent, 5)
if phase_change_percent is not None
else None,
"market_phase_reason": phase_reason,
"market_trend_gap_percent": round(trend_gap_percent, 5)
if trend_gap_percent is not None
else None,
"market_trend_consistency": round(trend_consistency, 3)
if trend_consistency is not None
else None,
"close_price": close_price,
"ema_fast_period": self._fast_ema_period,
"ema_slow_period": self._slow_ema_period,
"ema_fast": round(ema_fast, 8),
"ema_slow": round(ema_slow, 8),
"atr_period": self._atr_period,
"atr": round(atr_value, 8),
"atr_percent": round(atr_percent, 4),
"rsi_period": self._rsi_period,
"rsi": round(rsi_value, 2) if rsi_value is not None else None,
"candles_count": len(candles),
"is_trade_allowed": is_trade_allowed,
},
trend_strength=trend_strength,
trend_quality=trend_quality,
market_phase=market_phase,
trend_gap_percent=trend_gap_percent,
trend_consistency=trend_consistency,
phase_direction=phase_direction,
phase_change_percent=phase_change_percent,
phase_reason=phase_reason,
)
def _trend_gap_percent_value(
self,
*,
ema_fast: float,
ema_slow: float,
) -> float | None:
if ema_slow <= 0:
return None
return ((ema_fast - ema_slow) / ema_slow) * 100
def _classify_trend(
self,
*,
ema_fast: float,
ema_slow: float,
) -> TrendDirection:
gap_percent = self._trend_gap_percent_value(
ema_fast=ema_fast,
ema_slow=ema_slow,
)
if gap_percent is None:
return TrendDirection.UNKNOWN
if gap_percent >= self._trend_gap_percent:
return TrendDirection.UP
if gap_percent <= -self._trend_gap_percent:
return TrendDirection.DOWN
return TrendDirection.FLAT
def _classify_trend_strength(
self,
trend_gap_percent: float | None,
) -> TrendStrength:
if trend_gap_percent is None:
return TrendStrength.UNKNOWN
gap = abs(trend_gap_percent)
if gap < 0.08:
return TrendStrength.WEAK
if gap < 0.25:
return TrendStrength.NORMAL
return TrendStrength.STRONG
def _trend_consistency(
self,
*,
closes: list[float],
trend: TrendDirection,
) -> float | None:
if len(closes) < 2:
return None
window = closes[-self._trend_consistency_window :]
if len(window) < 2:
return None
up_moves = 0
down_moves = 0
for previous_price, current_price in zip(window, window[1:]):
if current_price > previous_price:
up_moves += 1
elif current_price < previous_price:
down_moves += 1
total_moves = max(1, len(window) - 1)
if trend == TrendDirection.UP:
return up_moves / total_moves
if trend == TrendDirection.DOWN:
return down_moves / total_moves
return None
def _classify_trend_quality(
self,
trend_consistency: float | None,
) -> TrendQuality:
if trend_consistency is None:
return TrendQuality.UNKNOWN
if trend_consistency >= 0.6:
return TrendQuality.CLEAN
return TrendQuality.NOISY
def _recent_change_percent(
self,
*,
closes: list[float],
window: int,
) -> float | None:
if window <= 0 or len(closes) < window + 1:
return None
first_price = closes[-(window + 1)]
last_price = closes[-1]
if first_price <= 0:
return None
return ((last_price - first_price) / first_price) * 100
def _classify_phase_direction(
self,
change_percent: float | None,
) -> TrendDirection:
if change_percent is None:
return TrendDirection.UNKNOWN
if change_percent >= self._phase_direction_threshold_percent:
return TrendDirection.UP
if change_percent <= -self._phase_direction_threshold_percent:
return TrendDirection.DOWN
return TrendDirection.FLAT
def _is_counter_trend_move(
self,
*,
trend: TrendDirection,
phase_direction: TrendDirection,
) -> bool:
if trend == TrendDirection.UP:
return phase_direction == TrendDirection.DOWN
if trend == TrendDirection.DOWN:
return phase_direction == TrendDirection.UP
return False
def _classify_market_phase(
self,
*,
trend: TrendDirection,
volatility: VolatilityState,
trend_strength: TrendStrength,
trend_quality: TrendQuality,
rsi_value: float | None,
phase_direction: TrendDirection,
) -> tuple[MarketPhase, str]:
if volatility == VolatilityState.LOW:
return MarketPhase.SQUEEZE, "LOW_VOLATILITY_SQUEEZE"
if trend == TrendDirection.FLAT:
return MarketPhase.RANGE, "FLAT_TREND_RANGE"
if trend not in {TrendDirection.UP, TrendDirection.DOWN}:
return MarketPhase.UNKNOWN, "UNKNOWN_TREND"
if trend_strength == TrendStrength.WEAK:
return MarketPhase.RANGE, "WEAK_TREND_RANGE"
if self._is_counter_trend_move(
trend=trend,
phase_direction=phase_direction,
):
return MarketPhase.PULLBACK, "COUNTER_TREND_MOVE"
if (
trend == TrendDirection.UP
and rsi_value is not None
and rsi_value < 45
and phase_direction == TrendDirection.DOWN
):
return MarketPhase.PULLBACK, "UPTREND_RSI_PULLBACK_CONFIRMED_BY_PRICE"
if (
trend == TrendDirection.DOWN
and rsi_value is not None
and rsi_value > 55
and phase_direction == TrendDirection.UP
):
return MarketPhase.PULLBACK, "DOWNTREND_RSI_PULLBACK_CONFIRMED_BY_PRICE"
return MarketPhase.IMPULSE, "WITH_TREND_OR_NEUTRAL_MOVE"
def _classify_volatility(self, atr_percent: float) -> VolatilityState:
if atr_percent <= 0:
return VolatilityState.UNKNOWN
if atr_percent < self._low_volatility_atr_percent:
return VolatilityState.LOW
if atr_percent > self._high_volatility_atr_percent:
return VolatilityState.HIGH
return VolatilityState.NORMAL
def _classify_market_state(
self,
*,
trend: TrendDirection,
volatility: VolatilityState,
) -> MarketState:
if volatility == VolatilityState.HIGH:
return MarketState.HIGH_VOLATILITY
if volatility == VolatilityState.LOW:
return MarketState.LOW_VOLATILITY
if trend == TrendDirection.UP:
return MarketState.TREND_UP
if trend == TrendDirection.DOWN:
return MarketState.TREND_DOWN
if trend == TrendDirection.FLAT:
return MarketState.RANGE
return MarketState.UNKNOWN
def _reason(
self,
*,
state: MarketState,
trend: TrendDirection,
volatility: VolatilityState,
atr_percent: float,
rsi_value: float | None,
) -> str:
rsi_text = f", RSI={rsi_value:.2f}" if rsi_value is not None else ""
if state == MarketState.TREND_UP:
return f"Рынок перешёл в рост. ATR={atr_percent:.2f}%{rsi_text}."
if state == MarketState.TREND_DOWN:
return f"Рынок перешёл в снижение. ATR={atr_percent:.2f}%{rsi_text}."
if state == MarketState.RANGE:
return f"На рынке нет выраженного направления. ATR={atr_percent:.2f}%{rsi_text}."
if state == MarketState.HIGH_VOLATILITY:
return f"Рынок стал слишком волатильным. ATR={atr_percent:.2f}%{rsi_text}."
if state == MarketState.LOW_VOLATILITY:
return f"Рынок почти не движется. ATR={atr_percent:.2f}%{rsi_text}."
return f"Состояние рынка не определено. Trend={trend}, volatility={volatility}."
def _unknown(
self,
*,
symbol: str,
interval: str,
reason: str,
candles_count: int = 0,
) -> MarketAnalysisResult:
return MarketAnalysisResult(
symbol=symbol,
interval=interval,
state=MarketState.UNKNOWN,
trend=TrendDirection.UNKNOWN,
volatility=VolatilityState.UNKNOWN,
close_price=None,
ema_fast=None,
ema_slow=None,
atr=None,
atr_percent=None,
rsi=None,
candles_count=candles_count,
reason=reason,
is_trade_allowed=False,
payload={
"symbol": symbol,
"interval": interval,
"market_state": MarketState.UNKNOWN.value,
"trend": TrendDirection.UNKNOWN.value,
"volatility": VolatilityState.UNKNOWN.value,
"market_trend_strength": TrendStrength.UNKNOWN.value,
"market_trend_quality": TrendQuality.UNKNOWN.value,
"market_phase": MarketPhase.UNKNOWN.value,
"market_phase_direction": TrendDirection.UNKNOWN.value,
"market_phase_change_percent": None,
"market_phase_reason": reason,
"market_trend_gap_percent": None,
"market_trend_consistency": None,
"candles_count": candles_count,
"is_trade_allowed": False,
"reason": reason,
},
trend_strength=TrendStrength.UNKNOWN,
trend_quality=TrendQuality.UNKNOWN,
market_phase=MarketPhase.UNKNOWN,
trend_gap_percent=None,
trend_consistency=None,
phase_direction=TrendDirection.UNKNOWN,
phase_change_percent=None,
phase_reason=reason,
)