Files
sanguo_quant_live/risk-management/realtime-system/src/emergency_handler.py
T

251 lines
8.9 KiB
Python

"""
交易中断和紧急处理算法
============
提供多级紧急风险处置:
- 级别1:预警提示(不干预,只通知)
- 级别2:限制开仓(只允许平仓,不允许开仓)
- 级别3:逐步减仓(按比例逐步降低仓位)
- 级别4:紧急清仓(全部平仓,立即停止交易)
- 级别5:系统停机(整个交易系统停止运行)
"""
from typing import Dict, List, Optional, Callable, Tuple
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from risk_monitor import RiskAlert, AlertLevel, RealTimeRiskMonitor
from risk_calculator import RiskMetrics
class EmergencyLevel(Enum):
"""紧急级别"""
NORMAL = "正常"
ALERT = "预警"
RESTRICT = "限制开仓"
REDUCE = "逐步减仓"
EMERGENCY = "紧急清仓"
SHUTDOWN = "系统停机"
@dataclass
class EmergencyAction:
"""紧急行动"""
action_id: str
level: EmergencyLevel
trigger_alert: RiskAlert
timestamp: datetime
description: str
executed: bool = False
result: str = ""
def to_dict(self) -> Dict:
return {
'action_id': self.action_id,
'level': self.level.value,
'trigger_alert': self.trigger_alert.to_dict() if self.trigger_alert else None,
'timestamp': self.timestamp.isoformat(),
'description': self.description,
'executed': self.executed,
'result': self.result
}
class EmergencyConfig:
"""紧急处理配置"""
def __init__(self,
critical_alerts_trigger_restrict: int = 1,
reduce_position_pct: float = 0.3,
max_reduce_steps: int = 3,
critical_trigger_emergency: bool = True):
"""初始化"""
self.critical_alerts_trigger_restrict = critical_alerts_trigger_restrict
self.reduce_position_pct = reduce_position_pct
self.max_reduce_steps = max_reduce_steps
self.critical_trigger_emergency = critical_trigger_emergency
class EmergencyHandler:
"""紧急风险处理器"""
def __init__(self,
config: Optional[EmergencyConfig] = None,
monitor: Optional[RealTimeRiskMonitor] = None):
"""初始化"""
self.config = config or EmergencyConfig()
self.monitor = monitor
# 行动历史
self.action_history: List[EmergencyAction] = []
# 当前紧急级别
self.current_level: EmergencyLevel = EmergencyLevel.NORMAL
# 减仓步数
self.reduce_steps: int = 0
# 回调
self.on_emergency_action: Optional[Callable[[EmergencyAction], None]] = None
# 行动计数
self._action_counter = 0
def _generate_action_id(self) -> str:
"""生成行动ID"""
self._action_counter += 1
ts = datetime.now().strftime("%Y%m%d%H%M%S")
return f"ACTION-{ts}-{self._action_counter:04d}"
def assess_emergency_level(self) -> EmergencyLevel:
"""评估当前紧急级别"""
if not self.monitor:
return EmergencyLevel.NORMAL
unhandled = self.monitor.get_unhandled_alerts()
critical_count = sum(1 for a in unhandled if a.level == AlertLevel.CRITICAL)
if self.monitor.has_emergency_alerts():
return EmergencyLevel.SHUTDOWN
if critical_count >= 2:
if self.config.critical_trigger_emergency:
return EmergencyLevel.EMERGENCY
else:
return EmergencyLevel.REDUCE
if critical_count >= self.config.critical_alerts_trigger_restrict:
return EmergencyLevel.RESTRICT
if self.monitor.get_unhandled_alerts():
return EmergencyLevel.ALERT
return EmergencyLevel.NORMAL
def create_action(self, level: EmergencyLevel, trigger: RiskAlert) -> EmergencyAction:
"""创建紧急行动"""
descriptions = {
EmergencyLevel.NORMAL: "系统正常运行",
EmergencyLevel.ALERT: "风险预警,持续监控",
EmergencyLevel.RESTRICT: "限制新开仓,只允许平仓",
EmergencyLevel.REDUCE: f"逐步减仓,每次减{self.config.reduce_position_pct:.1%}",
EmergencyLevel.EMERGENCY: "紧急清仓,全部平仓",
EmergencyLevel.SHUTDOWN: "系统紧急停机"
}
action = EmergencyAction(
action_id=self._generate_action_id(),
level=level,
trigger_alert=trigger,
timestamp=datetime.now(),
description=descriptions.get(level, "未知级别")
)
return action
def execute_action(self, action: EmergencyAction,
liquidate_func: Optional[Callable[[], Tuple[bool, str]]] = None) -> bool:
"""执行紧急行动"""
self.current_level = action.level
if action.level == EmergencyLevel.NORMAL:
action.executed = True
action.result = "正常状态,无需操作"
return True
if action.level == EmergencyLevel.ALERT:
action.executed = True
action.result = "已发出预警,持续监控"
return True
if action.level == EmergencyLevel.RESTRICT:
action.executed = True
action.result = "已限制新开仓,只允许平仓"
self.reduce_steps = 0
return True
if action.level == EmergencyLevel.REDUCE:
if self.reduce_steps >= self.config.max_reduce_steps:
# 减仓次数已到,升级紧急清仓
action.level = EmergencyLevel.EMERGENCY
self.reduce_steps += 1
if liquidate_func and action.level == EmergencyLevel.REDUCE:
success, msg = liquidate_func()
action.executed = success
action.result = f"{self.reduce_steps}次减仓完成: {msg}"
return success
return True
if action.level == EmergencyLevel.EMERGENCY:
if liquidate_func:
success, msg = liquidate_func()
action.executed = success
action.result = f"紧急清仓完成: {msg}"
return success
return False
if action.level == EmergencyLevel.SHUTDOWN:
action.executed = True
action.result = "系统已紧急停机,等待人工处理"
return True
return False
def check_and_handle(self,
liquidate_func: Optional[Callable[[], Tuple[bool, str]]] = None) -> List[EmergencyAction]:
"""检查并处理紧急情况"""
if not self.monitor:
return []
new_actions = []
unhandled = self.monitor.get_unhandled_alerts()
for alert in unhandled:
if alert.level == AlertLevel.CRITICAL:
level = self.assess_emergency_level()
if level != EmergencyLevel.NORMAL and level != self.current_level:
action = self.create_action(level, alert)
self.execute_action(action, liquidate_func)
new_actions.append(action)
self.action_history.append(action)
if self.on_emergency_action:
self.on_emergency_action(action)
return new_actions
def get_current_level(self) -> EmergencyLevel:
"""获取当前紧急级别"""
return self.current_level
def get_action_history(self) -> List[EmergencyAction]:
"""获取行动历史"""
return self.action_history
def get_unexecuted_actions(self) -> List[EmergencyAction]:
"""获取未执行行动"""
return [a for a in self.action_history if not a.executed]
def clear_all_alerts(self) -> None:
"""清除所有预警(风险解除后调用)"""
if self.monitor:
for alert in self.monitor.get_unhandled_alerts():
self.monitor.mark_handled(alert.alert_id)
self.current_level = EmergencyLevel.NORMAL
self.reduce_steps = 0
def can_open_position(self) -> bool:
"""检查是否允许开新仓"""
allowed_levels = [EmergencyLevel.NORMAL, EmergencyLevel.ALERT]
return self.current_level in allowed_levels
def can_close_position(self) -> bool:
"""检查是否允许平仓(任何时候都允许平仓)"""
return True
def need_emergency_liquidate(self) -> bool:
"""检查是否需要紧急清仓"""
return self.current_level in [EmergencyLevel.EMERGENCY, EmergencyLevel.SHUTDOWN]
def is_system_running(self) -> bool:
"""检查系统是否正常运行"""
return self.current_level != EmergencyLevel.SHUTDOWN