Files
sanguo_vnpy/scripts/backtest-service/api.py
T
2026-04-28 23:41:37 +08:00

93 lines
3.0 KiB
Python
Executable File

"""
自动化回测服务 - API路由
"""
from typing import Optional
from fastapi import APIRouter, Query
from .models import (
BacktestTask,
BacktestTaskWithId,
BacktestResult,
TaskListResponse,
ApiResponse,
HealthCheckResponse,
TaskStatus,
)
from .task_queue import task_queue
from .result_storage import storage
from .executor import executor
router = APIRouter()
@router.post("/submit", summary="提交回测任务")
def submit_task(task: BacktestTask) -> ApiResponse[BacktestTaskWithId]:
"""提交一个新的回测任务"""
task_with_id = task_queue.submit_task(task)
storage.save_task(task_with_id)
return ApiResponse(
code=0,
msg="任务提交成功",
data=task_with_id
)
@router.get("/list", summary="列出回测任务")
def list_tasks(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=100, description="每页数量"),
status: Optional[str] = Query(None, description="状态过滤 pending/running/completed/failed")
) -> ApiResponse[TaskListResponse]:
"""列出回测任务,支持分页和状态过滤"""
result = task_queue.list_tasks(page, page_size, status)
return ApiResponse(
code=0,
msg="success",
data=TaskListResponse(**result)
)
@router.get("/status/{task_id}", summary="查询任务状态")
def get_status(task_id: str) -> ApiResponse[Optional[BacktestTaskWithId]]:
"""查询单个任务状态(从磁盘查找)"""
task = storage.find_task(task_id)
if not task:
return ApiResponse(code=404, msg="任务不存在", data=None)
return ApiResponse(code=0, msg="success", data=task)
@router.get("/result/{task_id}", summary="获取回测结果")
def get_result(task_id: str) -> ApiResponse[Optional[BacktestResult]]:
"""获取回测完整结果(从磁盘查找)"""
result = storage.find_result(task_id)
if not result:
task = storage.find_task(task_id)
if not task:
return ApiResponse(code=404, msg="任务不存在", data=None)
return ApiResponse(code=0, msg="任务尚未完成", data=None)
return ApiResponse(code=0, msg="success", data=result)
@router.delete("/delete/{task_id}", summary="删除回测任务")
def delete_task(task_id: str) -> ApiResponse[None]:
"""删除一个回测任务"""
# TODO: 实现物理删除
# 现在只返回成功,后续实现
return ApiResponse(code=0, msg="删除成功(待实现物理删除)", data=None)
@router.get("/health", summary="健康检查")
def health_check() -> ApiResponse[HealthCheckResponse]:
"""服务健康检查,返回任务统计信息"""
return ApiResponse(
code=0,
msg="ok",
data=HealthCheckResponse(
pending_count=len(task_queue.pending_tasks),
running_count=len(task_queue.running_tasks),
completed_count=len(task_queue.completed_tasks),
failed_count=len(task_queue.failed_tasks),
max_workers=task_queue.max_workers
)
)