""" 实时风控面板(主入口) ============ 整合风险计算、监控、紧急处理,提供统一接口 """ from typing import Dict, List, Optional, Callable, Tuple from datetime import datetime from risk_calculator import RealTimeRiskCalculator, RiskMetrics from risk_monitor import RealTimeRiskMonitor, ThresholdConfig, RiskAlert, AlertLevel from emergency_handler import EmergencyHandler, EmergencyConfig, EmergencyAction, EmergencyLevel class RealtimeRiskPanel: """实时风控面板 - 主入口""" def __init__(self, threshold_config: Optional[ThresholdConfig] = None, emergency_config: Optional[EmergencyConfig] = None, risk_style: str = "conservative"): """ 初始化 参数: risk_style: "conservative" / "aggressive" """ # 风险计算器 self.calculator = RealTimeRiskCalculator() # 阈值配置 if threshold_config is None: if risk_style == "conservative": threshold_config = ThresholdConfig.conservative() else: threshold_config = ThresholdConfig.aggressive() # 风险监控 self.monitor = RealTimeRiskMonitor(threshold_config, self.calculator) # 紧急处理 if emergency_config is None: emergency_config = EmergencyConfig() self.emergency = EmergencyHandler(emergency_config, self.monitor) # 设置回调 self.monitor.on_alert_callback = self._on_alert self.emergency.on_emergency_action = self._on_emergency # 外部回调 self.on_alert_external: Optional[Callable[[RiskAlert], None]] = None self.on_emergency_external: Optional[Callable[[EmergencyAction], None]] = None # 统计 self.update_count = 0 def _on_alert(self, alert: RiskAlert) -> None: """内部预警回调""" if self.on_alert_external: self.on_alert_external(alert) def _on_emergency(self, action: EmergencyAction) -> None: """内部紧急行动回调""" if self.on_emergency_external: self.on_emergency_external(action) def update(self, timestamp: datetime, total_capital: float, cash: float, liquidate_func: Optional[Callable[[], Tuple[bool, str]]] = None) -> Dict: """更新一次风控检查""" # 计算所有指标 metrics = self.monitor.update_and_check(timestamp, total_capital, cash) # 检查紧急情况并处理 new_actions = self.emergency.check_and_handle(liquidate_func) self.update_count += 1 return { 'metrics': metrics.to_dict(), 'new_alerts': [a.to_dict() for a in self.monitor.unhandled_alerts[-5:]], 'new_actions': [a.to_dict() for a in new_actions], 'current_emergency_level': self.emergency.current_level.value, 'can_open_position': self.emergency.can_open_position(), 'system_running': self.emergency.is_system_running() } def update_position(self, symbol: str, volume: int, price: float) -> None: """更新持仓信息""" market_value = volume * price self.calculator.update_position(symbol, volume, price, market_value) def remove_position(self, symbol: str) -> None: """移除持仓""" self.calculator.remove_position(symbol) def update_net_value(self, timestamp: datetime, net_value: float) -> None: """更新净值""" self.calculator.update_net_value(timestamp, net_value) def get_current_metrics(self) -> Optional[RiskMetrics]: """获取当前风险指标""" return self.calculator.last_metrics def get_all_alerts(self) -> List[Dict]: """获取所有预警""" return [a.to_dict() for a in self.monitor.get_all_alerts()] def get_unhandled_alerts(self) ->List[Dict]: """获取未处理预警""" return [a.to_dict() for a in self.monitor.get_unhandled_alerts()] def get_action_history(self) -> List[Dict]: """获取紧急行动历史""" return [a.to_dict() for a in self.emergency.get_action_history()] def get_current_level(self) -> str: """获取当前紧急级别""" return self.emergency.get_current_level().value def mark_alert_handled(self, alert_id: str) -> bool: """标记预警已处理""" success = self.monitor.mark_handled(alert_id) if success: # 重新评估紧急级别 _ = self.emergency.assess_emergency_level() return success def clear_all(self) -> None: """清除所有预警(风险解除)""" self.emergency.clear_all_alerts() def get_panel_summary(self) -> Dict: """获取面板汇总信息""" metrics = self.get_current_metrics() return { 'update_count': self.update_count, 'current_emergency_level': self.get_current_level(), 'total_alerts': len(self.monitor.get_all_alerts()), 'unhandled_alerts': len(self.monitor.get_unhandled_alerts()), 'total_actions': len(self.emergency.get_action_history()), 'current_metrics': metrics.to_dict() if metrics else None, 'can_open_position': self.emergency.can_open_position(), 'system_running': self.emergency.is_system_running(), 'need_emergency_liquidate': self.emergency.need_emergency_liquidate() }