Files
sanguo_quant_live/risk-management/realtime-system/src/realtime_risk_panel.py
T

148 lines
5.6 KiB
Python

"""
实时风控面板(主入口)
============
整合风险计算、监控、紧急处理,提供统一接口
"""
from typing import Dict, List, Optional, Callable, Tuple
from datetime import datetime
from risk_calculator import RealTimeRiskCalculator, RiskMetrics
from risk_monitor import RealTimeRiskMonitor, ThresholdConfig, RiskAlert, AlertLevel
from emergency_handler import EmergencyHandler, EmergencyConfig, EmergencyAction, EmergencyLevel
class RealtimeRiskPanel:
"""实时风控面板 - 主入口"""
def __init__(self,
threshold_config: Optional[ThresholdConfig] = None,
emergency_config: Optional[EmergencyConfig] = None,
risk_style: str = "conservative"):
"""
初始化
参数:
risk_style: "conservative" / "aggressive"
"""
# 风险计算器
self.calculator = RealTimeRiskCalculator()
# 阈值配置
if threshold_config is None:
if risk_style == "conservative":
threshold_config = ThresholdConfig.conservative()
else:
threshold_config = ThresholdConfig.aggressive()
# 风险监控
self.monitor = RealTimeRiskMonitor(threshold_config, self.calculator)
# 紧急处理
if emergency_config is None:
emergency_config = EmergencyConfig()
self.emergency = EmergencyHandler(emergency_config, self.monitor)
# 设置回调
self.monitor.on_alert_callback = self._on_alert
self.emergency.on_emergency_action = self._on_emergency
# 外部回调
self.on_alert_external: Optional[Callable[[RiskAlert], None]] = None
self.on_emergency_external: Optional[Callable[[EmergencyAction], None]] = None
# 统计
self.update_count = 0
def _on_alert(self, alert: RiskAlert) -> None:
"""内部预警回调"""
if self.on_alert_external:
self.on_alert_external(alert)
def _on_emergency(self, action: EmergencyAction) -> None:
"""内部紧急行动回调"""
if self.on_emergency_external:
self.on_emergency_external(action)
def update(self, timestamp: datetime,
total_capital: float,
cash: float,
liquidate_func: Optional[Callable[[], Tuple[bool, str]]] = None) -> Dict:
"""更新一次风控检查"""
# 计算所有指标
metrics = self.monitor.update_and_check(timestamp, total_capital, cash)
# 检查紧急情况并处理
new_actions = self.emergency.check_and_handle(liquidate_func)
self.update_count += 1
return {
'metrics': metrics.to_dict(),
'new_alerts': [a.to_dict() for a in self.monitor.unhandled_alerts[-5:]],
'new_actions': [a.to_dict() for a in new_actions],
'current_emergency_level': self.emergency.current_level.value,
'can_open_position': self.emergency.can_open_position(),
'system_running': self.emergency.is_system_running()
}
def update_position(self, symbol: str, volume: int, price: float) -> None:
"""更新持仓信息"""
market_value = volume * price
self.calculator.update_position(symbol, volume, price, market_value)
def remove_position(self, symbol: str) -> None:
"""移除持仓"""
self.calculator.remove_position(symbol)
def update_net_value(self, timestamp: datetime, net_value: float) -> None:
"""更新净值"""
self.calculator.update_net_value(timestamp, net_value)
def get_current_metrics(self) -> Optional[RiskMetrics]:
"""获取当前风险指标"""
return self.calculator.last_metrics
def get_all_alerts(self) -> List[Dict]:
"""获取所有预警"""
return [a.to_dict() for a in self.monitor.get_all_alerts()]
def get_unhandled_alerts(self) ->List[Dict]:
"""获取未处理预警"""
return [a.to_dict() for a in self.monitor.get_unhandled_alerts()]
def get_action_history(self) -> List[Dict]:
"""获取紧急行动历史"""
return [a.to_dict() for a in self.emergency.get_action_history()]
def get_current_level(self) -> str:
"""获取当前紧急级别"""
return self.emergency.get_current_level().value
def mark_alert_handled(self, alert_id: str) -> bool:
"""标记预警已处理"""
success = self.monitor.mark_handled(alert_id)
if success:
# 重新评估紧急级别
_ = self.emergency.assess_emergency_level()
return success
def clear_all(self) -> None:
"""清除所有预警(风险解除)"""
self.emergency.clear_all_alerts()
def get_panel_summary(self) -> Dict:
"""获取面板汇总信息"""
metrics = self.get_current_metrics()
return {
'update_count': self.update_count,
'current_emergency_level': self.get_current_level(),
'total_alerts': len(self.monitor.get_all_alerts()),
'unhandled_alerts': len(self.monitor.get_unhandled_alerts()),
'total_actions': len(self.emergency.get_action_history()),
'current_metrics': metrics.to_dict() if metrics else None,
'can_open_position': self.emergency.can_open_position(),
'system_running': self.emergency.is_system_running(),
'need_emergency_liquidate': self.emergency.need_emergency_liquidate()
}