78 lines
2.6 KiB
Python
Executable File
78 lines
2.6 KiB
Python
Executable File
"""
|
|
自动化回测服务 - 结果存储
|
|
"""
|
|
import json
|
|
import os
|
|
from typing import Optional
|
|
from .models import BacktestTaskWithId, BacktestResult
|
|
from .config import settings
|
|
|
|
|
|
class ResultStorage:
|
|
"""结果存储管理器"""
|
|
|
|
def __init__(self):
|
|
self.base_dir = settings.base_dir
|
|
self._ensure_dirs()
|
|
|
|
def _ensure_dirs(self):
|
|
"""确保目录结构存在"""
|
|
for status_dir in ["pending", "running", "completed", "failed"]:
|
|
path = os.path.join(self.base_dir, status_dir)
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
def _task_dir(self, task_id: str, status: str) -> str:
|
|
"""获取任务目录"""
|
|
return os.path.join(self.base_dir, status, task_id)
|
|
|
|
def save_task(self, task: BacktestTaskWithId) -> None:
|
|
"""保存任务信息"""
|
|
task_dir = self._task_dir(task.task_id, task.status)
|
|
os.makedirs(task_dir, exist_ok=True)
|
|
|
|
info_file = os.path.join(task_dir, "task.json")
|
|
with open(info_file, "w", encoding="utf-8") as f:
|
|
json.dump(task.model_dump(), f, indent=2, ensure_ascii=False)
|
|
|
|
def load_task(self, task_id: str, status: str) -> Optional[BacktestTaskWithId]:
|
|
"""加载任务信息"""
|
|
task_dir = self._task_dir(task_id, status)
|
|
info_file = os.path.join(task_dir, "task.json")
|
|
|
|
if not os.path.exists(info_file):
|
|
return None
|
|
|
|
with open(info_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
return BacktestTaskWithId(**data)
|
|
|
|
def save_result(self, result: BacktestResult) -> None:
|
|
"""保存回测结果"""
|
|
task_dir = self._task_dir(result.task_id, result.status)
|
|
os.makedirs(task_dir, exist_ok=True)
|
|
|
|
result_file = os.path.join(task_dir, "result.json")
|
|
with open(result_file, "w", encoding="utf-8") as f:
|
|
json.dump(result.model_dump(), f, indent=2, ensure_ascii=False)
|
|
|
|
def load_result(self, task_id: str, status: str) -> Optional[BacktestResult]:
|
|
"""加载回测结果"""
|
|
task_dir = self._task_dir(task_id, status)
|
|
result_file = os.path.join(task_dir, "result.json")
|
|
|
|
if not os.path.exists(result_file):
|
|
return None
|
|
|
|
with open(result_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
return BacktestResult(**data)
|
|
|
|
def get_task_path(self, task_id: str, status: str, filename: str) -> str:
|
|
"""获取任务文件路径"""
|
|
return os.path.join(self._task_dir(task_id, status), filename)
|
|
|
|
|
|
storage = ResultStorage()
|