Files
2026-04-29 20:15:25 +08:00

59 lines
1.7 KiB
Python

#!/usr/bin/env python3
"""
在Docker容器内直接运行回测,不经过HTTP API
"""
# 读取远程策略代码
import os
import sys
# 读取策略文件路径
strategy_files = [
"/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/strategies/factors-dynamic-weight-timing-20260327/risk_control.py",
"/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/strategies/factors-dynamic-weight-timing-20260327/main_strategy_single_file.py",
]
# 读取并合并代码
full_code = ""
for f in strategy_files:
print(f"读取: {f}")
with open(f, 'r', encoding='utf-8') as file:
code = file.read()
full_code += code + "\n\n# " + "=" * 80 + "\n\n"
# 执行策略代码,定义策略类
print("=" * 80)
print("执行策略代码,定义策略类...")
print("=" * 80)
exec(full_code, globals())
# 使用RPC服务端的方式运行回测
print("\n" + "=" * 80)
print("导入回测引擎...")
print("=" * 80)
# 从final_rpc_correct.py复制核心代码
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctabacktester import BacktesterEngine
from vnpy.trader.constant import Exchange, Interval, Direction, Offset
from datetime import datetime
def str_to_interval(interval_str: str):
"""字符串转Interval枚举"""
mapping = {
"1m": Interval.MINUTE,
"min": Interval.MINUTE,
"hour": Interval.HOUR,
"1h": Interval.HOUR,
"d": Interval.DAILY,
"1d": Interval.DAILY,
"daily": Interval.DAILY,
"w": Interval.WEEKLY,
"1w": Interval.WEEKLY,
"weekly": Interval.WEEKLY,
}
return mapping.get(interval_str.lower(), Interval.DAILY)
def parse