""" 实时风险阈值监控系统 ============ 实时监控风险指标,触发阈值自动预警: - 单日回撤预警 - 累计回撤预警 - 高仓位预警 - 集中度预警 - VaR超限预警 """ from typing import Dict, List, Optional, Callable from datetime import datetime from dataclasses import dataclass from enum import Enum from risk_calculator import RiskMetrics, RealTimeRiskCalculator class AlertLevel(Enum): """预警级别""" INFO = "信息" WARNING = "警告" CRITICAL = "严重" EMERGENCY = "紧急" @dataclass class RiskAlert: """风险预警""" alert_id: str alert_type: str level: AlertLevel metric_name: str current_value: float threshold: float timestamp: datetime message: str handled: bool = False def to_dict(self) -> Dict: return { 'alert_id': self.alert_id, 'alert_type': self.alert_type, 'level': self.level.value, 'metric_name': self.metric_name, 'current_value': self.current_value, 'threshold': self.threshold, 'timestamp': self.timestamp.isoformat(), 'message': self.message, 'handled': self.handled } class ThresholdConfig: """阈值配置""" def __init__(self, daily_drawdown_warning: float = 0.03, daily_drawdown_critical: float = 0.05, total_drawdown_warning: float = 0.10, total_drawdown_critical: float = 0.15, max_drawdown_critical: float = 0.20, position_pct_warning: float = 0.80, position_pct_critical: float = 0.95, concentration_warning: float = 0.30, concentration_critical: float = 0.50, var_95_warning: float = 0.03, var_95_critical: float = 0.05, volatility_warning: float = 0.30, volatility_critical: float = 0.50): """初始化阈值配置""" self.daily_drawdown_warning = daily_drawdown_warning self.daily_drawdown_critical = daily_drawdown_critical self.total_drawdown_warning = total_drawdown_warning self.total_drawdown_critical = total_drawdown_critical self.max_drawdown_critical = max_drawdown_critical self.position_pct_warning = position_pct_warning self.position_pct_critical = position_pct_critical self.concentration_warning = concentration_warning self.concentration_critical = concentration_critical self.var_95_warning = var_95_warning self.var_95_critical = var_95_critical self.volatility_warning = volatility_warning self.volatility_critical = volatility_critical @classmethod def conservative(cls) -> 'ThresholdConfig': """保守配置(严格控制)""" return cls( daily_drawdown_warning=0.02, daily_drawdown_critical=0.03, total_drawdown_warning=0.08, total_drawdown_critical=0.12, max_drawdown_critical=0.15, position_pct_warning=0.70, position_pct_critical=0.85, concentration_warning=0.25, concentration_critical=0.40, var_95_warning=0.02, var_95_critical=0.03, volatility_warning=0.25, volatility_critical=0.40 ) @classmethod def aggressive(cls) -> 'ThresholdConfig': """进取配置(风险承受较高)""" return cls( daily_drawdown_warning=0.05, daily_drawdown_critical=0.08, total_drawdown_warning=0.15, total_drawdown_critical=0.20, max_drawdown_critical=0.25, position_pct_warning=0.85, position_pct_critical=0.95, concentration_warning=0.40, concentration_critical=0.60, var_95_warning=0.04, var_95_critical=0.06, volatility_warning=0.40, volatility_critical=0.60 ) class RealTimeRiskMonitor: """实时风险监控器""" def __init__(self, config: Optional[ThresholdConfig] = None, risk_calculator: Optional[RealTimeRiskCalculator] = None): """初始化""" self.config = config or ThresholdConfig() self.risk_calculator = risk_calculator or RealTimeRiskCalculator() # 预警列表 self.alerts: List[RiskAlert] = [] # 未处理预警 self.unhandled_alerts: List[RiskAlert] = [] # 预警回调 self.on_alert_callback: Optional[Callable[[RiskAlert], None]] = None # 告警计数 self._alert_counter = 0 def _generate_alert_id(self) -> str: """生成预警ID""" self._alert_counter += 1 ts = datetime.now().strftime("%Y%m%d%H%M%S") return f"ALERT-{ts}-{self._alert_counter:04d}" def check_daily_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查单日回撤""" dd = metrics.daily_drawdown if dd >= self.config.daily_drawdown_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="DAILY_DRAWDOWN", level=AlertLevel.CRITICAL, metric_name="单日回撤", current_value=dd, threshold=self.config.daily_drawdown_critical, timestamp=datetime.now(), message=f"单日回撤 {dd:.2%} 超过临界阈值 {self.config.daily_drawdown_critical:.2%}" ) elif dd >= self.config.daily_drawdown_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="DAILY_DRAWDOWN", level=AlertLevel.WARNING, metric_name="单日回撤", current_value=dd, threshold=self.config.daily_drawdown_warning, timestamp=datetime.now(), message=f"单日回撤 {dd:.2%} 超过警告阈值 {self.config.daily_drawdown_warning:.2%}" ) return None def check_total_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查累计回撤""" dd = metrics.total_drawdown if dd >= self.config.total_drawdown_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="TOTAL_DRAWDOWN", level=AlertLevel.CRITICAL, metric_name="累计回撤", current_value=dd, threshold=self.config.total_drawdown_critical, timestamp=datetime.now(), message=f"累计回撤 {dd:.2%} 超过临界阈值 {self.config.total_drawdown_critical:.2%}" ) elif dd >= self.config.total_drawdown_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="TOTAL_DRAWDOWN", level=AlertLevel.WARNING, metric_name="累计回撤", current_value=dd, threshold=self.config.total_drawdown_warning, timestamp=datetime.now(), message=f"累计回撤 {dd:.2%} 超过警告阈值 {self.config.total_drawdown_warning:.2%}" ) return None def check_max_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查最大回撤""" dd = metrics.max_drawdown if dd >= self.config.max_drawdown_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="MAX_DRAWDOWN", level=AlertLevel.CRITICAL, metric_name="最大回撤", current_value=dd, threshold=self.config.max_drawdown_critical, timestamp=datetime.now(), message=f"最大回撤 {dd:.2%} 超过临界阈值 {self.config.max_drawdown_critical:.2%},建议紧急止损" ) return None def check_position_pct(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查仓位比例""" pct = metrics.position_pct if pct >= self.config.position_pct_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="HIGH_POSITION", level=AlertLevel.CRITICAL, metric_name="仓位比例", current_value=pct, threshold=self.config.position_pct_critical, timestamp=datetime.now(), message=f"仓位比例 {pct:.2%} 超过临界阈值 {self.config.position_pct_critical:.2%}" ) elif pct >= self.config.position_pct_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="HIGH_POSITION", level=AlertLevel.WARNING, metric_name="仓位比例", current_value=pct, threshold=self.config.position_pct_warning, timestamp=datetime.now(), message=f"仓位比例 {pct:.2%} 超过警告阈值 {self.config.position_pct_warning:.2%}" ) return None def check_concentration(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查集中度""" pct = metrics.concentration_pct if pct >= self.config.concentration_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="CONCENTRATION", level=AlertLevel.CRITICAL, metric_name="持仓集中度", current_value=pct, threshold=self.config.concentration_critical, timestamp=datetime.now(), message=f"最大持仓集中度 {pct:.2%} 超过临界阈值 {self.config.concentration_critical:.2%}" ) elif pct >= self.config.concentration_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="CONCENTRATION", level=AlertLevel.WARNING, metric_name="持仓集中度", current_value=pct, threshold=self.config.concentration_warning, timestamp=datetime.now(), message=f"最大持仓集中度 {pct:.2%} 超过警告阈值 {self.config.concentration_warning:.2%}" ) return None def check_var(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查VaR""" var = metrics.var_95 if var >= self.config.var_95_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="VAR_OVERFLOW", level=AlertLevel.CRITICAL, metric_name="VaR(95%)", current_value=var, threshold=self.config.var_95_critical, timestamp=datetime.now(), message=f"95%置信度VaR {var:.2%} 超过临界阈值 {self.config.var_95_critical:.2%}" ) elif var >= self.config.var_95_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="VAR_OVERFLOW", level=AlertLevel.WARNING, metric_name="VaR(95%)", current_value=var, threshold=self.config.var_95_warning, timestamp=datetime.now(), message=f"95%置信度VaR {var:.2%} 超过警告阈值 {self.config.var_95_warning:.2%}" ) return None def check_volatility(self, metrics: RiskMetrics) -> Optional[RiskAlert]: """检查波动率""" vol = metrics.volatility if vol >= self.config.volatility_critical: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="HIGH_VOLATILITY", level=AlertLevel.CRITICAL, metric_name="年化波动率", current_value=vol, threshold=self.config.volatility_critical, timestamp=datetime.now(), message=f"年化波动率 {vol:.2%} 超过临界阈值 {self.config.volatility_critical:.2%}" ) elif vol >= self.config.volatility_warning: return RiskAlert( alert_id=self._generate_alert_id(), alert_type="HIGH_VOLATILITY", level=AlertLevel.WARNING, metric_name="年化波动率", current_value=vol, threshold=self.config.volatility_warning, timestamp=datetime.now(), message=f"年化波动率 {vol:.2%} 超过警告阈值 {self.config.volatility_warning:.2%}" ) return None def check_all(self, metrics: RiskMetrics) -> List[RiskAlert]: """检查所有阈值""" new_alerts = [] checks = [ self.check_daily_drawdown, self.check_total_drawdown, self.check_max_drawdown, self.check_position_pct, self.check_concentration, self.check_var, self.check_volatility ] for check_func in checks: alert = check_func(metrics) if alert: new_alerts.append(alert) # 添加到列表 for alert in new_alerts: self.alerts.append(alert) self.unhandled_alerts.append(alert) if self.on_alert_callback: self.on_alert_callback(alert) return new_alerts def update_and_check(self, timestamp: datetime, total_capital: float, cash: float) -> RiskMetrics: """更新并检查所有风险指标""" metrics = self.risk_calculator.calculate_all_metrics( timestamp, total_capital, cash ) new_alerts = self.check_all(metrics) if new_alerts: print(f"⚠️ 产生 {len(new_alerts)} 个新风险预警") for alert in new_alerts: print(f" [{alert.level.value}] {alert.message}") return metrics def mark_handled(self, alert_id: str) -> bool: """标记预警已处理""" for alert in self.unhandled_alerts: if alert.alert_id == alert_id: alert.handled = True self.unhandled_alerts.remove(alert) return True return False def get_unhandled_alerts(self) -> List[RiskAlert]: """获取未处理预警""" return self.unhandled_alerts def get_all_alerts(self) -> List[RiskAlert]: """获取所有预警""" return self.alerts def get_alerts_by_level(self, level: AlertLevel) -> List[RiskAlert]: """按级别获取预警""" return [a for a in self.alerts if a.level == level] def has_critical_alerts(self) -> bool: """是否有严重级别的未处理预警""" return any(a.level == AlertLevel.CRITICAL for a in self.unhandled_alerts) def has_emergency_alerts(self) -> bool: """是否有紧急级别的未处理预警""" return any(a.level == AlertLevel.EMERGENCY for a in self.unhandled_alerts)