""" 高频算法性能基准测试 测试不同实现方式的性能 """ import time import numpy as np import pandas as pd from datetime import datetime from high_frequency_signal import ( TechnicalFactorCalculator, PerformanceMonitor, BarData, ) def generate_test_data(n: int = 10000) -> pd.DataFrame: """生成随机测试数据""" np.random.seed(42) # 生成随机游走价格 returns = np.random.normal(0.0001, 0.01, n) price = 100 * np.exp(np.cumsum(returns)) # 生成OHLCV high = price * (1 + np.random.uniform(0, 0.01, n)) low = price * (1 - np.random.uniform(0, 0.01, n)) open_ = price * (1 + np.random.normal(0, 0.005, n)) volume = np.random.randint(100000, 10000000, n) df = pd.DataFrame({ 'open': open_, 'high': high, 'low': low, 'close': price, 'volume': volume, }) return df def benchmark_technical_calculator(): """测试技术因子计算器性能""" print("=" * 60) print("技术因子计算器性能基准测试") print("=" * 60) calc = TechnicalFactorCalculator() monitor = PerformanceMonitor() # 测试不同数据大小 for size in [100, 1000, 10000]: df = generate_test_data(size) close = df['close'].values high = df['high'].values low = df['low'].values volume = df['volume'].values n_runs = 1000 if size <= 1000 else 100 # 测试MA start_time = time.time() for _ in range(n_runs): _ = calc.calculate_ma(close, 20) elapsed = (time.time() - start_time) * 1000 print(f"MA {size} points x{n_runs} runs: {elapsed:.2f} ms " f"({elapsed/n_runs:.3f} ms/run)") # 测试MACD start_time = time.time() for _ in range(n_runs): _ = calc.calculate_macd(close, 12, 26, 9) elapsed = (time.time() - start_time) * 1000 print(f"MACD {size} points x{n_runs} runs: {elapsed:.2f} ms " f"({elapsed/n_runs:.3f} ms/run)") # 测试RSI start_time = time.time() for _ in range(n_runs): _ = calc.calculate_rsi(close, 14) elapsed = (time.time() - start_time) * 1000 print(f"RSI {size} points x{n_runs} runs: {elapsed:.2f} ms " f"({elapsed/n_runs:.3f} ms/run)") # 测试布林带 start_time = time.time() for _ in range(n_runs): _ = calc.calculate_bollinger_bands(close, 20, 2.0) elapsed = (time.time() - start_time) * 1000 print(f"Bollinger {size} points x{n_runs} runs: {elapsed:.2f} ms " f"({elapsed/n_runs:.3f} ms/run)") # 测试ATR start_time = time.time() for _ in range(n_runs): _ = calc.calculate_atr(high, low, close, 14) elapsed = (time.time() - start_time) * 1000 print(f"ATR {size} points x{n_runs} runs: {elapsed:.2f} ms " f"({elapsed/n_runs:.3f} ms/run)") print() def benchmark_full_pipeline(): """测试完整信号生成流水线""" print("=" * 60) print("完整信号生成流水线性能基准测试") print("=" * 60) from high_frequency_signal import HighFrequencySignalGenerator calc = TechnicalFactorCalculator() generator = HighFrequencySignalGenerator() monitor = PerformanceMonitor() # 生成测试数据 n_days = 252 * 5 df = generate_test_data(n_days) start_time = time.time() # 模拟逐日处理 signals = [] for i in range(20, len(df)): data = df.iloc[:i+1] bar = BarData( code="TEST", datetime=datetime.now(), open=data['open'].iloc[-1], high=data['high'].iloc[-1], low=data['low'].iloc[-1], close=data['close'].iloc[-1], volume=data['volume'].iloc[-1], amount=data['volume'].iloc[-1] * data['close'].iloc[-1] ) monitor.on_process_start() signal = generator.generate_signal_from_bar(bar, data) monitor.on_process_end() if signal: signals.append(signal) elapsed = (time.time() - start_time) * 1000 print(f"完整回测 {n_days} 根K线,生成 {len(signals)} 个信号") print(f"总耗时: {elapsed:.2f} ms") print(f"平均每根K线: {elapsed/n_days:.3f} ms") print() monitor.print_stats() stats = generator.get_performance_stats() print(f"\n信号统计: {stats}") def main(): """主函数""" benchmark_technical_calculator() print() benchmark_full_pipeline() print() print("=" * 60) print("基准测试完成") print("=" * 60) if __name__ == '__main__': main()