Files
sanguo_moziplus_v2/src/daemon/ticker.py
T
2026-05-17 05:51:41 +08:00

276 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动
Tick 循环是整个 Daemon 的核心驱动:
1. 遍历所有 active 项目
2. 每个 tick 扫描黑板状态
3. 推进依赖链(blocked → pending
4. 写入 daemon_tick 事件
5. 调度 pending 任务(F9 实现)
"""
from __future__ import annotations
import asyncio
import json
import logging
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, List, Optional
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.blackboard.models import Task
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
logger = logging.getLogger("moziplus-v2.ticker")
class Ticker:
"""Daemon ticker 主循环"""
def __init__(
self,
registry: ProjectRegistry,
tick_interval: float = 30.0,
max_ticks: Optional[int] = None,
on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None,
):
"""
Args:
registry: 项目注册表
tick_interval: tick 间隔秒数
max_ticks: 测试用,限制最大 tick 次数
on_tick_complete: 每次 tick 完成后的回调(用于通知等)
"""
self.registry = registry
self.tick_interval = tick_interval
self.max_ticks = max_ticks
self.on_tick_complete = on_tick_complete
self._tick_count: int = 0
self._running: bool = False
self._task: Optional[asyncio.Task] = None
# 已初始化的 db_path 集合,避免每次 tick 重复 init_db
self._initialized_dbs: set = set()
# 每个项目上次 tick 的 event count(用于僵尸检测 F8
self._last_event_counts: Dict[str, int] = {}
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
async def start(self) -> None:
"""启动 tick 循环"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._loop())
logger.info("Ticker started (interval=%.1fs)", self.tick_interval)
async def stop(self) -> None:
"""停止 tick 循环"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Ticker stopped (ticks=%d)", self._tick_count)
@property
def tick_count(self) -> int:
return self._tick_count
@property
def is_running(self) -> bool:
return self._running
# ------------------------------------------------------------------
# 主循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""Tick 主循环"""
while self._running:
try:
await self.tick()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tick %d error", self._tick_count + 1)
if self.max_ticks and self._tick_count >= self.max_ticks:
logger.info("Max ticks reached (%d), stopping", self.max_ticks)
self._running = False
break
try:
await asyncio.sleep(self.tick_interval)
except asyncio.CancelledError:
return
# ------------------------------------------------------------------
# 单次 tick
# ------------------------------------------------------------------
async def tick(self) -> Dict[str, Any]:
"""执行一次 tick,遍历所有 active 项目"""
self._tick_count += 1
tick_num = self._tick_count
results: Dict[str, Any] = {
"tick": tick_num,
"projects": {},
}
projects = self.registry.list_projects()
active_projects = {
pid: info for pid, info in projects.items()
if info.get("status") == "active"
}
for project_id, project_info in active_projects.items():
try:
pr = await self._tick_project(project_id, project_info)
results["projects"][project_id] = pr
except Exception as e:
logger.exception("Tick %d project %s error", tick_num, project_id)
results["projects"][project_id] = {"error": str(e)}
logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects))
if self.on_tick_complete:
try:
await self.on_tick_complete()
except Exception:
logger.exception("on_tick_complete callback error")
return results
async def _tick_project(self, project_id: str,
project_info: Dict[str, Any]) -> Dict[str, Any]:
"""单项目的 tick 处理"""
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if not db_path.exists():
return {"status": "no_db"}
# 只在首次遇到该 db_path 时执行 init_db
db_key = str(db_path)
if db_key not in self._initialized_dbs:
from src.blackboard.db import init_db
init_db(db_path)
self._initialized_dbs.add(db_key)
result: Dict[str, Any] = {
"status": "ok",
"summary_before": {},
"summary_after": {},
"advanced": [],
}
# 1. 扫描当前状态
queries = Queries(db_path)
result["summary_before"] = queries.task_summary()
# 2. 依赖推进
advanced = self._advance_dependencies(db_path)
result["advanced"] = advanced
# 3. 写 daemon_tick 事件(直接用 get_connection,不再 Blackboard() → init_db
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(None, "daemon", "daemon_tick",
json.dumps({"tick": self._tick_count, "advanced_count": len(advanced)})),
)
conn.commit()
finally:
conn.close()
# 4. 扫描后状态
result["summary_after"] = queries.task_summary()
return result
# ------------------------------------------------------------------
# 依赖推进
# ------------------------------------------------------------------
def _advance_dependencies(self, db_path: Path) -> List[str]:
"""检查 blocked 任务,若所有依赖已完成则推进为 pending
Returns:
被推进的 task_id 列表
"""
queries = Queries(db_path)
blocked = queries.blocked_tasks_with_deps()
advanced: List[str] = []
if not blocked:
return advanced
conn = get_connection(db_path)
try:
for item in blocked:
if item["all_deps_done"]:
task_id = item["task_id"]
ok = self._transition_status(conn, task_id, "pending",
agent="daemon",
detail={"reason": "all_dependencies_done"})
if ok:
advanced.append(task_id)
logger.info("Advanced %s: blocked → pending", task_id)
finally:
conn.close()
return advanced
def _transition_status(self, conn, task_id: str, new_status: str,
agent: str = "daemon",
detail: Optional[Dict] = None) -> bool:
"""轻量状态转换(不走 Blackboard 类,避免 init_db"""
from src.blackboard.db import VALID_TRANSITIONS, TERMINAL_STATUSES, EVENT_TYPES
from datetime import datetime
conn.execute("BEGIN IMMEDIATE")
row = conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
return False
old_status = row["status"]
if old_status in TERMINAL_STATUSES or old_status == new_status:
return old_status == new_status
if new_status not in VALID_TRANSITIONS.get(old_status, set()):
return False
now = datetime.utcnow().isoformat()
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
)
event_type = f"task_{new_status}"
if event_type not in EVENT_TYPES:
event_type = "daemon_tick"
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?,?,?,?)",
(task_id, agent, event_type, json.dumps({"from": old_status, "to": new_status, **(detail or {})})),
)
conn.commit()
return True
# ------------------------------------------------------------------
# 手动 tickAPI 端点触发)
# ------------------------------------------------------------------
async def manual_tick(self) -> Dict[str, Any]:
"""手动触发一次 tick"""
logger.info("Manual tick triggered")
result = await self.tick()
result["manual"] = True
return result