Files
sanguo_quant_live/guanyu-risk/realtime-system/src/realtime_risk_panel.py
T
cfdaily affcfa0c72 按照工作流规则进行目录整理
**主要调整:**
1. 重命名将军工作区目录:
   - data-engineering → zhaoyun-data (赵云数据工程)
   - risk-management → guanyu-risk (关羽风控管理)
   - platform → jiangwei-platform (姜维平台)
   - technical-strategy → zhangfei-technical (张飞技术策略)

2. 创建新目录:
   - archive/ (归档目录)
   - simayi-quality/ (司马懿质量保证)
   - pangtong-value/ (庞统价值投资)

3. 移动内容:
   - value-investing → pangtong-value/research (庞统价值投资)
   - running_data → zhaoyun-data/data (运行数据)
   - 文件任务管理系统文档 → archive/file-task-system

4. 清理文件:
   - 删除所有日志文件
   - 删除agent脚本
   - 删除knowledge-base (使用统一知识库)

5. 创建标准结构:
   - 各将军目录下创建research/, scripts/, reports/, references/子目录

6. 更新.gitignore:
   - 排除日志文件和临时文件

**依据:** management/workflow-rules.md
**制定:** 庞统(凤雏)
**审核:** 诸葛亮
2026-03-25 17:27:35 +08:00

148 lines
5.6 KiB
Python

"""
实时风控面板(主入口)
============
整合风险计算、监控、紧急处理,提供统一接口
"""
from typing import Dict, List, Optional, Callable, Tuple
from datetime import datetime
from risk_calculator import RealTimeRiskCalculator, RiskMetrics
from risk_monitor import RealTimeRiskMonitor, ThresholdConfig, RiskAlert, AlertLevel
from emergency_handler import EmergencyHandler, EmergencyConfig, EmergencyAction, EmergencyLevel
class RealtimeRiskPanel:
"""实时风控面板 - 主入口"""
def __init__(self,
threshold_config: Optional[ThresholdConfig] = None,
emergency_config: Optional[EmergencyConfig] = None,
risk_style: str = "conservative"):
"""
初始化
参数:
risk_style: "conservative" / "aggressive"
"""
# 风险计算器
self.calculator = RealTimeRiskCalculator()
# 阈值配置
if threshold_config is None:
if risk_style == "conservative":
threshold_config = ThresholdConfig.conservative()
else:
threshold_config = ThresholdConfig.aggressive()
# 风险监控
self.monitor = RealTimeRiskMonitor(threshold_config, self.calculator)
# 紧急处理
if emergency_config is None:
emergency_config = EmergencyConfig()
self.emergency = EmergencyHandler(emergency_config, self.monitor)
# 设置回调
self.monitor.on_alert_callback = self._on_alert
self.emergency.on_emergency_action = self._on_emergency
# 外部回调
self.on_alert_external: Optional[Callable[[RiskAlert], None]] = None
self.on_emergency_external: Optional[Callable[[EmergencyAction], None]] = None
# 统计
self.update_count = 0
def _on_alert(self, alert: RiskAlert) -> None:
"""内部预警回调"""
if self.on_alert_external:
self.on_alert_external(alert)
def _on_emergency(self, action: EmergencyAction) -> None:
"""内部紧急行动回调"""
if self.on_emergency_external:
self.on_emergency_external(action)
def update(self, timestamp: datetime,
total_capital: float,
cash: float,
liquidate_func: Optional[Callable[[], Tuple[bool, str]]] = None) -> Dict:
"""更新一次风控检查"""
# 计算所有指标
metrics = self.monitor.update_and_check(timestamp, total_capital, cash)
# 检查紧急情况并处理
new_actions = self.emergency.check_and_handle(liquidate_func)
self.update_count += 1
return {
'metrics': metrics.to_dict(),
'new_alerts': [a.to_dict() for a in self.monitor.unhandled_alerts[-5:]],
'new_actions': [a.to_dict() for a in new_actions],
'current_emergency_level': self.emergency.current_level.value,
'can_open_position': self.emergency.can_open_position(),
'system_running': self.emergency.is_system_running()
}
def update_position(self, symbol: str, volume: int, price: float) -> None:
"""更新持仓信息"""
market_value = volume * price
self.calculator.update_position(symbol, volume, price, market_value)
def remove_position(self, symbol: str) -> None:
"""移除持仓"""
self.calculator.remove_position(symbol)
def update_net_value(self, timestamp: datetime, net_value: float) -> None:
"""更新净值"""
self.calculator.update_net_value(timestamp, net_value)
def get_current_metrics(self) -> Optional[RiskMetrics]:
"""获取当前风险指标"""
return self.calculator.last_metrics
def get_all_alerts(self) -> List[Dict]:
"""获取所有预警"""
return [a.to_dict() for a in self.monitor.get_all_alerts()]
def get_unhandled_alerts(self) ->List[Dict]:
"""获取未处理预警"""
return [a.to_dict() for a in self.monitor.get_unhandled_alerts()]
def get_action_history(self) -> List[Dict]:
"""获取紧急行动历史"""
return [a.to_dict() for a in self.emergency.get_action_history()]
def get_current_level(self) -> str:
"""获取当前紧急级别"""
return self.emergency.get_current_level().value
def mark_alert_handled(self, alert_id: str) -> bool:
"""标记预警已处理"""
success = self.monitor.mark_handled(alert_id)
if success:
# 重新评估紧急级别
_ = self.emergency.assess_emergency_level()
return success
def clear_all(self) -> None:
"""清除所有预警(风险解除)"""
self.emergency.clear_all_alerts()
def get_panel_summary(self) -> Dict:
"""获取面板汇总信息"""
metrics = self.get_current_metrics()
return {
'update_count': self.update_count,
'current_emergency_level': self.get_current_level(),
'total_alerts': len(self.monitor.get_all_alerts()),
'unhandled_alerts': len(self.monitor.get_unhandled_alerts()),
'total_actions': len(self.emergency.get_action_history()),
'current_metrics': metrics.to_dict() if metrics else None,
'can_open_position': self.emergency.can_open_position(),
'system_running': self.emergency.is_system_running(),
'need_emergency_liquidate': self.emergency.need_emergency_liquidate()
}