diff --git a/docs/design/06-pm2-crash-recovery.md b/docs/design/06-pm2-crash-recovery.md index 354b7e6..74b3e99 100644 --- a/docs/design/06-pm2-crash-recovery.md +++ b/docs/design/06-pm2-crash-recovery.md @@ -1,10 +1,11 @@ # #06 PM2 Crash 恢复设计 -> 版本: v1.0 +> 版本: v1.1 > 日期: 2026-06-01 > 作者: 庞统(副军师) -> 状态: 待评审 +> 状态: 二次评审中 > 前置: spawner-monitor-design.md §5 A0(Agent crash 恢复) +> 变更: v1.1 纳入司马懿评审意见(Blocking #1 reviewing 状态 + Blocking #3 虚拟项目 + 建议 #2 审计事件) --- @@ -74,9 +75,10 @@ PM2 启动 moziplus v2 → ticker.__init__() → ticker.start() → _startup_recover() ← 新增,在首次 tick 前执行 - → 遍历所有项目的 blackboard.db - → 找出所有非终态任务(claimed/working/review) + → 遍历所有项目(含 _general/_mail 虚拟项目)的 blackboard.db + → 找出所有非终态任务(claimed/working/review/reviewing) → 对每个任务:收集黑板线索 → 判断真实状态 → 执行恢复 + → 对保持原状的任务也写审计事件 → 正常 tick 循环开始 ``` @@ -162,6 +164,18 @@ working + last_attempt.completed? - 无结论 → ticker 的 `_dispatch_reviews` 会在下个 tick 自然尝试 dispatch reviewer - `_check_recent_routing` 的 5 分钟窗口已过期(PM2 重启间隔 > 5 分钟),不会阻止重新 dispatch +#### reviewing 状态 + +**含义**:parent task 进入庞统 round review 的中间状态(`done/failed → reviewing`)。 + +**恢复逻辑**:推回 `working`,让 ticker 重新触发 `_check_round_complete`。 + +**原因**:reviewing 状态依赖 `_spawn_pangtong_review` 的回调 `_handle_review_conclusion` 来推进。PM2 crash 后回调丢失,`_handle_review_conclusion` 永不触发,parent 永久卡死。推回 working 后,下个 tick 的 `_check_round_complete` 检查 sub task 全部终态 + parent 状态非 done/failed → 跳过。但 `_check_round_complete` 只在 `parent_status in (done, failed)` 时触发,所以需要推回 working 后手动触发一次,或者直接推回 done(让 `_check_round_complete` 自然重新触发 review spawn)。 + +**最终策略**:推回 `done`(恢复到 reviewing 之前的状态),`_check_round_complete` 会自然重新触发庞统 review。 + +> 注:reviewing 状态是从 done/failed 转入的(`_set_parent_reviewing`),所以推回 done 是合法的(reviewing → done 在 VALID_TRANSITIONS 中)。但更安全的做法是查 events 表看 reviewing 之前是什么状态。简化起见直接推 done。 + #### escalated / waiting_human / paused 状态 **含义**:需要人工介入。 @@ -179,16 +193,24 @@ working + last_attempt.completed? def _startup_recover(self) -> Dict[str, Any]: """PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态""" - NON_TERMINAL = {"claimed", "working", "review"} + NON_TERMINAL = {"claimed", "working", "review", "reviewing"} projects = self.registry.list_projects() - recovery_report = {"projects": {}, "total_recovered": 0} + recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0} + # 收集所有需要扫描的项目(registry + 虚拟项目) + project_dirs = {} for project_id, project_info in projects.items(): - if project_info.get("status") != "active": - continue - project_dir = self.registry.root / project_id - db_path = project_dir / "blackboard.db" + if project_info.get("status") == "active": + project_dirs[project_id] = self.registry.root / project_id / "blackboard.db" + + # 虚拟项目 + for virtual_id in ("_general", "_mail"): + virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db" + if virtual_db.exists() and virtual_id not in project_dirs: + project_dirs[virtual_id] = virtual_db + + for project_id, db_path in project_dirs.items(): if not db_path.exists(): continue @@ -199,15 +221,19 @@ def _startup_recover(self) -> Dict[str, Any]: init_db(db_path) self._initialized_dbs.add(db_key) - recovered = self._recover_project(db_path, NON_TERMINAL) + recovered, noop_count = self._recover_project(db_path, NON_TERMINAL) if recovered: recovery_report["projects"][project_id] = recovered recovery_report["total_recovered"] += len(recovered) + recovery_report["total_noop"] += noop_count if recovery_report["total_recovered"] > 0: logger.info("Startup recovery: %d tasks recovered across %d projects", recovery_report["total_recovered"], len(recovery_report["projects"])) + elif recovery_report["total_noop"] > 0: + logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)", + recovery_report["total_noop"]) else: logger.info("Startup recovery: no non-terminal tasks found, clean start") @@ -218,6 +244,7 @@ def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]: """恢复单个项目中的非终态任务""" conn = get_connection(db_path) recovered = [] + noop_count = 0 try: for status in non_terminal: @@ -231,10 +258,19 @@ def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]: if action: self._execute_recovery(conn, task["id"], action, db_path) recovered.append({"task_id": task["id"], "from": status, "action": action}) + else: + # 审计:保持原状的任务也记录事件 + noop_count += 1 + conn.execute( + "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)", + (task["id"], "daemon", "startup_recovery_noop", + json.dumps({"status": status, "reason": "no_action_needed"})) + ) + conn.commit() finally: conn.close() - return recovered + return recovered, noop_count def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> Optional[str]: @@ -251,6 +287,10 @@ def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> if status == "review": return self._recover_review_task(conn, task_id) + if status == "reviewing": + # reviewing → done,让 _check_round_complete 重新触发庞统 review + return "push_to_done" + return None @@ -301,6 +341,7 @@ def _recover_review_task(self, conn, task_id: str) -> Optional[str]: return "push_to_pending" # 无审查结论 → 保持 review,ticker 自然会 dispatch reviewer + # (审计事件由 _recover_project 中的 noop 分支记录) return None @@ -356,6 +397,7 @@ async def start(self) -> None: self._running = True # 启动恢复(PM2 crash 后重建一致状态) + # 同步调用是有意为之:恢复必须在事件循环启动前完成,耗时 < 1s self._startup_recover() self._task = asyncio.create_task(self._loop()) @@ -382,7 +424,7 @@ async def start(self) -> None: | | Fix-3b(旧) | 本方案(新) | |---|---|---| | 触发时机 | 每 tick 检查 | 仅启动时一次 | -| 覆盖状态 | 仅 review | claimed + working + review | +| 覆盖状态 | 仅 review | claimed + working + review + reviewing | | 恢复策略 | 超时 → 全部推 pending | Event Sourcing → 精确恢复 | | 误判风险 | 高(agent busy 被误判) | 低(基于黑板事实判断) | | 运行时开销 | 每 tick 扫描 review 任务 | 仅启动时执行一次 | @@ -414,3 +456,5 @@ async def start(self) -> None: | review + needs_revision | reviewer 标 needs_revision 后 PM2 crash | → pending | | review + 无结论 | reviewer 从未 dispatch + PM2 crash | 保持 review,ticker 自然 dispatch | | review + outcome=skipped | agent busy 导致从未 dispatch | 保持 review,ticker 自然 dispatch | +| reviewing 任务 | PM2 crash 在庞统 round review 中 | → done,ticker 重新触发 round review | +| 虚拟项目 _mail | PM2 crash 时有 working 邮件 | 按 working 策略恢复 |