Files
sanguo_moziplus_v2/docs/design/06-pm2-crash-recovery.md
T
cfdaily eccb4d2723
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
docs: 设计文档编号重排(20→14, 24→15) + 已完成文档状态标注更新
2026-06-13 10:12:39 +08:00

20 KiB
Raw Blame History

#06 PM2 Crash 恢复设计

版本: v1.2
日期: 2026-06-03
作者: 庞统(副军师)
状态: 已完成(_startup_recover 7 个方法已实现)
前置: spawner-monitor-design.md §5 A0Agent crash 恢复)
变更: v1.2 两个关键改进:(1) working→pending 保留 current_agent 让同一 agent 接手;(2) reviewing 精确恢复到前置状态而非硬推 done


一、问题陈述

1.1 两个层面的 crash

层面 触发 现有恢复机制
Agent crash(子进程退出) exit≠0、Gateway timeout、compact 异常 §5 A0spawner monitor 检测 → counter.release → ticker 重新 dispatch
PM2 crashmoziplus 进程重启) PM2 重启、服务器重启、OOM kill 无恢复机制

Agent crash 的设计(§5 A0)是完整的:进程退出 → counter.release → 任务保持当前状态 → ticker 重新 dispatch。这不需要 Fix-3b。

PM2 crash 后,内存状态全部丢失:

  • _sessionsspawner 进程跟踪)→ 空
  • counter(并发控制)→ 重置
  • ticker 不再知道哪些任务有活跃进程

DB 是持久化的——tasks/events/task_attempts/routing_decisions/comments/outputs/reviews 全部完好。

1.2 Fix-3b 的问题

v2.8.1 Fix-3b 试图用"15 分钟超时推 pending"近似 PM2 crash 恢复,但:

  1. Agent crash 不需要它——§5 A0 已覆盖
  2. PM2 crash 后首次 tick 就应该恢复——不应该等 15 分钟
  3. 误判场景——agent busy(未 dispatch)时 Fix-3b 误判为超时,把正在排队的任务推回 pending
  4. 违反"spawner 无状态"原则——spawner 是执行者,不应该有状态判断逻辑

结论:回滚 Fix-3b,用专门的启动恢复机制替代。


二、设计原则

  1. 黑板是唯一真相源——所有恢复线索来自 DB,不依赖内存状态
  2. Event Sourcing 思路——events 表是不可变事件日志,恢复 = 从事件重建最新一致状态
  3. 精确恢复优于粗暴重置——利用黑板线索(outputs/comments/reviews/routing_decisions)判断任务真实进度,尽量让任务从断点继续
  4. 只运行一次——_startup_recover() 在 ticker 首次 tick 前执行一次,不影响运行时性能

三、恢复数据源

每个非终态任务可查询的线索:

