Files
sanguo_vnpy/scripts/backtest-service/result_storage.py
T
2026-04-28 23:39:46 +08:00

102 lines
3.5 KiB
Python
Executable File

"""
自动化回测服务 - 结果存储
"""
import json
import os
from datetime import date, datetime
from typing import Optional
from .models import BacktestTaskWithId, BacktestResult
from .config import settings
def _json_serial(obj):
"""JSON序列化辅助:处理date/datetime"""
if isinstance(obj, (date, datetime)):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
class ResultStorage:
"""结果存储管理器"""
def __init__(self):
self.base_dir = settings.base_dir
self._ensure_dirs()
def _ensure_dirs(self):
"""确保目录结构存在"""
for status_dir in ["pending", "running", "completed", "failed"]:
path = os.path.join(self.base_dir, status_dir)
os.makedirs(path, exist_ok=True)
def _task_dir(self, task_id: str, status: str) -> str:
"""获取任务目录"""
return os.path.join(self.base_dir, status, task_id)
def save_task(self, task: BacktestTaskWithId) -> None:
"""保存任务信息"""
task_dir = self._task_dir(task.task_id, task.status)
os.makedirs(task_dir, exist_ok=True)
info_file = os.path.join(task_dir, "task.json")
with open(info_file, "w", encoding="utf-8") as f:
json.dump(task.model_dump(), f, indent=2, ensure_ascii=False, default=_json_serial)
def load_task(self, task_id: str, status: str) -> Optional[BacktestTaskWithId]:
"""加载任务信息"""
task_dir = self._task_dir(task_id, status)
info_file = os.path.join(task_dir, "task.json")
if not os.path.exists(info_file):
return None
with open(info_file, "r", encoding="utf-8") as f:
data = json.load(f)
return BacktestTaskWithId(**data)
def save_result(self, result: BacktestResult) -> None:
"""保存回测结果"""
task_dir = self._task_dir(result.task_id, result.status)
os.makedirs(task_dir, exist_ok=True)
result_file = os.path.join(task_dir, "result.json")
with open(result_file, "w", encoding="utf-8") as f:
json.dump(result.model_dump(), f, indent=2, ensure_ascii=False, default=_json_serial)
def load_result(self, task_id: str, status: str) -> Optional[BacktestResult]:
"""加载回测结果"""
task_dir = self._task_dir(task_id, status)
result_file = os.path.join(task_dir, "result.json")
if not os.path.exists(result_file):
return None
with open(result_file, "r", encoding="utf-8") as f:
data = json.load(f)
return BacktestResult(**data)
def find_task(self, task_id: str) -> Optional[BacktestTaskWithId]:
"""在所有状态目录中查找任务"""
for status_dir in ["running", "failed", "completed", "pending"]:
task = self.load_task(task_id, status_dir)
if task:
return task
return None
def find_result(self, task_id: str) -> Optional[BacktestResult]:
"""在所有状态目录中查找结果"""
for status_dir in ["failed", "completed", "running", "pending"]:
result = self.load_result(task_id, status_dir)
if result:
return result
return None
def get_task_path(self, task_id: str, status: str, filename: str) -> str:
"""获取任务文件路径"""
return os.path.join(self._task_dir(task_id, status), filename)
storage = ResultStorage()