diff --git a/docs/design/06-pm2-crash-recovery.md b/docs/design/06-pm2-crash-recovery.md new file mode 100644 index 0000000..354b7e6 --- /dev/null +++ b/docs/design/06-pm2-crash-recovery.md @@ -0,0 +1,416 @@ +# #06 PM2 Crash 恢复设计 + +> 版本: v1.0 +> 日期: 2026-06-01 +> 作者: 庞统(副军师) +> 状态: 待评审 +> 前置: spawner-monitor-design.md §5 A0(Agent crash 恢复) + +--- + +## 一、问题陈述 + +### 1.1 两个层面的 crash + +| 层面 | 触发 | 现有恢复机制 | +|------|------|-------------| +| **Agent crash**(子进程退出) | exit≠0、Gateway timeout、compact 异常 | §5 A0:spawner monitor 检测 → counter.release → ticker 重新 dispatch | +| **PM2 crash**(moziplus 进程重启) | PM2 重启、服务器重启、OOM kill | ❌ **无恢复机制** | + +Agent crash 的设计(§5 A0)是完整的:进程退出 → counter.release → 任务保持当前状态 → ticker 重新 dispatch。这不需要 Fix-3b。 + +PM2 crash 后,内存状态全部丢失: +- `_sessions`(spawner 进程跟踪)→ 空 +- `counter`(并发控制)→ 重置 +- ticker 不再知道哪些任务有活跃进程 + +但 **DB 是持久化的**——tasks/events/task_attempts/routing_decisions/comments/outputs/reviews 全部完好。 + +### 1.2 Fix-3b 的问题 + +v2.8.1 Fix-3b 试图用"15 分钟超时推 pending"近似 PM2 crash 恢复,但: + +1. **Agent crash 不需要它**——§5 A0 已覆盖 +2. **PM2 crash 后首次 tick 就应该恢复**——不应该等 15 分钟 +3. **误判场景**——agent busy(未 dispatch)时 Fix-3b 误判为超时,把正在排队的任务推回 pending +4. **违反"spawner 无状态"原则**——spawner 是执行者,不应该有状态判断逻辑 + +**结论**:回滚 Fix-3b,用专门的启动恢复机制替代。 + +--- + +## 二、设计原则 + +1. **黑板是唯一真相源**——所有恢复线索来自 DB,不依赖内存状态 +2. **Event Sourcing 思路**——events 表是不可变事件日志,恢复 = 从事件重建最新一致状态 +3. **精确恢复优于粗暴重置**——利用黑板线索(outputs/comments/reviews/routing_decisions)判断任务真实进度,尽量让任务从断点继续 +4. **只运行一次**——`_startup_recover()` 在 ticker 首次 tick 前执行一次,不影响运行时性能 + +--- + +## 三、恢复数据源 + +每个非终态任务可查询的线索: + +| 数据源 | 表 | 可回答的问题 | +|--------|-----|-------------| +| 任务当前状态 | tasks | status / assignee / current_agent / updated_at | +| 事件日志 | events | 所有状态变化、产出、广播记录(不可变) | +| 执行记录 | task_attempts | 每次 spawn 的 outcome(completed/crashed/timed_out) | +| 路由记录 | routing_decisions | 每次 dispatch 的结果(dispatched/skipped/error) | +| 产出物 | outputs | agent 产出了什么 | +| 交接记录 | comments (handoff) | 执行者是否完成并交接 | +| 审查结论 | reviews | reviewer 是否已给出结论 | +| 审查评论 | comments (review) | reviewer 的审查意见 | + +--- + +## 四、恢复策略 + +### 4.1 总体流程 + +``` +PM2 启动 moziplus v2 + → ticker.__init__() + → ticker.start() + → _startup_recover() ← 新增,在首次 tick 前执行 + → 遍历所有项目的 blackboard.db + → 找出所有非终态任务(claimed/working/review) + → 对每个任务:收集黑板线索 → 判断真实状态 → 执行恢复 + → 正常 tick 循环开始 +``` + +### 4.2 按状态恢复规则 + +#### claimed 状态 + +**含义**:任务已广播,等待 agent 认领。 + +**恢复逻辑**: + +``` +看 task_attempts 最后一条: + - 无记录 → agent 还没 spawn 过 → 推回 pending 重新 broadcast + - outcome=completed → agent 执行完 NO_REPLY 就走了 → 推回 pending 重新 broadcast + - outcome=crashed → agent crash 了 → 推回 pending 重新 broadcast +``` + +**结论**:claimed 状态统一推回 pending。claimed 语义就是"等待认领",没有需要保留的执行进度。 + +#### working 状态 + +**含义**:agent 正在执行。 + +**恢复逻辑**: + +``` +1. 看 task_attempts 最后一条: + - 无记录 → agent 还没 spawn → 推回 pending + - outcome=crashed / agent_failed → agent crash → 推回 pending + - outcome=completed → agent 正常退出,看进一步线索: + +2. 看 outputs 表:是否有产出物? + - 有产出 → agent 可能已完成工作但没标 review + +3. 看 comments (handoff):是否有 handoff 评论? + - 有 handoff + 有 output → agent 已完成 → 恢复为 review + - 有 handoff + 无 output → 产出可能丢失 → 推回 pending + - 无 handoff → agent 中途完成但没交接 → 推回 pending + +4. 看 comments (review):是否已有审查结论? + - 有 approved → 恢复为 done +``` + +**决策树**: + +``` +working + last_attempt.completed? + ├─ No → pending + └─ Yes + has_output? + ├─ No → pending + └─ Yes + has_handoff? + ├─ No → pending(可能标了 working 就 timeout 退出了,产出残留) + └─ Yes → review(agent 已完成工作并交接) +``` + +**注意**:`pending` 推回后 assignee 被清空,ticker 自然重新 broadcast 或 dispatch。如果原来有 assignee,agent 会看到之前的 outputs 和 comments(黑板数据不丢失),可以选择继续或重新认领。 + +#### review 状态 + +**含义**:执行者已交接,等待审查。 + +**恢复逻辑**: + +``` +1. 看 reviews 表:是否已有审查结论? + - approved → 恢复为 done + - needs_revision → 恢复为 pending(让 dispatcher 重新 dispatch executor) + - rejected → 恢复为 pending + +2. 看 routing_decisions 最后一条(from_status=review): + - outcome=dispatched → reviewer 已被 dispatch 过 + - 看 task_attempts 最后一条(agent=reviewer): + - completed → reviewer 完成 but 结论没写入 reviews 表 → 保持 review,等 ticker 重新 dispatch review + - crashed → reviewer crash → 保持 review,等 ticker 自然 dispatch(Fix-3c crash 计数仍然有效) + - 无记录 → spawn 可能失败 → 保持 review + - outcome=skipped → reviewer 从未成功 dispatch(agent busy)→ 保持 review,ticker 自然会重试 + - 无 routing_decision → review 刚进入,还没 dispatch → 保持 review +``` + +**关键**:review 状态**默认保持 review**,不推 pending。因为: +- reviews 表有结论 → 直接标 done/pending +- 无结论 → ticker 的 `_dispatch_reviews` 会在下个 tick 自然尝试 dispatch reviewer +- `_check_recent_routing` 的 5 分钟窗口已过期(PM2 重启间隔 > 5 分钟),不会阻止重新 dispatch + +#### escalated / waiting_human / paused 状态 + +**含义**:需要人工介入。 + +**恢复逻辑**:不干预。这些状态是人为设定的,保持不变。 + +--- + +## 五、实现设计 + +### 5.1 新增方法 + +```python +# ticker.py + +def _startup_recover(self) -> Dict[str, Any]: + """PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态""" + NON_TERMINAL = {"claimed", "working", "review"} + + projects = self.registry.list_projects() + recovery_report = {"projects": {}, "total_recovered": 0} + + for project_id, project_info in projects.items(): + if project_info.get("status") != "active": + continue + project_dir = self.registry.root / project_id + db_path = project_dir / "blackboard.db" + if not db_path.exists(): + continue + + # init_db 如果还没初始化 + db_key = str(db_path) + if db_key not in self._initialized_dbs: + from src.blackboard.db import init_db + init_db(db_path) + self._initialized_dbs.add(db_key) + + recovered = self._recover_project(db_path, NON_TERMINAL) + if recovered: + recovery_report["projects"][project_id] = recovered + recovery_report["total_recovered"] += len(recovered) + + if recovery_report["total_recovered"] > 0: + logger.info("Startup recovery: %d tasks recovered across %d projects", + recovery_report["total_recovered"], + len(recovery_report["projects"])) + else: + logger.info("Startup recovery: no non-terminal tasks found, clean start") + + return recovery_report + + +def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]: + """恢复单个项目中的非终态任务""" + conn = get_connection(db_path) + recovered = [] + + try: + for status in non_terminal: + rows = conn.execute( + "SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?", + (status,) + ).fetchall() + + for task in rows: + action = self._determine_recovery_action(conn, task, status, db_path) + if action: + self._execute_recovery(conn, task["id"], action, db_path) + recovered.append({"task_id": task["id"], "from": status, "action": action}) + finally: + conn.close() + + return recovered + + +def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> Optional[str]: + """根据黑板线索决定恢复动作,返回 None 表示不需要干预""" + task_id = task["id"] + + if status == "claimed": + # claimed 统一推 pending + return "push_to_pending" + + if status == "working": + return self._recover_working_task(conn, task_id) + + if status == "review": + return self._recover_review_task(conn, task_id) + + return None + + +def _recover_working_task(self, conn, task_id: str) -> Optional[str]: + """working 状态恢复:看黑板线索判断 agent 是否实际完成了工作""" + # 最后一次 attempt + last_attempt = conn.execute( + "SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1", + (task_id,) + ).fetchone() + + if not last_attempt or last_attempt["outcome"] != "completed": + return "push_to_pending" + + # agent 正常退出,看是否有产出 + has_output = conn.execute( + "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1", (task_id,) + ).fetchone() is not None + + if not has_output: + return "push_to_pending" + + # 看是否有 handoff + has_handoff = conn.execute( + "SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1", + (task_id,) + ).fetchone() is not None + + if has_handoff: + return "push_to_review" # agent 已完成工作并交接 + + return "push_to_pending" + + +def _recover_review_task(self, conn, task_id: str) -> Optional[str]: + """review 状态恢复:看是否有审查结论""" + # 检查 reviews 表 + review = conn.execute( + "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", + (task_id,) + ).fetchone() + + if review: + if review["verdict"] == "approved": + return "push_to_done" + else: + # needs_revision / rejected → 推回 pending 重新执行 + return "push_to_pending" + + # 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer + return None + + +def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path): + """执行恢复动作""" + from src.blackboard.db import get_connection as _get_conn + + if action == "push_to_pending": + self._transition_status( + conn, task_id, "pending", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": + conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]}, + ) + # 清空 current_agent + conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,)) + conn.commit() + + elif action == "push_to_review": + # working → review,恢复 assignee 保持不变 + self._transition_status( + conn, task_id, "review", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": "working"}, + ) + conn.commit() + + elif action == "push_to_done": + self._transition_status( + conn, task_id, "done", + agent="daemon", + detail={"reason": "startup_recovery", "original_status": "review"}, + ) + conn.commit() + + # 记录恢复事件 + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", + (task_id, "daemon", "startup_recovery", json.dumps({"action": action})) + ) + conn.commit() + + logger.info("Recovery: task %s → %s (action=%s)", task_id, action, action) +``` + +### 5.2 调用时机 + +```python +# ticker.py start() 方法 +async def start(self) -> None: + if self._running: + return + self._running = True + + # 启动恢复(PM2 crash 后重建一致状态) + self._startup_recover() + + self._task = asyncio.create_task(self._loop()) + # ... InboxWatcher start ... +``` + +### 5.3 回滚 Fix-3b + +删除 `ticker.py` `_check_timeouts` 方法中 `# v2.8.1 Fix-3b` 到 `logger.exception("Failed to reclaim review task %s", task.id)` 的整块代码(约 46 行)。 + +### 5.4 改动总览 + +| 文件 | 改动 | 行数 | +|------|------|------| +| `ticker.py` | 新增 `_startup_recover` / `_recover_project` / `_determine_recovery_action` / `_recover_working_task` / `_recover_review_task` / `_execute_recovery` | ~120 行 | +| `ticker.py` | `start()` 中调用 `_startup_recover()` | 2 行 | +| `ticker.py` | 删除 Fix-3b 代码块 | -46 行 | +| **净增** | | **~76 行** | + +--- + +## 六、设计对比 + +| | Fix-3b(旧) | 本方案(新) | +|---|---|---| +| 触发时机 | 每 tick 检查 | 仅启动时一次 | +| 覆盖状态 | 仅 review | claimed + working + review | +| 恢复策略 | 超时 → 全部推 pending | Event Sourcing → 精确恢复 | +| 误判风险 | 高(agent busy 被误判) | 低(基于黑板事实判断) | +| 运行时开销 | 每 tick 扫描 review 任务 | 仅启动时执行一次 | +| Agent crash 处理 | 混在 Fix-3b 里 | 明确回归 §5 A0 | + +--- + +## 七、优秀实践参考 + +| 实践 | 来源 | 我们的对应 | +|------|------|-----------| +| Postgres-backed checkpointer | LangGraph | events 表 = 天然的 checkpoint 日志 | +| 不可变事件日志,写入即审计 | #04 黑板设计原则 | 恢复基于 events + 关联表 | +| 可预测骨架 + LLM 动态填充 | 调研共识 | 恢复是确定性的骨架逻辑,不调 LLM | +| propose→validate→commit | Network-AI | 恢复动作通过 _transition_status 验证合法性 | + +--- + +## 八、测试计划 + +| 用例 | 模拟方式 | 预期 | +|------|---------|------| +| 干净启动 | 无非终态任务 | 无恢复动作,正常启动 | +| claimed 任务 | PM2 重启时有一个 claimed 任务 | → pending 重新 broadcast | +| working + 无产出 | agent crash 后 PM2 重启 | → pending 重新 dispatch | +| working + 有产出 + 有 handoff | agent 完成但 PM2 crash | → review | +| working + 有产出 + 无 handoff | agent 中途完成退出 | → pending | +| review + approved | reviewer 标 approved 后 PM2 crash | → done | +| review + needs_revision | reviewer 标 needs_revision 后 PM2 crash | → pending | +| review + 无结论 | reviewer 从未 dispatch + PM2 crash | 保持 review,ticker 自然 dispatch | +| review + outcome=skipped | agent busy 导致从未 dispatch | 保持 review,ticker 自然 dispatch |