auto-sync: 2026-04-28 18:11:28

This commit is contained in:
cfdaily
2026-04-28 18:11:28 +08:00
parent 52384692d7
commit 0c73273d69
2 changed files with 77 additions and 51 deletions
+76 -51
View File
@@ -1,49 +1,52 @@
"""
自动化回测服务 - 任务队列
简单后台线程调度:submit后自动触发执行,同一时间只跑一个回测
"""
import os
import uuid
import threading
import time
import traceback
from datetime import datetime
from typing import List, Optional, Dict
from multiprocessing import Pool
from .config import settings
from .models import TaskStatus, BacktestTask, BacktestTaskWithId
from .result_storage import storage
class TaskQueue:
"""任务队列管理器"""
"""任务队列管理器 - 后台线程调度"""
def __init__(self):
self.max_workers = settings.max_workers
self.pending_tasks: List[str] = []
self.running_tasks: List[str] = []
self.completed_tasks: List[str] = []
self.failed_tasks: List[str] = []
self._pool: Optional[Pool] = None
self._worker_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
def _generate_task_id(self) -> str:
"""生成唯一任务ID"""
return str(uuid.uuid4()).replace("-", "")
def submit_task(self, task: BacktestTask) -> BacktestTaskWithId:
"""提交新任务到队列"""
task_id = self._generate_task_id()
now = datetime.now().isoformat()
task_with_id = BacktestTaskWithId(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=now,
**task.model_dump()
)
storage.save_task(task_with_id)
self.pending_tasks.append(task_id)
return task_with_id
def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict:
"""列出任务,支持分页和状态过滤"""
if status == "pending":
task_ids = self.pending_tasks
elif status == "running":
@@ -54,63 +57,85 @@ class TaskQueue:
task_ids = self.failed_tasks
else:
task_ids = (
self.pending_tasks +
self.running_tasks +
self.completed_tasks +
self.pending_tasks +
self.running_tasks +
self.completed_tasks +
self.failed_tasks
)
total = len(task_ids)
start = (page - 1) * page_size
end = start + page_size
result = []
for task_id in task_ids[start:end]:
# 根据状态找任务
if task_id in self.pending_tasks:
task = storage.load_task(task_id, "pending")
elif task_id in self.running_tasks:
task = storage.load_task(task_id, "running")
elif task_id in self.completed_tasks:
task = storage.load_task(task_id, "completed")
else:
task = storage.load_task(task_id, "failed")
if task:
result.append(task)
for status_dir in ["pending", "running", "completed", "failed"]:
task = storage.load_task(task_id, status_dir)
if task:
result.append(task)
break
return {
"total": total,
"page": page,
"page_size": page_size,
"tasks": result
}
def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]:
"""根据ID获取任务"""
# 在各个状态查找
for status_dir, task_list in [
("pending", self.pending_tasks),
("running", self.running_tasks),
("completed", self.completed_tasks),
("failed", self.failed_tasks),
]:
if task_id in task_list:
return storage.load_task(task_id, status_dir)
for status_dir in ["pending", "running", "completed", "failed"]:
task = storage.load_task(task_id, status_dir)
if task:
return task
return None
def _worker_loop(self):
"""后台工作线程:循环检查pending任务并执行"""
from .executor import executor
while not self._stop_event.is_set():
# 检查是否有pending任务且当前无running任务
if self.pending_tasks and not self.running_tasks:
task_id = self.pending_tasks.pop(0)
self.running_tasks.append(task_id)
# 在后台线程中执行回测
try:
task = storage.load_task(task_id, "pending")
if task:
# 移动到running目录
storage.save_task(task)
result = executor.execute_backtest(task)
# 从running移到completed/failed
self.running_tasks.remove(task_id)
if result.status == TaskStatus.COMPLETED:
self.completed_tasks.append(task_id)
else:
self.failed_tasks.append(task_id)
except Exception as e:
print(f"任务执行异常: {e}\n{traceback.format_exc()}")
if task_id in self.running_tasks:
self.running_tasks.remove(task_id)
self.failed_tasks.append(task_id)
# 等待1秒再检查
self._stop_event.wait(1.0)
def start_worker_pool(self):
"""启动工作进程池"""
if self._pool is None:
self._pool = Pool(processes=self.max_workers)
"""启动后台工作线程"""
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_event.clear()
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker_thread.start()
print(f"工作线程已启动 (max_workers={self.max_workers})")
def close_worker_pool(self):
"""关闭工作进程池"""
if self._pool is not None:
self._pool.close()
self._pool.join()
self._pool = None
"""停止工作线程"""
self._stop_event.set()
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=5)
self._worker_thread = None
task_queue = TaskQueue()