07.4.3.14 — Auto Trading UI. Realistic Pricing & Debug Live Tools

This commit is contained in:
2026-05-09 01:34:46 +03:00
parent ee78f9774a
commit df76490783
15 changed files with 2161 additions and 464 deletions

View File

@@ -26,7 +26,7 @@ class AutoTradeRunner:
_current_screen: str | None = None
_analysis_interval_seconds = 5
_ui_interval_seconds = 60
_ui_interval_seconds = 5
_last_text: str | None = None
_last_ui_refresh_at: float = 0.0
@@ -550,17 +550,66 @@ class AutoTradeRunner:
except (TypeError, ValueError):
return ""
@classmethod
def _log_refresh_skip(cls, reason: str, payload: dict | None = None) -> None:
try:
JournalService().log_ui_info(
event_type="auto_screen_refresh_skipped",
message=f"Auto screen refresh skipped: {reason}",
screen="auto",
action="refresh_screen",
payload=payload or {},
)
except Exception:
pass
@classmethod
def _log_refresh_success(cls, payload: dict | None = None) -> None:
try:
JournalService().log_ui_info(
event_type="auto_screen_refreshed",
message="Auto screen refreshed.",
screen="auto",
action="refresh_screen",
payload=payload or {},
)
except Exception:
pass
@classmethod
def _log_refresh_error(cls, reason: str, payload: dict | None = None) -> None:
try:
JournalService().log_error(
"auto_screen_refresh_error",
f"Auto screen refresh error: {reason}",
payload or {},
)
except Exception:
pass
@classmethod
async def _refresh_screen(cls, *, force: bool = False) -> None:
if cls._current_screen != "auto":
cls._log_refresh_skip("current_screen_not_auto")
return
now = time.monotonic()
if now < cls._retry_after_until:
cls._log_refresh_skip(
"retry_after_active",
{"retry_after_until": cls._retry_after_until, "now": now},
)
return
if not force and now - cls._last_ui_refresh_at < cls._ui_interval_seconds:
cls._log_refresh_skip(
"ui_interval_not_reached",
{
"elapsed": round(now - cls._last_ui_refresh_at, 2),
"interval": cls._ui_interval_seconds,
},
)
return
if not all(
@@ -572,11 +621,22 @@ class AutoTradeRunner:
cls._render_markup,
]
):
cls._log_refresh_skip(
"screen_not_registered",
{
"has_bot": cls._bot is not None,
"chat_id": cls._chat_id,
"message_id": cls._message_id,
"has_render_text": cls._render_text is not None,
"has_render_markup": cls._render_markup is not None,
},
)
return
text = cls._render_text()
if text == cls._last_text:
cls._log_refresh_skip("text_not_changed")
return
try:
@@ -589,8 +649,23 @@ class AutoTradeRunner:
cls._last_text = text
cls._last_ui_refresh_at = now
cls._log_refresh_success(
{
"chat_id": cls._chat_id,
"message_id": cls._message_id,
"text_length": len(text),
}
)
except TelegramRetryAfter as exc:
cls._retry_after_until = time.monotonic() + exc.retry_after + 5
cls._log_refresh_error(
"telegram_retry_after",
{
"retry_after": exc.retry_after,
"retry_after_until": cls._retry_after_until,
},
)
except TelegramBadRequest as exc:
error_text = str(exc).lower()
@@ -598,6 +673,7 @@ class AutoTradeRunner:
if "message is not modified" in error_text:
cls._last_text = text
cls._last_ui_refresh_at = now
cls._log_refresh_skip("telegram_message_not_modified")
return
if "message to edit not found" in error_text:
@@ -605,7 +681,19 @@ class AutoTradeRunner:
cls._render_text = None
cls._render_markup = None
cls._last_text = None
cls._log_refresh_error(
"telegram_message_to_edit_not_found",
{"error": str(exc)},
)
return
except Exception:
pass
cls._log_refresh_error(
"telegram_bad_request",
{"error": str(exc)},
)
except Exception as exc:
cls._log_refresh_error(
"unexpected_refresh_error",
{"error": str(exc)},
)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import time
from datetime import datetime
from src.core.config import load_settings
@@ -49,6 +50,9 @@ class AutoTradeService:
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != normalized_signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = normalized_signal
state.last_signal_repeat_count = repeat_count
@@ -85,6 +89,18 @@ class AutoTradeService:
return state
# установить капитал, выделенный под автоторговлю
def set_allocated_balance_usd(self, value: float) -> AutoTradeState:
state = self.get_state()
if value <= 0:
value = 1000.0
state.allocated_balance_usd = value
state.execution_block_reason = None
state.execution_size_adjustment_reason = None
return state
# получить текущее состояние автоторговли
def get_state(self) -> AutoTradeState:
if not self._state.symbol:
@@ -264,6 +280,7 @@ class AutoTradeService:
state.is_signal_confirmed = False
state.is_signal_ready = False
state.execution_block_reason = None
state.signal_started_at = None
# собрать контекст для стратегии
def _build_strategy_context(self) -> StrategyContext:
@@ -397,6 +414,9 @@ class AutoTradeService:
previous_signal = state.last_signal
previous_decision_status = state.decision_status
if previous_signal != signal or state.signal_started_at is None:
state.signal_started_at = time.monotonic()
state.last_signal = signal
state.last_signal_repeat_count = self._same_signal_count
state.last_signal_confidence = confidence

