Files
dzentra_bot/app/src/trading/debug/execution.py

443 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/debug/execution.py
from __future__ import annotations
import math
from datetime import datetime
from src.integrations.exchange.service import ExchangeService
from src.trading.debug.state import DebugPositionState, DebugTradeState
from src.trading.execution.models import ExecutionDecision
class DebugExecutionEngine:
_size_precision = 5
def process(self, state: DebugTradeState) -> ExecutionDecision:
if state.status != "RUNNING":
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Execution доступен только в режиме RUNNING.",
)
self.update_unrealized_pnl(state)
risk_decision = self.risk_close_decision(state)
if risk_decision is not None:
return risk_decision
if state.decision_status != "READY" or not state.is_signal_ready:
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Сигнал ещё не готов к execution.",
)
if self._should_flip_position(state):
return self.flip_position(state)
if state.last_signal == "BUY":
return self.open_position_if_empty(state=state, side="LONG")
if state.last_signal == "SELL":
return self.open_position_if_empty(state=state, side="SHORT")
return ExecutionDecision("NONE", False, "[DEBUG] Нет торгового действия.")
def open_position_if_empty(
self,
*,
state: DebugTradeState,
side: str,
) -> ExecutionDecision:
if state.position.side != "NONE":
return ExecutionDecision("NONE", False, "[DEBUG] Позиция уже открыта.")
try:
entry_price = self._entry_price_for_side(state.symbol, side)
except Exception as exc:
return ExecutionDecision(
"NONE",
False,
f"[DEBUG] Не удалось получить цену входа: {exc}",
)
size = self.calculate_position_size(state, entry_price=entry_price)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Позиция не открыта: невозможно рассчитать size.",
)
size = self.adjust_size_by_margin_limit(
state=state,
entry_price=entry_price,
size=size,
)
size = self._round_size(size)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Позиция не открыта: итоговый size равен 0.",
)
now = self._now_time()
state.position = DebugPositionState(
side=side,
symbol=state.symbol,
entry_price=entry_price,
size=size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
return ExecutionDecision(
f"DEBUG_OPEN_{side}",
True,
f"[DEBUG] Paper позиция открыта: {side}.",
)
def flip_position(self, state: DebugTradeState) -> ExecutionDecision:
position = state.position
if position.side == "NONE":
return ExecutionDecision("NONE", False, "[DEBUG] Нет позиции для flip.")
new_side = self._target_side_from_signal(state.last_signal)
if new_side is None:
return ExecutionDecision("NONE", False, "[DEBUG] Нет направления для flip.")
try:
exit_price = self._exit_price_for_side(
position.symbol or state.symbol,
position.side,
)
new_entry_price = self._entry_price_for_side(state.symbol, new_side)
except Exception as exc:
return ExecutionDecision("NONE", False, f"[DEBUG] Ошибка цены для flip: {exc}")
pnl = self.calculate_pnl(state, exit_price)
new_size = self.calculate_position_size(state, entry_price=new_entry_price)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Flip отменён: невозможно рассчитать new size.",
)
new_size = self.adjust_size_by_margin_limit(
state=state,
entry_price=new_entry_price,
size=new_size,
)
new_size = self._round_size(new_size)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Flip отменён: итоговый new size равен 0.",
)
state.realized_pnl_usd += pnl
now = self._now_time()
old_side = position.side
state.position = DebugPositionState(
side=new_side,
symbol=state.symbol,
entry_price=new_entry_price,
size=new_size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
return ExecutionDecision(
f"DEBUG_FLIP_{old_side}_TO_{new_side}",
True,
f"[DEBUG] Flip выполнен: {old_side}{new_side}.",
)
def close_position(
self,
state: DebugTradeState,
*,
forced_reason: str | None = None,
) -> ExecutionDecision:
position = state.position
if position.side == "NONE":
return ExecutionDecision(
"NONE",
False,
"[DEBUG] Нет открытой позиции для закрытия.",
)
try:
exit_price = self._exit_price_for_side(
position.symbol or state.symbol,
position.side,
)
except Exception as exc:
return ExecutionDecision("NONE", False, f"[DEBUG] Ошибка цены закрытия: {exc}")
pnl = self.calculate_pnl(state, exit_price)
state.realized_pnl_usd += pnl
old_side = position.side
state.position = DebugPositionState()
action = f"DEBUG_CLOSE_{forced_reason}" if forced_reason else "DEBUG_CLOSE"
return ExecutionDecision(
action,
True,
f"[DEBUG] Позиция закрыта: {old_side}. PnL: {pnl:.4f}",
)
def risk_close_decision(self, state: DebugTradeState) -> ExecutionDecision | None:
position = state.position
if position.side == "NONE":
return None
try:
current_price = self._exit_price_for_side(
position.symbol or state.symbol,
position.side,
)
except Exception:
return None
price_move_percent = self.calculate_price_move_percent(state, current_price)
unrealized_pnl = self.calculate_pnl(state, current_price)
if self._is_max_loss_hit(state, unrealized_pnl):
return self.close_position(state, forced_reason="MAX_LOSS")
if self._is_stop_loss_hit(state, price_move_percent):
return self.close_position(state, forced_reason="STOP_LOSS")
if self._is_take_profit_hit(state, price_move_percent):
return self.close_position(state, forced_reason="TAKE_PROFIT")
return None
def update_unrealized_pnl(self, state: DebugTradeState) -> None:
position = state.position
if position.side == "NONE":
position.unrealized_pnl_usd = None
return
try:
current_price = self._exit_price_for_side(
position.symbol or state.symbol,
position.side,
)
except Exception:
return
position.unrealized_pnl_usd = self.calculate_pnl(state, current_price)
position.updated_at = self._now_time()
def calculate_position_size(
self,
state: DebugTradeState,
*,
entry_price: float | None = None,
) -> float:
if state.risk_percent is None or state.risk_percent <= 0:
return 0.0
if state.stop_loss_percent is None or state.stop_loss_percent <= 0:
return 0.0
price = entry_price
if price is None:
price = self._signal_entry_price(state)
if price <= 0:
return 0.0
target_risk_usd = state.allocated_balance_usd * (state.risk_percent / 100)
stop_loss_distance_usd = price * (state.stop_loss_percent / 100)
if stop_loss_distance_usd <= 0:
return 0.0
return self._round_size(target_risk_usd / stop_loss_distance_usd)
def adjust_size_by_margin_limit(
self,
*,
state: DebugTradeState,
entry_price: float,
size: float,
) -> float:
state.execution_block_reason = None
state.execution_size_adjustment_reason = None
max_percent = state.max_reserved_balance_percent
if max_percent is None or max_percent <= 0:
return self._round_size(size)
leverage = state.leverage or 1.0
if leverage <= 0 or entry_price <= 0:
state.execution_block_reason = "[DEBUG] Invalid leverage or entry price."
return 0.0
max_reserved_usd = state.allocated_balance_usd * (max_percent / 100)
max_notional_usd = max_reserved_usd * leverage
max_size = max_notional_usd / entry_price
if size <= max_size:
return self._round_size(size)
state.execution_size_adjustment_reason = "MARGIN_LIMIT"
return self._round_size(max_size)
def calculate_price_move_percent(
self,
state: DebugTradeState,
current_price: float,
) -> float:
position = state.position
entry = position.entry_price or 0.0
if entry <= 0:
return 0.0
if position.side == "LONG":
return round(((current_price - entry) / entry) * 100, 4)
if position.side == "SHORT":
return round(((entry - current_price) / entry) * 100, 4)
return 0.0
def calculate_pnl(self, state: DebugTradeState, current_price: float) -> float:
position = state.position
entry = position.entry_price or 0.0
size = position.size or 0.0
if position.side == "LONG":
return round((current_price - entry) * size, 4)
if position.side == "SHORT":
return round((entry - current_price) * size, 4)
return 0.0
def _should_flip_position(self, state: DebugTradeState) -> bool:
if state.position.side == "LONG" and state.last_signal == "SELL":
return True
if state.position.side == "SHORT" and state.last_signal == "BUY":
return True
return False
def _target_side_from_signal(self, signal: str | None) -> str | None:
if signal == "BUY":
return "LONG"
if signal == "SELL":
return "SHORT"
return None
def _is_stop_loss_hit(self, state: DebugTradeState, price_move_percent: float) -> bool:
if state.stop_loss_percent is None:
return False
return price_move_percent <= -abs(state.stop_loss_percent)
def _is_take_profit_hit(self, state: DebugTradeState, price_move_percent: float) -> bool:
if state.take_profit_percent is None:
return False
return price_move_percent >= abs(state.take_profit_percent)
def _is_max_loss_hit(self, state: DebugTradeState, unrealized_pnl: float) -> bool:
if state.max_loss_usd is None:
return False
return unrealized_pnl <= -abs(state.max_loss_usd)
def _signal_entry_price(self, state: DebugTradeState) -> float:
if state.last_signal == "BUY":
return self._entry_price_for_side(state.symbol, "LONG")
if state.last_signal == "SELL":
return self._entry_price_for_side(state.symbol, "SHORT")
return self._market_last_price(state.symbol)
def _entry_price_for_side(self, symbol: str, side: str) -> float:
snapshot = ExchangeService().get_fresh_market_snapshot(symbol)
if side == "LONG":
return self._snapshot_price(snapshot, "ask_price", "last_price")
if side == "SHORT":
return self._snapshot_price(snapshot, "bid_price", "last_price")
return self._snapshot_price(snapshot, "last_price")
def _exit_price_for_side(self, symbol: str, side: str) -> float:
snapshot = ExchangeService().get_fresh_market_snapshot(symbol)
if side == "LONG":
return self._snapshot_price(snapshot, "bid_price", "last_price")
if side == "SHORT":
return self._snapshot_price(snapshot, "ask_price", "last_price")
return self._snapshot_price(snapshot, "last_price")
def _market_last_price(self, symbol: str) -> float:
snapshot = ExchangeService().get_fresh_market_snapshot(symbol)
return self._snapshot_price(snapshot, "last_price")
def _snapshot_price(
self,
snapshot: dict[str, object],
primary_key: str,
fallback_key: str | None = None,
) -> float:
raw_price = snapshot.get(primary_key)
if raw_price is None and fallback_key is not None:
raw_price = snapshot.get(fallback_key)
if raw_price is None:
raise ValueError(f"Market snapshot price '{primary_key}' is missing.")
price = float(raw_price)
if price <= 0:
raise ValueError(f"Market snapshot price '{primary_key}' is invalid: {price}")
return price
def _round_size(self, size: float) -> float:
factor = 10 ** self._size_precision
return math.floor(float(size) * factor) / factor
def _now_time(self) -> str:
return datetime.now().strftime("%H:%M:%S")