17 KiB
#08 Classify Outcome 优化 + Registry 清理
版本:v1.0 | 日期:2026-06-02 | 作者:庞统 | 状态:设计中
1. 背景与动机
1.1 classify outcome 问题
_classify_outcome 的 A0 行(status=None, stdout 空, exit≠0 → crashed)把所有「进程没输出 JSON 就退了」的情况一视同仁,无法区分:
| 实际场景 | exit_code | 当前判定 | 理想判定 |
|---|---|---|---|
| openclaw CLI 连不上 Gateway | 1 | crashed | 可恢复 |
| 进程被 SIGINT/SIGTERM 杀(compact 期间等) | 130/143 | crashed | 可恢复 |
| Agent 代码 bug 崩溃 | 1 | crashed | 不可恢复(但要等 ticker 兜底) |
此外:
- fallback_used 直接判死刑,不允许 retry
- compact_failed 不可恢复——但 compact 结束后应该可以续杯
- network/rate_limit/lock 推回 pending 而非 retry——counter 在手里,retry 更快
- stderr 没记录到 task_attempts,排查靠猜
1.2 Registry 孤儿问题
测试产生的 e2e-* 项目 data 目录被删,但 registry.db 记录还在(128 条)。每次 tick 遍历 100+ 个 no_db 项目,浪费 I/O。
1.3 并发控制一致性
architecture-v3.0.md §8.2 描述三层并发控制,代码已实现但存在一处偏差:
AgentProfile.max_concurrent(config 中每个 agent 的max_concurrent)定义了但没被使用- 实际 per-agent 限制统一用
counter.max_concurrent_sessions(全局值 3) - 不影响功能(因为当前 max_concurrent_sessions=3 已是宽松上限),但是设计↔代码不一致
2. 设计目标
- classify outcome:区分可恢复 vs 不可恢复,可恢复走 retry(续杯),不可恢复标 failed + 原因写黑板
- Registry 清理:删除项目时同步清理 registry,discover 时同步清理孤儿
- 并发控制对齐:标记 AgentProfile.max_concurrent 为 TODO 或接入 counter
2.1 Phase -1: Gateway 存活检查(新增)
问题
spawner Phase 0-4 只检查 agent 本地文件状态(sessions.json、lock),不检查 Gateway 进程是否活着。如果 Gateway 挂了或重启中,openclaw agent 命令会 hang 10+ 分钟后失败,浪费资源。
方案
在 Phase 0 之前增加 Gateway liveness check:
async def _probe_gateway(self, timeout: float = 3.0) -> bool:
"""TCP + WebSocket Upgrade 握手探测 Gateway"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection('127.0.0.1', 18789), timeout=timeout
)
writer.write(
b'GET /ws HTTP/1.1\r\n'
b'Host: 127.0.0.1:18789\r\n'
b'Upgrade: websocket\r\n'
b'Connection: Upgrade\r\n'
b'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n'
b'Sec-WebSocket-Version: 13\r\n\r\n'
)
await writer.drain()
resp = await asyncio.wait_for(reader.read(256), timeout=timeout)
writer.close()
return b'101' in resp
except Exception:
return False
处理
Gateway 不可达 → AgentBusyError(reason="gateway_down") → 任务留 pending → ticker 30s 后自动重试。
性能
- 延迟:~1-10ms(正常情况)
- 无新依赖(原生 asyncio)
- 对 Gateway 零负担(等同一次普通 HTTP 请求)
- 验证脚本:
scripts/gateway_monitor.py(每 10s 探测 + 日志记录)
3. Classify Outcome 判定树 v2.0
3.1 核心分类原则
| 分类 | 处理 | 说明 |
|---|---|---|
| 可恢复 → retry | _do_retry(续杯) |
counter 已在手里,重新 spawn 比推回 pending 更快 |
| 不确定 → crashed(等 ticker 兜底) | cooldown + 保持 working | ticker 超时回收 or crash_limit 标 failed |
| Agent 自己判的 | 尊重,不干预 | A4: task_status=failed |
| 完成 | 结束 | 正常完成 |
| 不可恢复 → failed | 标 failed + 原因写黑板 | 确定性错误,重试不会改变结果 |
3.2 输入因素
| 因素 | 来源 | 说明 |
|---|---|---|
| exit_code | 进程退出码 | 0=正常, 1=通用错误, 128+N=信号 |
| json_result.status | openclaw stdout JSON | ok / timeout / error / None |
| json_result.summary | 同上 | completed 等 |
| json_result.fallback_used | 同上 | Gateway 降级标志 |
| stderr_text | 进程 stderr | 关键字匹配辅助分类 |
| stdout_text | 进程 stdout | 是否有输出 |
| task_status | 黑板 DB 任务实际状态 | done / failed / working 等 |
| exit_code 信号类型 (新增) | exit_code ≥ 128 | SIGINT(130) / SIGTERM(143) = 外部中断 |
| stderr 关键字(无 JSON 时) (新增) | stderr_text | 无 JSON 输出时也检查 network/compact 等 |
3.3 判定表
第一层:有 JSON 输出(status ≠ None)
| 编号 | 条件 | outcome | 可恢复? | 处理 |
|---|---|---|---|---|
| A1 | status=ok, summary=completed | completed | — | 结束 |
| A2 | status=timeout | gateway_timeout | ✅ | retry(续杯) |
| A3 | status=ok, fallback_used, fallback_count ≥ 2 | fallback_exhausted | ❌ | 标 failed + 原因写黑板 |
| A3b | status=ok, fallback_used, fallback_count < 2 | fallback_retry | ✅ | retry + cooldown 30s |
| A4 | task_status=failed | agent_failed | — | 尊重,不干预 |
| A5 | status=ok(其他组合) | completed | — | 结束 |
| A6 | status=error + stderr 含 auth 关键字 | auth_failed | ❌ | 标 failed + 原因写黑板 |
| A7 | status=error + stderr 含 compact 关键字 | compact_interrupted | ✅ | retry + cooldown 60s(等 compact 完成) |
| A8 | status=error + stderr 含 network 关键字 | gateway_unreachable | ✅ | retry + cooldown 30s |
| A9 | status=error + stderr 含 rate_limit 关键字 | api_error | ✅ | retry + cooldown 60s |
| A10 | status=error + stderr 含 lock 关键字 | lock_conflict | ✅ | retry + cooldown 10s |
| A11 | status=error(其他) | agent_error | ❌ | 标 failed + 原因写黑板 |
第二层:无 JSON 输出(status = None, stdout 空)
| 编号 | 条件 | outcome | 可恢复? | 处理 |
|---|---|---|---|---|
| A12 | exit=0 + task_status ∈ {done, review} | completed | — | 正常完成 |
| A13 | exit=0 + task_status ∉ {done, review} | agent_error | ❌ | 标 failed + 原因写黑板 |
| A14 | exit=130 (SIGINT) 或 exit=143 (SIGTERM) | interrupted | ✅ | retry |
| A15 | exit≠0 + stderr 含 network 关键字 | gateway_unreachable | ✅ | retry + cooldown 30s |
| A16 | exit≠0 + stderr 含 compact 关键字 | compact_interrupted | ✅ | retry + cooldown 60s |
| A17 | exit≠0(其他,排除 A14-A16) | crashed | 不确定 | cooldown 300s + 保持 working → ticker 兜底 |
3.4 各编号详细说明
A2: gateway_timeout(续杯)
- 唯一由 Gateway 官方信号触发的续杯
- 走
_do_retry,用同一 session-id 续杯 - retry_count 上限 3 次
A3/A3b: fallback 分级处理(新)
- A3b(fallback_count < 2):fallback 说明任务本身没问题,是模型/网络抖了,retry
- A3(fallback_count ≥ 2):连续两次 fallback,说明模型质量不可靠,标 failed
- fallback_count 存储位置:task_attempts.metadata 中的
fallback_count字段 - 每次检测到 fallback_used=true 时递增,retry 成功(非 fallback 结束)时重置
A4: Agent 自己标 failed
- Agent 通过 API
POST /tasks/{id}/status主动标 failed - 含义:Agent 有意识地判断任务无法完成(需求矛盾、资源不足等)
- 处理:尊重 Agent 的判断,记录 outcome=agent_failed,不干预
A6: auth_failed(不可恢复)
- 401/403/unauthorized → 认证/授权失败
- 重试不会改变结果,需要人工介入
- 标 failed +
{"reason": "auth_failed", "stderr_preview": "..."}
A7: compact_interrupted(可恢复,新逻辑)
- compact 是 Gateway 的上下文压缩过程
- compact 结束后 session 状态恢复,可以续杯
- retry + cooldown 60s(给 compact 足够时间完成)
- 续杯时 spawn_full_agent 会重新检查 session state(Phase 2),如果 compact 还在进行会 AgentBusyError → 等 ticker
A8/A9/A10: 暂时性错误(可恢复,改为 retry)
- 原逻辑:release counter + 推回 pending → 等 ticker 重新分配
- 新逻辑:counter 已在手里,直接 retry 更快
- 各自设 cooldown 避免高频轮询:
- A8 network: 30s
- A9 rate_limit: 60s
- A10 lock: 10s
A11: agent_error(不可恢复)
- status=error 但 stderr 不匹配任何已知关键字
- 标 failed +
{"reason": "agent_error", "stderr_preview": "前500字"}
A14: interrupted(可恢复,新增)
- exit=130 (SIGINT) 或 exit=143 (SIGTERM)
- 原因:进程被外部中断(PM2 restart、compact 期间被杀、手动 Ctrl+C)
- 不是代码 bug,retry 很可能成功
- 直接 retry,不设额外 cooldown(进程已退出,session 应该是干净的)
A15/A16: 无 JSON 输出 + stderr 关键字(可恢复,新增)
- 原逻辑 A0 把这些统判 crashed
- 新逻辑:stderr 含 network/compact 关键字 → 对应的可恢复分类
- 和 A8/A7 处理一致
A17: crashed(不确定,等 ticker 兜底)
- 排除所有可恢复场景后,真正的"不知道什么原因崩了"
- 不立刻标 failed(保持现有逻辑)
- 设 cooldown 300s,任务保持 working
- ticker
_check_timeouts检测超时 → 标 failed - 或
_check_crash_limit连续 3 次 → 标 failed - 两种兜底机制都写原因到黑板
3.5 cooldown 参数汇总
| 场景 | cooldown | 理由 |
|---|---|---|
| A3b fallback | 30s | 模型抖动,短冷却 |
| A7/A16 compact | 60s | 等 compact 完成 |
| A8/A15 network | 30s | 网络暂时性问题 |
| A9 rate_limit | 60s | API 限流需要等待 |
| A10 lock | 10s | lock 冲突很快释放 |
| A14 interrupted | 0s | 进程已退出,session 干净 |
| A17 crashed | 300s | 给 session 恢复时间 |
3.6 retry 时 cooldown 的实现
retry 走 _do_retry → spawn_full_agent(skip_counter=True)。
cooldown 在 retry 前设:
if cls.get("cooldown_seconds"):
self.counter.set_cooldown(agent_id, seconds=cls["cooldown_seconds"])
await asyncio.sleep(cls["cooldown_seconds"])
或者更优雅:cooldown 由 can_acquire 检查,retry 时 spawn_full_agent 内部 Phase 1 会调 can_acquire,cooldown 期间自动等待。但当前 _do_retry 用 skip_counter=True 跳过了 counter 检查。
方案:在 _do_retry 开始时设 cooldown,cooldown 期间 asyncio.sleep,然后继续 spawn。这样不需要改 counter 逻辑。
3.7 stderr 记录增强
_record_attempt metadata 新增:
metadata = {
# 现有
"status": ...,
"summary": ...,
"fallback_used": ...,
"fallback_reason": ...,
"task_status_at_exit": ...,
# 新增
"stderr_preview": stderr_text[:500] if stderr_text else None,
"exit_signal": "SIGINT" if exit_code == 130 else "SIGTERM" if exit_code == 143 else None,
"fallback_count": ..., # 累计 fallback 次数
}
4. Registry 清理
4.1 现有 delete_project 的问题
DELETE /api/projects/{id}只逻辑删除(status→deleted),不清理 registry 记录discover_projects()只注册新项目,不清理 data 已删的记录- 无批量操作 API
4.2 修复方案
delete_project 增强
删除项目时同步清理 registry:
# src/blackboard/registry.py
def delete_project(self, project_id: str) -> bool:
"""物理删除项目:registry 记录 + data 目录"""
# 1. 删除 data 目录
project_dir = self.root / project_id
if project_dir.exists():
shutil.rmtree(project_dir, ignore_errors=True)
# 2. 删除 registry 记录
conn = self._connect()
conn.execute("DELETE FROM projects WHERE id=?", (project_id,))
conn.commit()
conn.close()
return True
安全性:保持 DELETE /api/projects/{id} 需要 project_id 精确匹配,不支持通配符。
discover_projects 增强
扫描时同步清理 data 不存在的 registry 记录:
def discover_projects(self, scan_dir=None):
# ... 现有注册逻辑 ...
# 清理孤儿:registry 有但 data 没目录的
conn = self._connect()
all_registered = conn.execute("SELECT id FROM projects WHERE status='active'").fetchall()
for (pid,) in all_registered:
if not (self.root / pid).is_dir():
conn.execute("UPDATE projects SET status='deleted' WHERE id=?", (pid,))
cleaned.append(pid)
conn.commit()
conn.close()
批量清理 API
新增运维端点(src/api/admin_routes.py):
POST /api/admin/cleanup
body: {
"action": "purge_deleted", # 清理所有 status=deleted 的项目和 data
"confirm": true # 安全确认
}
body: {
"action": "purge_prefix", # 按前缀批量删除
"prefix": "e2e-",
"confirm": true
}
body: {
"action": "sync_registry", # registry ↔ data 双向同步
"confirm": true
}
安全机制:
- 所有操作需要
confirm: true confirm: false时返回 dry-run 预览(将删除什么)- 可选:admin token 验证
5. 并发控制一致性检查
5.1 现状
| 设计层 | 配置/代码 | 实际生效 |
|---|---|---|
| per session key | max_per_session=1 |
✅ 生效 |
| per agent | max_concurrent_sessions=3 |
✅ 生效 |
| global | max_global_agents=5 |
✅ 生效 |
| per tick | max_dispatch_per_tick=3 |
✅ 生效 |
| per agent profile | AgentProfile.max_concurrent |
❌ 定义了未使用 |
5.2 问题
config/default.yaml 中每个 agent 有 max_concurrent(张飞=1, 司马懿=2, 庞统=3),但 Router 加载 profile 后 max_concurrent 从未被引用。实际 per-agent 限制统一用 counter.max_concurrent_sessions=3。
5.3 方案(标记为 TODO,不在本次实施)
两种对齐方向:
- 方向 A:删掉
AgentProfile.max_concurrent,统一用max_concurrent_sessions - 方向 B:counter.can_acquire() 读取 agent profile 的 max_concurrent 替代全局值
当前所有 agent 的 max_concurrent 都 ≤ 3,且 max_concurrent_sessions=3 已是宽松上限,不影响运行。标记为 TODO,在 #02 Main Session 实施时统一处理。
6. 影响范围
| 文件 | 改动 | 风险 |
|---|---|---|
src/daemon/spawner.py |
_classify_outcome 重写 + _handle_exit 增加 retry 分支 |
中 — 核心逻辑 |
src/daemon/spawner.py |
_record_attempt metadata 增强 |
低 — 只增加字段 |
src/daemon/spawner.py |
_do_retry cooldown 逻辑 |
低 — 在现有 retry 前加 sleep |
src/blackboard/registry.py |
delete_project 增强 + discover_projects 孤儿清理 | 低 — 边缘逻辑 |
src/api/admin_routes.py |
新增批量清理 API | 低 — 新文件 |
src/api/project_routes.py |
delete_project 调用 registry 清理 | 低 |
config/default.yaml |
无改动 | — |
docs/design/spawner-monitor-design.md |
A0 章节更新 | 无 |
tests/test_spawner.py |
新增 A14-A17 + A3b + A7 retry 测试 | 无 |
7. 测试计划
7.1 单元测试(mock)
| 测试 | 覆盖 |
|---|---|
| test_A14_interrupted_sigint | exit=130 → retry |
| test_A14_interrupted_sigterm | exit=143 → retry |
| test_A15_no_json_network | exit=1 + stderr network → retry |
| test_A16_no_json_compact | exit=1 + stderr compact → retry |
| test_A17_crashed | exit=1 + 空 stderr → crashed + cooldown |
| test_A3b_fallback_retry | fallback_count=0 → retry |
| test_A3_fallback_exhausted | fallback_count=2 → failed |
| test_A7_compact_retry | status=error + compact → retry + 60s cooldown |
| test_A8_network_retry | status=error + network → retry + 30s cooldown |
| test_A9_rate_limit_retry | status=error + rate_limit → retry + 60s cooldown |
| test_A10_lock_retry | status=error + lock → retry + 10s cooldown |
| test_A11_agent_error_reason | status=error + 未知 stderr → failed + reason |
| test_A6_auth_failed_reason | status=error + auth → failed + reason |
| test_stderr_recorded | metadata 包含 stderr_preview |
| test_fallback_count_tracking | fallback_count 跨 retry 累计 |
7.2 E2E 测试(需 RUN_INTEGRATION=1)
- 模拟 compact 场景:spawn → compact → retry → 成功
- 模拟 network 中断:spawn → network error → retry → 成功
8. 与现有设计的关系
本文档是 spawner-monitor-design.md 的增量更新,不替代原文档。改动点:
- §5 A0 章节更新为 A14-A17 四种细分
- §5 A5/A6 (fallback) 改为 A3/A3b 分级处理
- §5 A7-A12 中 compact/network/rate_limit/lock 改为 retry
- 新增 §3.6 cooldown 参数汇总
- 新增 §4 Registry 清理
- 新增 §5 并发控制一致性检查(TODO 标记)