Files
sanguo_moziplus_v2/docs/design/v2.7.2-counter-lifecycle-fix.md
T
2026-05-26 08:20:00 +08:00

14 KiB
Raw Blame History

v2.7.2 Counter 生命周期修复

版本: v1.1
日期: 2026-05-26
作者: 庞统
状态: 评审修订中(v1.1 采纳部分评审意见)


1. 问题背景

1.1 核心原则

每次 agent 调用都是独占的。openclaw 无论成功失败都会返回,最差情况是 timeout 返回。谁占用谁持有,进程退出就 release。

1.2 当前偏差

偏差 当前行为 核心原则要求
counter 生命周期 任务级(贯穿 retry 链) 调用级(spawn acquire,退出 release
retry 绕过 dispatcher _do_retry 直接调 spawn_full_agent,不检查 counter spawn 只有 spawn_full_agent 一个入口,内部统一检查
A5/A6 fallback 也 retry fallback 出现说明 agent 被占用时 spawn 了,根因错误 不应出现 fallback;出现了说明流程有 bug
A10/A12 也 retry compact 失败/未知错误都走 retry 进程退出 = agent 空闲,走正常 dispatch
429 不推回 pending release counter 但任务 still working → 30 分钟超时后才标 failed 应推回 pending,让 ticker 重新调度
429 无冷却 ticker 下次(30秒后)又 dispatch → 又 429 → 循环 per-agent 冷却期
进程意外退出 counter 泄漏 _monitor_process 异常/PM2 重启时 on_complete 不调用 ticker 层兜底:检测进程存活性

1.3 司马懿事件复盘(2026-05-25 20:58-21:00

20:58:12  dispatch mail → counter acquire → spawn 司马懿 (pid=50512)
20:59:50  进程退出 (gateway_timeout, exit=0) → release_counter=False → _do_retry
          → 直接 spawn (pid=50580),不检查 counter → 司马懿被 spawn 第二次
21:00:08  进程退出 → _do_retry → 直接 spawn (pid=50613) → 第三次
21:00:13  API 429 → exit=1 → api_error → release_counter=True → 结束

问题:counter 占用但 retry 不检查,3 次 spawn 在 ~2 分钟内完成
      每次 retry 没有延迟,叠加 API 调用触发 zhipu 429

2. 设计方案

2.1 核心改动:counter 下沉到 spawn_full_agent

原则:spawn 只有 spawn_full_agent 一个入口,acquire/release 统一在内部。

async def spawn_full_agent(self, agent_id, ...):
    # 1. 检查 counter
    if self.counter and not await self.counter.can_acquire(agent_id):
        raise AgentBusyError(agent_id)
    
    # 2. acquire
    if self.counter:
        await self.counter.acquire(agent_id)
    
    # 3. 构建 on_completerelease counter
    original_on_complete = on_complete
    async def _wrapped_on_complete(aid, outcome):
        if self.counter:
            self.counter.release(aid)
        if original_on_complete:
            await original_on_complete(aid, outcome)
    
    # 4. spawn 进程
    proc = await asyncio.create_subprocess_exec(...)
    
    # 5. schedule monitor(传 wrapped_on_complete
    asyncio.create_task(self._monitor_process(..., on_complete=_wrapped_on_complete))

变化

  • dispatcher 不再 acquire/release counter,只负责路由和构建业务回调
  • spawn_full_agent 内部 acquire + 注册 wrapped on_complete(保证 release
  • on_complete 不再包含 counter.release,只做业务逻辑(幻觉门控、状态标记等)

2.2 _classify_outcome 简化

去掉 release_counter 字段。进程退出 = release counter(由 wrapped_on_complete 保证)。

只保留两个维度:

  • should_retry:是否触发续杯(只有 A2/A3 gateway_timeout
  • retry_field:计入哪个计数器
情况 outcome should_retry 说明
A1 正常完成 completed False 任务终态
A4 Agent failed agent_failed False 尊重 Agent 判断
A2/A3 Gateway timeout gateway_timeout True 唯一续杯场景
A5/A6 Fallback fallback_timeout False 不应出现,记录 warning,不 retry
A7 认证失败 auth_failed False 不 retry
A8 Gateway 不可达 gateway_unreachable False 不改任务状态,等 ticker
A9 API 错误/429 api_error False 推回 pending(见 2.4
A10 Compact 失败 compact_failed False 不 retry,推回 pending 等 ticker
A11 Lock 冲突 lock_conflict False 不改任务状态,等 ticker
A12 其他 agent_error False 不 retry,推回 pending 等 ticker

A5/A6 不应出现:如果出现了,说明 spawn 时 agent 被占用(counter 检查失效)。记录 ERROR 级日志,标 failed + escalate,不 retry。

A10/A12 不 retry:进程退出了 = agent 空闲。release counter → 推回 pending → ticker 重新调度。

2.3 _do_retry 重构

async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"):
    # 1. 检查任务终态
    if self._is_terminal(db_path, task_id):
        return  # wrapped_on_complete 已 release counter
    
    # 2. 检查 retry 上限
    count = self._increment_retry(db_path, task_id, retry_field)
    if count >= self.max_retries:
        self._mark_task(db_path, task_id, "failed", {...})
        return  # wrapped_on_complete 已 release counter
    
    # 3. 构建续杯 message
    message = self._build_retry_message(...)
    
    # 4. 通过 spawn_full_agent 重新 spawn(内部会 can_acquire
    #    此时 counter 已由 wrapped_on_complete release
    try:
        await self.spawn_full_agent(
            agent_id=agent_id,
            message=message,
            task_id=task_id,
            on_complete=on_complete,  # 业务回调(不含 counter
            use_main_session=(session_id is None),
            reuse_session_id=session_id if session_id else None,
            task_db_path=db_path,
        )
    except AgentBusyError:
        # agent 被其他任务占用,release counter,等 ticker
        logger.warning("Retry spawn skipped: %s busy", agent_id)
        # counter 已被 wrapped_on_complete release
        # 任务保持 working,等 ticker 检查

关键变化

  • counter 在进程退出时由 wrapped_on_complete release
  • _do_retryspawn_full_agent,内部 can_acquire 检查
  • 如果 agent 忙(不应该发生,但防御)→ 等 ticker

2.4 429 处理:推回 pending + 冷却机制

推回 pending

# _handle_exit 中 A9 处理
if outcome == "api_error":
    # wrapped_on_complete 已 release counter
    # 推回 pending,让 ticker 重新调度
    self._transition_task_status(db_path, task_id, "pending", {
        "reason": "api_error_retry",
        "api_retry_count": count,
    })

冷却机制

class ActiveAgentCounter:
    def __init__(self):
        self._active: dict[str, int] = {}  # agent_id → count
        self._cooldown_until: dict[str, float] = {}  # agent_id → timestamp
    
    def set_cooldown(self, agent_id: str, seconds: float = 120.0):
        """设置冷却期"""
        self._cooldown_until[agent_id] = time.time() + seconds
    
    def is_cooling_down(self, agent_id: str) -> bool:
        """检查是否在冷却期"""
        until = self._cooldown_until.get(agent_id)
        if until and time.time() < until:
            return True
        # 冷却期已过,清理
        self._cooldown_until.pop(agent_id, None)
        return False
    
    async def can_acquire(self, agent_id: str) -> bool:
        if self.is_cooling_down(agent_id):
            return False
        return self._active.get(agent_id, 0) == 0

冷却时间:120 秒(zhipu 瞬时限流一般 30-60 秒恢复,留余量)。

冷却触发点

  • A9api_error/429):counter.set_cooldown(agent_id, 120)
  • 冷却只阻止新 dispatch,不影响 retry 的 can_acquireretry 时冷却已过或不存在)

2.5 进程意外退出兜底

在 ticker 的 _check_timeouts 中增加进程存活性检查

def _check_timeouts(self, db_path: Path) -> List[str]:
    # ... 现有超时检查 ...
    
    # 新增:检查 counter 占用但进程已死的情况
    if self.counter and self.spawner:
        for agent_id in self.counter.active_agents:
            session_info = self.spawner.get_session_by_agent(agent_id)
            if not session_info:
                continue
            pid = session_info.get("pid")
            if pid and not self._is_pid_alive(pid):
                # 进程已死但 counter 还占着
                crash_count = self._get_crash_count(db_path, task_id) + 1
                self._set_crash_count(db_path, task_id, crash_count)
                
                if crash_count >= 3:
                    # 3 次连续崩溃,标 failed
                    logger.error("Agent %s crashed %d times, marking failed", agent_id, crash_count)
                    self._mark_task(db_path, task_id, "failed", {
                        "reason": "process_crash",
                        "crash_count": crash_count,
                    })
                    self.counter.release(agent_id)
                else:
                    # 释放 counter,让 ticker 下次重新 dispatch
                    logger.warning("Agent %s process dead (crash %d/3), releasing counter",
                                   agent_id, crash_count)
                    self.counter.release(agent_id)
                    # 任务保持 working,下次 tick 会看到 counter 空闲 → 重新 dispatch

crash_count 存储task_attempts.metadatacrash_count 字段。

重置时机:agent 成功完成一次任务后重置为 0。

2.6 dispatcher 简化

dispatcher 不再管 counter

async def dispatch(self, task, ...):
    # 不再 acquire counterspawn_full_agent 内部处理)
    # 不再构建含 counter.release 的 on_complete
    
    # 只做路由和业务回调
    on_complete = None
    if is_mail:
        on_complete = lambda aid, outcome: self._mail_auto_complete(task_id, aid, db_path, must_haves)
    # Task 不需要业务回调(counter release 由 spawn_full_agent 的 wrapped_on_complete 处理)
    
    try:
        session_id = await self.spawner.spawn_full_agent(
            agent_id=agent_id,
            message=message,
            task_id=task.id,
            on_complete=on_complete,  # 只含业务逻辑
            use_main_session=is_mail,
            task_db_path=db_path,
        )
    except AgentBusyError:
        return {"status": "skipped", "reason": "Agent busy"}

3. 改动范围

文件 改动 行数
spawner.py spawn_full_agent 加 counter acquire/release + wrapped_on_complete ~30
spawner.py _classify_outcome 去掉 release_counter 字段 ~10
spawner.py _handle_exit 简化(A9 推回 pendingA10/A12 不 retry ~20
spawner.py _do_retry 通过 spawn_full_agent 重试 ~15
spawner.py 新增 AgentBusyError ~5
dispatcher.py 去掉 counter acquire/release/on_complete 逻辑 ~-40
counter.py 新增 cooldown 机制 ~25
ticker.py _check_timeouts 加进程存活性检查 ~30
合计 ~95 行净增

4. 续杯语义变化

当前 修复后
counter 生命周期 任务级(贯穿 retry 链) 调用级(spawn acquire,退出 release
retry 路径 直接 spawn,不检查 counter 通过 spawn_full_agent,内部检查
retry 失败 只看 retry_count 上限 can_acquire 失败也会停止(agent 忙)
A9/429 release + 任务 working → 30 分钟超时 release + 推回 pending + 冷却 120s
A10 compact retrycounter 不 release 不 retry,推回 pending
意外退出 counter 泄漏 ticker 检测 → crash_count → 最多 3 次

5. 续杯时间计算

Gateway timeout = 600s10 分钟),3 次续杯 = ~30 分钟。

每次续杯的流程:

进程退出 → release counter → _do_retry → spawn_full_agent(can_acquire) 
→ acquire → spawn → 新进程执行(最多 10 分钟)

30 分钟内 agent 无法完成任务 → 第 3 次续杯后标 failed + escalate。

6. 测试计划

用例 验证
正常完成 counter acquire → release,任务 done
Gateway timeout 续杯 release → re-acquire → spawn,复用 session
续杯时 agent 被占用 can_acquire 失败 → 不 spawn,等 ticker
429 → 冷却 推回 pending120s 内不 dispatch
进程崩溃 ticker 检测,crash < 3 → 重新 dispatch
3 次崩溃 标 failed + escalate
PM2 重启 ticker 检测进程死 → release counter
Mail 幻觉门控 on_complete 业务回调正常执行

7. 司马懿评审意见处理(mail-1779726169654

评审意见 结论 理由
wrapped_on_complete 加 try/finally 采纳 防御性编程,确保 counter release 和业务回调都执行
A5/A6 加 context 日志 采纳 排查方便
per-provider 冷却 ⏭ 延后 低优先级,先做 per-agent
crash_count per-agent 累计,禁用 agent 不采纳 崩溃可能是任务问题不是 agent 问题。保持 per-task 3 次标 failed,通过 escalate 通知用户自行判断
can_acquire 失败推回 claimed 不采纳 retry 路径下 can_acquire 不会失败(asyncio 单线程无竞态)。release → can_acquire → acquire 是内存同步操作,中间无 await
release 和 acquire 之间有竞态窗口 不存在 Python asyncio 单线程,三步都是内存同步操作,无竞态

8. 实现检查清单

  • counter.py:新增 cooldown 机制
  • spawner.pyspawn_full_agent 加 counter acquire/release + wrapped_on_completetry/finally
  • spawner.py_classify_outcome 去掉 release_counter,只有 A2/A3 触发 retry
  • spawner.py_do_retry 通过 spawn_full_agent 重试
  • spawner.py_handle_exit 简化(A9 推回 pendingA5/A6 标 failed + context 日志)
  • dispatcher.py:去掉 counter acquire/release 逻辑
  • ticker.py_check_timeouts 加进程存活性检查(per-task crash_count