initial-import: 2026-04-11 21:18:55
This commit is contained in:
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
验证司马懿将军的 vnpy.app 问题是否已解决
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
def test_vnpy_installation():
|
||||
"""测试 vnpy 安装"""
|
||||
print("1. 测试 vnpy 安装...")
|
||||
|
||||
# 测试1: 检查 vnpy 版本
|
||||
cmd1 = "ssh admin@192.168.2.154 \"export PATH=\\\$PATH:/var/packages/Docker/target/usr/bin && docker exec sanguo_vnpy python3 -c \\\"import vnpy; print('版本:', vnpy.__version__ if hasattr(vnpy, '__version__') else '未知')\\\"\""
|
||||
|
||||
print(f" 执行: python -c \"import vnpy; print(vnpy.__version__)\"")
|
||||
result = subprocess.run(cmd1, shell=True, capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
print(f" ✅ {result.stdout.strip()}")
|
||||
else:
|
||||
print(f" ❌ 失败: {result.stderr.strip()}")
|
||||
|
||||
# 测试2: 检查 vnpy.app.cta_strategy 导入
|
||||
cmd2 = "ssh admin@192.168.2.154 \"export PATH=\\\$PATH:/var/packages/Docker/target/usr/bin && docker exec sanguo_vnpy python3 -c \\\"from vnpy.app.cta_strategy import CtaTemplate; print('导入成功')\\\"\""
|
||||
|
||||
print(f"\n2. 测试: from vnpy.app.cta_strategy import CtaTemplate")
|
||||
result = subprocess.run(cmd2, shell=True, capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
print(f" ✅ {result.stdout.strip()}")
|
||||
return True
|
||||
else:
|
||||
print(f" ❌ 失败: {result.stderr.strip()}")
|
||||
|
||||
# 测试备用方案: 使用 vnpy_ctastrategy
|
||||
cmd3 = "ssh admin@192.168.2.154 \"export PATH=\\\$PATH:/var/packages/Docker/target/usr/bin && docker exec sanguo_vnpy python3 -c \\\"from vnpy_ctastrategy import CtaTemplate; print('备用导入成功')\\\"\""
|
||||
|
||||
print(f"\n3. 测试备用方案: from vnpy_ctastrategy import CtaTemplate")
|
||||
result = subprocess.run(cmd3, shell=True, capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
print(f" ✅ {result.stdout.strip()}")
|
||||
print(" 💡 建议: 将代码中的 'vnpy.app.cta_strategy' 改为 'vnpy_ctastrategy'")
|
||||
return True
|
||||
else:
|
||||
print(f" ❌ 备用方案也失败: {result.stderr.strip()}")
|
||||
return False
|
||||
|
||||
def test_api_service():
|
||||
"""测试 API 服务"""
|
||||
print("\n4. 测试 API 服务...")
|
||||
|
||||
import requests
|
||||
try:
|
||||
# 测试 API 文档
|
||||
response = requests.get("http://192.168.2.154:8088/docs", timeout=5)
|
||||
if response.status_code == 200:
|
||||
print(" ✅ API 文档可访问")
|
||||
else:
|
||||
print(f" ❌ API 文档不可访问: {response.status_code}")
|
||||
|
||||
# 测试回测 API
|
||||
url = "http://192.168.2.154:8088/api/backtest/run"
|
||||
|
||||
# 使用兼容性导入的策略
|
||||
strategy_code = '''
|
||||
# 使用 vnpy.app.cta_strategy 导入
|
||||
from vnpy.app.cta_strategy import CtaTemplate
|
||||
|
||||
class SimayiTestStrategy(CtaTemplate):
|
||||
author = "司马懿测试"
|
||||
|
||||
def on_init(self):
|
||||
self.write_log("✅ 使用 vnpy.app.cta_strategy 导入成功")
|
||||
|
||||
def on_bar(self, bar):
|
||||
self.write_log(f"收到K线: {bar.datetime}")
|
||||
'''
|
||||
|
||||
payload = {
|
||||
"strategy_code": strategy_code,
|
||||
"symbol": "rb8888.SHFE",
|
||||
"start": 20240101,
|
||||
"end": 20240101,
|
||||
"capital": 100000,
|
||||
}
|
||||
|
||||
response = requests.post(url, json=payload, timeout=10)
|
||||
print(f" 回测API响应: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
print(f" ✅ 回测成功: {result.get('msg')}")
|
||||
return True
|
||||
else:
|
||||
print(f" ❌ 回测失败: {response.text[:200]}")
|
||||
return False
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
print(" ❌ API 请求超时")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f" ❌ 其他错误: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
print("🚀 验证司马懿将军的 vnpy.app 问题修复")
|
||||
print("="*60)
|
||||
|
||||
# 测试 vnpy 安装
|
||||
vnpy_ok = test_vnpy_installation()
|
||||
|
||||
# 测试 API 服务
|
||||
api_ok = test_api_service()
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("验证结果:")
|
||||
print(f" vnpy 安装: {'✅ 通过' if vnpy_ok else '❌ 失败'}")
|
||||
print(f" API 服务: {'✅ 通过' if api_ok else '❌ 失败'}")
|
||||
|
||||
if vnpy_ok and api_ok:
|
||||
print("\n🎉 所有问题已修复!")
|
||||
print("请通知司马懿将军:")
|
||||
print("1. vnpy.app.cta_strategy 导入问题已解决")
|
||||
print("2. 回测API可以正常使用")
|
||||
print("3. 可以运行测试脚本了")
|
||||
else:
|
||||
print("\n⚠️ 仍有问题需要修复")
|
||||
print("请检查:")
|
||||
print("1. Docker容器状态")
|
||||
print("2. vn.py安装情况")
|
||||
print("3. 服务启动日志")
|
||||
|
||||
print("="*60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user