From b6c8710eee499f49e1d995fb539affcdf30f4a23 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 13:47:08 +0800 Subject: [PATCH] auto-sync: 2026-05-17 13:47:08 --- src/daemon/ticker.py | 210 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 208 insertions(+), 2 deletions(-) diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 92834e4..1f7ff8c 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -34,6 +34,11 @@ class Ticker: tick_interval: float = 30.0, max_ticks: Optional[int] = None, on_tick_complete: Optional[Callable[[], Coroutine[Any, Any, None]]] = None, + dispatcher: Optional[Any] = None, + spawner: Optional[Any] = None, + max_dispatch_per_tick: int = 3, + claim_timeout_minutes: float = 5.0, + default_task_timeout_minutes: float = 30.0, ): """ Args: @@ -41,11 +46,21 @@ class Ticker: tick_interval: tick 间隔秒数 max_ticks: 测试用,限制最大 tick 次数 on_tick_complete: 每次 tick 完成后的回调(用于通知等) + dispatcher: Dispatcher 实例(Agent 调度) + spawner: AgentSpawner 实例(Agent spawn) + max_dispatch_per_tick: 每个 tick 最多调度多少个 pending 任务 + claim_timeout_minutes: claimed 超时重置为 pending 的分钟数 + default_task_timeout_minutes: working 超时标记 failed 的分钟数 """ self.registry = registry self.tick_interval = tick_interval self.max_ticks = max_ticks self.on_tick_complete = on_tick_complete + self.dispatcher = dispatcher + self.spawner = spawner + self.max_dispatch_per_tick = max_dispatch_per_tick + self.claim_timeout_minutes = claim_timeout_minutes + self.default_task_timeout_minutes = default_task_timeout_minutes self._tick_count: int = 0 self._running: bool = False @@ -57,6 +72,9 @@ class Ticker: # 每个项目上次 tick 的 event count(用于僵尸检测 F8) self._last_event_counts: Dict[str, int] = {} + # 当前项目 ID(_dispatch_pending 需要) + self._current_project_id: Optional[str] = None + # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ @@ -170,8 +188,14 @@ class Ticker: "summary_before": {}, "summary_after": {}, "advanced": [], + "dispatched": [], + "review_dispatched": [], + "zombie_reclaimed": [], } + # 保存当前 project_id(供 _dispatch_pending 使用) + self._current_project_id = project_id + # 1. 扫描当前状态 queries = Queries(db_path) result["summary_before"] = queries.task_summary() @@ -180,7 +204,21 @@ class Ticker: advanced = self._advance_dependencies(db_path) result["advanced"] = advanced - # 3. 写 daemon_tick 事件(直接用 get_connection,不再 Blackboard() → init_db) + # 3. 僵尸/超时处理(在依赖推进后、调度前) + zombie_reclaimed = self._check_timeouts(db_path) + result["zombie_reclaimed"] = zombie_reclaimed + + # 4. 调度 pending 任务 + if self.dispatcher and self.spawner: + dispatched = await self._dispatch_pending(db_path, project_id) + result["dispatched"] = dispatched + + # 5. 调度审查任务 + if self.dispatcher and self.spawner: + review_dispatched = await self._dispatch_reviews(db_path, project_id) + result["review_dispatched"] = review_dispatched + + # 6. 写 daemon_tick 事件(直接用 get_connection,不再 Blackboard() → init_db) conn = get_connection(db_path) try: conn.execute("BEGIN IMMEDIATE") @@ -193,7 +231,7 @@ class Ticker: finally: conn.close() - # 4. 扫描后状态 + # 7. 扫描后状态 result["summary_after"] = queries.task_summary() return result @@ -263,6 +301,174 @@ class Ticker: conn.commit() return True + # ------------------------------------------------------------------ + # Agent 调度 + # ------------------------------------------------------------------ + + async def _dispatch_pending(self, db_path: Path, + project_id: str) -> List[str]: + """扫描 pending 任务并调度""" + queries = Queries(db_path) + bb = Blackboard(db_path) + pending = queries.tasks_by_status("pending") + dispatched: List[str] = [] + + if not pending: + return dispatched + + for task in pending[:self.max_dispatch_per_tick]: + try: + result = await self.dispatcher.dispatch( + task, + project_config={"project_id": project_id}, + ) + if result["status"] == "dispatched" and result["level"] in ("full", "escalate"): + # 标记为 claimed + conn = get_connection(db_path) + try: + ok = self._transition_status( + conn, task.id, "claimed", + agent="daemon", + detail={"dispatched_to": result["agent_id"], + "session_id": result.get("session_id")}, + ) + if ok: + dispatched.append(task.id) + logger.info("Dispatched %s to %s (session=%s)", + task.id, result["agent_id"], + result.get("session_id")) + finally: + conn.close() + except Exception: + logger.exception("Dispatch failed for %s", task.id) + + return dispatched + + async def _dispatch_reviews(self, db_path: Path, + project_id: str) -> List[str]: + """扫描 review 状态任务,检查是否有产出,调度审查 Agent""" + queries = Queries(db_path) + bb = Blackboard(db_path) + review_tasks = queries.tasks_by_status("review") + dispatched: List[str] = [] + + for task in review_tasks: + # 检查是否已有 review 记录 + reviews = bb.get_reviews(task.id) + if reviews: + continue # 已有审查,跳过 + + # 检查是否有产出(司马懿建议:无产出直接标 failed) + outputs = bb.get_outputs(task.id) + if not outputs: + conn = get_connection(db_path) + try: + self._transition_status( + conn, task.id, "failed", + agent="daemon", + detail={"reason": "no_output_for_review"}, + ) + bb.add_observation( + task.id, "daemon", + "任务进入 review 状态但没有产出物,自动标记为 failed", + ) + logger.warning("Task %s in review but no output, marking failed", + task.id) + finally: + conn.close() + continue + + # 调度审查 Agent(司马懿) + try: + result = await self.dispatcher.dispatch( + task, + action_type="review", + project_config={"project_id": project_id}, + ) + if result["status"] == "dispatched": + dispatched.append(task.id) + logger.info("Dispatched review for %s to %s", + task.id, result["agent_id"]) + except Exception: + logger.exception("Review dispatch failed for %s", task.id) + + return dispatched + + # ------------------------------------------------------------------ + # 僵尸/超时处理 + # ------------------------------------------------------------------ + + def _check_timeouts(self, db_path: Path) -> List[str]: + """检查 claimed/working 超时的任务""" + queries = Queries(db_path) + reclaimed: List[str] = [] + now = datetime.utcnow() + + # claimed 超时 → 重置为 pending + claimed = queries.tasks_by_status("claimed") + for task in claimed: + if not task.claimed_at: + continue + try: + claimed_time = datetime.fromisoformat(task.claimed_at) + elapsed = (now - claimed_time).total_seconds() / 60.0 + if elapsed > self.claim_timeout_minutes: + conn = get_connection(db_path) + try: + ok = self._transition_status( + conn, task.id, "pending", + agent="daemon", + detail={"reason": "claim_timeout", + "elapsed_minutes": round(elapsed, 1)}, + ) + if ok: + reclaimed.append(task.id) + logger.info("Reclaimed %s: claimed → pending (timeout %.1fm)", + task.id, elapsed) + finally: + conn.close() + except (ValueError, TypeError): + pass + + # working 超时 → 标记为 failed + working = queries.tasks_by_status("working") + for task in working: + start_time_str = task.started_at or task.claimed_at + if not start_time_str: + continue + try: + start_time = datetime.fromisoformat(start_time_str) + # per-task timeout: deadline 优先,否则用默认值 + if task.deadline: + deadline_time = datetime.fromisoformat(task.deadline) + timeout_minutes = (deadline_time - start_time).total_seconds() / 60.0 + if timeout_minutes < 1: + timeout_minutes = self.default_task_timeout_minutes + else: + timeout_minutes = self.default_task_timeout_minutes + + elapsed = (now - start_time).total_seconds() / 60.0 + if elapsed > timeout_minutes: + conn = get_connection(db_path) + try: + ok = self._transition_status( + conn, task.id, "failed", + agent="daemon", + detail={"reason": "task_timeout", + "elapsed_minutes": round(elapsed, 1), + "timeout_minutes": round(timeout_minutes, 1)}, + ) + if ok: + reclaimed.append(task.id) + logger.warning("Task %s timed out (working %.1fm > %.1fm)", + task.id, elapsed, timeout_minutes) + finally: + conn.close() + except (ValueError, TypeError): + pass + + return reclaimed + # ------------------------------------------------------------------ # 手动 tick(API 端点触发) # ------------------------------------------------------------------