Files
dzentra_bot/app/src/trading/execution/engine.py

1102 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/execution/engine.py
from __future__ import annotations
import time
import math
from dataclasses import dataclass
from datetime import datetime
from src.core.event_bus import EventBus
from src.integrations.exchange.service import ExchangeService
from src.trading.auto.state import AutoTradeState
from src.trading.execution.models import ExecutionDecision
from src.trading.journal.service import JournalService
from src.trading.position.state import PositionState
from src.core.numbers import safe_float
from src.core.types import JsonDict, NumericLike
@dataclass(slots=True)
class _ExecutionPrice:
price: float
source: str
age_seconds: float | None
updated_at: str
pricing_role: str
class ExecutionEngine:
_position = PositionState()
_size_precision = 5
_min_flip_confidence = 0.75
_min_flip_repeat_count = 3
_min_flip_hold_seconds = 60
_loss_flip_confidence = 0.9
_last_flip_block_key: str | None = None
def get_position(self) -> PositionState:
return type(self)._position
def process(self, state: AutoTradeState) -> ExecutionDecision:
self._sync_state_from_position(state)
if state.status != "RUNNING":
return ExecutionDecision("NONE", False, "Execution доступен только в режиме RUNNING.")
self._update_unrealized_pnl(state)
risk_decision = self._risk_close_decision(state)
if risk_decision is not None:
return risk_decision
if state.decision_status != "READY" or not state.is_signal_ready:
return ExecutionDecision("NONE", False, "Сигнал ещё не готов к execution.")
if self._should_flip_position(state):
flip_block_reason = self._flip_block_reason(state)
if flip_block_reason is not None:
return self._block_flip(state, flip_block_reason)
return self._flip_position(state)
if state.last_signal == "BUY":
return self._open_position_if_empty(state=state, side="LONG", action="OPEN_LONG")
if state.last_signal == "SELL":
return self._open_position_if_empty(state=state, side="SHORT", action="OPEN_SHORT")
return ExecutionDecision("NONE", False, "Нет торгового действия.")
def _open_position_if_empty(
self,
*,
state: AutoTradeState,
side: str,
action: str,
) -> ExecutionDecision:
position = type(self)._position
if position.side != "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Позиция уже открыта.")
try:
entry = self._entry_price_for_side(state.symbol, side)
entry_price = entry.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Не удалось получить цену для paper execution: {exc}")
now = self._now_time()
size = self._calculate_position_size(state, entry_price=entry_price)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"Позиция не открыта: невозможно рассчитать adaptive size.",
)
size = self._adjust_size_by_margin_limit(
state=state,
entry_price=entry_price,
size=size,
)
self._sync_effective_risk_after_margin_limit(
state,
base_size=state.adaptive_size_base or 0.0,
final_size=size,
)
size = self._round_size(size)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"Позиция не открыта: итоговый size равен 0.",
)
type(self)._position = PositionState(
side=side,
symbol=state.symbol,
entry_price=entry_price,
size=size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
self._sync_state_from_position(state)
state.execution_block_reason = None
state.last_flip_block_reason = None
state.last_execution_action = action
state.last_execution_reason = f"Позиция {side} открыта."
payload: JsonDict = {
"execution_type": "ENTRY",
"action": action,
"symbol": state.symbol,
"side": side,
"entry_price": entry_price,
"size": size,
"leverage": state.leverage,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"execution_confidence_score": state.execution_confidence_score,
"execution_confidence_level": state.execution_confidence_level,
"execution_confidence_reason": state.execution_confidence_reason,
"adaptive_size_multiplier": state.adaptive_size_multiplier,
"adaptive_size_reason": state.adaptive_size_reason,
"adaptive_size_factors": state.adaptive_size_factors,
"effective_risk_percent": state.effective_risk_percent,
"effective_target_risk_usd": state.effective_target_risk_usd,
"adaptive_size_base": state.adaptive_size_base,
"adaptive_size_final": state.adaptive_size_final,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"opened_at": now,
"pricing": "ask_for_long_bid_for_short",
"pricing_role": entry.pricing_role,
"price_source": entry.source,
"price_age_seconds": entry.age_seconds,
"price_updated_at": entry.updated_at,
}
JournalService().log_ui_info(
event_type="position_opened",
message=f"Позиция {side} открыта: {state.symbol}.",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_opened", payload)
return ExecutionDecision(action, True, f"Позиция {side} открыта.")
def _flip_position(self, state: AutoTradeState) -> ExecutionDecision:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Нет позиции для flip.")
new_side = self._target_side_from_signal(state.last_signal)
if new_side is None:
return ExecutionDecision("NONE", False, "Нет направления для flip.")
try:
exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
entry_execution = self._entry_price_for_side(state.symbol, new_side)
exit_price = exit_execution.price
new_entry_price = entry_execution.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для flip: {exc}")
now = self._now_time()
pnl = self._calculate_pnl(exit_price)
new_size = self._calculate_position_size(state, entry_price=new_entry_price)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"Flip отменён: невозможно рассчитать adaptive size.",
)
new_size = self._adjust_size_by_margin_limit(
state=state,
entry_price=new_entry_price,
size=new_size,
)
self._sync_effective_risk_after_margin_limit(
state,
base_size=state.adaptive_size_base or 0.0,
final_size=new_size,
)
new_size = self._round_size(new_size)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"Flip отменён: итоговый size равен 0.",
)
state.realized_pnl_usd += pnl
state.cycle_realized_pnl_usd += pnl
state.cycle_closed_trades += 1
if pnl > 0:
state.cycle_winning_trades += 1
old_side = position.side
old_entry_price = position.entry_price
old_size = position.size
old_leverage = position.leverage
old_opened_at = position.opened_at
state.last_flip_old_side = old_side
state.last_flip_new_side = new_side
state.last_flip_pnl_usd = pnl
state.last_flip_reason = state.last_signal_reason
state.last_flip_monotonic_at = time.monotonic()
type(self)._position = PositionState(
side=new_side,
symbol=state.symbol,
entry_price=new_entry_price,
size=new_size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
self._sync_state_from_position(state)
state.execution_block_reason = None
state.last_flip_block_reason = None
state.last_execution_action = f"FLIP_{old_side}_TO_{new_side}"
state.last_execution_reason = "Направление позиции изменено."
state.last_flip_at = now
type(self)._last_flip_block_key = None
payload: JsonDict = {
"execution_type": "FLIP",
"action": f"FLIP_{old_side}_TO_{new_side}",
"symbol": state.symbol,
"old_side": old_side,
"new_side": new_side,
"side": new_side,
"entry_price": old_entry_price,
"exit_price": exit_price,
"new_entry_price": new_entry_price,
"old_size": old_size,
"new_size": new_size,
"size": new_size,
"old_leverage": old_leverage,
"leverage": state.leverage,
"pnl": pnl,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"execution_confidence_score": state.execution_confidence_score,
"execution_confidence_level": state.execution_confidence_level,
"execution_confidence_reason": state.execution_confidence_reason,
"adaptive_size_multiplier": state.adaptive_size_multiplier,
"adaptive_size_reason": state.adaptive_size_reason,
"adaptive_size_factors": state.adaptive_size_factors,
"effective_risk_percent": state.effective_risk_percent,
"effective_target_risk_usd": state.effective_target_risk_usd,
"adaptive_size_base": state.adaptive_size_base,
"adaptive_size_final": state.adaptive_size_final,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"opened_at": old_opened_at,
"closed_at": now,
"new_opened_at": now,
"pricing": "exit_by_side_then_entry_by_side",
"exit_pricing_role": exit_execution.pricing_role,
"exit_price_source": exit_execution.source,
"exit_price_age_seconds": exit_execution.age_seconds,
"exit_price_updated_at": exit_execution.updated_at,
"entry_pricing_role": entry_execution.pricing_role,
"entry_price_source": entry_execution.source,
"entry_price_age_seconds": entry_execution.age_seconds,
"entry_price_updated_at": entry_execution.updated_at,
}
JournalService().log_ui_info(
event_type="position_flipped",
message=f"Направление позиции изменено: {old_side}{new_side}.",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_flipped", payload)
return ExecutionDecision(
f"FLIP_{old_side}_TO_{new_side}",
True,
f"Направление позиции изменено: {old_side}{new_side}.",
)
def _close_position(
self,
state: AutoTradeState,
*,
forced_reason: str | None = None,
forced_exit_price: NumericLike | None = None,
forced_pnl: NumericLike | None = None,
forced_price_meta: _ExecutionPrice | None = None,
) -> ExecutionDecision:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Нет открытой позиции для закрытия.")
if forced_exit_price is not None:
exit_price = safe_float(forced_exit_price) or 0.0
exit_execution = forced_price_meta
else:
try:
exit_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
exit_price = exit_execution.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для закрытия: {exc}")
pnl = (
safe_float(forced_pnl)
if forced_pnl is not None
else self._calculate_pnl(exit_price)
)
if pnl is None:
pnl = 0.0
state.realized_pnl_usd += pnl
state.cycle_realized_pnl_usd += pnl
state.cycle_closed_trades += 1
if pnl > 0:
state.cycle_winning_trades += 1
now = self._now_time()
payload: JsonDict = {
"execution_type": "EXIT",
"action": "CLOSE",
"symbol": state.symbol,
"side": position.side,
"entry_price": position.entry_price,
"exit_price": exit_price,
"size": position.size,
"leverage": position.leverage,
"pnl": pnl,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"risk_reason": forced_reason,
"is_forced": forced_reason is not None,
"opened_at": position.opened_at,
"closed_at": now,
"pricing": "bid_for_long_exit_ask_for_short_exit",
"pricing_role": exit_execution.pricing_role if exit_execution else None,
"price_source": exit_execution.source if exit_execution else None,
"price_age_seconds": exit_execution.age_seconds if exit_execution else None,
"price_updated_at": exit_execution.updated_at if exit_execution else None,
}
close_reason = forced_reason or "MANUAL"
JournalService().log_ui_info(
event_type="position_closed",
message=f"Позиция {position.side} закрыта: {close_reason}.",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_closed", payload)
type(self)._position = PositionState()
self._sync_state_from_position(state)
state.execution_block_reason = None
state.last_flip_block_reason = None
state.last_execution_action = (
f"FORCE_CLOSE_{forced_reason}"
if forced_reason is not None
else "CLOSE"
)
state.last_execution_reason = (
f"Позиция закрыта по правилу защиты: {forced_reason}."
if forced_reason is not None
else "Позиция закрыта."
)
type(self)._last_flip_block_key = None
if forced_reason is not None:
return ExecutionDecision(
f"FORCE_CLOSE_{forced_reason}",
True,
f"Позиция закрыта по правилу защиты: {forced_reason}.",
)
return ExecutionDecision("CLOSE", True, "Позиция закрыта.")
def _risk_close_decision(self, state: AutoTradeState) -> ExecutionDecision | None:
position = type(self)._position
if position.side == "NONE":
return None
try:
current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
current_price = current_execution.price
except Exception:
return None
price_move_percent = self._calculate_price_move_percent(current_price)
unrealized_pnl = self._calculate_pnl(current_price)
if self._is_max_loss_hit(state, unrealized_pnl):
return self._close_position(
state,
forced_reason="MAX_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
forced_price_meta=current_execution,
)
if self._is_stop_loss_hit(state, price_move_percent):
return self._close_position(
state,
forced_reason="STOP_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
forced_price_meta=current_execution,
)
if self._is_take_profit_hit(state, price_move_percent):
return self._close_position(
state,
forced_reason="TAKE_PROFIT",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
forced_price_meta=current_execution,
)
return None
def _is_stop_loss_hit(self, state: AutoTradeState, price_move_percent: float) -> bool:
if state.stop_loss_percent is None:
return False
return price_move_percent <= -abs(state.stop_loss_percent)
def _is_take_profit_hit(self, state: AutoTradeState, price_move_percent: float) -> bool:
if state.take_profit_percent is None:
return False
return price_move_percent >= abs(state.take_profit_percent)
def _is_max_loss_hit(self, state: AutoTradeState, unrealized_pnl: float) -> bool:
if state.max_loss_usd is None:
return False
return unrealized_pnl <= -abs(state.max_loss_usd)
def _calculate_price_move_percent(
self,
current_price: NumericLike | None,
) -> float:
position = type(self)._position
price = safe_float(current_price) or 0.0
entry = safe_float(position.entry_price) or 0.0
if entry <= 0:
return 0.0
if position.side == "LONG":
return round(((price - entry) / entry) * 100, 4)
if position.side == "SHORT":
return round(((entry - price) / entry) * 100, 4)
return 0.0
def _should_flip_position(self, state: AutoTradeState) -> bool:
position = type(self)._position
if position.side == "NONE":
return False
if position.side == "LONG" and state.last_signal == "SELL":
return True
if position.side == "SHORT" and state.last_signal == "BUY":
return True
return False
def _flip_block_reason(self, state: AutoTradeState) -> str | None:
position = type(self)._position
confidence = safe_float(state.last_signal_confidence) or 0.0
repeat_count = int(safe_float(state.last_signal_repeat_count) or 0)
unrealized_pnl = safe_float(state.unrealized_pnl_usd) or 0.0
hold_seconds = self._position_hold_seconds(position)
momentum_direction = getattr(state, "momentum_direction", None)
momentum_state = getattr(state, "momentum_state", None)
signal = (state.last_signal or "").upper()
if confidence < self._min_flip_confidence:
return (
"уверенность сигнала ниже порога "
f"({confidence:.2f} < {self._min_flip_confidence:.2f})"
)
if repeat_count < self._min_flip_repeat_count:
return (
"сигнал ещё не подтверждён нужным количеством повторов "
f"({repeat_count} < {self._min_flip_repeat_count})"
)
if hold_seconds is not None and hold_seconds < self._min_flip_hold_seconds:
return (
"позиция открыта слишком недавно "
f"({hold_seconds}с < {self._min_flip_hold_seconds}с)"
)
if signal == "BUY" and momentum_direction == "DOWN":
return "momentum направлен против BUY сигнала"
if signal == "SELL" and momentum_direction == "UP":
return "momentum направлен против SELL сигнала"
if momentum_state in {"BREAKOUT_UP", "BREAKOUT_DOWN"}:
if confidence < 0.85:
return (
"flip заблокирован во время breakout impulse "
f"({confidence:.2f} < 0.85)"
)
if unrealized_pnl < 0 and confidence < self._loss_flip_confidence:
return (
"позиция сейчас в минусе, а сигнал недостаточно сильный "
f"({confidence:.2f} < {self._loss_flip_confidence:.2f})"
)
return None
def _block_flip(
self,
state: AutoTradeState,
reason: str,
) -> ExecutionDecision:
position = type(self)._position
confidence = safe_float(state.last_signal_confidence) or 0.0
state.execution_block_reason = reason
state.last_flip_block_reason = reason
state.last_execution_action = "FLIP_BLOCKED"
state.last_execution_reason = reason
block_key = (
f"{position.side}:"
f"{state.last_signal}:"
f"{state.last_signal_repeat_count}:"
f"{confidence:.2f}:"
f"{reason}"
)
if block_key != type(self)._last_flip_block_key:
type(self)._last_flip_block_key = block_key
payload: JsonDict = {
"execution_type": "FLIP_BLOCKED",
"symbol": state.symbol,
"position_side": position.side,
"signal": state.last_signal,
"confidence": confidence,
"repeat_count": state.last_signal_repeat_count,
"reason": reason,
"unrealized_pnl_usd": state.unrealized_pnl_usd,
"opened_at": position.opened_at,
"updated_at": position.updated_at,
}
JournalService().log_ui_warning(
event_type="position_flip_blocked",
message=f"Смена направления позиции заблокирована: {reason}.",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_flip_blocked", payload)
return ExecutionDecision("NONE", False, reason)
def _position_hold_seconds(self, position: PositionState) -> int | None:
if not position.opened_at:
return None
try:
opened_at = datetime.strptime(position.opened_at, "%H:%M:%S")
now = datetime.strptime(self._now_time(), "%H:%M:%S")
seconds = int((now - opened_at).total_seconds())
if seconds < 0:
seconds += 24 * 60 * 60
return seconds
except Exception:
return None
def _target_side_from_signal(self, signal: str | None) -> str | None:
if signal == "BUY":
return "LONG"
if signal == "SELL":
return "SHORT"
return None
def _update_unrealized_pnl(self, state: AutoTradeState) -> None:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return
try:
current_execution = self._exit_price_for_side(position.symbol or state.symbol, position.side)
current_price = current_execution.price
except Exception:
self._sync_state_from_position(state)
return
position.unrealized_pnl_usd = self._calculate_pnl(current_price)
position.updated_at = self._now_time()
self._sync_state_from_position(state)
def _calculate_position_size(
self,
state: AutoTradeState,
*,
entry_price: float | None = None,
) -> float:
if state.risk_percent is None or state.risk_percent <= 0:
self._sync_adaptive_size_state(state, base_size=0.0, final_size=0.0, multiplier=0.0)
return 0.0
if state.stop_loss_percent is None or state.stop_loss_percent <= 0:
self._sync_adaptive_size_state(state, base_size=0.0, final_size=0.0, multiplier=0.0)
return 0.0
price = entry_price
if price is None:
try:
price = self._signal_entry_price(state).price
except Exception:
self._sync_adaptive_size_state(state, base_size=0.0, final_size=0.0, multiplier=0.0)
return 0.0
if price <= 0:
self._sync_adaptive_size_state(state, base_size=0.0, final_size=0.0, multiplier=0.0)
return 0.0
balance_usd = state.allocated_balance_usd
target_risk_usd = balance_usd * (state.risk_percent / 100)
stop_loss_distance_usd = price * (state.stop_loss_percent / 100)
if stop_loss_distance_usd <= 0:
self._sync_adaptive_size_state(state, base_size=0.0, final_size=0.0, multiplier=0.0)
return 0.0
base_size = target_risk_usd / stop_loss_distance_usd
multiplier = self._adaptive_size_multiplier(state)
final_size = base_size * multiplier
self._sync_adaptive_size_state(
state,
base_size=base_size,
final_size=final_size,
multiplier=multiplier,
)
return self._round_size(final_size)
def _adaptive_size_multiplier(self, state: AutoTradeState) -> float:
multiplier = 1.0
execution_confidence_score = getattr(state, "execution_confidence_score", None)
score_raw = safe_float(execution_confidence_score)
if score_raw is not None:
score = max(0.0, min(1.0, score_raw))
if score < 0.55:
multiplier *= 0.0
elif score < 0.65:
multiplier *= 0.65
elif score < 0.75:
multiplier *= 0.85
elif score >= 0.85:
multiplier *= 1.15
market_state = getattr(state, "market_state", None)
market_trend_strength = getattr(state, "market_trend_strength", None)
market_trend_quality = getattr(state, "market_trend_quality", None)
market_phase = getattr(state, "market_phase", None)
if market_state in {"HIGH_VOLATILITY", "LOW_VOLATILITY", "RANGE"}:
multiplier *= 0.65
if market_trend_strength == "STRONG":
multiplier *= 1.1
elif market_trend_strength == "WEAK":
multiplier *= 0.75
if market_trend_quality == "CLEAN":
multiplier *= 1.05
elif market_trend_quality == "NOISY":
multiplier *= 0.75
if market_phase == "IMPULSE":
multiplier *= 1.1
elif market_phase == "PULLBACK":
multiplier *= 0.8
elif market_phase in {"RANGE", "SQUEEZE"}:
multiplier *= 0.7
momentum_state = getattr(state, "momentum_state", None)
momentum_direction = getattr(state, "momentum_direction", None)
momentum_strength = getattr(state, "momentum_strength", None)
signal = (state.last_signal or "").upper()
if momentum_state in {"BREAKOUT_UP", "BREAKOUT_DOWN"}:
multiplier *= 1.15
elif momentum_state in {"MOMENTUM_UP", "MOMENTUM_DOWN"}:
multiplier *= 1.05
if momentum_strength is not None:
strength = safe_float(momentum_strength)
if strength is not None:
if strength >= 1.5:
multiplier *= 1.1
elif strength <= 0.7:
multiplier *= 0.8
if signal == "BUY":
if momentum_direction == "DOWN":
multiplier *= 0.75
if signal == "SELL":
if momentum_direction == "UP":
multiplier *= 0.75
execution_quality = getattr(state, "execution_quality", None)
execution_quality_reason = getattr(state, "execution_quality_reason", None)
if execution_quality == "BLOCKED":
multiplier *= 0.0
elif execution_quality == "WARNING":
if execution_quality_reason == "WIDE_SPREAD":
multiplier *= 0.75
elif execution_quality_reason == "AGING_SNAPSHOT":
multiplier *= 0.8
elif execution_quality_reason == "SNAPSHOT_UNAVAILABLE":
multiplier *= 0.7
else:
multiplier *= 0.8
return round(max(0.0, min(1.25, multiplier)), 4)
def _sync_adaptive_size_state(
self,
state: AutoTradeState,
*,
base_size: float,
final_size: float,
multiplier: float,
) -> None:
reason = self._adaptive_size_reason(multiplier)
state.adaptive_size_base = self._round_size(base_size)
state.adaptive_size_final = self._round_size(final_size)
state.adaptive_size_multiplier = multiplier
if multiplier != 1:
state.adaptive_size_changed_at = time.monotonic()
base_risk_percent = safe_float(state.risk_percent) or 0.0
state.effective_risk_percent = round(
base_risk_percent * multiplier,
4,
)
state.effective_target_risk_usd = round(
state.allocated_balance_usd
* (state.effective_risk_percent / 100),
4,
)
state.adaptive_size_reason = reason
state.adaptive_size_factors = {
"execution_confidence_score": getattr(state, "execution_confidence_score", None),
"execution_confidence_level": getattr(state, "execution_confidence_level", None),
"market_state": getattr(state, "market_state", None),
"market_trend_strength": getattr(state, "market_trend_strength", None),
"market_trend_quality": getattr(state, "market_trend_quality", None),
"market_phase": getattr(state, "market_phase", None),
"momentum_state": getattr(state, "momentum_state", None),
"momentum_direction": getattr(state, "momentum_direction", None),
"momentum_strength": getattr(state, "momentum_strength", None),
"execution_quality": getattr(state, "execution_quality", None),
"execution_quality_reason": getattr(state, "execution_quality_reason", None),
"spread_percent": getattr(state, "spread_percent", None),
"base_size": self._round_size(base_size),
"final_size": self._round_size(final_size),
"multiplier": multiplier,
}
if multiplier <= 0:
state.execution_size_adjustment_reason = "ADAPTIVE_SIZE_ZERO"
elif multiplier < 1:
state.execution_size_adjustment_reason = "ADAPTIVE_SIZE_REDUCED"
elif multiplier > 1:
state.execution_size_adjustment_reason = "ADAPTIVE_SIZE_INCREASED"
else:
state.execution_size_adjustment_reason = None
def _sync_effective_risk_after_margin_limit(
self,
state: AutoTradeState,
*,
base_size: float,
final_size: float,
) -> None:
adaptive_final = safe_float(state.adaptive_size_final) or 0.0
if adaptive_final <= 0:
state.effective_risk_percent = 0.0
state.effective_target_risk_usd = 0.0
return
margin_ratio = max(
0.0,
min(1.0, final_size / adaptive_final),
)
current_effective_risk = safe_float(state.effective_risk_percent) or 0.0
state.effective_risk_percent = round(
current_effective_risk * margin_ratio,
4,
)
state.effective_target_risk_usd = round(
state.allocated_balance_usd
* (state.effective_risk_percent / 100),
4,
)
def _adaptive_size_reason(self, multiplier: float) -> str:
if multiplier <= 0:
return "adaptive size заблокировал вход"
if multiplier < 0.75:
return "размер позиции сильно уменьшен по risk/runtime факторам"
if multiplier < 1:
return "размер позиции умеренно уменьшен по risk/runtime факторам"
if multiplier > 1:
return "размер позиции увеличен при сильном execution context"
return "размер позиции без adaptive корректировки"
def _adjust_size_by_margin_limit(
self,
*,
state: AutoTradeState,
entry_price: float,
size: float,
) -> float:
max_percent = state.max_reserved_balance_percent
if max_percent is None or max_percent <= 0:
return self._round_size(size)
leverage = state.leverage or 1.0
if leverage <= 0 or entry_price <= 0:
state.execution_block_reason = "Invalid leverage or entry price."
return 0.0
balance_usd = state.allocated_balance_usd
max_reserved_usd = balance_usd * (max_percent / 100)
max_notional_usd = max_reserved_usd * leverage
max_size = max_notional_usd / entry_price
if size <= max_size:
return self._round_size(size)
state.execution_size_adjustment_reason = "MARGIN_LIMIT"
limited_size = self._round_size(max_size)
adaptive_final = safe_float(state.adaptive_size_final) or 0.0
if adaptive_final > 0:
effective_multiplier = limited_size / adaptive_final
if effective_multiplier < 0.5:
state.adaptive_size_reason = (
"размер позиции сильно ограничен margin limit"
)
else:
state.adaptive_size_reason = (
"размер позиции ограничен margin limit"
)
return limited_size
def _signal_entry_price(self, state: AutoTradeState) -> _ExecutionPrice:
if state.last_signal == "BUY":
return self._entry_price_for_side(state.symbol, "LONG")
if state.last_signal == "SELL":
return self._entry_price_for_side(state.symbol, "SHORT")
return self._market_last_price(state.symbol)
def _entry_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice:
snapshot = ExchangeService().get_execution_snapshot(symbol)
if side == "LONG":
return _ExecutionPrice(
price=self._snapshot_price(snapshot.ask_price, "ask_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="LONG_ENTRY_ASK",
)
if side == "SHORT":
return _ExecutionPrice(
price=self._snapshot_price(snapshot.bid_price, "bid_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="SHORT_ENTRY_BID",
)
return _ExecutionPrice(
price=self._snapshot_price(snapshot.last_price, "last_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="ENTRY_LAST",
)
def _exit_price_for_side(self, symbol: str, side: str) -> _ExecutionPrice:
snapshot = ExchangeService().get_execution_snapshot(symbol)
if side == "LONG":
return _ExecutionPrice(
price=self._snapshot_price(snapshot.bid_price, "bid_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="LONG_EXIT_BID",
)
if side == "SHORT":
return _ExecutionPrice(
price=self._snapshot_price(snapshot.ask_price, "ask_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="SHORT_EXIT_ASK",
)
return _ExecutionPrice(
price=self._snapshot_price(snapshot.last_price, "last_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="EXIT_LAST",
)
def _market_last_price(self, symbol: str) -> _ExecutionPrice:
snapshot = ExchangeService().get_execution_snapshot(symbol)
return _ExecutionPrice(
price=self._snapshot_price(snapshot.last_price, "last_price"),
source=snapshot.source,
age_seconds=snapshot.age_seconds,
updated_at=snapshot.updated_at,
pricing_role="MARKET_LAST",
)
def _snapshot_price(
self,
raw_price: NumericLike | None,
name: str,
) -> float:
if raw_price is None:
raise ValueError(
f"Execution snapshot price '{name}' is missing."
)
price = safe_float(raw_price)
if price is None:
raise ValueError(
f"Execution snapshot price '{name}' is invalid."
)
if price <= 0:
raise ValueError(
f"Execution snapshot price '{name}' is invalid: {price}"
)
return price
def _round_size(self, size: NumericLike | None) -> float:
value = safe_float(size)
if value is None:
return 0.0
factor = 10 ** self._size_precision
return math.floor(value * factor) / factor
def _calculate_pnl(
self,
current_price: NumericLike | None,
) -> float:
position = type(self)._position
price = safe_float(current_price) or 0.0
entry = safe_float(position.entry_price) or 0.0
size = safe_float(position.size) or 0.0
if position.side == "LONG":
return round((price - entry) * size, 4)
if position.side == "SHORT":
return round((entry - price) * size, 4)
return 0.0
def _sync_state_from_position(self, state: AutoTradeState) -> None:
position = type(self)._position
state.position_side = position.side
state.entry_price = position.entry_price
state.position_size = position.size
state.unrealized_pnl_usd = position.unrealized_pnl_usd
def _now_time(self) -> str:
return datetime.now().strftime("%H:%M:%S")