#!/usr/bin/env python3 """ 直接调用RPC服务执行回测 - 不经过HTTP """ import zmq import json from datetime import datetime print("=" * 80) print("🚀 通过RPC执行回测") print("=" * 80) # ZMQ配置 ZMQ_HOST = "127.0.0.1" ZMQ_PORT = 8008 ZMQ_TIMEOUT = 30000 # 30秒 # 读取策略代码 strategy_file = "/Users/chufeng/.openclaw/workspace-guanyu/pangtong-value/research/task-20260329-strategy-backtest/guanyu/single_stock_stop_loss_final_correct.py" with open(strategy_file, 'r', encoding='utf-8') as f: strategy_code = f.read() print(f"✅ 策略代码: {len(strategy_code)} 字符") # 创建请求 request = { "strategy_code": strategy_code, "symbol": "510300.SSE", "interval": "1d", "start": 1609459200, "end": 1772515200, "capital": 1000000, "rate": 3e-5, "slippage": 0.002, "size": 10000, "pricetick": 0.001, "data_source": "sqlite" } print("\n请求配置:") print(f" 标的: {request['symbol']}") print(f" 时间: 2021-01-01 ~ 2026-03-01") print(f" 资金: {request['capital']:,}") print(f" 止损: 15%") # 连接到RPC print(f"\n连接RPC: {ZMQ_HOST}:{ZMQ_PORT}") context = zmq.Context() socket = context.socket(zmq.REQ) socket.setsockopt(zmq.LINGER, 0) # 不等待未发送的消息 socket.connect(f"tcp://{ZMQ_HOST}:{ZMQ_PORT}") # 设置超时 socket.setsockopt(zmq.RCVTIMEO, ZMQ_TIMEOUT) socket.setsockopt(zmq.SNDTIMEO, ZMQ_TIMEOUT) # 发送请求 print("\n发送请求...") request_json = json.dumps(request) socket.send_string(request_json) print("✅ 请求已发送,等待响应...") # 接收响应 try: response_json = socket.recv_string() response = json.loads(response_json) print("✅ 收到响应") if "error" in response: print(f"\n❌ 回测失败: {response['error']}") if "traceback" in response: print("\n错误堆栈:") print(response["traceback"]) else: print("\n" + "=" * 80) print("回测结果:") print("=" * 80) if "statistics" in response: stats = response["statistics"] print(f"\n📊 绩效指标:") print(f" 总收益率: {stats.get('total_return', 0):.2%}") print(f" 年化收益率: {stats.get('annual_return', 0):.2%}") print(f" 最大回撤: {stats.get('max_drawdown', 0):.2%}") print(f" 夏普比率: {stats.get('sharpe_ratio', 0):.2f}") print(f" 卡玛比率: {stats.get('calmar_ratio', 0):.2f}") print(f" 总交易次数: {stats.get('total_trades', 0)}") print(f" 胜率: {stats.get('win_rate', 0):.2%}") print(f" 盈亏比: {stats.get('profit_loss_ratio', 0):.2f}") if "trades" in response: trades = response["trades"] print(f"\n📝 交易记录: 共 {len(trades)} 笔") for idx, trade in enumerate(trades, 1): print(f" {idx}. {trade.get('datetime')} {trade.get('direction')} {trade.get('symbol')} @ {trade.get('price'):.2f} × {trade.get('volume')}") print("\n" + "=" * 80) print("✅ 回测执行完成!") print("=" * 80) except zmq.error.Again: print("❌ 请求超时: RPC服务响应时间过长") except Exception as e: print(f"❌ 接收响应失败: {e}") import traceback traceback.print_exc() finally: socket.close() context.term()