add: 关羽完成已调研结果和实时风控系统上传
This commit is contained in:
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
高频算法性能基准测试
|
||||
测试不同实现方式的性能
|
||||
"""
|
||||
|
||||
import time
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
from high_frequency_signal import (
|
||||
TechnicalFactorCalculator,
|
||||
PerformanceMonitor,
|
||||
BarData,
|
||||
)
|
||||
|
||||
|
||||
def generate_test_data(n: int = 10000) -> pd.DataFrame:
|
||||
"""生成随机测试数据"""
|
||||
np.random.seed(42)
|
||||
|
||||
# 生成随机游走价格
|
||||
returns = np.random.normal(0.0001, 0.01, n)
|
||||
price = 100 * np.exp(np.cumsum(returns))
|
||||
|
||||
# 生成OHLCV
|
||||
high = price * (1 + np.random.uniform(0, 0.01, n))
|
||||
low = price * (1 - np.random.uniform(0, 0.01, n))
|
||||
open_ = price * (1 + np.random.normal(0, 0.005, n))
|
||||
volume = np.random.randint(100000, 10000000, n)
|
||||
|
||||
df = pd.DataFrame({
|
||||
'open': open_,
|
||||
'high': high,
|
||||
'low': low,
|
||||
'close': price,
|
||||
'volume': volume,
|
||||
})
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def benchmark_technical_calculator():
|
||||
"""测试技术因子计算器性能"""
|
||||
print("=" * 60)
|
||||
print("技术因子计算器性能基准测试")
|
||||
print("=" * 60)
|
||||
|
||||
calc = TechnicalFactorCalculator()
|
||||
monitor = PerformanceMonitor()
|
||||
|
||||
# 测试不同数据大小
|
||||
for size in [100, 1000, 10000]:
|
||||
df = generate_test_data(size)
|
||||
close = df['close'].values
|
||||
high = df['high'].values
|
||||
low = df['low'].values
|
||||
volume = df['volume'].values
|
||||
|
||||
n_runs = 1000 if size <= 1000 else 100
|
||||
|
||||
# 测试MA
|
||||
start_time = time.time()
|
||||
for _ in range(n_runs):
|
||||
_ = calc.calculate_ma(close, 20)
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"MA {size} points x{n_runs} runs: {elapsed:.2f} ms "
|
||||
f"({elapsed/n_runs:.3f} ms/run)")
|
||||
|
||||
# 测试MACD
|
||||
start_time = time.time()
|
||||
for _ in range(n_runs):
|
||||
_ = calc.calculate_macd(close, 12, 26, 9)
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"MACD {size} points x{n_runs} runs: {elapsed:.2f} ms "
|
||||
f"({elapsed/n_runs:.3f} ms/run)")
|
||||
|
||||
# 测试RSI
|
||||
start_time = time.time()
|
||||
for _ in range(n_runs):
|
||||
_ = calc.calculate_rsi(close, 14)
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"RSI {size} points x{n_runs} runs: {elapsed:.2f} ms "
|
||||
f"({elapsed/n_runs:.3f} ms/run)")
|
||||
|
||||
# 测试布林带
|
||||
start_time = time.time()
|
||||
for _ in range(n_runs):
|
||||
_ = calc.calculate_bollinger_bands(close, 20, 2.0)
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"Bollinger {size} points x{n_runs} runs: {elapsed:.2f} ms "
|
||||
f"({elapsed/n_runs:.3f} ms/run)")
|
||||
|
||||
# 测试ATR
|
||||
start_time = time.time()
|
||||
for _ in range(n_runs):
|
||||
_ = calc.calculate_atr(high, low, close, 14)
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"ATR {size} points x{n_runs} runs: {elapsed:.2f} ms "
|
||||
f"({elapsed/n_runs:.3f} ms/run)")
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def benchmark_full_pipeline():
|
||||
"""测试完整信号生成流水线"""
|
||||
print("=" * 60)
|
||||
print("完整信号生成流水线性能基准测试")
|
||||
print("=" * 60)
|
||||
|
||||
from high_frequency_signal import HighFrequencySignalGenerator
|
||||
|
||||
calc = TechnicalFactorCalculator()
|
||||
generator = HighFrequencySignalGenerator()
|
||||
monitor = PerformanceMonitor()
|
||||
|
||||
# 生成测试数据
|
||||
n_days = 252 * 5
|
||||
df = generate_test_data(n_days)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# 模拟逐日处理
|
||||
signals = []
|
||||
for i in range(20, len(df)):
|
||||
data = df.iloc[:i+1]
|
||||
bar = BarData(
|
||||
code="TEST",
|
||||
datetime=datetime.now(),
|
||||
open=data['open'].iloc[-1],
|
||||
high=data['high'].iloc[-1],
|
||||
low=data['low'].iloc[-1],
|
||||
close=data['close'].iloc[-1],
|
||||
volume=data['volume'].iloc[-1],
|
||||
amount=data['volume'].iloc[-1] * data['close'].iloc[-1]
|
||||
)
|
||||
monitor.on_process_start()
|
||||
signal = generator.generate_signal_from_bar(bar, data)
|
||||
monitor.on_process_end()
|
||||
if signal:
|
||||
signals.append(signal)
|
||||
|
||||
elapsed = (time.time() - start_time) * 1000
|
||||
print(f"完整回测 {n_days} 根K线,生成 {len(signals)} 个信号")
|
||||
print(f"总耗时: {elapsed:.2f} ms")
|
||||
print(f"平均每根K线: {elapsed/n_days:.3f} ms")
|
||||
print()
|
||||
|
||||
monitor.print_stats()
|
||||
|
||||
stats = generator.get_performance_stats()
|
||||
print(f"\n信号统计: {stats}")
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
benchmark_technical_calculator()
|
||||
print()
|
||||
benchmark_full_pipeline()
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("基准测试完成")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user