auto-sync: 2026-06-01 11:42:51

This commit is contained in:
cfdaily
2026-06-01 11:42:51 +08:00
parent 88312db0e6
commit ccd69b21c3
+57 -13
View File
@@ -1,10 +1,11 @@
# #06 PM2 Crash 恢复设计
> 版本: v1.0
> 版本: v1.1
> 日期: 2026-06-01
> 作者: 庞统(副军师)
> 状态: 评审
> 状态: 二次评审
> 前置: spawner-monitor-design.md §5 A0Agent crash 恢复)
> 变更: v1.1 纳入司马懿评审意见(Blocking #1 reviewing 状态 + Blocking #3 虚拟项目 + 建议 #2 审计事件)
---
@@ -74,9 +75,10 @@ PM2 启动 moziplus v2
→ ticker.__init__()
→ ticker.start()
→ _startup_recover() ← 新增,在首次 tick 前执行
→ 遍历所有项目的 blackboard.db
→ 找出所有非终态任务(claimed/working/review
→ 遍历所有项目(含 _general/_mail 虚拟项目)的 blackboard.db
→ 找出所有非终态任务(claimed/working/review/reviewing
→ 对每个任务:收集黑板线索 → 判断真实状态 → 执行恢复
→ 对保持原状的任务也写审计事件
→ 正常 tick 循环开始
```
@@ -162,6 +164,18 @@ working + last_attempt.completed?
- 无结论 → ticker 的 `_dispatch_reviews` 会在下个 tick 自然尝试 dispatch reviewer
- `_check_recent_routing` 的 5 分钟窗口已过期(PM2 重启间隔 > 5 分钟),不会阻止重新 dispatch
#### reviewing 状态
**含义**parent task 进入庞统 round review 的中间状态(`done/failed → reviewing`)。
**恢复逻辑**:推回 `working`,让 ticker 重新触发 `_check_round_complete`
**原因**reviewing 状态依赖 `_spawn_pangtong_review` 的回调 `_handle_review_conclusion` 来推进。PM2 crash 后回调丢失,`_handle_review_conclusion` 永不触发,parent 永久卡死。推回 working 后,下个 tick 的 `_check_round_complete` 检查 sub task 全部终态 + parent 状态非 done/failed → 跳过。但 `_check_round_complete` 只在 `parent_status in (done, failed)` 时触发,所以需要推回 working 后手动触发一次,或者直接推回 done(让 `_check_round_complete` 自然重新触发 review spawn)。
**最终策略**:推回 `done`(恢复到 reviewing 之前的状态),`_check_round_complete` 会自然重新触发庞统 review。
> 注:reviewing 状态是从 done/failed 转入的(`_set_parent_reviewing`),所以推回 done 是合法的(reviewing → done 在 VALID_TRANSITIONS 中)。但更安全的做法是查 events 表看 reviewing 之前是什么状态。简化起见直接推 done。
#### escalated / waiting_human / paused 状态
**含义**:需要人工介入。
@@ -179,16 +193,24 @@ working + last_attempt.completed?
def _startup_recover(self) -> Dict[str, Any]:
"""PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态"""
NON_TERMINAL = {"claimed", "working", "review"}
NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
projects = self.registry.list_projects()
recovery_report = {"projects": {}, "total_recovered": 0}
recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0}
# 收集所有需要扫描的项目(registry + 虚拟项目)
project_dirs = {}
for project_id, project_info in projects.items():
if project_info.get("status") != "active":
continue
project_dir = self.registry.root / project_id
db_path = project_dir / "blackboard.db"
if project_info.get("status") == "active":
project_dirs[project_id] = self.registry.root / project_id / "blackboard.db"
# 虚拟项目
for virtual_id in ("_general", "_mail"):
virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
project_dirs[virtual_id] = virtual_db
for project_id, db_path in project_dirs.items():
if not db_path.exists():
continue
@@ -199,15 +221,19 @@ def _startup_recover(self) -> Dict[str, Any]:
init_db(db_path)
self._initialized_dbs.add(db_key)
recovered = self._recover_project(db_path, NON_TERMINAL)
recovered, noop_count = self._recover_project(db_path, NON_TERMINAL)
if recovered:
recovery_report["projects"][project_id] = recovered
recovery_report["total_recovered"] += len(recovered)
recovery_report["total_noop"] += noop_count
if recovery_report["total_recovered"] > 0:
logger.info("Startup recovery: %d tasks recovered across %d projects",
recovery_report["total_recovered"],
len(recovery_report["projects"]))
elif recovery_report["total_noop"] > 0:
logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
recovery_report["total_noop"])
else:
logger.info("Startup recovery: no non-terminal tasks found, clean start")
@@ -218,6 +244,7 @@ def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]:
"""恢复单个项目中的非终态任务"""
conn = get_connection(db_path)
recovered = []
noop_count = 0
try:
for status in non_terminal:
@@ -231,10 +258,19 @@ def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]:
if action:
self._execute_recovery(conn, task["id"], action, db_path)
recovered.append({"task_id": task["id"], "from": status, "action": action})
else:
# 审计:保持原状的任务也记录事件
noop_count += 1
conn.execute(
"INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
(task["id"], "daemon", "startup_recovery_noop",
json.dumps({"status": status, "reason": "no_action_needed"}))
)
conn.commit()
finally:
conn.close()
return recovered
return recovered, noop_count
def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> Optional[str]:
@@ -251,6 +287,10 @@ def _determine_recovery_action(self, conn, task, status: str, db_path: Path) ->
if status == "review":
return self._recover_review_task(conn, task_id)
if status == "reviewing":
# reviewing → done,让 _check_round_complete 重新触发庞统 review
return "push_to_done"
return None
@@ -301,6 +341,7 @@ def _recover_review_task(self, conn, task_id: str) -> Optional[str]:
return "push_to_pending"
# 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
# (审计事件由 _recover_project 中的 noop 分支记录)
return None
@@ -356,6 +397,7 @@ async def start(self) -> None:
self._running = True
# 启动恢复(PM2 crash 后重建一致状态)
# 同步调用是有意为之:恢复必须在事件循环启动前完成,耗时 < 1s
self._startup_recover()
self._task = asyncio.create_task(self._loop())
@@ -382,7 +424,7 @@ async def start(self) -> None:
| | Fix-3b(旧) | 本方案(新) |
|---|---|---|
| 触发时机 | 每 tick 检查 | 仅启动时一次 |
| 覆盖状态 | 仅 review | claimed + working + review |
| 覆盖状态 | 仅 review | claimed + working + review + reviewing |
| 恢复策略 | 超时 → 全部推 pending | Event Sourcing → 精确恢复 |
| 误判风险 | 高(agent busy 被误判) | 低(基于黑板事实判断) |
| 运行时开销 | 每 tick 扫描 review 任务 | 仅启动时执行一次 |
@@ -414,3 +456,5 @@ async def start(self) -> None:
| review + needs_revision | reviewer 标 needs_revision 后 PM2 crash | → pending |
| review + 无结论 | reviewer 从未 dispatch + PM2 crash | 保持 reviewticker 自然 dispatch |
| review + outcome=skipped | agent busy 导致从未 dispatch | 保持 reviewticker 自然 dispatch |
| reviewing 任务 | PM2 crash 在庞统 round review 中 | → doneticker 重新触发 round review |
| 虚拟项目 _mail | PM2 crash 时有 working 邮件 | 按 working 策略恢复 |