diff --git a/logs/auto-sync.log b/logs/auto-sync.log index 749c787c..3acafb90 100644 --- a/logs/auto-sync.log +++ b/logs/auto-sync.log @@ -165,5 +165,6 @@ + diff --git a/scripts/backtest-service/task_queue.py b/scripts/backtest-service/task_queue.py new file mode 100644 index 00000000..54fa94e3 --- /dev/null +++ b/scripts/backtest-service/task_queue.py @@ -0,0 +1,116 @@ +""" +自动化回测服务 - 任务队列 +""" +import os +import uuid +from datetime import datetime +from typing import List, Optional, Dict +from multiprocessing import Pool +from .config import settings +from .models import TaskStatus, BacktestTask, BacktestTaskWithId +from .result_storage import storage + + +class TaskQueue: + """任务队列管理器""" + + def __init__(self): + self.max_workers = settings.max_workers + self.pending_tasks: List[str] = [] + self.running_tasks: List[str] = [] + self.completed_tasks: List[str] = [] + self.failed_tasks: List[str] = [] + self._pool: Optional[Pool] = None + + def _generate_task_id(self) -> str: + """生成唯一任务ID""" + return str(uuid.uuid4()).replace("-", "") + + def submit_task(self, task: BacktestTask) -> BacktestTaskWithId: + """提交新任务到队列""" + task_id = self._generate_task_id() + now = datetime.now().isoformat() + + task_with_id = BacktestTaskWithId( + task_id=task_id, + status=TaskStatus.PENDING, + created_at=now, + **task.model_dump() + ) + + storage.save_task(task_with_id) + self.pending_tasks.append(task_id) + return task_with_id + + def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict: + """列出任务,支持分页和状态过滤""" + if status == "pending": + task_ids = self.pending_tasks + elif status == "running": + task_ids = self.running_tasks + elif status == "completed": + task_ids = self.completed_tasks + elif status == "failed": + task_ids = self.failed_tasks + else: + task_ids = ( + self.pending_tasks + + self.running_tasks + + self.completed_tasks + + self.failed_tasks + ) + + total = len(task_ids) + start = (page - 1) * page_size + end = start + page_size + + result = [] + for task_id in task_ids[start:end]: + # 根据状态找任务 + if task_id in self.pending_tasks: + task = storage.load_task(task_id, "pending") + elif task_id in self.running_tasks: + task = storage.load_task(task_id, "running") + elif task_id in self.completed_tasks: + task = storage.load_task(task_id, "completed") + else: + task = storage.load_task(task_id, "failed") + + if task: + result.append(task) + + return { + "total": total, + "page": page, + "page_size": page_size, + "tasks": result + } + + def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]: + """根据ID获取任务""" + # 在各个状态查找 + for status_dir, task_list in [ + ("pending", self.pending_tasks), + ("running", self.running_tasks), + ("completed", self.completed_tasks), + ("failed", self.failed_tasks), + ]: + if task_id in task_list: + return storage.load_task(task_id, status_dir) + + return None + + def start_worker_pool(self): + """启动工作进程池""" + if self._pool is None: + self._pool = Pool(processes=self.max_workers) + + def close_worker_pool(self): + """关闭工作进程池""" + if self._pool is not None: + self._pool.close() + self._pool.join() + self._pool = None + + +task_queue = TaskQueue()