Files
sanguo_quant_live/technical-strategy/02-algorithms/benchmark_test.py
T

167 lines
4.7 KiB
Python

"""
高频算法性能基准测试
测试不同实现方式的性能
"""
import time
import numpy as np
import pandas as pd
from datetime import datetime
from high_frequency_signal import (
TechnicalFactorCalculator,
PerformanceMonitor,
BarData,
)
def generate_test_data(n: int = 10000) -> pd.DataFrame:
"""生成随机测试数据"""
np.random.seed(42)
# 生成随机游走价格
returns = np.random.normal(0.0001, 0.01, n)
price = 100 * np.exp(np.cumsum(returns))
# 生成OHLCV
high = price * (1 + np.random.uniform(0, 0.01, n))
low = price * (1 - np.random.uniform(0, 0.01, n))
open_ = price * (1 + np.random.normal(0, 0.005, n))
volume = np.random.randint(100000, 10000000, n)
df = pd.DataFrame({
'open': open_,
'high': high,
'low': low,
'close': price,
'volume': volume,
})
return df
def benchmark_technical_calculator():
"""测试技术因子计算器性能"""
print("=" * 60)
print("技术因子计算器性能基准测试")
print("=" * 60)
calc = TechnicalFactorCalculator()
monitor = PerformanceMonitor()
# 测试不同数据大小
for size in [100, 1000, 10000]:
df = generate_test_data(size)
close = df['close'].values
high = df['high'].values
low = df['low'].values
volume = df['volume'].values
n_runs = 1000 if size <= 1000 else 100
# 测试MA
start_time = time.time()
for _ in range(n_runs):
_ = calc.calculate_ma(close, 20)
elapsed = (time.time() - start_time) * 1000
print(f"MA {size} points x{n_runs} runs: {elapsed:.2f} ms "
f"({elapsed/n_runs:.3f} ms/run)")
# 测试MACD
start_time = time.time()
for _ in range(n_runs):
_ = calc.calculate_macd(close, 12, 26, 9)
elapsed = (time.time() - start_time) * 1000
print(f"MACD {size} points x{n_runs} runs: {elapsed:.2f} ms "
f"({elapsed/n_runs:.3f} ms/run)")
# 测试RSI
start_time = time.time()
for _ in range(n_runs):
_ = calc.calculate_rsi(close, 14)
elapsed = (time.time() - start_time) * 1000
print(f"RSI {size} points x{n_runs} runs: {elapsed:.2f} ms "
f"({elapsed/n_runs:.3f} ms/run)")
# 测试布林带
start_time = time.time()
for _ in range(n_runs):
_ = calc.calculate_bollinger_bands(close, 20, 2.0)
elapsed = (time.time() - start_time) * 1000
print(f"Bollinger {size} points x{n_runs} runs: {elapsed:.2f} ms "
f"({elapsed/n_runs:.3f} ms/run)")
# 测试ATR
start_time = time.time()
for _ in range(n_runs):
_ = calc.calculate_atr(high, low, close, 14)
elapsed = (time.time() - start_time) * 1000
print(f"ATR {size} points x{n_runs} runs: {elapsed:.2f} ms "
f"({elapsed/n_runs:.3f} ms/run)")
print()
def benchmark_full_pipeline():
"""测试完整信号生成流水线"""
print("=" * 60)
print("完整信号生成流水线性能基准测试")
print("=" * 60)
from high_frequency_signal import HighFrequencySignalGenerator
calc = TechnicalFactorCalculator()
generator = HighFrequencySignalGenerator()
monitor = PerformanceMonitor()
# 生成测试数据
n_days = 252 * 5
df = generate_test_data(n_days)
start_time = time.time()
# 模拟逐日处理
signals = []
for i in range(20, len(df)):
data = df.iloc[:i+1]
bar = BarData(
code="TEST",
datetime=datetime.now(),
open=data['open'].iloc[-1],
high=data['high'].iloc[-1],
low=data['low'].iloc[-1],
close=data['close'].iloc[-1],
volume=data['volume'].iloc[-1],
amount=data['volume'].iloc[-1] * data['close'].iloc[-1]
)
monitor.on_process_start()
signal = generator.generate_signal_from_bar(bar, data)
monitor.on_process_end()
if signal:
signals.append(signal)
elapsed = (time.time() - start_time) * 1000
print(f"完整回测 {n_days} 根K线,生成 {len(signals)} 个信号")
print(f"总耗时: {elapsed:.2f} ms")
print(f"平均每根K线: {elapsed/n_days:.3f} ms")
print()
monitor.print_stats()
stats = generator.get_performance_stats()
print(f"\n信号统计: {stats}")
def main():
"""主函数"""
benchmark_technical_calculator()
print()
benchmark_full_pipeline()
print()
print("=" * 60)
print("基准测试完成")
print("=" * 60)
if __name__ == '__main__':
main()