142 lines
4.8 KiB
Python
Executable File
142 lines
4.8 KiB
Python
Executable File
"""
|
|
自动化回测服务 - 任务队列
|
|
简单后台线程调度:submit后自动触发执行,同一时间只跑一个回测
|
|
"""
|
|
import os
|
|
import uuid
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from datetime import datetime
|
|
from typing import List, Optional, Dict
|
|
|
|
from .config import settings
|
|
from .models import TaskStatus, BacktestTask, BacktestTaskWithId
|
|
from .result_storage import storage
|
|
|
|
|
|
class TaskQueue:
|
|
"""任务队列管理器 - 后台线程调度"""
|
|
|
|
def __init__(self):
|
|
self.max_workers = settings.max_workers
|
|
self.pending_tasks: List[str] = []
|
|
self.running_tasks: List[str] = []
|
|
self.completed_tasks: List[str] = []
|
|
self.failed_tasks: List[str] = []
|
|
self._worker_thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
|
|
def _generate_task_id(self) -> str:
|
|
return str(uuid.uuid4()).replace("-", "")
|
|
|
|
def submit_task(self, task: BacktestTask) -> BacktestTaskWithId:
|
|
"""提交新任务到队列"""
|
|
task_id = self._generate_task_id()
|
|
now = datetime.now().isoformat()
|
|
|
|
task_with_id = BacktestTaskWithId(
|
|
task_id=task_id,
|
|
status=TaskStatus.PENDING,
|
|
created_at=now,
|
|
**task.model_dump()
|
|
)
|
|
|
|
storage.save_task(task_with_id)
|
|
self.pending_tasks.append(task_id)
|
|
return task_with_id
|
|
|
|
def list_tasks(self, page: int = 1, page_size: int = 10, status: Optional[str] = None) -> Dict:
|
|
if status == "pending":
|
|
task_ids = self.pending_tasks
|
|
elif status == "running":
|
|
task_ids = self.running_tasks
|
|
elif status == "completed":
|
|
task_ids = self.completed_tasks
|
|
elif status == "failed":
|
|
task_ids = self.failed_tasks
|
|
else:
|
|
task_ids = (
|
|
self.pending_tasks +
|
|
self.running_tasks +
|
|
self.completed_tasks +
|
|
self.failed_tasks
|
|
)
|
|
|
|
total = len(task_ids)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
|
|
result = []
|
|
for task_id in task_ids[start:end]:
|
|
for status_dir in ["pending", "running", "completed", "failed"]:
|
|
task = storage.load_task(task_id, status_dir)
|
|
if task:
|
|
result.append(task)
|
|
break
|
|
|
|
return {
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"tasks": result
|
|
}
|
|
|
|
def get_task(self, task_id: str) -> Optional[BacktestTaskWithId]:
|
|
for status_dir in ["pending", "running", "completed", "failed"]:
|
|
task = storage.load_task(task_id, status_dir)
|
|
if task:
|
|
return task
|
|
return None
|
|
|
|
def _worker_loop(self):
|
|
"""后台工作线程:循环检查pending任务并执行"""
|
|
from .executor import executor
|
|
|
|
while not self._stop_event.is_set():
|
|
# 检查是否有pending任务且当前无running任务
|
|
if self.pending_tasks and not self.running_tasks:
|
|
task_id = self.pending_tasks.pop(0)
|
|
self.running_tasks.append(task_id)
|
|
|
|
# 在后台线程中执行回测
|
|
try:
|
|
task = storage.load_task(task_id, "pending")
|
|
if task:
|
|
# 移动到running目录
|
|
storage.save_task(task)
|
|
result = executor.execute_backtest(task)
|
|
|
|
# 从running移到completed/failed
|
|
self.running_tasks.remove(task_id)
|
|
if result.status == TaskStatus.COMPLETED:
|
|
self.completed_tasks.append(task_id)
|
|
else:
|
|
self.failed_tasks.append(task_id)
|
|
except Exception as e:
|
|
print(f"任务执行异常: {e}\n{traceback.format_exc()}")
|
|
if task_id in self.running_tasks:
|
|
self.running_tasks.remove(task_id)
|
|
self.failed_tasks.append(task_id)
|
|
|
|
# 等待1秒再检查
|
|
self._stop_event.wait(1.0)
|
|
|
|
def start_worker_pool(self):
|
|
"""启动后台工作线程"""
|
|
if self._worker_thread is None or not self._worker_thread.is_alive():
|
|
self._stop_event.clear()
|
|
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
|
self._worker_thread.start()
|
|
print(f"工作线程已启动 (max_workers={self.max_workers})")
|
|
|
|
def close_worker_pool(self):
|
|
"""停止工作线程"""
|
|
self._stop_event.set()
|
|
if self._worker_thread and self._worker_thread.is_alive():
|
|
self._worker_thread.join(timeout=5)
|
|
self._worker_thread = None
|
|
|
|
|
|
task_queue = TaskQueue()
|