#!/usr/bin/env python3 """ 最终修复版 - 包含: 1. vnpy.app兼容性修复 2. 正确的BacktesterEngine初始化(需要传入main_engine和event_engine) 3. 510300.SSE数据已导入 """ import sys import os # ============================================ # 🔥 关键修复1:加载vnpy.app兼容性模块 # 在执行任何策略代码前先创建兼容性层 # ============================================ print("🔧 加载vnpy.app兼容性模块...") # 创建vnpy.app兼容性模块 import types # 创建顶级模块 vnpy_app_module = types.ModuleType('vnpy.app') sys.modules['vnpy.app'] = vnpy_app_module # 创建子模块 submodules = ['cta_strategy', 'cta_backtester', 'data_manager'] for name in submodules: full_name = f'vnpy.app.{name}' submodule = types.ModuleType(full_name) sys.modules[full_name] = submodule setattr(vnpy_app_module, name, submodule) # 从实际模块映射类 from vnpy_ctastrategy import CtaTemplate, CtaStrategyApp sys.modules['vnpy.app.cta_strategy'].CtaTemplate = CtaTemplate sys.modules['vnpy.app.cta_strategy'].CtaStrategyApp = CtaStrategyApp vnpy_app_module.CtaTemplate = CtaTemplate vnpy_app_module.CtaStrategyApp = CtaStrategyApp from vnpy_ctabacktester import BacktesterEngine sys.modules['vnpy.app.cta_backtester'].BacktesterEngine = BacktesterEngine vnpy_app_module.BacktesterEngine = BacktesterEngine print("✅ vnpy.app兼容性模块加载完成!") print(" 现在支持: from vnpy.app.cta_strategy import CtaTemplate") # ============================================ # 兼容性修复完成,继续导入其他模块 # ============================================ from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine import traceback import zmq import time def run_strategy_backtest(strategy_code: str, symbol: str, interval: str, start: int, end: int, **kwargs): """RPC方法:运行策略回测""" try: print(f"开始回测: {symbol} [{start} - {end}]") # 动态加载策略 - 由于兼容性模块已创建,vnpy.app导入会成功 local_vars = {} exec(strategy_code, globals(), local_vars) # 查找CtaTemplate子类 strategy_classes = [ v for k, v in local_vars.items() if isinstance(v, type) and issubclass(v, CtaTemplate) and v != CtaTemplate ] if not strategy_classes: return { "error": "策略代码中未找到CtaTemplate子类", "hint": "请确保策略继承自CtaTemplate" } StrategyClass = strategy_classes[0] print(f"找到策略类: {StrategyClass.__name__}") # 🔥 关键修复2:正确初始化BacktesterEngine # BacktesterEngine.__init__需要传入main_engine和event_engine event_engine = EventEngine() main_engine = MainEngine(event_engine) # 正确初始化:传入main_engine backtester_engine = BacktesterEngine(main_engine) # 设置回测参数 - vn.py 4.x格式 # 将整数日期转换为字符串 start_str = str(start) if len(start_str) == 8: start_str = f"{start_str[:4]}-{start_str[4:6]}-{start_str[6:8]}" end_str = str(end) if len(end_str) == 8: end_str = f"{end_str[:4]}-{end_str[4:6]}-{end_str[6:8]}" setting = { "vt_symbol": symbol, "interval": interval, "start_date": start_str, "end_date": end_str, "rate": kwargs.get("rate", 0.00003), "slippage": kwargs.get("slippage", 0.2), "size": kwargs.get("size", 1), "pricetick": kwargs.get("pricetick", 0.2), "capital": kwargs.get("capital", 1000000.0), } print(f"回测参数: {setting}") # 初始化引擎 backtester_engine.init_engine() # 运行回测 result = backtester_engine.run_backtesting( strategy_class=StrategyClass, setting=setting ) # 获取回测结果统计 statistics = backtester_engine.get_result_statistics() print(f"回测完成,获取统计: {list(statistics.keys()) if statistics else '无'}") # 获取每日结果 daily_df = backtester_engine.get_daily_df() # 转换为可序列化的字典 if daily_df is not None and hasattr(daily_df, 'to_dict'): daily_data = daily_df.to_dict(orient="records") else: daily_data = [] # 获取交易记录 trades = backtester_engine.get_all_trades() return { "statistics": statistics, "trades": [t.__dict__ for t in trades] if trades else [], "daily_data": daily_data } except Exception as e: error_info = { "error": str(e), "traceback": traceback.format_exc() } print(f"回测错误: {error_info['error']}") print(error_info['traceback']) return error_info def main(): """主函数""" print('🚀 启动最终修复版 WebTrader RPC 服务(包含所有修复)') print(' 修复1: vnpy.app兼容性 ✅') print(' 修复2: BacktesterEngine初始化 ✅') print(' 数据: 510300.SSE 3361行 ✅') # 创建ZMQ REP socket context = zmq.Context() rep_socket = context.socket(zmq.REP) # 使用8002端口 bind_addr = "tcp://0.0.0.0:8002" rep_socket.bind(bind_addr) print('✅ RPC服务已启动') print(f' 监听地址: {bind_addr}') print(f' 外部访问: tcp://192.168.2.154:8002') print(' 等待请求...') # 处理请求 while True: try: # 接收请求 req = rep_socket.recv_pyobj() print(f"收到请求: {req.get('function', 'unknown')}") function_name = req.get("function") args = req.get("args", []) kwargs = req.get("kwargs", {}) if function_name == "run_strategy_backtest": result = run_strategy_backtest(*args, **kwargs) else: result = {"error": f"未知函数: {function_name}"} # 发送响应 rep_socket.send_pyobj(result) print(f"请求处理完成") except Exception as e: error_result = { "error": str(e), "traceback": traceback.format_exc() } rep_socket.send_pyobj(error_result) print(f"处理请求时出错: {e}") if __name__ == '__main__': main()