407 lines
15 KiB
Python
407 lines
15 KiB
Python
"""
|
|
实时风险阈值监控系统
|
|
============
|
|
|
|
实时监控风险指标,触发阈值自动预警:
|
|
- 单日回撤预警
|
|
- 累计回撤预警
|
|
- 高仓位预警
|
|
- 集中度预警
|
|
- VaR超限预警
|
|
"""
|
|
from typing import Dict, List, Optional, Callable
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from risk_calculator import RiskMetrics, RealTimeRiskCalculator
|
|
|
|
|
|
class AlertLevel(Enum):
|
|
"""预警级别"""
|
|
INFO = "信息"
|
|
WARNING = "警告"
|
|
CRITICAL = "严重"
|
|
EMERGENCY = "紧急"
|
|
|
|
|
|
@dataclass
|
|
class RiskAlert:
|
|
"""风险预警"""
|
|
alert_id: str
|
|
alert_type: str
|
|
level: AlertLevel
|
|
metric_name: str
|
|
current_value: float
|
|
threshold: float
|
|
timestamp: datetime
|
|
message: str
|
|
handled: bool = False
|
|
|
|
def to_dict(self) -> Dict:
|
|
return {
|
|
'alert_id': self.alert_id,
|
|
'alert_type': self.alert_type,
|
|
'level': self.level.value,
|
|
'metric_name': self.metric_name,
|
|
'current_value': self.current_value,
|
|
'threshold': self.threshold,
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'message': self.message,
|
|
'handled': self.handled
|
|
}
|
|
|
|
|
|
class ThresholdConfig:
|
|
"""阈值配置"""
|
|
|
|
def __init__(self,
|
|
daily_drawdown_warning: float = 0.03,
|
|
daily_drawdown_critical: float = 0.05,
|
|
total_drawdown_warning: float = 0.10,
|
|
total_drawdown_critical: float = 0.15,
|
|
max_drawdown_critical: float = 0.20,
|
|
position_pct_warning: float = 0.80,
|
|
position_pct_critical: float = 0.95,
|
|
concentration_warning: float = 0.30,
|
|
concentration_critical: float = 0.50,
|
|
var_95_warning: float = 0.03,
|
|
var_95_critical: float = 0.05,
|
|
volatility_warning: float = 0.30,
|
|
volatility_critical: float = 0.50):
|
|
"""初始化阈值配置"""
|
|
self.daily_drawdown_warning = daily_drawdown_warning
|
|
self.daily_drawdown_critical = daily_drawdown_critical
|
|
self.total_drawdown_warning = total_drawdown_warning
|
|
self.total_drawdown_critical = total_drawdown_critical
|
|
self.max_drawdown_critical = max_drawdown_critical
|
|
self.position_pct_warning = position_pct_warning
|
|
self.position_pct_critical = position_pct_critical
|
|
self.concentration_warning = concentration_warning
|
|
self.concentration_critical = concentration_critical
|
|
self.var_95_warning = var_95_warning
|
|
self.var_95_critical = var_95_critical
|
|
self.volatility_warning = volatility_warning
|
|
self.volatility_critical = volatility_critical
|
|
|
|
@classmethod
|
|
def conservative(cls) -> 'ThresholdConfig':
|
|
"""保守配置(严格控制)"""
|
|
return cls(
|
|
daily_drawdown_warning=0.02,
|
|
daily_drawdown_critical=0.03,
|
|
total_drawdown_warning=0.08,
|
|
total_drawdown_critical=0.12,
|
|
max_drawdown_critical=0.15,
|
|
position_pct_warning=0.70,
|
|
position_pct_critical=0.85,
|
|
concentration_warning=0.25,
|
|
concentration_critical=0.40,
|
|
var_95_warning=0.02,
|
|
var_95_critical=0.03,
|
|
volatility_warning=0.25,
|
|
volatility_critical=0.40
|
|
)
|
|
|
|
@classmethod
|
|
def aggressive(cls) -> 'ThresholdConfig':
|
|
"""进取配置(风险承受较高)"""
|
|
return cls(
|
|
daily_drawdown_warning=0.05,
|
|
daily_drawdown_critical=0.08,
|
|
total_drawdown_warning=0.15,
|
|
total_drawdown_critical=0.20,
|
|
max_drawdown_critical=0.25,
|
|
position_pct_warning=0.85,
|
|
position_pct_critical=0.95,
|
|
concentration_warning=0.40,
|
|
concentration_critical=0.60,
|
|
var_95_warning=0.04,
|
|
var_95_critical=0.06,
|
|
volatility_warning=0.40,
|
|
volatility_critical=0.60
|
|
)
|
|
|
|
|
|
class RealTimeRiskMonitor:
|
|
"""实时风险监控器"""
|
|
|
|
def __init__(self,
|
|
config: Optional[ThresholdConfig] = None,
|
|
risk_calculator: Optional[RealTimeRiskCalculator] = None):
|
|
"""初始化"""
|
|
self.config = config or ThresholdConfig()
|
|
self.risk_calculator = risk_calculator or RealTimeRiskCalculator()
|
|
|
|
# 预警列表
|
|
self.alerts: List[RiskAlert] = []
|
|
# 未处理预警
|
|
self.unhandled_alerts: List[RiskAlert] = []
|
|
# 预警回调
|
|
self.on_alert_callback: Optional[Callable[[RiskAlert], None]] = None
|
|
|
|
# 告警计数
|
|
self._alert_counter = 0
|
|
|
|
def _generate_alert_id(self) -> str:
|
|
"""生成预警ID"""
|
|
self._alert_counter += 1
|
|
ts = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
return f"ALERT-{ts}-{self._alert_counter:04d}"
|
|
|
|
def check_daily_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查单日回撤"""
|
|
dd = metrics.daily_drawdown
|
|
|
|
if dd >= self.config.daily_drawdown_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="DAILY_DRAWDOWN",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="单日回撤",
|
|
current_value=dd,
|
|
threshold=self.config.daily_drawdown_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"单日回撤 {dd:.2%} 超过临界阈值 {self.config.daily_drawdown_critical:.2%}"
|
|
)
|
|
elif dd >= self.config.daily_drawdown_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="DAILY_DRAWDOWN",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="单日回撤",
|
|
current_value=dd,
|
|
threshold=self.config.daily_drawdown_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"单日回撤 {dd:.2%} 超过警告阈值 {self.config.daily_drawdown_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_total_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查累计回撤"""
|
|
dd = metrics.total_drawdown
|
|
|
|
if dd >= self.config.total_drawdown_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="TOTAL_DRAWDOWN",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="累计回撤",
|
|
current_value=dd,
|
|
threshold=self.config.total_drawdown_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"累计回撤 {dd:.2%} 超过临界阈值 {self.config.total_drawdown_critical:.2%}"
|
|
)
|
|
elif dd >= self.config.total_drawdown_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="TOTAL_DRAWDOWN",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="累计回撤",
|
|
current_value=dd,
|
|
threshold=self.config.total_drawdown_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"累计回撤 {dd:.2%} 超过警告阈值 {self.config.total_drawdown_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_max_drawdown(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查最大回撤"""
|
|
dd = metrics.max_drawdown
|
|
|
|
if dd >= self.config.max_drawdown_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="MAX_DRAWDOWN",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="最大回撤",
|
|
current_value=dd,
|
|
threshold=self.config.max_drawdown_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"最大回撤 {dd:.2%} 超过临界阈值 {self.config.max_drawdown_critical:.2%},建议紧急止损"
|
|
)
|
|
return None
|
|
|
|
def check_position_pct(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查仓位比例"""
|
|
pct = metrics.position_pct
|
|
|
|
if pct >= self.config.position_pct_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="HIGH_POSITION",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="仓位比例",
|
|
current_value=pct,
|
|
threshold=self.config.position_pct_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"仓位比例 {pct:.2%} 超过临界阈值 {self.config.position_pct_critical:.2%}"
|
|
)
|
|
elif pct >= self.config.position_pct_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="HIGH_POSITION",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="仓位比例",
|
|
current_value=pct,
|
|
threshold=self.config.position_pct_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"仓位比例 {pct:.2%} 超过警告阈值 {self.config.position_pct_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_concentration(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查集中度"""
|
|
pct = metrics.concentration_pct
|
|
|
|
if pct >= self.config.concentration_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="CONCENTRATION",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="持仓集中度",
|
|
current_value=pct,
|
|
threshold=self.config.concentration_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"最大持仓集中度 {pct:.2%} 超过临界阈值 {self.config.concentration_critical:.2%}"
|
|
)
|
|
elif pct >= self.config.concentration_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="CONCENTRATION",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="持仓集中度",
|
|
current_value=pct,
|
|
threshold=self.config.concentration_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"最大持仓集中度 {pct:.2%} 超过警告阈值 {self.config.concentration_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_var(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查VaR"""
|
|
var = metrics.var_95
|
|
|
|
if var >= self.config.var_95_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="VAR_OVERFLOW",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="VaR(95%)",
|
|
current_value=var,
|
|
threshold=self.config.var_95_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"95%置信度VaR {var:.2%} 超过临界阈值 {self.config.var_95_critical:.2%}"
|
|
)
|
|
elif var >= self.config.var_95_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="VAR_OVERFLOW",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="VaR(95%)",
|
|
current_value=var,
|
|
threshold=self.config.var_95_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"95%置信度VaR {var:.2%} 超过警告阈值 {self.config.var_95_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_volatility(self, metrics: RiskMetrics) -> Optional[RiskAlert]:
|
|
"""检查波动率"""
|
|
vol = metrics.volatility
|
|
|
|
if vol >= self.config.volatility_critical:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="HIGH_VOLATILITY",
|
|
level=AlertLevel.CRITICAL,
|
|
metric_name="年化波动率",
|
|
current_value=vol,
|
|
threshold=self.config.volatility_critical,
|
|
timestamp=datetime.now(),
|
|
message=f"年化波动率 {vol:.2%} 超过临界阈值 {self.config.volatility_critical:.2%}"
|
|
)
|
|
elif vol >= self.config.volatility_warning:
|
|
return RiskAlert(
|
|
alert_id=self._generate_alert_id(),
|
|
alert_type="HIGH_VOLATILITY",
|
|
level=AlertLevel.WARNING,
|
|
metric_name="年化波动率",
|
|
current_value=vol,
|
|
threshold=self.config.volatility_warning,
|
|
timestamp=datetime.now(),
|
|
message=f"年化波动率 {vol:.2%} 超过警告阈值 {self.config.volatility_warning:.2%}"
|
|
)
|
|
return None
|
|
|
|
def check_all(self, metrics: RiskMetrics) -> List[RiskAlert]:
|
|
"""检查所有阈值"""
|
|
new_alerts = []
|
|
|
|
checks = [
|
|
self.check_daily_drawdown,
|
|
self.check_total_drawdown,
|
|
self.check_max_drawdown,
|
|
self.check_position_pct,
|
|
self.check_concentration,
|
|
self.check_var,
|
|
self.check_volatility
|
|
]
|
|
|
|
for check_func in checks:
|
|
alert = check_func(metrics)
|
|
if alert:
|
|
new_alerts.append(alert)
|
|
|
|
# 添加到列表
|
|
for alert in new_alerts:
|
|
self.alerts.append(alert)
|
|
self.unhandled_alerts.append(alert)
|
|
if self.on_alert_callback:
|
|
self.on_alert_callback(alert)
|
|
|
|
return new_alerts
|
|
|
|
def update_and_check(self, timestamp: datetime,
|
|
total_capital: float, cash: float) -> RiskMetrics:
|
|
"""更新并检查所有风险指标"""
|
|
metrics = self.risk_calculator.calculate_all_metrics(
|
|
timestamp, total_capital, cash
|
|
)
|
|
new_alerts = self.check_all(metrics)
|
|
|
|
if new_alerts:
|
|
print(f"⚠️ 产生 {len(new_alerts)} 个新风险预警")
|
|
for alert in new_alerts:
|
|
print(f" [{alert.level.value}] {alert.message}")
|
|
|
|
return metrics
|
|
|
|
def mark_handled(self, alert_id: str) -> bool:
|
|
"""标记预警已处理"""
|
|
for alert in self.unhandled_alerts:
|
|
if alert.alert_id == alert_id:
|
|
alert.handled = True
|
|
self.unhandled_alerts.remove(alert)
|
|
return True
|
|
return False
|
|
|
|
def get_unhandled_alerts(self) -> List[RiskAlert]:
|
|
"""获取未处理预警"""
|
|
return self.unhandled_alerts
|
|
|
|
def get_all_alerts(self) -> List[RiskAlert]:
|
|
"""获取所有预警"""
|
|
return self.alerts
|
|
|
|
def get_alerts_by_level(self, level: AlertLevel) -> List[RiskAlert]:
|
|
"""按级别获取预警"""
|
|
return [a for a in self.alerts if a.level == level]
|
|
|
|
def has_critical_alerts(self) -> bool:
|
|
"""是否有严重级别的未处理预警"""
|
|
return any(a.level == AlertLevel.CRITICAL for a in self.unhandled_alerts)
|
|
|
|
def has_emergency_alerts(self) -> bool:
|
|
"""是否有紧急级别的未处理预警"""
|
|
return any(a.level == AlertLevel.EMERGENCY for a in self.unhandled_alerts)
|