""" 自动化回测服务 - 任务队列 简单后台线程调度:submit后自动触发执行,同一时间只跑一个回测 """ import os import uuid import threading import time import traceback from datetime import datetime from typing import List, Optional, Dict from .config import settings from .models import TaskStatus, BacktestTask, BacktestTaskWithId from .result_storage import storage class TaskQueue: """任务队列管理器 - 后台线程调度""" def __init__(self): self.max_workers = settings.max_workers self.pending_tasks: List[str] = [] self.running_tasks: List[str] = [] self.completed_tasks: List[str] = [] self.failed_tasks: List[str] = [] self._worker_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() def _generate_task_id(self) -> str: return str(uuid.uuid4()).replace("-", "") def submit_task(self, task: BacktestTask) -> BacktestTaskWithId: """提交新任务到队列""" task_id = self._generate_task_id() now = datetime.now().isoformat() task_with_id = BacktestTaskWithId( task_id=task_id, status=TaskStatus.PENDING, created_at=now, **task.model_dump() ) storage.save_task(task_with_id) self.pending_tasks.append(task_id) return task_with_id def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict: if status == "pending": task_ids = self.pending_tasks elif status == "running": task_ids = self.running_tasks elif status == "completed": task_ids = self.completed_tasks elif status == "failed": task_ids = self.failed_tasks else: task_ids = ( self.pending_tasks + self.running_tasks + self.completed_tasks + self.failed_tasks ) total = len(task_ids) start = (page - 1) * page_size end = start + page_size result = [] for task_id in task_ids[start:end]: for status_dir in ["pending", "running", "completed", "failed"]: task = storage.load_task(task_id, status_dir) if task: result.append(task) break return { "total": total, "page": page, "page_size": page_size, "tasks": result } def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]: for status_dir in ["pending", "running", "completed", "failed"]: task = storage.load_task(task_id, status_dir) if task: return task return None def _worker_loop(self): """后台工作线程:循环检查pending任务并执行""" from .executor import executor while not self._stop_event.is_set(): # 检查是否有pending任务且当前无running任务 if self.pending_tasks and not self.running_tasks: task_id = self.pending_tasks.pop(0) self.running_tasks.append(task_id) # 在后台线程中执行回测 try: task = storage.load_task(task_id, "pending") if task: # 移动到running目录 storage.save_task(task) result = executor.execute_backtest(task) # 从running移到completed/failed self.running_tasks.remove(task_id) if result.status == TaskStatus.COMPLETED: self.completed_tasks.append(task_id) else: self.failed_tasks.append(task_id) except Exception as e: print(f"任务执行异常: {e}\n{traceback.format_exc()}") if task_id in self.running_tasks: self.running_tasks.remove(task_id) self.failed_tasks.append(task_id) # 等待1秒再检查 self._stop_event.wait(1.0) def start_worker_pool(self): """启动后台工作线程""" if self._worker_thread is None or not self._worker_thread.is_alive(): self._stop_event.clear() self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self._worker_thread.start() print(f"工作线程已启动 (max_workers={self.max_workers})") def close_worker_pool(self): """停止工作线程""" self._stop_event.set() if self._worker_thread and self._worker_thread.is_alive(): self._worker_thread.join(timeout=5) self._worker_thread = None task_queue = TaskQueue()