#!/usr/bin/env python3 """ 运行司马懿将军的测试脚本 """ import os import sys import subprocess def check_simayi_scripts(): """检查司马懿将军的测试脚本""" print("📁 检查司马懿将军的测试脚本...") task_dir = "pangtong-value/research/task-20260329-strategy-backtest/simayi" if os.path.exists(task_dir): print(f"✅ 任务目录存在: {task_dir}") files = os.listdir(task_dir) print(f" 目录包含 {len(files)} 个文件:") for file in files: print(f" - {file}") # 查找测试脚本 test_scripts = [f for f in files if 'backtest' in f or 'test' in f] if test_scripts: print(f"\n📄 找到测试脚本:") for script in test_scripts: script_path = os.path.join(task_dir, script) print(f" {script_path}") # 检查脚本内容 try: with open(script_path, 'r') as f: first_lines = f.readlines()[:5] print(f" 前5行:") for line in first_lines: print(f" {line.rstrip()}") except Exception as e: print(f" 无法读取: {e}") else: print("\n❌ 未找到测试脚本") else: print(f"❌ 任务目录不存在: {task_dir}") print(" 请检查路径或创建目录") create_simayi_test_script() def create_simayi_test_script(): """创建司马懿将军的测试脚本""" print("\n📝 创建测试脚本...") test_code = '''#!/usr/bin/env python3 """ 司马懿将军的回测工作流测试脚本 验证回测API修复后状态 """ import requests import json import time def test_backtest_api(): """测试回测API""" print("🚀 开始测试回测API...") # 测试1: 健康检查 print("1. 测试API健康检查...") try: response = requests.get("http://192.168.2.154:8088/health", timeout=5) print(f" 状态码: {response.status_code}") print(f" 响应: {response.text[:100]}") except Exception as e: print(f" 无法访问: {e}") # 测试2: Swagger UI print("\\n2. 测试Swagger UI...") try: response = requests.get("http://192.168.2.154:8088/docs", timeout=5) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(" ✅ Swagger UI可访问") else: print(f" ❌ 不可访问: {response.status_code}") except Exception as e: print(f" 无法访问: {e}") # 测试3: 回测API print("\\n3. 测试回测功能...") url = "http://192.168.2.154:8088/api/backtest/run" # 简单测试策略 simple_strategy = ''' from vnpy_ctastrategy import CtaTemplate class TestStrategy(CtaTemplate): author = "司马懿测试" def on_init(self): self.write_log("✅ 测试策略初始化完成") def on_bar(self, bar): self.write_log(f"收到K线: {bar.datetime}") ''' payload = { "strategy_code": simple_strategy, "symbol": "rb8888.SHFE", "start": 20240101, "end": 20240101, "capital": 100000, } try: start_time = time.time() response = requests.post(url, json=payload, timeout=10) elapsed = time.time() - start_time print(f" 响应时间: {elapsed:.2f}秒") print(f" 状态码: {response.status_code}") if response.status_code == 200: result = response.json() print(f" ✅ 回测成功!") print(f" 消息: {result.get('msg')}") print(f" 返回码: {result.get('code')}") if result.get('data'): data = result['data'] print(f" 数据包含: {list(data.keys())}") else: print(f" ❌ 回测失败: {response.text[:200]}") except requests.exceptions.Timeout: print(" ❌ 请求超时") except Exception as e: print(f" ❌ 其他错误: {e}") def test_environment(): """测试Python环境""" print("\\n🐍 测试Python环境...") import importlib modules_to_test = [ 'vnpy', 'vnpy.trader', 'vnpy.event', 'vnpy_ctastrategy', 'vnpy_ctabacktester', 'zmq', 'fastapi', ] for module in modules_to_test: try: importlib.import_module(module) print(f" ✅ {module}") except ImportError: print(f" ❌ {module}") if __name__ == "__main__": print("="*60) print("司马懿将军 - 回测API测试脚本") print("="*60) # 测试环境 test_environment() # 测试回测API test_backtest_api() print("\\n" + "="*60) print("测试完成") print("="*60) ''' # 创建目录和脚本 task_dir = "pangtong-value/research/task-20260329-strategy-backtest/simayi" os.makedirs(task_dir, exist_ok=True) script_path = os.path.join(task_dir, "backtest_workflow.py") with open(script_path, 'w', encoding='utf-8') as f: f.write(test_code) print(f"✅ 已创建测试脚本: {script_path}") print("\n使用方法:") print(f" cd {task_dir}") print(" python3 backtest_workflow.py") def main(): print("🚀 启动司马懿将军测试脚本...") # 检查现有脚本 check_simayi_scripts() # 如果没有脚本,则创建 task_dir = "pangtong-value/research/task-20260329-strategy-backtest/simayi" test_script = os.path.join(task_dir, "backtest_workflow.py") if not os.path.exists(test_script): create_simayi_test_script() print("\n📋 已创建测试脚本") print(f" 位置: {test_script}") print(" 运行: python3 backtest_workflow.py") else: print(f"\n✅ 测试脚本已存在: {test_script}") print(" 运行: python3 backtest_workflow.py") print("\n============================================================") print("测试脚本准备完成,请通知司马懿将军运行测试") print("============================================================") if __name__ == "__main__": main()