View File

@@ -11,13 +11,13 @@ class AutoTradeState:
status: str = "OFF"
# выбранная стратегия: TREND / GRID / SCALP
strategy: str | None = None
strategy: str | None = "TREND"
# торговый инструмент
symbol: str = ""
symbol: str = "BTC/USD_LEVERAGE"
# риск на одну сделку в %
risk_percent: float | None = None
risk_percent: float | None = 1.0
# текущий PnL
pnl_usd: float = 0.0
@@ -37,6 +37,9 @@ class AutoTradeState:
# причина последнего сигнала
last_signal_reason: str | None = None
# время начала текущего сигнала, monotonic timestamp
signal_started_at: float | None = None
# статус торгового решения: WAITING / CONFIRMING / READY / BLOCKED
decision_status: str = "WAITING"
@@ -68,7 +71,7 @@ class AutoTradeState:
leverage: float | None = 2.0
# stop loss по движению цены в %
stop_loss_percent: float | None = None
stop_loss_percent: float | None = 1.0
# take profit по движению цены в %
take_profit_percent: float | None = None
@@ -83,4 +86,10 @@ class AutoTradeState:
execution_block_reason: str | None = None
# причина авто-уменьшения размера позиции
execution_size_adjustment_reason: str | None = None
execution_size_adjustment_reason: str | None = None
# капитал, выделенный только под AutoTrade
allocated_balance_usd: float = 1000.0
# зафиксированный результат закрытых paper-сделок
realized_pnl_usd: float = 0.0

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import math
from datetime import datetime
from src.core.event_bus import EventBus
@@ -14,6 +15,7 @@ from src.trading.position.state import PositionState
class ExecutionEngine:
_position = PositionState()
_size_precision = 5
def get_position(self) -> PositionState:
return type(self)._position
@@ -58,8 +60,7 @@ class ExecutionEngine:
return ExecutionDecision("NONE", False, "Позиция уже открыта.")
try:
ticker = ExchangeService().get_price(state.symbol)
entry_price = ticker.price
entry_price = self._entry_price_for_side(state.symbol, side)
except Exception as exc:
return ExecutionDecision("NONE", False, f"Не удалось получить цену для paper execution: {exc}")
@@ -72,13 +73,22 @@ class ExecutionEngine:
False,
"Позиция не открыта: невозможно рассчитать size без Stop Loss.",
)
size = self._adjust_size_by_margin_limit(
state=state,
entry_price=entry_price,
size=size,
)
size = self._round_order_size(size)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"Позиция не открыта: итоговый size равен 0.",
)
type(self)._position = PositionState(
side=side,
symbol=state.symbol,
@@ -105,6 +115,7 @@ class ExecutionEngine:
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"opened_at": now,
"pricing": "ask_for_long_bid_for_short",
}
JournalService().log_ui_info(
@@ -131,14 +142,14 @@ class ExecutionEngine:
return ExecutionDecision("NONE", False, "Нет направления для flip.")
try:
ticker = ExchangeService().get_price(state.symbol)
flip_price = ticker.price
exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
new_entry_price = self._entry_price_for_side(state.symbol, new_side)
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для flip: {exc}")
now = self._now_time()
pnl = self._calculate_pnl(flip_price)
new_size = self._calculate_position_size(state, entry_price=flip_price)
pnl = self._calculate_pnl(exit_price)
new_size = self._calculate_position_size(state, entry_price=new_entry_price)
if new_size <= 0:
return ExecutionDecision(
@@ -146,13 +157,24 @@ class ExecutionEngine:
False,
"Flip отменён: невозможно рассчитать size без Stop Loss.",
)
new_size = self._adjust_size_by_margin_limit(
state=state,
entry_price=flip_price,
entry_price=new_entry_price,
size=new_size,
)
new_size = self._round_order_size(new_size)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"Flip отменён: итоговый size равен 0.",
)
state.realized_pnl_usd += pnl
old_side = position.side
old_entry_price = position.entry_price
old_size = position.size
@@ -162,7 +184,7 @@ class ExecutionEngine:
type(self)._position = PositionState(
side=new_side,
symbol=state.symbol,
entry_price=flip_price,
entry_price=new_entry_price,
size=new_size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
@@ -180,8 +202,8 @@ class ExecutionEngine:
"new_side": new_side,
"side": new_side,
"entry_price": old_entry_price,
"exit_price": flip_price,
"new_entry_price": flip_price,
"exit_price": exit_price,
"new_entry_price": new_entry_price,
"old_size": old_size,
"new_size": new_size,
"size": new_size,
@@ -195,6 +217,7 @@ class ExecutionEngine:
"opened_at": old_opened_at,
"closed_at": now,
"new_opened_at": now,
"pricing": "exit_by_side_then_entry_by_side",
}
JournalService().log_ui_info(
@@ -231,13 +254,14 @@ class ExecutionEngine:
exit_price = forced_exit_price
else:
try:
ticker = ExchangeService().get_price(state.symbol)
exit_price = ticker.price
exit_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для закрытия: {exc}")
pnl = forced_pnl if forced_pnl is not None else self._calculate_pnl(exit_price)
state.realized_pnl_usd += pnl
now = self._now_time()
payload = {
@@ -258,6 +282,7 @@ class ExecutionEngine:
"is_forced": forced_reason is not None,
"opened_at": position.opened_at,
"closed_at": now,
"pricing": "bid_for_long_exit_ask_for_short_exit",
}
JournalService().log_ui_info(
@@ -293,8 +318,7 @@ class ExecutionEngine:
return None
try:
ticker = ExchangeService().get_price(position.symbol or state.symbol)
current_price = ticker.price
current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
except Exception:
return None
@@ -327,34 +351,19 @@ class ExecutionEngine:
return None
def _is_stop_loss_hit(
self,
state: AutoTradeState,
price_move_percent: float,
) -> bool:
def _is_stop_loss_hit(self, state: AutoTradeState, price_move_percent: float) -> bool:
if state.stop_loss_percent is None:
return False
return price_move_percent <= -abs(state.stop_loss_percent)
def _is_take_profit_hit(
self,
state: AutoTradeState,
price_move_percent: float,
) -> bool:
def _is_take_profit_hit(self, state: AutoTradeState, price_move_percent: float) -> bool:
if state.take_profit_percent is None:
return False
return price_move_percent >= abs(state.take_profit_percent)
def _is_max_loss_hit(
self,
state: AutoTradeState,
unrealized_pnl: float,
) -> bool:
def _is_max_loss_hit(self, state: AutoTradeState, unrealized_pnl: float) -> bool:
if state.max_loss_usd is None:
return False
return unrealized_pnl <= -abs(state.max_loss_usd)
def _calculate_price_move_percent(self, current_price: float) -> float:
@@ -371,7 +380,7 @@ class ExecutionEngine:
return round(((entry - current_price) / entry) * 100, 4)
return 0.0
def _should_flip_position(self, state: AutoTradeState) -> bool:
position = type(self)._position
@@ -403,8 +412,7 @@ class ExecutionEngine:
return
try:
ticker = ExchangeService().get_price(position.symbol or state.symbol)
current_price = ticker.price
current_price = self._exit_price_for_side(position.symbol or state.symbol, position.side)
except Exception:
self._sync_state_from_position(state)
return
@@ -430,15 +438,14 @@ class ExecutionEngine:
if price is None:
try:
ticker = ExchangeService().get_price(state.symbol)
price = ticker.price
price = self._signal_entry_price(state)
except Exception:
return 0.0
if price <= 0:
return 0.0
balance_usd = 1000.0
balance_usd = state.allocated_balance_usd
target_risk_usd = balance_usd * (state.risk_percent / 100)
stop_loss_distance_usd = price * (state.stop_loss_percent / 100)
@@ -446,8 +453,7 @@ class ExecutionEngine:
return 0.0
size = target_risk_usd / stop_loss_distance_usd
return round(size, 8)
return self._round_size(size)
def _adjust_size_by_margin_limit(
self,
@@ -462,26 +468,85 @@ class ExecutionEngine:
state.execution_size_adjustment_reason = None
if max_percent is None or max_percent <= 0:
return round(size, 8)
return self._round_size(size)
leverage = state.leverage or 1.0
if leverage <= 0 or entry_price <= 0:
state.execution_block_reason = "Invalid leverage or entry price."
return 0.0
balance_usd = 1000.0
balance_usd = state.allocated_balance_usd
max_reserved_usd = balance_usd * (max_percent / 100)
max_notional_usd = max_reserved_usd * leverage
max_size = max_notional_usd / entry_price
if size <= max_size:
return round(size, 8)
return self._round_size(size)
state.execution_size_adjustment_reason = "MARGIN_LIMIT"
return self._round_size(max_size)
def _signal_entry_price(self, state: AutoTradeState) -> float:
if state.last_signal == "BUY":
return self._entry_price_for_side(state.symbol, "LONG")
if state.last_signal == "SELL":
return self._entry_price_for_side(state.symbol, "SHORT")
return self._market_last_price(state.symbol)
def _entry_price_for_side(self, symbol: str, side: str) -> float:
snapshot = ExchangeService().get_market_snapshot(symbol)
if side == "LONG":
return self._snapshot_price(snapshot, "ask_price", "last_price")
if side == "SHORT":
return self._snapshot_price(snapshot, "bid_price", "last_price")
return self._snapshot_price(snapshot, "last_price")
def _exit_price_for_side(self, symbol: str, side: str) -> float:
snapshot = ExchangeService().get_market_snapshot(symbol)
if side == "LONG":
return self._snapshot_price(snapshot, "bid_price", "last_price")
if side == "SHORT":
return self._snapshot_price(snapshot, "ask_price", "last_price")
return self._snapshot_price(snapshot, "last_price")
def _market_last_price(self, symbol: str) -> float:
snapshot = ExchangeService().get_market_snapshot(symbol)
return self._snapshot_price(snapshot, "last_price")
def _snapshot_price(
self,
snapshot: dict[str, object],
primary_key: str,
fallback_key: str | None = None,
) -> float:
raw_price = snapshot.get(primary_key)
if raw_price is None and fallback_key is not None:
raw_price = snapshot.get(fallback_key)
if raw_price is None:
raise ValueError(f"Market snapshot price '{primary_key}' is missing.")
price = float(raw_price)
if price <= 0:
raise ValueError(f"Market snapshot price '{primary_key}' is invalid: {price}")
return price
def _round_size(self, size: float) -> float:
factor = 10 ** self._size_precision
return math.floor(float(size) * factor) / factor
return round(max_size, 8)
def _calculate_pnl(self, current_price: float) -> float:
position = type(self)._position
@@ -504,5 +569,9 @@ class ExecutionEngine:
state.position_size = position.size
state.unrealized_pnl_usd = position.unrealized_pnl_usd
def _round_order_size(self, value: float) -> float:
factor = 10 ** self._size_precision
return math.floor(float(value) * factor) / factor
def _now_time(self) -> str:
return datetime.now().strftime("%H:%M:%S")

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from src.trading.strategies.base import BaseStrategy
from src.trading.strategies.hold import HoldStrategy
from src.trading.strategies.scalp import ScalpStrategy
from src.trading.strategies.trend import TrendStrategy
@@ -13,7 +14,7 @@ class StrategyRegistry:
"HOLD": HoldStrategy(),
"TREND": TrendStrategy(),
"GRID": HoldStrategy(),
"SCALP": HoldStrategy(),
"SCALP": ScalpStrategy(),
}
# получить стратегию по имени

View File

@@ -0,0 +1,156 @@
# app/src/trading/strategies/scalp.py
from __future__ import annotations
from src.integrations.exchange.service import ExchangeService
from src.trading.strategies.base import StrategyContext
from src.trading.strategies.signals import SignalResult, SignalType
class ScalpStrategy:
name = "SCALP"
_price_window: dict[str, list[float]] = {}
# короткое окно = быстрая реакция
_window_size = 4
# ниже порог = чувствительнее TREND
_threshold_percent = 0.02
# для scalp допускаем чуть больше шума
_min_direction_ratio = 0.55
def analyze(self, context: StrategyContext) -> SignalResult:
try:
ticker = ExchangeService().get_price(context.symbol)
except Exception as exc:
return SignalResult(
signal=SignalType.HOLD,
reason="Не удалось получить рыночную цену. Безопасный HOLD.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": context.symbol,
"error": str(exc),
},
)
symbol = ticker.symbol
current_price = float(ticker.price)
prices = self._price_window.setdefault(symbol, [])
prices.append(current_price)
if len(prices) > self._window_size:
prices.pop(0)
if len(prices) < self._window_size:
return SignalResult(
signal=SignalType.HOLD,
reason="Недостаточно данных для SCALP.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"price": current_price,
"window_size": len(prices),
"required_window_size": self._window_size,
},
)
first_price = prices[0]
last_price = prices[-1]
if first_price <= 0:
return SignalResult(
signal=SignalType.HOLD,
reason="Некорректная стартовая цена в окне SCALP.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"prices": prices,
},
)
change_percent = ((last_price - first_price) / first_price) * 100
direction_ratio = self._direction_ratio(prices, change_percent)
payload = {
"strategy": self.name,
"symbol": symbol,
"first_price": first_price,
"current_price": last_price,
"change_percent": round(change_percent, 5),
"direction_ratio": round(direction_ratio, 3),
"window_size": len(prices),
"threshold_percent": self._threshold_percent,
"min_direction_ratio": self._min_direction_ratio,
}
if (
change_percent >= self._threshold_percent
and direction_ratio >= self._min_direction_ratio
):
return SignalResult(
signal=SignalType.BUY,
reason="Быстрый краткосрочный импульс вверх.",
confidence=self._calculate_confidence(change_percent, direction_ratio),
payload=payload,
)
if (
change_percent <= -self._threshold_percent
and direction_ratio >= self._min_direction_ratio
):
return SignalResult(
signal=SignalType.SELL,
reason="Быстрый краткосрочный импульс вниз.",
confidence=self._calculate_confidence(change_percent, direction_ratio),
payload=payload,
)
return SignalResult(
signal=SignalType.HOLD,
reason="SCALP-импульс недостаточно сильный.",
confidence=0.0,
payload=payload,
)
def _direction_ratio(self, prices: list[float], change_percent: float) -> float:
if len(prices) < 2:
return 0.0
up_moves = 0
down_moves = 0
for previous_price, current_price in zip(prices, prices[1:]):
if current_price > previous_price:
up_moves += 1
elif current_price < previous_price:
down_moves += 1
total_moves = max(1, len(prices) - 1)
if change_percent >= 0:
return up_moves / total_moves
return down_moves / total_moves
def _calculate_confidence(
self,
change_percent: float,
direction_ratio: float,
) -> float:
strength = abs(change_percent) / self._threshold_percent
if strength < 1:
return 0.0
strength_score = min(1.0, strength / 2)
direction_score = min(1.0, direction_ratio)
confidence = 0.35 + (strength_score * 0.4) + (direction_score * 0.25)
return round(min(1.0, confidence), 2)

