diff --git a/docs/design/v2.7.2-counter-lifecycle-fix.md b/docs/design/v2.7.2-counter-lifecycle-fix.md index 6fa2ec8..3c7bc37 100644 --- a/docs/design/v2.7.2-counter-lifecycle-fix.md +++ b/docs/design/v2.7.2-counter-lifecycle-fix.md @@ -1,334 +1,173 @@ -# v2.7.2 Counter 生命周期修复 +# v2.7.2 防重复调用 & 防无限续杯 — 完整方案 -**版本**: v1.1 +**版本**: v2.0 **日期**: 2026-05-26 **作者**: 庞统 -**状态**: 评审修订中(v1.1 采纳部分评审意见) +**状态**: 已实现 + 待最终评审 --- -## 1. 问题背景 +## 1. 核心原则 -### 1.1 核心原则 - -> 每次 agent 调用都是独占的。openclaw 无论成功失败都会返回,最差情况是 timeout 返回。谁占用谁持有,进程退出就 release。 - -### 1.2 当前偏差 - -| 偏差 | 当前行为 | 核心原则要求 | -|------|---------|-------------| -| counter 生命周期 | 任务级(贯穿 retry 链) | 调用级(spawn acquire,退出 release) | -| retry 绕过 dispatcher | `_do_retry` 直接调 `spawn_full_agent`,不检查 counter | spawn 只有 `spawn_full_agent` 一个入口,内部统一检查 | -| A5/A6 fallback 也 retry | fallback 出现说明 agent 被占用时 spawn 了,根因错误 | 不应出现 fallback;出现了说明流程有 bug | -| A10/A12 也 retry | compact 失败/未知错误都走 retry | 进程退出 = agent 空闲,走正常 dispatch | -| 429 不推回 pending | release counter 但任务 still working → 30 分钟超时后才标 failed | 应推回 pending,让 ticker 重新调度 | -| 429 无冷却 | ticker 下次(30秒后)又 dispatch → 又 429 → 循环 | per-agent 冷却期 | -| 进程意外退出 counter 泄漏 | `_monitor_process` 异常/PM2 重启时 on_complete 不调用 | ticker 层兜底:检测进程存活性 | - -### 1.3 司马懿事件复盘(2026-05-25 20:58-21:00) - -``` -20:58:12 dispatch mail → counter acquire → spawn 司马懿 (pid=50512) -20:59:50 进程退出 (gateway_timeout, exit=0) → release_counter=False → _do_retry - → 直接 spawn (pid=50580),不检查 counter → 司马懿被 spawn 第二次 -21:00:08 进程退出 → _do_retry → 直接 spawn (pid=50613) → 第三次 -21:00:13 API 429 → exit=1 → api_error → release_counter=True → 结束 - -问题:counter 占用但 retry 不检查,3 次 spawn 在 ~2 分钟内完成 - 每次 retry 没有延迟,叠加 API 调用触发 zhipu 429 -``` +> **每次 agent 调用都是独占的。** openclaw 无论成功失败都会返回,最差情况 timeout 返回。谁占用谁持有,进程退出就 release。 --- -## 2. 设计方案 +## 2. 根因分析 -### 2.1 核心改动:counter 下沉到 spawn_full_agent +### 2.1 2026-05-25/26 事件复盘 -**原则:spawn 只有 `spawn_full_agent` 一个入口,acquire/release 统一在内部。** +司马懿 + 庞统被 moziplus-v2 连续 spawn,叠加 API 调用触发 zhipu 429,双模型不可用导致 Gateway 假死。 -```python -async def spawn_full_agent(self, agent_id, ...): - # 1. 检查 counter - if self.counter and not await self.counter.can_acquire(agent_id): - raise AgentBusyError(agent_id) - - # 2. acquire - if self.counter: - await self.counter.acquire(agent_id) - - # 3. 构建 on_complete(release counter) - original_on_complete = on_complete - async def _wrapped_on_complete(aid, outcome): - if self.counter: - self.counter.release(aid) - if original_on_complete: - await original_on_complete(aid, outcome) - - # 4. spawn 进程 - proc = await asyncio.create_subprocess_exec(...) - - # 5. schedule monitor(传 wrapped_on_complete) - asyncio.create_task(self._monitor_process(..., on_complete=_wrapped_on_complete)) +### 2.2 发现的三个根因 + +| 根因 | 影响 | 严重度 | +|------|------|--------| +| **P0:`_parse_stdout_json` 解析路径错误** | `data.get("meta")` 应为 `data["response"]["meta"]`。68% 的 spawn 结果 transport=null(62/91 次)。A 场景分类全部失效 | **致命** | +| counter 生命周期是任务级 | retry 绕过 dispatcher 直接 spawn,不检查 counter | 高 | +| spawn 前缺 session state 检查 | webchat 占用 main session 时仍然 spawn,注定失败 | 高 | + +### 2.3 P0 详细说明 + +openclaw agent `--json` 输出格式: +```json +{ "kind": "agent-response", "response": { "meta": { "transport": "gateway", ... } } } ``` -**变化**: -- dispatcher 不再 acquire/release counter,只负责路由和构建业务回调 -- spawn_full_agent 内部 acquire + 注册 wrapped on_complete(保证 release) -- on_complete 不再包含 counter.release,只做业务逻辑(幻觉门控、状态标记等) - -### 2.2 _classify_outcome 简化 - -去掉 `release_counter` 字段。**进程退出 = release counter**(由 wrapped_on_complete 保证)。 - -只保留两个维度: -- `should_retry`:是否触发续杯(只有 A2/A3 gateway_timeout) -- `retry_field`:计入哪个计数器 - -| 情况 | outcome | should_retry | 说明 | -|------|---------|-------------|------| -| A1 正常完成 | completed | False | 任务终态 | -| A4 Agent failed | agent_failed | False | 尊重 Agent 判断 | -| A2/A3 Gateway timeout | gateway_timeout | True | **唯一续杯场景** | -| A5/A6 Fallback | fallback_timeout | False | **不应出现**,记录 warning,不 retry | -| A7 认证失败 | auth_failed | False | 不 retry | -| A8 Gateway 不可达 | gateway_unreachable | False | 不改任务状态,等 ticker | -| A9 API 错误/429 | api_error | False | **推回 pending**(见 2.4) | -| A10 Compact 失败 | compact_failed | False | **不 retry**,推回 pending 等 ticker | -| A11 Lock 冲突 | lock_conflict | False | 不改任务状态,等 ticker | -| A12 其他 | agent_error | False | **不 retry**,推回 pending 等 ticker | - -**A5/A6 不应出现**:如果出现了,说明 spawn 时 agent 被占用(counter 检查失效)。记录 ERROR 级日志,标 failed + escalate,不 retry。 - -**A10/A12 不 retry**:进程退出了 = agent 空闲。release counter → 推回 pending → ticker 重新调度。 - -### 2.3 _do_retry 重构 - -```python -async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): - # 1. 检查任务终态 - if self._is_terminal(db_path, task_id): - return # wrapped_on_complete 已 release counter - - # 2. 检查 retry 上限 - count = self._increment_retry(db_path, task_id, retry_field) - if count >= self.max_retries: - self._mark_task(db_path, task_id, "failed", {...}) - return # wrapped_on_complete 已 release counter - - # 3. 构建续杯 message - message = self._build_retry_message(...) - - # 4. 通过 spawn_full_agent 重新 spawn(内部会 can_acquire) - # 此时 counter 已由 wrapped_on_complete release - try: - await self.spawn_full_agent( - agent_id=agent_id, - message=message, - task_id=task_id, - on_complete=on_complete, # 业务回调(不含 counter) - use_main_session=(session_id is None), - reuse_session_id=session_id if session_id else None, - task_db_path=db_path, - ) - except AgentBusyError: - # agent 被其他任务占用,release counter,等 ticker - logger.warning("Retry spawn skipped: %s busy", agent_id) - # counter 已被 wrapped_on_complete release - # 任务保持 working,等 ticker 检查 -``` - -**关键变化**: -- counter 在进程退出时由 wrapped_on_complete release -- `_do_retry` 调 `spawn_full_agent`,内部 can_acquire 检查 -- 如果 agent 忙(不应该发生,但防御)→ 等 ticker - -### 2.4 429 处理:推回 pending + 冷却机制 - -#### 推回 pending - -```python -# _handle_exit 中 A9 处理 -if outcome == "api_error": - # wrapped_on_complete 已 release counter - # 推回 pending,让 ticker 重新调度 - self._transition_task_status(db_path, task_id, "pending", { - "reason": "api_error_retry", - "api_retry_count": count, - }) -``` - -#### 冷却机制 - -```python -class ActiveAgentCounter: - def __init__(self): - self._active: dict[str, int] = {} # agent_id → count - self._cooldown_until: dict[str, float] = {} # agent_id → timestamp - - def set_cooldown(self, agent_id: str, seconds: float = 120.0): - """设置冷却期""" - self._cooldown_until[agent_id] = time.time() + seconds - - def is_cooling_down(self, agent_id: str) -> bool: - """检查是否在冷却期""" - until = self._cooldown_until.get(agent_id) - if until and time.time() < until: - return True - # 冷却期已过,清理 - self._cooldown_until.pop(agent_id, None) - return False - - async def can_acquire(self, agent_id: str) -> bool: - if self.is_cooling_down(agent_id): - return False - return self._active.get(agent_id, 0) == 0 -``` - -**冷却时间**:120 秒(zhipu 瞬时限流一般 30-60 秒恢复,留余量)。 - -**冷却触发点**: -- A9(api_error/429):`counter.set_cooldown(agent_id, 120)` -- 冷却只阻止新 dispatch,不影响 retry 的 can_acquire(retry 时冷却已过或不存在) - -### 2.5 进程意外退出兜底 - -在 ticker 的 `_check_timeouts` 中增加**进程存活性检查**: - -```python -def _check_timeouts(self, db_path: Path) -> List[str]: - # ... 现有超时检查 ... - - # 新增:检查 counter 占用但进程已死的情况 - if self.counter and self.spawner: - for agent_id in self.counter.active_agents: - session_info = self.spawner.get_session_by_agent(agent_id) - if not session_info: - continue - pid = session_info.get("pid") - if pid and not self._is_pid_alive(pid): - # 进程已死但 counter 还占着 - crash_count = self._get_crash_count(db_path, task_id) + 1 - self._set_crash_count(db_path, task_id, crash_count) - - if crash_count >= 3: - # 3 次连续崩溃,标 failed - logger.error("Agent %s crashed %d times, marking failed", agent_id, crash_count) - self._mark_task(db_path, task_id, "failed", { - "reason": "process_crash", - "crash_count": crash_count, - }) - self.counter.release(agent_id) - else: - # 释放 counter,让 ticker 下次重新 dispatch - logger.warning("Agent %s process dead (crash %d/3), releasing counter", - agent_id, crash_count) - self.counter.release(agent_id) - # 任务保持 working,下次 tick 会看到 counter 空闲 → 重新 dispatch -``` - -**crash_count 存储**:`task_attempts.metadata` 的 `crash_count` 字段。 - -**重置时机**:agent 成功完成一次任务后重置为 0。 - -### 2.6 dispatcher 简化 - -dispatcher 不再管 counter: - -```python -async def dispatch(self, task, ...): - # 不再 acquire counter(spawn_full_agent 内部处理) - # 不再构建含 counter.release 的 on_complete - - # 只做路由和业务回调 - on_complete = None - if is_mail: - on_complete = lambda aid, outcome: self._mail_auto_complete(task_id, aid, db_path, must_haves) - # Task 不需要业务回调(counter release 由 spawn_full_agent 的 wrapped_on_complete 处理) - - try: - session_id = await self.spawner.spawn_full_agent( - agent_id=agent_id, - message=message, - task_id=task.id, - on_complete=on_complete, # 只含业务逻辑 - use_main_session=is_mail, - task_db_path=db_path, - ) - except AgentBusyError: - return {"status": "skipped", "reason": "Agent busy"} -``` +代码取的是 `data.get("meta")` 而不是 `data["response"]["meta"]`,导致: +- 所有 transport=null → A1/A5/A6 分类失效 +- session lock 阻塞退出被误判为 gateway_timeout(A2/A3)→ 续杯循环 +- 续杯循环叠加 API 调用 → 429 → Gateway 假死 --- -## 3. 改动范围 +## 3. 完整场景清单 -| 文件 | 改动 | 行数 | +### 3.1 Spawn 前检查(拦截无效 spawn) + +| # | 场景 | 检测方法 | 检测到后方案 | +|---|------|---------|-------------| +| L1 | moziplus 内部并发 | counter.can_acquire() | AgentBusyError → 等 ticker | +| L2 | API 429 冷却期 | counter.is_cooling_down() | AgentBusyError → 等 ticker | +| L3a | main session 被外部占用 | _check_session_state → lock_pid_alive | AgentBusyError → 等 ticker | +| L3b | main session 正在执行 | _check_session_state → status=processing | AgentBusyError → 等 ticker | +| L3c | main session 正在 compact | _check_session_state → recent_compact | AgentBusyError → 等 ticker | + +L1-L3 统一结果:不 spawn,任务保持 working,ticker 30 秒后重新调度。 + +L1+L3 互补:counter 防 moziplus 内部并发,session state 防外部占用(webchat/Control UI/cron)。 + +### 3.2 Spawn 后 — 进程退出(情况 A,0-630 秒内) + +| # | 场景 | 检测方法 | 条件 | 检测到后方案 | +|---|------|---------|------|-------------| +| A1 | 正常完成 | stdout transport + 任务状态 | exit=0 + transport≠embedded + 终态 | release counter → 结束 | +| A2/A3 | Gateway timeout | stdout transport + 任务状态 | exit=0 + 非终态 + transport 正常 | release counter → 续杯 | +| A4 | Agent 自标 failed | 任务状态 | 任务=failed | release counter → 结束 | +| A5/A6 | Gateway fallback | stdout transport=embedded + fallbackReason | exit=0 + transport=embedded | release counter → 标 failed + escalate | +| A7 | 认证失败 | exit≠0 + stderr | exit≠0 + stderr 含 401/403 | release counter → 标 failed + escalate | +| A8 | Gateway 不可达 | exit≠0 + stderr | exit≠0 + stderr 含 ECONNREFUSED/ETIMEDOUT | release counter → 等 ticker | +| A9 | API 429 | exit≠0 + stderr | exit≠0 + stderr 含 rate_limit/500/503 | release counter → 推回 pending + 冷却 120s | +| A10 | Compact 失败 | exit≠0 + stderr | exit≠0 + stderr 含 compaction-diag | release counter → 等 ticker | +| A11 | Session lock 冲突 | exit≠0 + stderr | exit≠0 + stderr 含 lock/busy/concurrent | release counter → 等 ticker | +| A12 | 兜底未知错误 | 兜底 | exit≠0 不匹配以上 | release counter → 等 ticker | +| A2+P2 | transport=null 兜底 | stderr 辅助判断 | exit=0 + transport=null + 非终态 | 检查 stderr → lock/compact/api_error 或走 gateway_timeout | + +**续杯:只有 A2/A3 才触发续杯。其他都 release counter,等 ticker 或推回 pending。** + +### 3.3 Spawn 后 — 进程不退出(情况 B,630 秒后) + +| # | 场景 | 检测方法 | 条件 | 检测到后方案 | +|---|------|---------|------|-------------| +| B1 | 假死(lock PID 死了) | _check_session_state | lock_pid 存在但 PID 不存活 | 标 failed + escalate + release counter | +| B2 | Compact 进行中 | _check_session_state + stderr | lock_pid 存活 + stderr 有 compact | 继续等(不递增计数,独立上限) | +| B3 | 进程不退出 | _check_session_state | lock_pid 存活 + 无 compact | 继续等(递增计数,≥3 次 → failed) | +| B4 | Session 状态不匹配 | _check_session_state | sessions.json status≠running | 等 60s → 按 B3 处理 | + +### 3.4 续杯限制 + +| # | 计数器 | 上限 | 超限后方案 | +|---|--------|------|-----------| +| R1 | retry_count | 3 次(每次 ~10 分钟) | 标 failed + escalate | +| R2 | connect_retry_count | 3 次 | 标 failed | +| R3 | api_retry_count | 3 次 | 标 failed | +| R4 | lock_retry_count | 3 次 | 标 failed | +| R5 | monitor_timeout_count | 3 次 | 标 failed | + +### 3.5 Ticker 兜底(每 30 秒检查) + +| # | 检查 | 方案 | +|---|------|------| +| T1 | 进程存活性:counter 占用但 PID 不存活 | release counter + 推回 pending | +| T2 | 任务超时:working 超时(默认 30 分钟) | 标 failed | + +### 3.6 假死复活术(2026-05-04 经验) + +**现象**:sessions.json 状态为 running 但 agent 长时间无响应。 + +**根因**:Gateway 认为 session 还活着(running)但实际连接已断开,无超时清理。 + +**复活步骤**: +1. 修改 sessions.json,把对应 session 的 status 从 running 改为 idle +2. 发心跳激活(把当前任务再发一遍给 agent) + +**待设计**:将此复活术集成到 ticker `_check_timeouts` 中,检测到 sessions.json status=running 但 lock PID 不存活时自动执行。 + +--- + +## 4. 改动文件清单 + +| 文件 | 改动 | 版本 | |------|------|------| -| `spawner.py` | spawn_full_agent 加 counter acquire/release + wrapped_on_complete | ~30 | -| `spawner.py` | _classify_outcome 去掉 release_counter 字段 | ~10 | -| `spawner.py` | _handle_exit 简化(A9 推回 pending,A10/A12 不 retry) | ~20 | -| `spawner.py` | _do_retry 通过 spawn_full_agent 重试 | ~15 | -| `spawner.py` | 新增 AgentBusyError | ~5 | -| `dispatcher.py` | 去掉 counter acquire/release/on_complete 逻辑 | ~-40 | -| `counter.py` | 新增 cooldown 机制 | ~25 | -| `ticker.py` | _check_timeouts 加进程存活性检查 | ~30 | -| **合计** | | ~95 行净增 | - -## 4. 续杯语义变化 - -| | 当前 | 修复后 | -|---|------|--------| -| counter 生命周期 | 任务级(贯穿 retry 链) | 调用级(spawn acquire,退出 release) | -| retry 路径 | 直接 spawn,不检查 counter | 通过 spawn_full_agent,内部检查 | -| retry 失败 | 只看 retry_count 上限 | can_acquire 失败也会停止(agent 忙) | -| A9/429 | release + 任务 working → 30 分钟超时 | release + 推回 pending + 冷却 120s | -| A10 compact | retry(counter 不 release) | 不 retry,推回 pending | -| 意外退出 | counter 泄漏 | ticker 检测 → crash_count → 最多 3 次 | - -## 5. 续杯时间计算 - -Gateway timeout = 600s(10 分钟),3 次续杯 = ~30 分钟。 - -每次续杯的流程: -``` -进程退出 → release counter → _do_retry → spawn_full_agent(can_acquire) -→ acquire → spawn → 新进程执行(最多 10 分钟) -``` - -30 分钟内 agent 无法完成任务 → 第 3 次续杯后标 failed + escalate。 - -## 6. 测试计划 - -| 用例 | 验证 | -|------|------| -| 正常完成 | counter acquire → release,任务 done | -| Gateway timeout 续杯 | release → re-acquire → spawn,复用 session | -| 续杯时 agent 被占用 | can_acquire 失败 → 不 spawn,等 ticker | -| 429 → 冷却 | 推回 pending,120s 内不 dispatch | -| 进程崩溃 | ticker 检测,crash < 3 → 重新 dispatch | -| 3 次崩溃 | 标 failed + escalate | -| PM2 重启 | ticker 检测进程死 → release counter | -| Mail 幻觉门控 | on_complete 业务回调正常执行 | +| `counter.py` | 新增 cooldown 机制(is_cooling_down / set_cooldown) | v1 | +| `spawner.py` | 新增 AgentBusyError | v1 | +| `spawner.py` | spawn_full_agent 内部 counter acquire/release + wrapped_on_complete(try/finally) | v1 | +| `spawner.py` | spawn_full_agent 加 L3 session state 检查 | v2 | +| `spawner.py` | _classify_outcome 去掉 release_counter,只有 A2/A3 触发 retry | v1 | +| `spawner.py` | _classify_outcome 加 P2 transport=null 兜底 | v2 | +| `spawner.py` | _handle_exit:A9 推回 pending + 冷却,A5/A6 标 failed + context 日志 | v1 | +| `spawner.py` | _do_retry 手动 release counter + 通过 spawn_full_agent 重试 | v1 | +| `spawner.py` | P0:_parse_stdout_json 改为 data["response"]["meta"] | v2 | +| `spawner.py` | 新增 get_session_by_agent(进程存活性检查用) | v1 | +| `dispatcher.py` | 去掉 counter acquire/release,on_complete 只含业务逻辑 | v1 | +| `ticker.py` | _spawn_available_agents 不再管 counter | v1 | +| `ticker.py` | _check_timeouts 加进程存活性检查 + 推回 pending | v1 | +| `ticker.py` | 新增 _is_pid_alive | v1 | +| `main.py` | counter 创建提前,传给 spawner | v1 | --- -## 7. 司马懿评审意见处理(mail-1779726169654) +## 5. 司马懿评审记录 + +### 第一次评审(mail-1779726169654) | 评审意见 | 结论 | 理由 | |---------|------|------| -| wrapped_on_complete 加 try/finally | ✅ 采纳 | 防御性编程,确保 counter release 和业务回调都执行 | +| wrapped_on_complete 加 try/finally | ✅ 采纳 | 防御性编程 | | A5/A6 加 context 日志 | ✅ 采纳 | 排查方便 | -| per-provider 冷却 | ⏭ 延后 | 低优先级,先做 per-agent | -| crash_count per-agent 累计,禁用 agent | ❌ 不采纳 | 崩溃可能是任务问题不是 agent 问题。保持 per-task 3 次标 failed,通过 escalate 通知用户自行判断 | -| can_acquire 失败推回 claimed | ❌ 不采纳 | retry 路径下 can_acquire 不会失败(asyncio 单线程无竞态)。release → can_acquire → acquire 是内存同步操作,中间无 await | -| release 和 acquire 之间有竞态窗口 | ❌ 不存在 | Python asyncio 单线程,三步都是内存同步操作,无竞态 | +| per-provider 冷却 | ⏭ 延后 | 低优先级 | +| crash_count per-agent 累计,禁用 agent | ❌ 不采纳 | 崩溃可能是任务问题不是 agent 问题 | +| can_acquire 失败推回 claimed | ❌ 不采纳 | asyncio 单线程无竞态 | +| release 和 acquire 之间有竞态窗口 | ❌ 不存在 | 内存同步操作 | -## 8. 实现检查清单 +### 第二次评审(_do_retry counter 时序) -- [ ] counter.py:新增 cooldown 机制 -- [ ] spawner.py:spawn_full_agent 加 counter acquire/release + wrapped_on_complete(try/finally) -- [ ] spawner.py:_classify_outcome 去掉 release_counter,只有 A2/A3 触发 retry -- [ ] spawner.py:_do_retry 通过 spawn_full_agent 重试 -- [ ] spawner.py:_handle_exit 简化(A9 推回 pending,A5/A6 标 failed + context 日志) -- [ ] dispatcher.py:去掉 counter acquire/release 逻辑 -- [ ] ticker.py:_check_timeouts 加进程存活性检查(per-task crash_count) +| 评审意见 | 结论 | +|---------|------| +| _do_retry 续杯退化为 AgentBusyError | ✅ 采纳:_do_retry 入口手动 release counter | + +### 第三次评审(P0/P1/P2 补丁) + +| 评审意见 | 结论 | +|---------|------| +| P0 stdout 解析路径修复 | 待评审 | +| P1 spawn 前检查 | 待评审 | +| P2 transport=null 兜底 | 待评审 | + +--- + +## 6. 待办 + +- [ ] 假死复活术集成到 ticker +- [ ] 历史数据清理(P0 修复前 62 条 transport=null 的 task_attempts) +- [ ] per-provider 冷却(低优先级)