Files
sanguo_moziplus_v2/src/daemon/ticker.py
T
2026-05-18 23:27:09 +08:00

581 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
Tick 循环是整个 Daemon 的核心驱动:
1. 遍历所有 active 项目
2. 每个 tick 扫描黑板状态
3. 推进依赖链(blocked → pending
4. 写入 daemon_tick 事件
5. 调度 pending 任务(F9 实现)
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
logger = logging.getLogger("moziplus-v2.ticker")
class Ticker:
"""Daemon ticker 主循环"""
def __init__(
self,
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
dispatcher: Optional[Any] = None,
spawner: Optional[Any] = None,
max_dispatch_per_tick: int = 3,
claim_timeout_minutes: float = 5.0,
default_task_timeout_minutes: float = 30.0,
):
"""
Args:
registry: 项目注册表
tick_interval: tick 间隔秒数
max_ticks: 测试用,限制最大 tick 次数
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
dispatcher: Dispatcher 实例(Agent 调度)
spawner: AgentSpawner 实例(Agent spawn
max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务
claim_timeout_minutes: claimed 超时重置为 pending 的分钟数
default_task_timeout_minutes: working 超时标记 failed 的分钟数
"""
self.registry = registry
self.tick_interval = tick_interval
self.max_ticks = max_ticks
self.on_tick_complete = on_tick_complete
self.dispatcher = dispatcher
self.spawner = spawner
self.max_dispatch_per_tick = max_dispatch_per_tick
self.claim_timeout_minutes = claim_timeout_minutes
self.default_task_timeout_minutes = default_task_timeout_minutes
self._tick_count: int = 0
self._running: bool = False
self._task: Optional[asyncio.Task] = None
# 已初始化的 db_path 集合,避免每次 tick 重复 init_db
self._initialized_dbs: set = set()
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# 当前项目 ID_dispatch_pending 需要)
self._current_project_id: Optional[str] = None
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 tick 循环"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
async def stop(self) -> None:
"""停止 tick 循环"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
@property
def tick_count(self) -> int:
return self._tick_count
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Tick 主循环"""
while self._running:
try:
await self.tick()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tick %d error", self._tick_count + 1)
if self.max_ticks and self._tick_count >= self.max_ticks:
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
self._running = False
break
try:
await asyncio.sleep(self.tick_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 单次 tick
# ------------------------------------------------------------------
async def tick(self) -> Dict[str, Any]:
"""执行一次 tick,遍历所有 active 项目"""
self._tick_count += 1
tick_num = self._tick_count
results: Dict[str, Any] = {
"tick": tick_num,
"projects": {},
}
projects = self.registry.list_projects()
active_projects = {
pid: info for pid, info in projects.items()
if info.get("status") == "active"
}
for project_id, project_info in active_projects.items():
try:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
results["projects"][project_id] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
if self.on_tick_complete:
try:
await self.on_tick_complete()
except Exception:
logger.exception("on_tick_complete callback error")
return results
async def _tick_project(self, project_id: str,
project_info: Dict[str, Any]) -> Dict[str, Any]:
"""单项目的 tick 处理"""
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if not db_path.exists():
return {"status": "no_db"}
# 只在首次遇到该 db_path 时执行 init_db
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
result: Dict[str, Any] = {
"status": "ok",
"summary_before": {},
"summary_after": {},
"advanced": [],
"dispatched": [],
"review_dispatched": [],
"zombie_reclaimed": [],
"parents_refreshed": [],
}
# 保存当前 project_id(供 _dispatch_pending 使用)
self._current_project_id = project_id
# 1. 扫描当前状态
queries = Queries(db_path)
result["summary_before"] = queries.task_summary()
# 2. 依赖推进
advanced = self._advance_dependencies(db_path)
result["advanced"] = advanced
# 3. 僵尸/超时处理(在依赖推进后、调度前)
zombie_reclaimed = self._check_timeouts(db_path)
result["zombie_reclaimed"] = zombie_reclaimed
# 4. 调度 pending 任务
if self.dispatcher and self.spawner:
dispatched = await self._dispatch_pending(db_path, project_id)
result["dispatched"] = dispatched
# 5. 调度审查任务
if self.dispatcher and self.spawner:
review_dispatched = await self._dispatch_reviews(db_path, project_id)
result["review_dispatched"] = review_dispatched
# 6. 聚合父 Task 状态(v2.7
parents_refreshed = self._refresh_parent_statuses(db_path)
result["parents_refreshed"] = parents_refreshed
# 7. 写 daemon_tick 事件
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(None, "daemon", "daemon_tick",
json.dumps({"tick": self._tick_count, "advanced_count": len(advanced),
"parents_refreshed": len(parents_refreshed)})),
)
conn.commit()
finally:
conn.close()
# 8. 扫描后状态
result["summary_after"] = queries.task_summary()
return result
# ------------------------------------------------------------------
# 父 Task 状态聚合
# ------------------------------------------------------------------
def _refresh_parent_statuses(self, db_path: Path) -> List[str]:
"""全量扫描有子 Task 的父 Task,刷新聚合状态
跳过手动状态(cancelled, paused)的父 Task。使用单连接批量处理。
"""
queries = Queries(db_path)
refreshed: List[str] = []
conn = get_connection(db_path)
try:
# 找出所有有子 Task 的父 Task ID
parent_rows = conn.execute(
"SELECT DISTINCT parent_task FROM tasks WHERE parent_task IS NOT NULL"
).fetchall()
parent_ids = [r["parent_task"] for r in parent_rows]
for pid in parent_ids:
computed = queries.compute_parent_status(pid)
if computed is None:
continue
parent = conn.execute(
"SELECT status FROM tasks WHERE id=?", (pid,)
).fetchone()
if parent and parent["status"] != computed:
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(computed, pid),
)
refreshed.append(pid)
logger.info("Parent %s status aggregated: → %s", pid, computed)
if refreshed:
conn.commit()
finally:
conn.close()
return refreshed
return refreshed
# ------------------------------------------------------------------
# 依赖推进
# ------------------------------------------------------------------
def _advance_dependencies(self, db_path: Path) -> List[str]:
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
Returns:
被推进的 task_id 列表
"""
queries = Queries(db_path)
blocked = queries.blocked_tasks_with_deps()
advanced: List[str] = []
if not blocked:
return advanced
conn = get_connection(db_path)
try:
for item in blocked:
if item["all_deps_done"]:
task_id = item["task_id"]
ok = self._transition_status(conn, task_id, "pending",
agent="daemon",
detail={"reason": "all_dependencies_done"})
if ok:
advanced.append(task_id)
logger.info("Advanced %s: blocked → pending", task_id)
finally:
conn.close()
return advanced
def _transition_status(self, conn, task_id: str, new_status: str,
agent: str = "daemon",
detail: Optional[Dict] = None) -> bool:
"""轻量状态转换(不走 Blackboard 类,避免 init_db"""
from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
if old_status in TERMINAL_STATUSES or old_status == new_status:
return old_status == new_status
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
return False
now = datetime.utcnow().isoformat()
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
event_type = f"task_{new_status}"
if event_type not in EVENT_TYPES:
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
# ------------------------------------------------------------------
# Agent 调度
# ------------------------------------------------------------------
async def _dispatch_pending(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 pending 任务并调度"""
queries = Queries(db_path)
pending = queries.pending_dispatchable()
dispatched: List[str] = []
if not pending:
return dispatched
for task in pending[:self.max_dispatch_per_tick]:
try:
result = await self.dispatcher.dispatch(
task,
project_config={"project_id": project_id, "db_path": db_path},
)
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
# 标记为 claimed + 更新 current_agent
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "claimed",
agent="daemon",
detail={"dispatched_to": result["agent_id"],
"session_id": result.get("session_id")},
)
if ok:
# 更新 current_agentRouter 审查时用)
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s)",
task.id, result["agent_id"],
result.get("session_id"))
finally:
conn.close()
except Exception:
logger.exception("Dispatch failed for %s", task.id)
return dispatched
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
queries = Queries(db_path)
bb = Blackboard(db_path)
review_tasks = queries.tasks_by_status("review")
dispatched: List[str] = []
for task in review_tasks:
# 检查是否已有 review 记录
reviews = bb.get_reviews(task.id)
if reviews:
continue # 已有审查,跳过
# 检查是否最近已 dispatch 过 review(防重复 dispatch
existing = self._check_recent_routing(db_path, task.id, "review")
if existing:
continue # 已有活跃 review dispatch
# 检查是否有产出(司马懿建议:无产出直接标 failed)
outputs = bb.get_outputs(task.id)
if not outputs:
conn = get_connection(db_path)
try:
self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "no_output_for_review"},
)
bb.add_observation(
task.id, "daemon",
"任务进入 review 状态但没有产出物,自动标记为 failed",
)
logger.warning("Task %s in review but no output, marking failed",
task.id)
finally:
conn.close()
continue
# 调度审查 Agent(司马懿)
try:
result = await self.dispatcher.dispatch(
task,
action_type="review",
project_config={"project_id": project_id, "db_path": db_path},
)
if result["status"] == "dispatched":
dispatched.append(task.id)
# 更新 current_agent 为审查者
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
finally:
conn.close()
logger.info("Dispatched review for %s to %s",
task.id, result["agent_id"])
except Exception:
logger.exception("Review dispatch failed for %s", task.id)
return dispatched
# ------------------------------------------------------------------
# 僵尸/超时处理
# ------------------------------------------------------------------
def _check_timeouts(self, db_path: Path) -> List[str]:
"""检查 claimed/working 超时的任务"""
queries = Queries(db_path)
reclaimed: List[str] = []
now = datetime.utcnow() # UTC,与 SQLite datetime('now') 一致
# claimed 超时 → 重置为 pending
claimed = queries.tasks_by_status("claimed")
for task in claimed:
if not task.claimed_at:
continue
try:
claimed_time = datetime.fromisoformat(task.claimed_at)
elapsed = (now - claimed_time).total_seconds() / 60.0
if elapsed > self.claim_timeout_minutes:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "pending",
agent="daemon",
detail={"reason": "claim_timeout",
"elapsed_minutes": round(elapsed, 1)},
)
if ok:
reclaimed.append(task.id)
logger.info("Reclaimed %s: claimed → pending (timeout %.1fm)",
task.id, elapsed)
finally:
conn.close()
except (ValueError, TypeError):
pass
# working 超时 → 标记为 failed
working = queries.tasks_by_status("working")
for task in working:
start_time_str = task.started_at or task.claimed_at
if not start_time_str:
continue
try:
start_time = datetime.fromisoformat(start_time_str)
# per-task timeout: deadline 优先,否则用默认值
if task.deadline:
deadline_time = datetime.fromisoformat(task.deadline)
timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0
if timeout_minutes < 1:
timeout_minutes = self.default_task_timeout_minutes
else:
timeout_minutes = self.default_task_timeout_minutes
elapsed = (now - start_time).total_seconds() / 60.0
if elapsed > timeout_minutes:
conn = get_connection(db_path)
try:
ok = self._transition_status(
conn, task.id, "failed",
agent="daemon",
detail={"reason": "task_timeout",
"elapsed_minutes": round(elapsed, 1),
"timeout_minutes": round(timeout_minutes, 1)},
)
if ok:
reclaimed.append(task.id)
logger.warning("Task %s timed out (working %.1fm > %.1fm)",
task.id, elapsed, timeout_minutes)
finally:
conn.close()
except (ValueError, TypeError):
pass
return reclaimed
def _check_recent_routing(self, db_path: Path, task_id: str,
action_type: str) -> bool:
"""检查最近 5 分钟内是否已 dispatch 过指定类型的路由(防重复)"""
try:
conn = get_connection(db_path)
try:
# 检查是否有 from_status=review 的 dispatched 记录(防止重复 review dispatch
if action_type == "review":
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND from_status='review' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
else:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM routing_decisions "
"WHERE task_id=? AND outcome='dispatched' "
"AND created_at > datetime('now', '-5 minutes')",
(task_id,),
).fetchone()
return row["cnt"] > 0 if row else False
finally:
conn.close()
except Exception:
return False
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------
async def manual_tick(self) -> Dict[str, Any]:
"""手动触发一次 tick"""
logger.info("Manual tick triggered")
result = await self.tick()
result["manual"] = True
return result