数据源 可回答的问题
任务当前状态 tasks status / assignee / current_agent / updated_at
事件日志 events 所有状态变化、产出、广播记录(不可变)
执行记录 task_attempts 每次 spawn 的 outcomecompleted/crashed/timed_out
路由记录 routing_decisions 每次 dispatch 的结果(dispatched/skipped/error
产出物 outputs agent 产出了什么
交接记录 comments (handoff) 执行者是否完成并交接
审查结论 reviews reviewer 是否已给出结论
审查评论 comments (review) reviewer 的审查意见

四、恢复策略

4.1 总体流程

PM2 启动 moziplus v2
  → ticker.__init__()
  → ticker.start()
  → _startup_recover()  ← 新增,在首次 tick 前执行
    → 遍历所有项目(含 _general/_mail 虚拟项目)的 blackboard.db
    → 找出所有非终态任务(claimed/working/review/reviewing
    → 对每个任务:收集黑板线索 → 判断真实状态 → 执行恢复
    → 对保持原状的任务也写审计事件
  → 正常 tick 循环开始

4.2 按状态恢复规则

claimed 状态

含义:任务已广播,等待 agent 认领。

恢复逻辑

看 task_attempts 最后一条:
  - 无记录 → agent 还没 spawn 过 → 推回 pending 重新 broadcast
  - outcome=completed → agent 执行完 NO_REPLY 就走了 → 推回 pending 重新 broadcast
  - outcome=crashed → agent crash 了 → 推回 pending 重新 broadcast

结论claimed 状态统一推回 pending。claimed 语义就是"等待认领",没有需要保留的执行进度。

working 状态

含义agent 正在执行。

恢复逻辑

1. 看 task_attempts 最后一条:
   - 无记录 → agent 还没 spawn → 推回 pending
   - outcome=crashed / agent_failed → agent crash → 推回 pending
   - outcome=completed → agent 正常退出,看进一步线索:

2. 看 outputs 表:是否有产出物?
   - 有产出 → agent 可能已完成工作但没标 review

3. 看 comments (handoff):是否有 handoff 评论?
   - 有 handoff + 有 output → agent 已完成 → 恢复为 review
   - 有 handoff + 无 output → 产出可能丢失 → 推回 pending
   - 无 handoff → agent 中途完成但没交接 → 推回 pending

4. 看 comments (review):是否已有审查结论?
   - 有 approved → 恢复为 done

决策树

working + last_attempt.completed?
  ├─ No → pending(保留 current_agent
  └─ Yes + has_output?
       ├─ No → pending(保留 current_agent
       └─ Yes + has_handoff?
            ├─ No → pending(保留 current_agent
            └─ Yes → reviewagent 已完成工作并交接)

保留 current_agent 原则:推回 pending 时不清空 current_agent,让同一 agent 重新接手。原因:

  1. 该 agent 之前有对话上下文(现已丢失),但黑板上的 outputs/comments 是它写的,它最了解上下文
  2. Dispatcher 路由时如果 current_agent 仍在,直接复用,省去重新路由
  3. Agent 被 bootstrap 注入完整任务上下文(含之前的产出),自行决定继续还是重来
  4. 这是冗余操作,但保证状态一致

review 状态

含义:执行者已交接,等待审查。

恢复逻辑

1. 看 reviews 表:是否已有审查结论?
   - approved → 恢复为 done
   - needs_revision → 恢复为 pending(让 dispatcher 重新 dispatch executor
   - rejected → 恢复为 pending

2. 看 routing_decisions 最后一条(from_status=review):
   - outcome=dispatched → reviewer 已被 dispatch 过
     - 看 task_attempts 最后一条(agent=reviewer):
       - completed → reviewer 完成 but 结论没写入 reviews 表 → 保持 review,等 ticker 重新 dispatch review
       - crashed → reviewer crash → 保持 review,等 ticker 自然 dispatchFix-3c crash 计数仍然有效)
       - 无记录 → spawn 可能失败 → 保持 review
   - outcome=skipped → reviewer 从未成功 dispatchagent busy)→ 保持 reviewticker 自然会重试
   - 无 routing_decision → review 刚进入,还没 dispatch → 保持 review

关键review 状态默认保持 review,不推 pending。因为:

  • reviews 表有结论 → 直接标 done/pending
  • 无结论 → ticker 的 _dispatch_reviews 会在下个 tick 自然尝试 dispatch reviewer
  • _check_recent_routing 的 5 分钟窗口已过期(PM2 重启间隔 > 5 分钟),不会阻止重新 dispatch

reviewing 状态

含义parent task 进入庞统 round review 的中间状态(done/failed → reviewing)。

恢复逻辑:查 events 表找到转入 reviewing 之前的状态(done 或 failed),精确推回该状态。

决策树

reviewing → 查 events 表最后一条 task_status_changed 到 reviewing 的记录
  ├─ 找到前置状态 done → 推回 done
  ├─ 找到前置状态 failed → 推回 failed
  └─ 找不到 → 兜底推 donedone/failed→reviewing 是唯一合法路径,大概率是 done)

原因reviewing 状态依赖 _spawn_pangtong_review 的回调 _handle_review_conclusion 来推进。PM2 crash 后回调丢失,_handle_review_conclusion 永不触发,parent 永久卡死。推回 done/failed 后,_check_round_complete 会在下个 tick 自然重新触发庞统 review spawn。

精确恢复优于硬推 done:因为 parent 可能是从 failed 转入 reviewing 的(子任务失败触发 round review),推回 done 语义错误。查 events 表是确定性操作,成本可忽略(单条 SQL)。

escalated / waiting_human / paused 状态

含义:需要人工介入。

恢复逻辑:不干预。这些状态是人为设定的,保持不变。


五、实现设计

5.1 新增方法

# ticker.py

def _startup_recover(self) -> Dict[str, Any]:
    """PM2 crash 后启动恢复:扫描所有非终态任务,从黑板线索重建一致状态"""
    NON_TERMINAL = {"claimed", "working", "review", "reviewing"}
    
    projects = self.registry.list_projects()
    recovery_report = {"projects": {}, "total_recovered": 0, "total_noop": 0}
    
    # 收集所有需要扫描的项目(registry + 虚拟项目)
    project_dirs = {}
    for project_id, project_info in projects.items():
        if project_info.get("status") == "active":
            project_dirs[project_id] = self.registry.root / project_id / "blackboard.db"
    
    # 虚拟项目
    for virtual_id in ("_general", "_mail"):
        virtual_db = Path(self.registry.root) / virtual_id / "blackboard.db"
        if virtual_db.exists() and virtual_id not in project_dirs:
            project_dirs[virtual_id] = virtual_db
    
    for project_id, db_path in project_dirs.items():
        if not db_path.exists():
            continue
        
        # init_db 如果还没初始化
        db_key = str(db_path)
        if db_key not in self._initialized_dbs:
            from src.blackboard.db import init_db
            init_db(db_path)
            self._initialized_dbs.add(db_key)
        
        recovered, noop_count = self._recover_project(db_path, NON_TERMINAL)
        if recovered:
            recovery_report["projects"][project_id] = recovered
            recovery_report["total_recovered"] += len(recovered)
        recovery_report["total_noop"] += noop_count
    
    if recovery_report["total_recovered"] > 0:
        logger.info("Startup recovery: %d tasks recovered across %d projects",
                     recovery_report["total_recovered"],
                     len(recovery_report["projects"]))
    elif recovery_report["total_noop"] > 0:
        logger.info("Startup recovery: %d tasks kept as-is (no recovery needed)",
                     recovery_report["total_noop"])
    else:
        logger.info("Startup recovery: no non-terminal tasks found, clean start")
    
    return recovery_report


def _recover_project(self, db_path: Path, non_terminal: set) -> List[Dict]:
    """恢复单个项目中的非终态任务"""
    conn = get_connection(db_path)
    recovered = []
    noop_count = 0
    
    try:
        for status in non_terminal:
            rows = conn.execute(
                "SELECT id, assignee, current_agent, updated_at FROM tasks WHERE status=?",
                (status,)
            ).fetchall()
            
            for task in rows:
                action = self._determine_recovery_action(conn, task, status, db_path)
                if action:
                    self._execute_recovery(conn, task["id"], action, db_path)
                    recovered.append({"task_id": task["id"], "from": status, "action": action})
                else:
                    # 审计:保持原状的任务也记录事件
                    noop_count += 1
                    conn.execute(
                        "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
                        (task["id"], "daemon", "startup_recovery_noop",
                         json.dumps({"status": status, "reason": "no_action_needed"}))
                    )
                    conn.commit()
    finally:
        conn.close()
    
    return recovered, noop_count


def _determine_recovery_action(self, conn, task, status: str, db_path: Path) -> Optional[str]:
    """根据黑板线索决定恢复动作,返回 None 表示不需要干预"""
    task_id = task["id"]
    
    if status == "claimed":
        # claimed 统一推 pending
        return "push_to_pending"
    
    if status == "working":
        return self._recover_working_task(conn, task_id)
    
    if status == "review":
        return self._recover_review_task(conn, task_id)
    
    if status == "reviewing":
        # reviewing → 查 events 找前置状态(done 或 failed),精确恢复
        prev_status = self._find_pre_reviewing_status(conn, task_id)
        if prev_status == "failed":
            return "push_to_failed"
        return "push_to_done"  # 兜底 done
    
    return None


def _recover_working_task(self, conn, task_id: str) -> Optional[str]:
    """working 状态恢复:看黑板线索判断 agent 是否实际完成了工作"""
    # 最后一次 attempt
    last_attempt = conn.execute(
        "SELECT outcome, agent FROM task_attempts WHERE task_id=? ORDER BY id DESC LIMIT 1",
        (task_id,)
    ).fetchone()
    
    if not last_attempt or last_attempt["outcome"] != "completed":
        return "push_to_pending_keep_agent"  # 保留 current_agent,让同一 agent 接手
    
    # agent 正常退出,看是否有产出
    has_output = conn.execute(
        "SELECT 1 FROM outputs WHERE task_id=? LIMIT 1" (task_id,)
    ).fetchone() is not None
    
    if not has_output:
        return "push_to_pending_keep_agent"
    
    # 看是否有 handoff
    has_handoff = conn.execute(
        "SELECT 1 FROM comments WHERE task_id=? AND comment_type='handoff' LIMIT 1",
        (task_id,)
    ).fetchone() is not None
    
    if has_handoff:
        return "push_to_review"  # agent 已完成工作并交接
    
    return "push_to_pending_keep_agent"


def _recover_review_task(self, conn, task_id: str) -> Optional[str]:
    """review 状态恢复:看是否有审查结论"""
    # 检查 reviews 表
    review = conn.execute(
        "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
        (task_id,)
    ).fetchone()
    
    if review:
        if review["verdict"] == "approved":
            return "push_to_done"
        else:
            # needs_revision / rejected → 推回 pending 重新执行
            return "push_to_pending"
    
    # 无审查结论 → 保持 reviewticker 自然会 dispatch reviewer
    # (审计事件由 _recover_project 中的 noop 分支记录)
    return None


def _execute_recovery(self, conn, task_id: str, action: str, db_path: Path):
    """执行恢复动作"""
    from src.blackboard.db import get_connection as _get_conn
    
    if action == "push_to_pending":
        self._transition_status(
            conn, task_id, "pending",
            agent="daemon",
            detail={"reason": "startup_recovery", "original_status": 
                    conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]},
        )
        # 清空 current_agent(常规推 pending,无特定 agent 接手)
        conn.execute("UPDATE tasks SET current_agent=NULL WHERE id=?", (task_id,))
        conn.commit()
    
    elif action == "push_to_pending_keep_agent":
        self._transition_status(
            conn, task_id, "pending",
            agent="daemon",
            detail={"reason": "startup_recovery", "original_status":
                    conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]},
        )
        # 保留 current_agent,让同一 agent 重新接手
        conn.commit()
    
    elif action == "push_to_review":
        # working → review,恢复 assignee 保持不变
        self._transition_status(
            conn, task_id, "review",
            agent="daemon",
            detail={"reason": "startup_recovery", "original_status": "working"},
        )
        conn.commit()
    
    elif action == "push_to_done":
        self._transition_status(
            conn, task_id, "done",
            agent="daemon",
            detail={"reason": "startup_recovery", "original_status":
                    conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]},
        )
        conn.commit()
    
    elif action == "push_to_failed":
        self._transition_status(
            conn, task_id, "failed",
            agent="daemon",
            detail={"reason": "startup_recovery", "original_status":
                    conn.execute("SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()["status"]},
        )
        conn.commit()
    
    # 记录恢复事件
    conn.execute(
        "INSERT INTO events (task_id, agent, event_type, detail) VALUES (?, ?, ?, ?)",
        (task_id, "daemon", "startup_recovery", json.dumps({"action": action}))
    )
    conn.commit()
    
    logger.info("Recovery: task %s%s (action=%s)", task_id, action, action)

# ── 辅助方法 ──

def _find_pre_reviewing_status(self, conn, task_id: str) -> str:
    """查 events 表找到 reviewing 之前的状态(done 或 failed"""
    row = conn.execute(
        """SELECT detail FROM events 
           WHERE task_id=? AND event_type='task_status_changed'
           ORDER BY id DESC LIMIT 10""",
        (task_id,)
    ).fetchall()
    
    for event in row:
        try:
            detail = json.loads(event["detail"])
            if detail.get("new_status") == "reviewing":
                return detail.get("old_status", "done")  # 兜底 done
        except (json.JSONDecodeError, KeyError):
            continue
    
    return "done"  # 找不到则兜底 donereviewing 只从 done/failed 转入)

5.2 调用时机

# ticker.py start() 方法
async def start(self) -> None:
    if self._running:
        return
    self._running = True
    
    # 启动恢复(PM2 crash 后重建一致状态)
    # 同步调用是有意为之:恢复必须在事件循环启动前完成,耗时 < 1s
    self._startup_recover()
    
    self._task = asyncio.create_task(self._loop())
    # ... InboxWatcher start ...

5.3 回滚 Fix-3b

删除 ticker.py _check_timeouts 方法中 # v2.8.1 Fix-3blogger.exception("Failed to reclaim review task %s", task.id) 的整块代码(约 46 行)。

5.4 改动总览

文件 改动 行数
ticker.py 新增 _startup_recover / _recover_project / _determine_recovery_action / _recover_working_task / _recover_review_task / _execute_recovery / _find_pre_reviewing_status ~135 行
ticker.py start() 中调用 _startup_recover() 2 行
净增 ~137 行

注:Fix-3b 已在 #07.2 中删除,无需回滚。


六、设计对比

Fix-3b(旧) 本方案(新)
触发时机 每 tick 检查 仅启动时一次
覆盖状态 仅 review claimed + working + review + reviewing
恢复策略 超时 → 全部推 pending Event Sourcing → 精确恢复
误判风险 高(agent busy 被误判) 低(基于黑板事实判断)
运行时开销 每 tick 扫描 review 任务 仅启动时执行一次
Agent crash 处理 混在 Fix-3b 里 明确回归 §5 A0

七、优秀实践参考

实践 来源 我们的对应
Postgres-backed checkpointer LangGraph events 表 = 天然的 checkpoint 日志
不可变事件日志,写入即审计 #04 黑板设计原则 恢复基于 events + 关联表
可预测骨架 + LLM 动态填充 调研共识 恢复是确定性的骨架逻辑,不调 LLM
propose→validate→commit Network-AI 恢复动作通过 _transition_status 验证合法性

八、测试计划

用例 模拟方式 预期
干净启动 无非终态任务 无恢复动作,正常启动
claimed 任务 PM2 重启时有一个 claimed 任务 → pending 重新 broadcast
working + 无产出 agent crash 后 PM2 重启 → pending 重新 dispatch
working + 有产出 + 有 handoff agent 完成但 PM2 crash → review
working + 有产出 + 无 handoff agent 中途完成退出 → pending
review + approved reviewer 标 approved 后 PM2 crash → done
review + needs_revision reviewer 标 needs_revision 后 PM2 crash → pending
review + 无结论 reviewer 从未 dispatch + PM2 crash 保持 reviewticker 自然 dispatch
review + outcome=skipped agent busy 导致从未 dispatch 保持 reviewticker 自然 dispatch
reviewing 任务 PM2 crash 在庞统 round review 中 → doneticker 重新触发 round review
虚拟项目 _mail PM2 crash 时有 working 邮件 按 working 策略恢复