Files
sanguo_vnpy/scripts/backtest-service/task_queue.py
T
2026-04-28 18:11:28 +08:00

142 lines
4.8 KiB
Python
Executable File

"""
自动化回测服务 - 任务队列
简单后台线程调度:submit后自动触发执行,同一时间只跑一个回测
"""
import os
import uuid
import threading
import time
import traceback
from datetime import datetime
from typing import List, Optional, Dict
from .config import settings
from .models import TaskStatus, BacktestTask, BacktestTaskWithId
from .result_storage import storage
class TaskQueue:
"""任务队列管理器 - 后台线程调度"""
def __init__(self):
self.max_workers = settings.max_workers
self.pending_tasks: List[str] = []
self.running_tasks: List[str] = []
self.completed_tasks: List[str] = []
self.failed_tasks: List[str] = []
self._worker_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
def _generate_task_id(self) -> str:
return str(uuid.uuid4()).replace("-", "")
def submit_task(self, task: BacktestTask) -> BacktestTaskWithId:
"""提交新任务到队列"""
task_id = self._generate_task_id()
now = datetime.now().isoformat()
task_with_id = BacktestTaskWithId(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=now,
**task.model_dump()
)
storage.save_task(task_with_id)
self.pending_tasks.append(task_id)
return task_with_id
def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict:
if status == "pending":
task_ids = self.pending_tasks
elif status == "running":
task_ids = self.running_tasks
elif status == "completed":
task_ids = self.completed_tasks
elif status == "failed":
task_ids = self.failed_tasks
else:
task_ids = (
self.pending_tasks +
self.running_tasks +
self.completed_tasks +
self.failed_tasks
)
total = len(task_ids)
start = (page - 1) * page_size
end = start + page_size
result = []
for task_id in task_ids[start:end]:
for status_dir in ["pending", "running", "completed", "failed"]:
task = storage.load_task(task_id, status_dir)
if task:
result.append(task)
break
return {
"total": total,
"page": page,
"page_size": page_size,
"tasks": result
}
def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]:
for status_dir in ["pending", "running", "completed", "failed"]:
task = storage.load_task(task_id, status_dir)
if task:
return task
return None
def _worker_loop(self):
"""后台工作线程:循环检查pending任务并执行"""
from .executor import executor
while not self._stop_event.is_set():
# 检查是否有pending任务且当前无running任务
if self.pending_tasks and not self.running_tasks:
task_id = self.pending_tasks.pop(0)
self.running_tasks.append(task_id)
# 在后台线程中执行回测
try:
task = storage.load_task(task_id, "pending")
if task:
# 移动到running目录
storage.save_task(task)
result = executor.execute_backtest(task)
# 从running移到completed/failed
self.running_tasks.remove(task_id)
if result.status == TaskStatus.COMPLETED:
self.completed_tasks.append(task_id)
else:
self.failed_tasks.append(task_id)
except Exception as e:
print(f"任务执行异常: {e}\n{traceback.format_exc()}")
if task_id in self.running_tasks:
self.running_tasks.remove(task_id)
self.failed_tasks.append(task_id)
# 等待1秒再检查
self._stop_event.wait(1.0)
def start_worker_pool(self):
"""启动后台工作线程"""
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_event.clear()
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker_thread.start()
print(f"工作线程已启动 (max_workers={self.max_workers})")
def close_worker_pool(self):
"""停止工作线程"""
self._stop_event.set()
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=5)
self._worker_thread = None
task_queue = TaskQueue()