View File

@@ -10,28 +10,24 @@ from src.trading.strategies.signals import SignalResult, SignalType
class TrendStrategy:
name = "TREND"
_last_prices: dict[str, float] = {}
_threshold_percent = 0.02
_price_window: dict[str, list[float]] = {}
# рассчитать уверенность сигнала по силе движения цены
def _calculate_confidence(self, change_percent: float) -> float:
strength = abs(change_percent) / self._threshold_percent
# длиннее окно = меньше шума
_window_size = 8
if strength < 1:
return 0.0
# общий порог изменения за окно
_threshold_percent = 0.05
confidence = 0.35 + ((strength - 1) / 2) * 0.65
# сколько движений внутри окна должно быть в сторону сигнала
_min_direction_ratio = 0.6
return round(min(1.0, confidence), 2)
# анализ простого тренда по изменению цены
def analyze(self, context: StrategyContext) -> SignalResult:
try:
ticker = ExchangeService().get_price(context.symbol)
snapshot = ExchangeService().get_market_snapshot(context.symbol)
except Exception as exc:
return SignalResult(
signal=SignalType.HOLD,
reason="Не удалось получить рыночную цену. Безопасный HOLD.",
reason="Не удалось получить рыночный snapshot. Безопасный HOLD.",
confidence=0.0,
payload={
"strategy": self.name,
@@ -40,63 +36,159 @@ class TrendStrategy:
},
)
symbol = ticker.symbol
current_price = ticker.price
previous_price = self._last_prices.get(symbol)
symbol = str(snapshot.get("symbol") or context.symbol)
current_price = self._analysis_price(snapshot)
self._last_prices[symbol] = current_price
if previous_price is None or previous_price <= 0:
if current_price <= 0:
return SignalResult(
signal=SignalType.HOLD,
reason="Недостаточно данных для определения тренда.",
reason="Некорректная рыночная цена. Безопасный HOLD.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"snapshot": snapshot,
},
)
prices = self._price_window.setdefault(symbol, [])
prices.append(current_price)
if len(prices) > self._window_size:
prices.pop(0)
if len(prices) < self._window_size:
return SignalResult(
signal=SignalType.HOLD,
reason="Недостаточно данных для анализа тренда.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"price": current_price,
"window_size": len(prices),
"required_window_size": self._window_size,
},
)
change_percent = ((current_price - previous_price) / previous_price) * 100
first_price = prices[0]
last_price = prices[-1]
if change_percent >= self._threshold_percent:
if first_price <= 0:
return SignalResult(
signal=SignalType.HOLD,
reason="Некорректная стартовая цена в окне.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"prices": prices,
},
)
change_percent = ((last_price - first_price) / first_price) * 100
direction_ratio = self._direction_ratio(prices, change_percent)
payload = {
"strategy": self.name,
"symbol": symbol,
"analysis_price": last_price,
"first_price": first_price,
"current_price": last_price,
"last_price": snapshot.get("last_price"),
"bid_price": snapshot.get("bid_price"),
"ask_price": snapshot.get("ask_price"),
"change_percent": round(change_percent, 5),
"direction_ratio": round(direction_ratio, 3),
"window_size": len(prices),
"threshold_percent": self._threshold_percent,
"min_direction_ratio": self._min_direction_ratio,
}
if (
change_percent >= self._threshold_percent
and direction_ratio >= self._min_direction_ratio
):
return SignalResult(
signal=SignalType.BUY,
reason="Цена растёт выше порога тренда.",
confidence=self._calculate_confidence(change_percent),
payload={
"strategy": self.name,
"symbol": symbol,
"previous_price": previous_price,
"current_price": current_price,
"change_percent": round(change_percent, 5),
},
reason="Устойчивый рост цены в окне TREND.",
confidence=self._calculate_confidence(change_percent, direction_ratio),
payload=payload,
)
if change_percent <= -self._threshold_percent:
if (
change_percent <= -self._threshold_percent
and direction_ratio >= self._min_direction_ratio
):
return SignalResult(
signal=SignalType.SELL,
reason="Цена падает ниже порога тренда.",
confidence=self._calculate_confidence(change_percent),
payload={
"strategy": self.name,
"symbol": symbol,
"previous_price": previous_price,
"current_price": current_price,
"change_percent": round(change_percent, 5),
},
reason="Устойчивое снижение цены в окне TREND.",
confidence=self._calculate_confidence(change_percent, direction_ratio),
payload=payload,
)
return SignalResult(
signal=SignalType.HOLD,
reason="Изменение цены ниже порога тренда.",
reason="Тренд недостаточно устойчивый.",
confidence=0.0,
payload={
"strategy": self.name,
"symbol": symbol,
"previous_price": previous_price,
"current_price": current_price,
"change_percent": round(change_percent, 5),
},
)
payload=payload,
)
def _analysis_price(self, snapshot: dict[str, object]) -> float:
bid = self._safe_float(snapshot.get("bid_price"))
ask = self._safe_float(snapshot.get("ask_price"))
if bid is not None and ask is not None and bid > 0 and ask > 0:
return (bid + ask) / 2
last = self._safe_float(snapshot.get("last_price"))
if last is not None:
return last
return 0.0
def _safe_float(self, value: object) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _direction_ratio(self, prices: list[float], change_percent: float) -> float:
if len(prices) < 2:
return 0.0
up_moves = 0
down_moves = 0
for previous_price, current_price in zip(prices, prices[1:]):
if current_price > previous_price:
up_moves += 1
elif current_price < previous_price:
down_moves += 1
total_moves = max(1, len(prices) - 1)
if change_percent >= 0:
return up_moves / total_moves
return down_moves / total_moves
def _calculate_confidence(
self,
change_percent: float,
direction_ratio: float,
) -> float:
strength = abs(change_percent) / self._threshold_percent
if strength < 1:
return 0.0
strength_score = min(1.0, strength / 3)
direction_score = min(1.0, direction_ratio)
confidence = 0.3 + (strength_score * 0.4) + (direction_score * 0.3)
return round(min(1.0, confidence), 2)