From 9722e843abaf0ce86ce10a7e32e38367797e9907 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 00:22:34 +0800 Subject: [PATCH] auto-sync: 2026-05-26 00:22:34 --- docs/design/v2.7.2-counter-lifecycle-fix.md | 322 ++++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100644 docs/design/v2.7.2-counter-lifecycle-fix.md diff --git a/docs/design/v2.7.2-counter-lifecycle-fix.md b/docs/design/v2.7.2-counter-lifecycle-fix.md new file mode 100644 index 0000000..c61c55a --- /dev/null +++ b/docs/design/v2.7.2-counter-lifecycle-fix.md @@ -0,0 +1,322 @@ +# v2.7.2 Counter 生命周期修复 + +**版本**: v1.0 +**日期**: 2026-05-26 +**作者**: 庞统 +**状态**: 待评审 + +--- + +## 1. 问题背景 + +### 1.1 核心原则 + +> 每次 agent 调用都是独占的。openclaw 无论成功失败都会返回,最差情况是 timeout 返回。谁占用谁持有,进程退出就 release。 + +### 1.2 当前偏差 + +| 偏差 | 当前行为 | 核心原则要求 | +|------|---------|-------------| +| counter 生命周期 | 任务级(贯穿 retry 链) | 调用级(spawn acquire,退出 release) | +| retry 绕过 dispatcher | `_do_retry` 直接调 `spawn_full_agent`,不检查 counter | spawn 只有 `spawn_full_agent` 一个入口,内部统一检查 | +| A5/A6 fallback 也 retry | fallback 出现说明 agent 被占用时 spawn 了,根因错误 | 不应出现 fallback;出现了说明流程有 bug | +| A10/A12 也 retry | compact 失败/未知错误都走 retry | 进程退出 = agent 空闲,走正常 dispatch | +| 429 不推回 pending | release counter 但任务 still working → 30 分钟超时后才标 failed | 应推回 pending,让 ticker 重新调度 | +| 429 无冷却 | ticker 下次(30秒后)又 dispatch → 又 429 → 循环 | per-agent 冷却期 | +| 进程意外退出 counter 泄漏 | `_monitor_process` 异常/PM2 重启时 on_complete 不调用 | ticker 层兜底:检测进程存活性 | + +### 1.3 司马懿事件复盘(2026-05-25 20:58-21:00) + +``` +20:58:12 dispatch mail → counter acquire → spawn 司马懿 (pid=50512) +20:59:50 进程退出 (gateway_timeout, exit=0) → release_counter=False → _do_retry + → 直接 spawn (pid=50580),不检查 counter → 司马懿被 spawn 第二次 +21:00:08 进程退出 → _do_retry → 直接 spawn (pid=50613) → 第三次 +21:00:13 API 429 → exit=1 → api_error → release_counter=True → 结束 + +问题:counter 占用但 retry 不检查,3 次 spawn 在 ~2 分钟内完成 + 每次 retry 没有延迟,叠加 API 调用触发 zhipu 429 +``` + +--- + +## 2. 设计方案 + +### 2.1 核心改动:counter 下沉到 spawn_full_agent + +**原则:spawn 只有 `spawn_full_agent` 一个入口,acquire/release 统一在内部。** + +```python +async def spawn_full_agent(self, agent_id, ...): + # 1. 检查 counter + if self.counter and not await self.counter.can_acquire(agent_id): + raise AgentBusyError(agent_id) + + # 2. acquire + if self.counter: + await self.counter.acquire(agent_id) + + # 3. 构建 on_complete(release counter) + original_on_complete = on_complete + async def _wrapped_on_complete(aid, outcome): + if self.counter: + self.counter.release(aid) + if original_on_complete: + await original_on_complete(aid, outcome) + + # 4. spawn 进程 + proc = await asyncio.create_subprocess_exec(...) + + # 5. schedule monitor(传 wrapped_on_complete) + asyncio.create_task(self._monitor_process(..., on_complete=_wrapped_on_complete)) +``` + +**变化**: +- dispatcher 不再 acquire/release counter,只负责路由和构建业务回调 +- spawn_full_agent 内部 acquire + 注册 wrapped on_complete(保证 release) +- on_complete 不再包含 counter.release,只做业务逻辑(幻觉门控、状态标记等) + +### 2.2 _classify_outcome 简化 + +去掉 `release_counter` 字段。**进程退出 = release counter**(由 wrapped_on_complete 保证)。 + +只保留两个维度: +- `should_retry`:是否触发续杯(只有 A2/A3 gateway_timeout) +- `retry_field`:计入哪个计数器 + +| 情况 | outcome | should_retry | 说明 | +|------|---------|-------------|------| +| A1 正常完成 | completed | False | 任务终态 | +| A4 Agent failed | agent_failed | False | 尊重 Agent 判断 | +| A2/A3 Gateway timeout | gateway_timeout | True | **唯一续杯场景** | +| A5/A6 Fallback | fallback_timeout | False | **不应出现**,记录 warning,不 retry | +| A7 认证失败 | auth_failed | False | 不 retry | +| A8 Gateway 不可达 | gateway_unreachable | False | 不改任务状态,等 ticker | +| A9 API 错误/429 | api_error | False | **推回 pending**(见 2.4) | +| A10 Compact 失败 | compact_failed | False | **不 retry**,推回 pending 等 ticker | +| A11 Lock 冲突 | lock_conflict | False | 不改任务状态,等 ticker | +| A12 其他 | agent_error | False | **不 retry**,推回 pending 等 ticker | + +**A5/A6 不应出现**:如果出现了,说明 spawn 时 agent 被占用(counter 检查失效)。记录 ERROR 级日志,标 failed + escalate,不 retry。 + +**A10/A12 不 retry**:进程退出了 = agent 空闲。release counter → 推回 pending → ticker 重新调度。 + +### 2.3 _do_retry 重构 + +```python +async def _do_retry(self, session_id, agent_id, task_id, on_complete, db_path, retry_field="retry_count"): + # 1. 检查任务终态 + if self._is_terminal(db_path, task_id): + return # wrapped_on_complete 已 release counter + + # 2. 检查 retry 上限 + count = self._increment_retry(db_path, task_id, retry_field) + if count >= self.max_retries: + self._mark_task(db_path, task_id, "failed", {...}) + return # wrapped_on_complete 已 release counter + + # 3. 构建续杯 message + message = self._build_retry_message(...) + + # 4. 通过 spawn_full_agent 重新 spawn(内部会 can_acquire) + # 此时 counter 已由 wrapped_on_complete release + try: + await self.spawn_full_agent( + agent_id=agent_id, + message=message, + task_id=task_id, + on_complete=on_complete, # 业务回调(不含 counter) + use_main_session=(session_id is None), + reuse_session_id=session_id if session_id else None, + task_db_path=db_path, + ) + except AgentBusyError: + # agent 被其他任务占用,release counter,等 ticker + logger.warning("Retry spawn skipped: %s busy", agent_id) + # counter 已被 wrapped_on_complete release + # 任务保持 working,等 ticker 检查 +``` + +**关键变化**: +- counter 在进程退出时由 wrapped_on_complete release +- `_do_retry` 调 `spawn_full_agent`,内部 can_acquire 检查 +- 如果 agent 忙(不应该发生,但防御)→ 等 ticker + +### 2.4 429 处理:推回 pending + 冷却机制 + +#### 推回 pending + +```python +# _handle_exit 中 A9 处理 +if outcome == "api_error": + # wrapped_on_complete 已 release counter + # 推回 pending,让 ticker 重新调度 + self._transition_task_status(db_path, task_id, "pending", { + "reason": "api_error_retry", + "api_retry_count": count, + }) +``` + +#### 冷却机制 + +```python +class ActiveAgentCounter: + def __init__(self): + self._active: dict[str, int] = {} # agent_id → count + self._cooldown_until: dict[str, float] = {} # agent_id → timestamp + + def set_cooldown(self, agent_id: str, seconds: float = 120.0): + """设置冷却期""" + self._cooldown_until[agent_id] = time.time() + seconds + + def is_cooling_down(self, agent_id: str) -> bool: + """检查是否在冷却期""" + until = self._cooldown_until.get(agent_id) + if until and time.time() < until: + return True + # 冷却期已过,清理 + self._cooldown_until.pop(agent_id, None) + return False + + async def can_acquire(self, agent_id: str) -> bool: + if self.is_cooling_down(agent_id): + return False + return self._active.get(agent_id, 0) == 0 +``` + +**冷却时间**:120 秒(zhipu 瞬时限流一般 30-60 秒恢复,留余量)。 + +**冷却触发点**: +- A9(api_error/429):`counter.set_cooldown(agent_id, 120)` +- 冷却只阻止新 dispatch,不影响 retry 的 can_acquire(retry 时冷却已过或不存在) + +### 2.5 进程意外退出兜底 + +在 ticker 的 `_check_timeouts` 中增加**进程存活性检查**: + +```python +def _check_timeouts(self, db_path: Path) -> List[str]: + # ... 现有超时检查 ... + + # 新增:检查 counter 占用但进程已死的情况 + if self.counter and self.spawner: + for agent_id in self.counter.active_agents: + session_info = self.spawner.get_session_by_agent(agent_id) + if not session_info: + continue + pid = session_info.get("pid") + if pid and not self._is_pid_alive(pid): + # 进程已死但 counter 还占着 + crash_count = self._get_crash_count(db_path, task_id) + 1 + self._set_crash_count(db_path, task_id, crash_count) + + if crash_count >= 3: + # 3 次连续崩溃,标 failed + logger.error("Agent %s crashed %d times, marking failed", agent_id, crash_count) + self._mark_task(db_path, task_id, "failed", { + "reason": "process_crash", + "crash_count": crash_count, + }) + self.counter.release(agent_id) + else: + # 释放 counter,让 ticker 下次重新 dispatch + logger.warning("Agent %s process dead (crash %d/3), releasing counter", + agent_id, crash_count) + self.counter.release(agent_id) + # 任务保持 working,下次 tick 会看到 counter 空闲 → 重新 dispatch +``` + +**crash_count 存储**:`task_attempts.metadata` 的 `crash_count` 字段。 + +**重置时机**:agent 成功完成一次任务后重置为 0。 + +### 2.6 dispatcher 简化 + +dispatcher 不再管 counter: + +```python +async def dispatch(self, task, ...): + # 不再 acquire counter(spawn_full_agent 内部处理) + # 不再构建含 counter.release 的 on_complete + + # 只做路由和业务回调 + on_complete = None + if is_mail: + on_complete = lambda aid, outcome: self._mail_auto_complete(task_id, aid, db_path, must_haves) + # Task 不需要业务回调(counter release 由 spawn_full_agent 的 wrapped_on_complete 处理) + + try: + session_id = await self.spawner.spawn_full_agent( + agent_id=agent_id, + message=message, + task_id=task.id, + on_complete=on_complete, # 只含业务逻辑 + use_main_session=is_mail, + task_db_path=db_path, + ) + except AgentBusyError: + return {"status": "skipped", "reason": "Agent busy"} +``` + +--- + +## 3. 改动范围 + +| 文件 | 改动 | 行数 | +|------|------|------| +| `spawner.py` | spawn_full_agent 加 counter acquire/release + wrapped_on_complete | ~30 | +| `spawner.py` | _classify_outcome 去掉 release_counter 字段 | ~10 | +| `spawner.py` | _handle_exit 简化(A9 推回 pending,A10/A12 不 retry) | ~20 | +| `spawner.py` | _do_retry 通过 spawn_full_agent 重试 | ~15 | +| `spawner.py` | 新增 AgentBusyError | ~5 | +| `dispatcher.py` | 去掉 counter acquire/release/on_complete 逻辑 | ~-40 | +| `counter.py` | 新增 cooldown 机制 | ~25 | +| `ticker.py` | _check_timeouts 加进程存活性检查 | ~30 | +| **合计** | | ~95 行净增 | + +## 4. 续杯语义变化 + +| | 当前 | 修复后 | +|---|------|--------| +| counter 生命周期 | 任务级(贯穿 retry 链) | 调用级(spawn acquire,退出 release) | +| retry 路径 | 直接 spawn,不检查 counter | 通过 spawn_full_agent,内部检查 | +| retry 失败 | 只看 retry_count 上限 | can_acquire 失败也会停止(agent 忙) | +| A9/429 | release + 任务 working → 30 分钟超时 | release + 推回 pending + 冷却 120s | +| A10 compact | retry(counter 不 release) | 不 retry,推回 pending | +| 意外退出 | counter 泄漏 | ticker 检测 → crash_count → 最多 3 次 | + +## 5. 续杯时间计算 + +Gateway timeout = 600s(10 分钟),3 次续杯 = ~30 分钟。 + +每次续杯的流程: +``` +进程退出 → release counter → _do_retry → spawn_full_agent(can_acquire) +→ acquire → spawn → 新进程执行(最多 10 分钟) +``` + +30 分钟内 agent 无法完成任务 → 第 3 次续杯后标 failed + escalate。 + +## 6. 测试计划 + +| 用例 | 验证 | +|------|------| +| 正常完成 | counter acquire → release,任务 done | +| Gateway timeout 续杯 | release → re-acquire → spawn,复用 session | +| 续杯时 agent 被占用 | can_acquire 失败 → 不 spawn,等 ticker | +| 429 → 冷却 | 推回 pending,120s 内不 dispatch | +| 进程崩溃 | ticker 检测,crash < 3 → 重新 dispatch | +| 3 次崩溃 | 标 failed + escalate | +| PM2 重启 | ticker 检测进程死 → release counter | +| Mail 幻觉门控 | on_complete 业务回调正常执行 | + +--- + +## 7. 请评审 + +1. counter 下沉到 spawn_full_agent 的方案是否合理? +2. A5/A6 不应出现,出现时标 failed + escalate 的处理是否过重? +3. 冷却时间 120s 是否合适? +4. 进程存活性检查放在 ticker 还是独立机制? +5. dispatcher 简化后,Mail 的 on_complete(幻觉门控)是否受影响? +6. 遗漏风险?