""" 实时风险指标计算引擎 ============ 实时计算各类风险指标,支持: - 持仓风险指标计算 - 动态VaR计算 - 实时回撤监控 - 流动性风险计算 """ from typing import Dict, List, Optional, Tuple from datetime import datetime from dataclasses import dataclass import pandas as pd import numpy as np @dataclass class RiskMetrics: """风险指标结果""" timestamp: datetime total_value: float # 总资产 position_value: float # 持仓市值 cash_value: float # 现金 position_pct: float # 仓位比例 daily_return: float # 日收益率 daily_drawdown: float # 日回撤 total_drawdown: float # 总回撤 max_drawdown: float # 最大回撤 var_95: float # 95%置信度VaR var_99: float # 99%置信度VaR volatility: float # 波动率 concentration_pct: float # 最大持仓集中度 def to_dict(self) -> Dict: """转换为字典""" return { 'timestamp': self.timestamp.isoformat(), 'total_value': self.total_value, 'position_value': self.position_value, 'cash_value': self.cash_value, 'position_pct': self.position_pct, 'daily_return': self.daily_return, 'daily_drawdown': self.daily_drawdown, 'total_drawdown': self.total_drawdown, 'max_drawdown': self.max_drawdown, 'var_95': self.var_95, 'var_99': self.var_99, 'volatility': self.volatility, 'concentration_pct': self.concentration_pct, } class RealTimeRiskCalculator: """实时风险指标计算引擎""" def __init__(self, window_size: int = 252, confidence_levels: List[float] = [0.95, 0.99]): """初始化""" self.window_size = window_size self.confidence_levels = confidence_levels # 历史净值记录 self.net_values: List[Tuple[datetime, float]] = [] # 历史收益率记录 self.returns: List[float] = [] # 最高净值 self.max_net_value: float = 0.0 # 持仓信息 self.positions: Dict[str, Dict] = {} # 价格缓存 self.prices: Dict[str, float] = {} # 计算结果缓存 self.last_metrics: Optional[RiskMetrics] = None def update_net_value(self, timestamp: datetime, net_value: float) -> None: """更新净值""" if self.net_values: last_value = self.net_values[-1][1] if last_value > 0: ret = (net_value - last_value) / last_value self.returns.append(ret) self.net_values.append((timestamp, net_value)) if net_value > self.max_net_value: self.max_net_value = net_value # 保持窗口大小 if len(self.returns) > self.window_size: self.returns.pop(0) def update_position(self, symbol: str, volume: int, price: float, market_value: float) -> None: """更新持仓""" self.positions[symbol] = { 'volume': volume, 'price': price, 'market_value': market_value } self.prices[symbol] = price def remove_position(self, symbol: str) -> None: """移除持仓""" if symbol in self.positions: del self.positions[symbol] if symbol in self.prices: del self.prices[symbol] def calculate_var(self, returns: List[float], confidence: float) -> float: """计算VaR(风险价值)- 历史模拟法""" if not returns: return 0.0 returns_sorted = sorted(returns) index = int(len(returns_sorted) * (1 - confidence)) if index >= len(returns_sorted): index = len(returns_sorted) - 1 return -returns_sorted[index] def calculate_volatility(self, returns: List[float]) -> float: """计算波动率(年化)""" if len(returns) < 2: return 0.0 std = np.std(returns, ddof=1) # 年化(252个交易日) return std * np.sqrt(252) def calculate_concentration(self, total_position: float) -> float: """计算最大持仓集中度""" if not self.positions or total_position <= 0: return 0.0 max_value = max(p['market_value'] for p in self.positions.values()) return max_value / total_position def calculate_drawdowns(self, current_net_value: float) -> Tuple[float, float, float]: """计算各类回撤""" if not self.net_values or self.max_net_value <= 0: return (0.0, 0.0, 0.0) if len(self.net_values) < 2: return (0.0, 0.0, 0.0) prev_net_value = self.net_values[-2][1] # 日回撤 daily_drawdown = (prev_net_value - current_net_value) / prev_net_value if prev_net_value > 0 else 0 daily_drawdown = max(daily_drawdown, 0) # 总回撤 total_drawdown = (self.max_net_value - current_net_value) / self.max_net_value total_drawdown = max(total_drawdown, 0) # 最大回撤(从历史记录计算) if len(self.net_values) >= 2: peak = self.net_values[0][1] max_dd = 0.0 for _, nv in self.net_values[1:]: if nv > peak: peak = nv else: dd = (peak - nv) / peak if dd > max_dd: max_dd = dd return (daily_drawdown, total_drawdown, max_dd) return (daily_drawdown, total_drawdown, 0.0) def calculate_all_metrics(self, timestamp: datetime, total_capital: float, cash: float) -> RiskMetrics: """计算所有风险指标""" current_total = cash + sum(p['market_value'] for p in self.positions.values()) position_total = sum(p['market_value'] for p in self.positions.values()) # 计算仓位比例 position_pct = position_total / current_total if current_total > 0 else 0 # 当前净值 if not self.net_values: self.update_net_value(timestamp, current_total) # 计算VaR var_95 = self.calculate_var(self.returns, 0.95) var_99 = self.calculate_var(self.returns, 0.99) # 计算波动率 volatility = self.calculate_volatility(self.returns) # 计算集中度 concentration = self.calculate_concentration(position_total) # 计算回撤 daily_dd, total_dd, max_dd = self.calculate_drawdowns(current_total) # 日收益率 if len(self.returns) > 0: daily_ret = self.returns[-1] if self.returns else 0 else: daily_ret = 0 metrics = RiskMetrics( timestamp=timestamp, total_value=current_total, position_value=position_total, cash_value=cash, position_pct=position_pct, daily_return=daily_ret, daily_drawdown=daily_dd, total_drawdown=total_dd, max_drawdown=max_dd, var_95=var_95, var_99=var_99, volatility=volatility, concentration_pct=concentration ) self.last_metrics = metrics return metrics