Files
dzentra_bot/app/src/trading/execution/engine.py

508 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/src/trading/execution/engine.py
from __future__ import annotations
from datetime import datetime
from src.core.event_bus import EventBus
from src.integrations.exchange.service import ExchangeService
from src.trading.auto.state import AutoTradeState
from src.trading.execution.models import ExecutionDecision
from src.trading.journal.service import JournalService
from src.trading.position.state import PositionState
class ExecutionEngine:
_position = PositionState()
def get_position(self) -> PositionState:
return type(self)._position
def process(self, state: AutoTradeState) -> ExecutionDecision:
self._sync_state_from_position(state)
if state.status != "RUNNING":
return ExecutionDecision("NONE", False, "Execution доступен только в режиме RUNNING.")
self._update_unrealized_pnl(state)
risk_decision = self._risk_close_decision(state)
if risk_decision is not None:
return risk_decision
if state.decision_status != "READY" or not state.is_signal_ready:
return ExecutionDecision("NONE", False, "Сигнал ещё не готов к execution.")
if self._should_flip_position(state):
return self._flip_position(state)
if state.last_signal == "BUY":
return self._open_position_if_empty(state=state, side="LONG", action="OPEN_LONG")
if state.last_signal == "SELL":
return self._open_position_if_empty(state=state, side="SHORT", action="OPEN_SHORT")
return ExecutionDecision("NONE", False, "Нет торгового действия.")
def _open_position_if_empty(
self,
*,
state: AutoTradeState,
side: str,
action: str,
) -> ExecutionDecision:
position = type(self)._position
if position.side != "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Позиция уже открыта.")
try:
ticker = ExchangeService().get_price(state.symbol)
entry_price = ticker.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Не удалось получить цену для paper execution: {exc}")
now = self._now_time()
size = self._calculate_position_size(state, entry_price=entry_price)
if size <= 0:
return ExecutionDecision(
"NONE",
False,
"Позиция не открыта: невозможно рассчитать size без Stop Loss.",
)
size = self._adjust_size_by_margin_limit(
state=state,
entry_price=entry_price,
size=size,
)
type(self)._position = PositionState(
side=side,
symbol=state.symbol,
entry_price=entry_price,
size=size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
self._sync_state_from_position(state)
payload = {
"execution_type": "ENTRY",
"action": action,
"symbol": state.symbol,
"side": side,
"entry_price": entry_price,
"size": size,
"leverage": state.leverage,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"opened_at": now,
}
JournalService().log_ui_info(
event_type="paper_position_opened",
message=f"Paper ENTRY открыта: {side} {state.symbol}",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_opened", payload)
return ExecutionDecision(action, True, f"Paper ENTRY {side} открыта.")
def _flip_position(self, state: AutoTradeState) -> ExecutionDecision:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Нет позиции для flip.")
new_side = self._target_side_from_signal(state.last_signal)
if new_side is None:
return ExecutionDecision("NONE", False, "Нет направления для flip.")
try:
ticker = ExchangeService().get_price(state.symbol)
flip_price = ticker.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для flip: {exc}")
now = self._now_time()
pnl = self._calculate_pnl(flip_price)
new_size = self._calculate_position_size(state, entry_price=flip_price)
if new_size <= 0:
return ExecutionDecision(
"NONE",
False,
"Flip отменён: невозможно рассчитать size без Stop Loss.",
)
new_size = self._adjust_size_by_margin_limit(
state=state,
entry_price=flip_price,
size=new_size,
)
old_side = position.side
old_entry_price = position.entry_price
old_size = position.size
old_leverage = position.leverage
old_opened_at = position.opened_at
type(self)._position = PositionState(
side=new_side,
symbol=state.symbol,
entry_price=flip_price,
size=new_size,
leverage=state.leverage,
unrealized_pnl_usd=0.0,
opened_at=now,
updated_at=now,
)
self._sync_state_from_position(state)
payload = {
"execution_type": "FLIP",
"action": f"FLIP_{old_side}_TO_{new_side}",
"symbol": state.symbol,
"old_side": old_side,
"new_side": new_side,
"side": new_side,
"entry_price": old_entry_price,
"exit_price": flip_price,
"new_entry_price": flip_price,
"old_size": old_size,
"new_size": new_size,
"size": new_size,
"old_leverage": old_leverage,
"leverage": state.leverage,
"pnl": pnl,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"opened_at": old_opened_at,
"closed_at": now,
"new_opened_at": now,
}
JournalService().log_ui_info(
event_type="paper_position_flipped",
message=f"Paper FLIP выполнен: {old_side}{new_side} {state.symbol}",
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_flipped", payload)
return ExecutionDecision(
f"FLIP_{old_side}_TO_{new_side}",
True,
f"Paper FLIP выполнен: {old_side}{new_side}.",
)
def _close_position(
self,
state: AutoTradeState,
*,
forced_reason: str | None = None,
forced_exit_price: float | None = None,
forced_pnl: float | None = None,
) -> ExecutionDecision:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return ExecutionDecision("NONE", False, "Нет открытой позиции для закрытия.")
if forced_exit_price is not None:
exit_price = forced_exit_price
else:
try:
ticker = ExchangeService().get_price(state.symbol)
exit_price = ticker.price
except Exception as exc:
return ExecutionDecision("NONE", False, f"Ошибка получения цены для закрытия: {exc}")
pnl = forced_pnl if forced_pnl is not None else self._calculate_pnl(exit_price)
now = self._now_time()
payload = {
"execution_type": "EXIT",
"action": "CLOSE",
"symbol": state.symbol,
"side": position.side,
"entry_price": position.entry_price,
"exit_price": exit_price,
"size": position.size,
"leverage": position.leverage,
"pnl": pnl,
"signal": state.last_signal,
"confidence": state.last_signal_confidence,
"repeat_count": state.last_signal_repeat_count,
"reason": state.last_signal_reason,
"risk_reason": forced_reason,
"is_forced": forced_reason is not None,
"opened_at": position.opened_at,
"closed_at": now,
}
JournalService().log_ui_info(
event_type="paper_position_closed",
message=(
f"Paper EXIT закрыта по риску {forced_reason}: {position.side} {state.symbol}"
if forced_reason is not None
else f"Paper EXIT закрыта: {position.side} {state.symbol}"
),
screen="auto",
action="paper_execution",
payload=payload,
)
EventBus.emit("paper_position_closed", payload)
type(self)._position = PositionState()
self._sync_state_from_position(state)
if forced_reason is not None:
return ExecutionDecision(
f"FORCE_CLOSE_{forced_reason}",
True,
f"Paper EXIT выполнена по риску: {forced_reason}.",
)
return ExecutionDecision("CLOSE", True, "Paper EXIT выполнена.")
def _risk_close_decision(self, state: AutoTradeState) -> ExecutionDecision | None:
position = type(self)._position
if position.side == "NONE":
return None
try:
ticker = ExchangeService().get_price(position.symbol or state.symbol)
current_price = ticker.price
except Exception:
return None
price_move_percent = self._calculate_price_move_percent(current_price)
unrealized_pnl = self._calculate_pnl(current_price)
if self._is_max_loss_hit(state, unrealized_pnl):
return self._close_position(
state,
forced_reason="MAX_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
)
if self._is_stop_loss_hit(state, price_move_percent):
return self._close_position(
state,
forced_reason="STOP_LOSS",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
)
if self._is_take_profit_hit(state, price_move_percent):
return self._close_position(
state,
forced_reason="TAKE_PROFIT",
forced_exit_price=current_price,
forced_pnl=unrealized_pnl,
)
return None
def _is_stop_loss_hit(
self,
state: AutoTradeState,
price_move_percent: float,
) -> bool:
if state.stop_loss_percent is None:
return False
return price_move_percent <= -abs(state.stop_loss_percent)
def _is_take_profit_hit(
self,
state: AutoTradeState,
price_move_percent: float,
) -> bool:
if state.take_profit_percent is None:
return False
return price_move_percent >= abs(state.take_profit_percent)
def _is_max_loss_hit(
self,
state: AutoTradeState,
unrealized_pnl: float,
) -> bool:
if state.max_loss_usd is None:
return False
return unrealized_pnl <= -abs(state.max_loss_usd)
def _calculate_price_move_percent(self, current_price: float) -> float:
position = type(self)._position
entry = position.entry_price or 0.0
if entry <= 0:
return 0.0
if position.side == "LONG":
return round(((current_price - entry) / entry) * 100, 4)
if position.side == "SHORT":
return round(((entry - current_price) / entry) * 100, 4)
return 0.0
def _should_flip_position(self, state: AutoTradeState) -> bool:
position = type(self)._position
if position.side == "NONE":
return False
if position.side == "LONG" and state.last_signal == "SELL":
return True
if position.side == "SHORT" and state.last_signal == "BUY":
return True
return False
def _target_side_from_signal(self, signal: str | None) -> str | None:
if signal == "BUY":
return "LONG"
if signal == "SELL":
return "SHORT"
return None
def _update_unrealized_pnl(self, state: AutoTradeState) -> None:
position = type(self)._position
if position.side == "NONE":
self._sync_state_from_position(state)
return
try:
ticker = ExchangeService().get_price(position.symbol or state.symbol)
current_price = ticker.price
except Exception:
self._sync_state_from_position(state)
return
position.unrealized_pnl_usd = self._calculate_pnl(current_price)
position.updated_at = self._now_time()
self._sync_state_from_position(state)
def _calculate_position_size(
self,
state: AutoTradeState,
*,
entry_price: float | None = None,
) -> float:
if state.risk_percent is None or state.risk_percent <= 0:
return 0.0
if state.stop_loss_percent is None or state.stop_loss_percent <= 0:
return 0.0
price = entry_price
if price is None:
try:
ticker = ExchangeService().get_price(state.symbol)
price = ticker.price
except Exception:
return 0.0
if price <= 0:
return 0.0
balance_usd = 1000.0
target_risk_usd = balance_usd * (state.risk_percent / 100)
stop_loss_distance_usd = price * (state.stop_loss_percent / 100)
if stop_loss_distance_usd <= 0:
return 0.0
size = target_risk_usd / stop_loss_distance_usd
return round(size, 8)
def _adjust_size_by_margin_limit(
self,
*,
state: AutoTradeState,
entry_price: float,
size: float,
) -> float:
max_percent = state.max_reserved_balance_percent
state.execution_block_reason = None
state.execution_size_adjustment_reason = None
if max_percent is None or max_percent <= 0:
return round(size, 8)
leverage = state.leverage or 1.0
if leverage <= 0 or entry_price <= 0:
state.execution_block_reason = "Invalid leverage or entry price."
return 0.0
balance_usd = 1000.0
max_reserved_usd = balance_usd * (max_percent / 100)
max_notional_usd = max_reserved_usd * leverage
max_size = max_notional_usd / entry_price
if size <= max_size:
return round(size, 8)
state.execution_size_adjustment_reason = "MARGIN_LIMIT"
return round(max_size, 8)
def _calculate_pnl(self, current_price: float) -> float:
position = type(self)._position
entry = position.entry_price or 0.0
size = position.size or 0.0
if position.side == "LONG":
return round((current_price - entry) * size, 4)
if position.side == "SHORT":
return round((entry - current_price) * size, 4)
return 0.0
def _sync_state_from_position(self, state: AutoTradeState) -> None:
position = type(self)._position
state.position_side = position.side
state.entry_price = position.entry_price
state.position_size = position.size
state.unrealized_pnl_usd = position.unrealized_pnl_usd
def _now_time(self) -> str:
return datetime.now().strftime("%H:%M:%S")