#!/usr/bin/env python3 """ 在Docker容器内直接运行回测,不经过HTTP API """ # 读取远程策略代码 import os import sys # 读取策略文件路径 strategy_files = [ "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/strategies/factors-dynamic-weight-timing-20260327/risk_control.py", "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/strategies/factors-dynamic-weight-timing-20260327/main_strategy_single_file.py", ] # 读取并合并代码 full_code = "" for f in strategy_files: print(f"读取: {f}") with open(f, 'r', encoding='utf-8') as file: code = file.read() full_code += code + "\n\n# " + "=" * 80 + "\n\n" # 执行策略代码,定义策略类 print("=" * 80) print("执行策略代码,定义策略类...") print("=" * 80) exec(full_code, globals()) # 使用RPC服务端的方式运行回测 print("\n" + "=" * 80) print("导入回测引擎...") print("=" * 80) # 从final_rpc_correct.py复制核心代码 from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy_ctabacktester import BacktesterEngine from vnpy.trader.constant import Exchange, Interval, Direction, Offset from datetime import datetime def str_to_interval(interval_str: str): """字符串转Interval枚举""" mapping = { "1m": Interval.MINUTE, "min": Interval.MINUTE, "hour": Interval.HOUR, "1h": Interval.HOUR, "d": Interval.DAILY, "1d": Interval.DAILY, "daily": Interval.DAILY, "w": Interval.WEEKLY, "1w": Interval.WEEKLY, "weekly": Interval.WEEKLY, } return mapping.get(interval_str.lower(), Interval.DAILY) def parse