diff --git a/logs/auto-sync.log b/logs/auto-sync.log index f4a0b18c..98df58c0 100644 --- a/logs/auto-sync.log +++ b/logs/auto-sync.log @@ -5486,5 +5486,6 @@ + diff --git a/scripts/backtest-service/task_queue.py b/scripts/backtest-service/task_queue.py index 54fa94e3..2e91800d 100755 --- a/scripts/backtest-service/task_queue.py +++ b/scripts/backtest-service/task_queue.py @@ -1,49 +1,52 @@ """ 自动化回测服务 - 任务队列 +简单后台线程调度:submit后自动触发执行,同一时间只跑一个回测 """ import os import uuid +import threading +import time +import traceback from datetime import datetime from typing import List, Optional, Dict -from multiprocessing import Pool + from .config import settings from .models import TaskStatus, BacktestTask, BacktestTaskWithId from .result_storage import storage class TaskQueue: - """任务队列管理器""" - + """任务队列管理器 - 后台线程调度""" + def __init__(self): self.max_workers = settings.max_workers self.pending_tasks: List[str] = [] self.running_tasks: List[str] = [] self.completed_tasks: List[str] = [] self.failed_tasks: List[str] = [] - self._pool: Optional[Pool] = None - + self._worker_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + def _generate_task_id(self) -> str: - """生成唯一任务ID""" return str(uuid.uuid4()).replace("-", "") - + def submit_task(self, task: BacktestTask) -> BacktestTaskWithId: """提交新任务到队列""" task_id = self._generate_task_id() now = datetime.now().isoformat() - + task_with_id = BacktestTaskWithId( task_id=task_id, status=TaskStatus.PENDING, created_at=now, **task.model_dump() ) - + storage.save_task(task_with_id) self.pending_tasks.append(task_id) return task_with_id - + def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict: - """列出任务,支持分页和状态过滤""" if status == "pending": task_ids = self.pending_tasks elif status == "running": @@ -54,63 +57,85 @@ class TaskQueue: task_ids = self.failed_tasks else: task_ids = ( - self.pending_tasks + - self.running_tasks + - self.completed_tasks + + self.pending_tasks + + self.running_tasks + + self.completed_tasks + self.failed_tasks ) - + total = len(task_ids) start = (page - 1) * page_size end = start + page_size - + result = [] for task_id in task_ids[start:end]: - # 根据状态找任务 - if task_id in self.pending_tasks: - task = storage.load_task(task_id, "pending") - elif task_id in self.running_tasks: - task = storage.load_task(task_id, "running") - elif task_id in self.completed_tasks: - task = storage.load_task(task_id, "completed") - else: - task = storage.load_task(task_id, "failed") - - if task: - result.append(task) - + for status_dir in ["pending", "running", "completed", "failed"]: + task = storage.load_task(task_id, status_dir) + if task: + result.append(task) + break + return { "total": total, "page": page, "page_size": page_size, "tasks": result } - + def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]: - """根据ID获取任务""" - # 在各个状态查找 - for status_dir, task_list in [ - ("pending", self.pending_tasks), - ("running", self.running_tasks), - ("completed", self.completed_tasks), - ("failed", self.failed_tasks), - ]: - if task_id in task_list: - return storage.load_task(task_id, status_dir) - + for status_dir in ["pending", "running", "completed", "failed"]: + task = storage.load_task(task_id, status_dir) + if task: + return task return None - + + def _worker_loop(self): + """后台工作线程:循环检查pending任务并执行""" + from .executor import executor + + while not self._stop_event.is_set(): + # 检查是否有pending任务且当前无running任务 + if self.pending_tasks and not self.running_tasks: + task_id = self.pending_tasks.pop(0) + self.running_tasks.append(task_id) + + # 在后台线程中执行回测 + try: + task = storage.load_task(task_id, "pending") + if task: + # 移动到running目录 + storage.save_task(task) + result = executor.execute_backtest(task) + + # 从running移到completed/failed + self.running_tasks.remove(task_id) + if result.status == TaskStatus.COMPLETED: + self.completed_tasks.append(task_id) + else: + self.failed_tasks.append(task_id) + except Exception as e: + print(f"任务执行异常: {e}\n{traceback.format_exc()}") + if task_id in self.running_tasks: + self.running_tasks.remove(task_id) + self.failed_tasks.append(task_id) + + # 等待1秒再检查 + self._stop_event.wait(1.0) + def start_worker_pool(self): - """启动工作进程池""" - if self._pool is None: - self._pool = Pool(processes=self.max_workers) - + """启动后台工作线程""" + if self._worker_thread is None or not self._worker_thread.is_alive(): + self._stop_event.clear() + self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) + self._worker_thread.start() + print(f"工作线程已启动 (max_workers={self.max_workers})") + def close_worker_pool(self): - """关闭工作进程池""" - if self._pool is not None: - self._pool.close() - self._pool.join() - self._pool = None + """停止工作线程""" + self._stop_event.set() + if self._worker_thread and self._worker_thread.is_alive(): + self._worker_thread.join(timeout=5) + self._worker_thread = None task_queue = TaskQueue()