From b186771e6f4b57e96e63b78a9fec29cc3c75bd16 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 05:45:47 +0800 Subject: [PATCH] auto-sync: 2026-05-17 05:45:47 --- src/daemon/ticker.py | 226 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 src/daemon/ticker.py diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py new file mode 100644 index 0000000..e3450be --- /dev/null +++ b/src/daemon/ticker.py @@ -0,0 +1,226 @@ +"""Daemon Ticker — 30s 状态扫描 + 依赖推进 + 事件驱动 + +Tick 循环是整个 Daemon 的核心驱动: +1. 遍历所有 active 项目 +2. 每个 tick 扫描黑板状态 +3. 推进依赖链(blocked → pending) +4. 写入 daemon_tick 事件 +5. 调度 pending 任务(F9 实现) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from pathlib import Path +from typing import Any, Callable, Coroutine, Dict, List, Optional + +from src.blackboard.blackboard import Blackboard +from src.blackboard.db import get_connection +from src.blackboard.models import Task +from src.blackboard.queries import Queries +from src.blackboard.registry import ProjectRegistry + +logger = logging.getLogger("moziplus-v2.ticker") + + +class Ticker: + """Daemon ticker 主循环""" + + def __init__( + self, + registry: ProjectRegistry, + tick_interval: float = 30.0, + max_ticks: Optional[int] = None, + on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None, + ): + """ + Args: + registry: 项目注册表 + tick_interval: tick 间隔秒数 + max_ticks: 测试用,限制最大 tick 次数 + on_tick_complete: 每次 tick 完成后的回调(用于通知等) + """ + self.registry = registry + self.tick_interval = tick_interval + self.max_ticks = max_ticks + self.on_tick_complete = on_tick_complete + + self._tick_count: int = 0 + self._running: bool = False + self._task: Optional[asyncio.Task] = None + + # 每个项目上次 tick 的 event count(用于僵尸检测 F8) + self._last_event_counts: Dict[str, int] = {} + + # ------------------------------------------------------------------ + # 生命周期 + # ------------------------------------------------------------------ + + async def start(self) -> None: + """启动 tick 循环""" + if self._running: + return + self._running = True + self._task = asyncio.create_task(self._loop()) + logger.info("Ticker started (interval=%.1fs)", self.tick_interval) + + async def stop(self) -> None: + """停止 tick 循环""" + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("Ticker stopped (ticks=%d)", self._tick_count) + + @property + def tick_count(self) -> int: + return self._tick_count + + @property + def is_running(self) -> bool: + return self._running + + # ------------------------------------------------------------------ + # 主循环 + # ------------------------------------------------------------------ + + async def _loop(self) -> None: + """Tick 主循环""" + while self._running: + try: + await self.tick() + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Tick %d error", self._tick_count + 1) + + if self.max_ticks and self._tick_count >= self.max_ticks: + logger.info("Max ticks reached (%d), stopping", self.max_ticks) + self._running = False + break + + try: + await asyncio.sleep(self.tick_interval) + except asyncio.CancelledError: + return + + # ------------------------------------------------------------------ + # 单次 tick + # ------------------------------------------------------------------ + + async def tick(self) -> Dict[str, Any]: + """执行一次 tick,遍历所有 active 项目""" + self._tick_count += 1 + tick_num = self._tick_count + results: Dict[str, Any] = { + "tick": tick_num, + "projects": {}, + } + + projects = self.registry.list_projects() + active_projects = { + pid: info for pid, info in projects.items() + if info.get("status") == "active" + } + + for project_id, project_info in active_projects.items(): + try: + pr = await self._tick_project(project_id, project_info) + results["projects"][project_id] = pr + except Exception as e: + logger.exception("Tick %d project %s error", tick_num, project_id) + results["projects"][project_id] = {"error": str(e)} + + logger.debug("Tick %d complete: %d projects", tick_num, len(active_projects)) + + if self.on_tick_complete: + try: + await self.on_tick_complete() + except Exception: + logger.exception("on_tick_complete callback error") + + return results + + async def _tick_project(self, project_id: str, + project_info: Dict[str, Any]) -> Dict[str, Any]: + """单项目的 tick 处理""" + project_dir = self.registry.root / project_id + db_path = project_dir / "blackboard.db" + + if not db_path.exists(): + return {"status": "no_db"} + + queries = Queries(db_path) + result: Dict[str, Any] = { + "status": "ok", + "summary_before": {}, + "summary_after": {}, + "advanced": [], + } + + # 1. 扫描当前状态 + result["summary_before"] = queries.task_summary() + + # 2. 依赖推进 + advanced = self._advance_dependencies(db_path) + result["advanced"] = advanced + + # 3. 写 daemon_tick 事件 + bb = Blackboard(db_path) + bb.add_event( + event_type="daemon_tick", + detail={"tick": self._tick_count, "advanced_count": len(advanced)}, + ) + + # 4. 扫描后状态 + result["summary_after"] = queries.task_summary() + + return result + + # ------------------------------------------------------------------ + # 依赖推进 + # ------------------------------------------------------------------ + + def _advance_dependencies(self, db_path: Path) -> List[str]: + """检查 blocked 任务,若所有依赖已完成则推进为 pending + + Returns: + 被推进的 task_id 列表 + """ + queries = Queries(db_path) + blocked = queries.blocked_tasks_with_deps() + advanced: List[str] = [] + + if not blocked: + return advanced + + bb = Blackboard(db_path) + for item in blocked: + if item["all_deps_done"]: + task_id = item["task_id"] + ok = bb.update_task_status( + task_id, "pending", + agent="daemon", + detail={"reason": "all_dependencies_done"}, + ) + if ok: + advanced.append(task_id) + logger.info("Advanced %s: blocked → pending", task_id) + + return advanced + + # ------------------------------------------------------------------ + # 手动 tick(API 端点触发) + # ------------------------------------------------------------------ + + async def manual_tick(self) -> Dict[str, Any]: + """手动触发一次 tick""" + logger.info("Manual tick triggered") + result = await self.tick() + result["manual"] = True + return result