diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index a32938c..0140af5 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -1422,6 +1422,245 @@ Parent Task ID: {parent_task.id} except Exception: return False + # ------------------------------------------------------------------ + # PM2 Crash 启动恢复 (#06) + # ------------------------------------------------------------------ + + def _startup_recover(self) -> Dict[str, Any]: + """PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态""" + NON_TERMINAL = {"claimed", "working", "review", "reviewing"} + + projects = self.registry.list_projects() + recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0} + + # 收集所有需要扫描的项目(registry + 虚拟项目) + project_dirs = {} + for project_id, project_info in projects.items(): + if project_info.get("status") == "active": + project_dirs[project_id] = self.registry.root / project_id / "blackboard.db" + + # 虚拟项目 + for virtual_id in ("_general", "_mail"): + virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db" + if virtual_db.exists() and virtual_id not in project_dirs: + project_dirs[virtual_id] = virtual_db + + for project_id, db_path in project_dirs.items(): + if not db_path.exists(): + continue + + # init_db 如果还没初始化 + db_key = str(db_path) + if db_key not in self._initialized_dbs: + from src.blackboard.db import init_db + init_db(db_path) + self._initialized_dbs.add(db_key) + + recovered, noop_count = self._recover_project(db_path, NON_TERMINAL) + if recovered: + recovery_report["projects"][project_id] = recovered + recovery_report["total_recovered"] += len(recovered) + recovery_report["total_noop"] += noop_count + + if recovery_report["total_recovered"] > 0: + logger.info("Startup recovery: %d tasks recovered across %d projects", + recovery_report["total_recovered"], + len(recovery_report["projects"])) + elif recovery_report["total_noop"] > 0: + logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)", + recovery_report["total_noop"]) + else: + logger.info("Startup recovery: no non-terminal tasks found, clean start") + + return recovery_report + + def _recover_project(self, db_path: Path, non_terminal: set) -> tuple: + """恢复单个项目中的非终态任务 + + Returns: + (recovered_list, noop_count) + """ + conn = get_connection(db_path) + recovered = [] + noop_count = 0 + + try: + for status in non_terminal: + rows = conn.execute( + "SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?", + (status,) + ).fetchall() + + for task in rows: + action = self._determine_recovery_action(conn, task, status, db_path) + if action: + self._execute_recovery(conn, task["id"], action, db_path) + recovered.append({"task_id": task["id"], "from": status, "action": action}) + else: + # 审计:保持原状的任务也记录事件 + noop_count += 1 + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", + (task["id"], "daemon", "startup_recovery_noop", + json.dumps({"status": status, "reason": "no_action_needed"})) + ) + conn.commit() + finally: + conn.close() + + return recovered, noop_count + + def _determine_recovery_action(self, conn, task, status: str, + db_path: Path) -> Optional[str]: + """根据黑板线索决定恢复动作,返回 None 表示不需要干预""" + task_id = task["id"] + + if status == "claimed": + # claimed 统一推 pending + return "push_to_pending" + + if status == "working": + return self._recover_working_task(conn, task_id) + + if status == "review": + return self._recover_review_task(conn, task_id) + + if status == "reviewing": + # reviewing → 查 events 找前置状态(done 或 failed),精确恢复 + prev_status = self._find_pre_reviewing_status(conn, task_id) + if prev_status == "failed": + return "push_to_failed" + return "push_to_done" # 兜底 done + + return None + + def _recover_working_task(self, conn, task_id: str) -> Optional[str]: + """working 状态恢复:看黑板线索判断 agent 是否实际完成了工作""" + # 最后一次 attempt + last_attempt = conn.execute( + "SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1", + (task_id,) + ).fetchone() + + if not last_attempt or last_attempt["outcome"] != "completed": + return "push_to_pending_keep_agent" # 保留 current_agent + + # agent 正常退出,看是否有产出 + has_output = conn.execute( + "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,) + ).fetchone() is not None + + if not has_output: + return "push_to_pending_keep_agent" + + # 看是否有 handoff + has_handoff = conn.execute( + "SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1", + (task_id,) + ).fetchone() is not None + + if has_handoff: + return "push_to_review" # agent 已完成工作并交接 + + return "push_to_pending_keep_agent" + + def _recover_review_task(self, conn, task_id: str) -> Optional[str]: + """review 状态恢复:看是否有审查结论""" + # 检查 reviews 表 + review = conn.execute( + "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", + (task_id,) + ).fetchone() + + if review: + if review["verdict"] == "approved": + return "push_to_done" + else: + # needs_revision / rejected → 推回 pending 重新执行 + return "push_to_pending" + + # 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer + return None + + def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): + """执行恢复动作""" + # 获取原始状态(用于审计) + orig_row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + orig_status = orig_row["status"] if orig_row else "unknown" + + if action == "push_to_pending": + self._transition_status( + conn, task_id, "pending", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": orig_status}, + ) + # 清空 current_agent(常规推 pending,无特定 agent 接手) + conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) + conn.commit() + + elif action == "push_to_pending_keep_agent": + self._transition_status( + conn, task_id, "pending", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": orig_status}, + ) + # 保留 current_agent,让同一 agent 重新接手 + conn.commit() + + elif action == "push_to_review": + self._transition_status( + conn, task_id, "review", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": "working"}, + ) + conn.commit() + + elif action == "push_to_done": + self._transition_status( + conn, task_id, "done", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": orig_status}, + ) + conn.commit() + + elif action == "push_to_failed": + self._transition_status( + conn, task_id, "failed", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": orig_status}, + ) + conn.commit() + + # 记录恢复审计事件 + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", + (task_id, "daemon", "startup_recovery", json.dumps({"action": action})) + ) + conn.commit() + + logger.info("Recovery: task %s → %s (action=%s)", task_id, action, action) + + def _find_pre_reviewing_status(self, conn, task_id: str) -> str: + """查 events 表找到 reviewing 之前的状态(done 或 failed)""" + rows = conn.execute( + """SELECT detail FROM events + WHERE task_id=? AND event_type='task_status_changed' + ORDER BY id DESC LIMIT 10""", + (task_id,) + ).fetchall() + + for event in rows: + try: + detail = json.loads(event["detail"]) + if detail.get("new_status") == "reviewing": + return detail.get("old_status", "done") # 兜底 done + except (json.JSONDecodeError, KeyError): + continue + + return "done" # 找不到则兜底 done(reviewing 只从 done/failed 转入) + # ------------------------------------------------------------------ # 手动 tick(API 端点触发) # ------------------------------------------------------------------