Files
sanguo_quant_live/risk-management/realtime-system/tests/stress_test.py
T

222 lines
6.5 KiB
Python

"""
实时风控系统压力测试
============
测试场景:
1. 正常行情波动 - 无预警
2. 单日回撤 - 触发警告
3. 连续回撤 - 触发紧急处理
4. 高仓位 - 触发仓位预警
5. 极端情况 - 触发紧急清仓
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from datetime import datetime
import random
from realtime_risk_panel import RealtimeRiskPanel
def test_normal_market():
"""测试1:正常行情波动"""
print("\n=== 测试1:正常行情波动 ===")
panel = RealtimeRiskPanel(risk_style="conservative")
# 初始资金100万
panel.update_net_value(datetime.now(), 1000000)
panel.update_position("600519", 100, 1800) # 贵州茅台 100股
panel.update_position("000001", 1000, 20) # 平安银行 1000股
result = panel.update(datetime.now(), 1000000, 1000000 - (100*1800 + 1000*20))
print(f"当前紧急级别: {result['current_emergency_level']}")
print(f"是否允许开仓: {result['can_open_position']}")
print(f"未处理预警数: {len(result['new_alerts'])}")
if result['current_emergency_level'] == '正常' and result['can_open_position']:
print("✅ 测试1通过")
return True
else:
print("❌ 测试1失败")
return False
def test_daily_drawdown_warning():
"""测试2:单日回撤触发警告"""
print("\n=== 测试2:单日回撤触发警告 ===")
panel = RealtimeRiskPanel(risk_style="conservative")
# 初始
panel.update_net_value(datetime.now(), 1000000)
# 单日回撤4%,触发警告
panel.update_net_value(datetime.now(), 960000)
result = panel.update(datetime.now(), 960000, 960000)
print(f"当前紧急级别: {result['current_emergency_level']}")
print(f"新预警数: {len(result['new_alerts'])}")
alerts = panel.get_unhandled_alerts()
if len(alerts) >= 1:
print(f"⚠️ 触发预警: {alerts[0]['message']}")
print("✅ 测试2通过")
return True
else:
print("❌ 测试2失败")
return False
def test_continuous_drawdown():
"""测试3:连续回撤触发限制开仓"""
print("\n=== 测试3:连续回撤触发限制开仓 ===")
panel = RealtimeRiskPanel(risk_style="conservative")
# 连续下跌
values = [1000000, 980000, 950000, 920000, 890000]
for i, v in enumerate(values):
dt = datetime.now()
panel.update_net_value(dt, v)
panel.update(dt, v, v)
summary = panel.get_panel_summary()
print(f"当前紧急级别: {summary['current_emergency_level']}")
print(f"未处理预警数: {summary['unhandled_alerts']}")
print(f"允许开仓: {summary['can_open_position']}")
if not summary['can_open_position']:
print("✅ 测试3通过,已限制开仓")
return True
else:
print("❌ 测试3失败")
return False
def test_high_position_concentration():
"""测试4:高持仓集中度"""
print("\n=== 测试4:高持仓集中度 ===")
panel = RealtimeRiskPanel(risk_style="conservative")
panel.update_net_value(datetime.now(), 1000000)
# 单个持仓50%,超过40%临界值
panel.update_position("600519", 278, 1800) # 500400 / 1000000 ≈ 50%
result = panel.update(datetime.now(), 1000000, 500000)
alerts = panel.get_unhandled_alerts()
concentration_alerts = [a for a in alerts if a['alert_type'] == 'CONCENTRATION']
print(f"集中度预警数: {len(concentration_alerts)}")
if concentration_alerts:
print(f"⚠️ {concentration_alerts[0]['message']}")
if len(concentration_alerts) >= 1:
print("✅ 测试4通过")
return True
else:
print("❌ 测试4失败")
return False
def test_emergency_liquidate():
"""测试5:极端情况触发紧急清仓"""
print("\n=== 测试5:极端情况触发紧急清仓 ===")
panel = RealtimeRiskPanel(risk_style="conservative")
# 极端回撤:从100万跌到80万,回撤20%
panel.update_net_value(datetime.now(), 1000000)
values = [1000000, 970000, 930000, 880000, 850000, 800000]
for v in values:
dt = datetime.now()
panel.update_net_value(dt, v)
panel.update(dt, v, v)
summary = panel.get_panel_summary()
print(f"当前紧急级别: {summary['current_emergency_level']}")
print(f"需要紧急清仓: {summary['need_emergency_liquidate']}")
if summary['need_emergency_liquidate']:
print("✅ 测试5通过,触发紧急清仓")
return True
else:
print("❌ 测试5失败")
return False
def test_performance():
"""性能测试:1000次更新"""
print("\n=== 性能测试:1000次更新 ===")
import time
panel = RealtimeRiskPanel(risk_style="conservative")
panel.update_net_value(datetime.now(), 1000000)
# 添加10只股票
symbols = [f"{i:06d}" for i in range(1, 11)]
for i, sym in enumerate(symbols):
price = 10 + i * 10
panel.update_position(sym, 1000, price)
start = time.time()
for i in range(1000):
# 随机波动净值
nv = 1000000 * (1 + random.uniform(-0.001, 0.001))
panel.update(datetime.now(), nv, nv * 0.3)
end = time.time()
elapsed = end - start
qps = 1000 / elapsed
print(f"1000次更新耗时: {elapsed:.3f}")
print(f"QPS: {qps:.1f} 次/秒")
if qps > 100:
print("✅ 性能测试通过")
return True
else:
print("⚠️ 性能一般,但可以接受")
return True
def main():
"""运行所有测试"""
print("🚀 开始实时风控系统压力测试\n")
tests = [
test_normal_market,
test_daily_drawdown_warning,
test_continuous_drawdown,
test_high_position_concentration,
test_emergency_liquidate,
test_performance
]
passed = 0
failed = 0
for test in tests:
try:
if test():
passed += 1
else:
failed += 1
except Exception as e:
print(f"💥 测试 {test.__name__} 异常: {e}")
failed += 1
print(f"\n📊 测试结果汇总:")
print(f"通过: {passed}/{len(tests)}")
print(f"失败: {failed}/{len(tests)}")
if failed == 0:
print("\n🎉 所有测试通过!系统正常工作!")
return 0
else:
print(f"\n{failed} 个测试失败")
return 1
if __name__ == "__main__":
sys.exit(main())