Files
sanguo_moziplus_v2/docs/design/08-classify-outcome-optimization.md
T
2026-06-02 14:44:53 +08:00

17 KiB
Raw Blame History

#08 Classify Outcome 优化 + Registry 清理

版本:v1.0 | 日期:2026-06-02 | 作者:庞统 | 状态:设计中

1. 背景与动机

1.1 classify outcome 问题

_classify_outcome 的 A0 行(status=None, stdout 空, exit≠0 → crashed)把所有「进程没输出 JSON 就退了」的情况一视同仁,无法区分:

实际场景 exit_code 当前判定 理想判定
openclaw CLI 连不上 Gateway 1 crashed 可恢复
进程被 SIGINT/SIGTERM 杀(compact 期间等) 130/143 crashed 可恢复
Agent 代码 bug 崩溃 1 crashed 不可恢复(但要等 ticker 兜底)

此外:

  • fallback_used 直接判死刑,不允许 retry
  • compact_failed 不可恢复——但 compact 结束后应该可以续杯
  • network/rate_limit/lock 推回 pending 而非 retry——counter 在手里,retry 更快
  • stderr 没记录到 task_attempts,排查靠猜

1.2 Registry 孤儿问题

测试产生的 e2e-* 项目 data 目录被删,但 registry.db 记录还在(128 条)。每次 tick 遍历 100+ 个 no_db 项目,浪费 I/O。

1.3 并发控制一致性

architecture-v3.0.md §8.2 描述三层并发控制,代码已实现但存在一处偏差:

  • AgentProfile.max_concurrentconfig 中每个 agent 的 max_concurrent定义了但没被使用
  • 实际 per-agent 限制统一用 counter.max_concurrent_sessions(全局值 3
  • 不影响功能(因为当前 max_concurrent_sessions=3 已是宽松上限),但是设计↔代码不一致

2. 设计目标

  1. classify outcome:区分可恢复 vs 不可恢复,可恢复走 retry(续杯),不可恢复标 failed + 原因写黑板
  2. Registry 清理:删除项目时同步清理 registry,discover 时同步清理孤儿
  3. 并发控制对齐:标记 AgentProfile.max_concurrent 为 TODO 或接入 counter

2.1 Phase -1: Gateway 存活检查(新增)

问题

spawner Phase 0-4 只检查 agent 本地文件状态(sessions.json、lock),不检查 Gateway 进程是否活着。如果 Gateway 挂了或重启中,openclaw agent 命令会 hang 10+ 分钟后失败,浪费资源。

方案

在 Phase 0 之前增加 Gateway liveness check

async def _probe_gateway(self, timeout: float = 3.0) -> bool:
    """TCP + WebSocket Upgrade 握手探测 Gateway"""
    try:
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection('127.0.0.1', 18789), timeout=timeout
        )
        writer.write(
            b'GET /ws HTTP/1.1\r\n'
            b'Host: 127.0.0.1:18789\r\n'
            b'Upgrade: websocket\r\n'
            b'Connection: Upgrade\r\n'
            b'Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n'
            b'Sec-WebSocket-Version: 13\r\n\r\n'
        )
        await writer.drain()
        resp = await asyncio.wait_for(reader.read(256), timeout=timeout)
        writer.close()
        return b'101' in resp
    except Exception:
        return False

处理

Gateway 不可达 → AgentBusyError(reason="gateway_down") → 任务留 pending → ticker 30s 后自动重试。

性能

  • 延迟:~1-10ms(正常情况)
  • 无新依赖(原生 asyncio
  • 对 Gateway 零负担(等同一次普通 HTTP 请求)
  • 验证脚本:scripts/gateway_monitor.py(每 10s 探测 + 日志记录)

3. Classify Outcome 判定树 v2.0

3.1 核心分类原则

分类 处理 说明
可恢复 → retry _do_retry(续杯) counter 已在手里,重新 spawn 比推回 pending 更快
不确定 → crashed(等 ticker 兜底) cooldown + 保持 working ticker 超时回收 or crash_limit 标 failed
Agent 自己判的 尊重,不干预 A4: task_status=failed
完成 结束 正常完成
不可恢复 → failed 标 failed + 原因写黑板 确定性错误,重试不会改变结果

3.2 输入因素

因素 来源 说明
exit_code 进程退出码 0=正常, 1=通用错误, 128+N=信号
json_result.status openclaw stdout JSON ok / timeout / error / None
json_result.summary 同上 completed 等
json_result.fallback_used 同上 Gateway 降级标志
stderr_text 进程 stderr 关键字匹配辅助分类
stdout_text 进程 stdout 是否有输出
task_status 黑板 DB 任务实际状态 done / failed / working 等
exit_code 信号类型 (新增) exit_code ≥ 128 SIGINT(130) / SIGTERM(143) = 外部中断
stderr 关键字(无 JSON 时) (新增) stderr_text 无 JSON 输出时也检查 network/compact 等

3.3 判定表

第一层:有 JSON 输出(status ≠ None

编号 条件 outcome 可恢复? 处理
A1 status=ok, summary=completed completed 结束
A2 status=timeout gateway_timeout retry(续杯)
A3 status=ok, fallback_used, fallback_count ≥ 2 fallback_exhausted 标 failed + 原因写黑板
A3b status=ok, fallback_used, fallback_count < 2 fallback_retry retry + cooldown 30s
A4 task_status=failed agent_failed 尊重,不干预
A5 status=ok(其他组合) completed 结束
A6 status=error + stderr 含 auth 关键字 auth_failed 标 failed + 原因写黑板
A7 status=error + stderr 含 compact 关键字 compact_interrupted retry + cooldown 60s(等 compact 完成)
A8 status=error + stderr 含 network 关键字 gateway_unreachable retry + cooldown 30s
A9 status=error + stderr 含 rate_limit 关键字 api_error retry + cooldown 60s
A10 status=error + stderr 含 lock 关键字 lock_conflict retry + cooldown 10s
A11 status=error(其他) agent_error 标 failed + 原因写黑板

第二层:无 JSON 输出(status = None, stdout 空)

编号 条件 outcome 可恢复? 处理
A12 exit=0 + task_status ∈ {done, review} completed 正常完成
A13 exit=0 + task_status ∉ {done, review} agent_error 标 failed + 原因写黑板
A14 exit=130 (SIGINT) 或 exit=143 (SIGTERM) interrupted retry
A15 exit≠0 + stderr 含 network 关键字 gateway_unreachable retry + cooldown 30s
A16 exit≠0 + stderr 含 compact 关键字 compact_interrupted retry + cooldown 60s
A17 exit≠0(其他,排除 A14-A16 crashed 不确定 cooldown 300s + 保持 working → ticker 兜底

3.4 各编号详细说明

A2: gateway_timeout(续杯)

  • 唯一由 Gateway 官方信号触发的续杯
  • _do_retry,用同一 session-id 续杯
  • retry_count 上限 3 次

A3/A3b: fallback 分级处理(新)

  • A3bfallback_count < 2):fallback 说明任务本身没问题,是模型/网络抖了,retry
  • A3fallback_count ≥ 2):连续两次 fallback,说明模型质量不可靠,标 failed
  • fallback_count 存储位置:task_attempts.metadata 中的 fallback_count 字段
  • 每次检测到 fallback_used=true 时递增,retry 成功(非 fallback 结束)时重置

A4: Agent 自己标 failed

  • Agent 通过 API POST /tasks/{id}/status 主动标 failed
  • 含义:Agent 有意识地判断任务无法完成(需求矛盾、资源不足等)
  • 处理:尊重 Agent 的判断,记录 outcome=agent_failed,不干预

A6: auth_failed(不可恢复)

  • 401/403/unauthorized → 认证/授权失败
  • 重试不会改变结果,需要人工介入
  • 标 failed + {"reason": "auth_failed", "stderr_preview": "..."}

A7: compact_interrupted(可恢复,新逻辑)

  • compact 是 Gateway 的上下文压缩过程
  • compact 结束后 session 状态恢复,可以续杯
  • retry + cooldown 60s(给 compact 足够时间完成)
  • 续杯时 spawn_full_agent 会重新检查 session statePhase 2),如果 compact 还在进行会 AgentBusyError → 等 ticker

A8/A9/A10: 暂时性错误(可恢复,改为 retry)

  • 原逻辑:release counter + 推回 pending → 等 ticker 重新分配
  • 新逻辑:counter 已在手里,直接 retry 更快
  • 各自设 cooldown 避免高频轮询:
    • A8 network: 30s
    • A9 rate_limit: 60s
    • A10 lock: 10s

A11: agent_error(不可恢复)

  • status=error 但 stderr 不匹配任何已知关键字
  • 标 failed + {"reason": "agent_error", "stderr_preview": "前500字"}

A14: interrupted(可恢复,新增)

  • exit=130 (SIGINT) 或 exit=143 (SIGTERM)
  • 原因:进程被外部中断(PM2 restart、compact 期间被杀、手动 Ctrl+C
  • 不是代码 bug,retry 很可能成功
  • 直接 retry,不设额外 cooldown(进程已退出,session 应该是干净的)

A15/A16: 无 JSON 输出 + stderr 关键字(可恢复,新增)

  • 原逻辑 A0 把这些统判 crashed
  • 新逻辑:stderr 含 network/compact 关键字 → 对应的可恢复分类
  • 和 A8/A7 处理一致

A17: crashed(不确定,等 ticker 兜底)

  • 排除所有可恢复场景后,真正的"不知道什么原因崩了"
  • 不立刻标 failed(保持现有逻辑)
  • 设 cooldown 300s,任务保持 working
  • ticker _check_timeouts 检测超时 → 标 failed
  • _check_crash_limit 连续 3 次 → 标 failed
  • 两种兜底机制都写原因到黑板

3.5 cooldown 参数汇总

场景 cooldown 理由
A3b fallback 30s 模型抖动,短冷却
A7/A16 compact 60s 等 compact 完成
A8/A15 network 30s 网络暂时性问题
A9 rate_limit 60s API 限流需要等待
A10 lock 10s lock 冲突很快释放
A14 interrupted 0s 进程已退出,session 干净
A17 crashed 300s 给 session 恢复时间

3.6 retry 时 cooldown 的实现

retry 走 _do_retryspawn_full_agent(skip_counter=True)。 cooldown 在 retry 前设:

if cls.get("cooldown_seconds"):
    self.counter.set_cooldown(agent_id, seconds=cls["cooldown_seconds"])
    await asyncio.sleep(cls["cooldown_seconds"])

或者更优雅:cooldown 由 can_acquire 检查,retry 时 spawn_full_agent 内部 Phase 1 会调 can_acquire,cooldown 期间自动等待。但当前 _do_retryskip_counter=True 跳过了 counter 检查。

方案:在 _do_retry 开始时设 cooldowncooldown 期间 asyncio.sleep,然后继续 spawn。这样不需要改 counter 逻辑。

3.7 stderr 记录增强

_record_attempt metadata 新增:

metadata = {
    # 现有
    "status": ...,
    "summary": ...,
    "fallback_used": ...,
    "fallback_reason": ...,
    "task_status_at_exit": ...,
    # 新增
    "stderr_preview": stderr_text[:500] if stderr_text else None,
    "exit_signal": "SIGINT" if exit_code == 130 else "SIGTERM" if exit_code == 143 else None,
    "fallback_count": ...,  # 累计 fallback 次数
}

4. Registry 清理

4.1 现有 delete_project 的问题

  • DELETE /api/projects/{id} 只逻辑删除(status→deleted),不清理 registry 记录
  • discover_projects() 只注册新项目,不清理 data 已删的记录
  • 无批量操作 API

4.2 修复方案

delete_project 增强

删除项目时同步清理 registry

# src/blackboard/registry.py
def delete_project(self, project_id: str) -> bool:
    """物理删除项目:registry 记录 + data 目录"""
    # 1. 删除 data 目录
    project_dir = self.root / project_id
    if project_dir.exists():
        shutil.rmtree(project_dir, ignore_errors=True)
    # 2. 删除 registry 记录
    conn = self._connect()
    conn.execute("DELETE FROM projects WHERE id=?", (project_id,))
    conn.commit()
    conn.close()
    return True

安全性:保持 DELETE /api/projects/{id} 需要 project_id 精确匹配,不支持通配符。

discover_projects 增强

扫描时同步清理 data 不存在的 registry 记录:

def discover_projects(self, scan_dir=None):
    # ... 现有注册逻辑 ...
    
    # 清理孤儿:registry 有但 data 没目录的
    conn = self._connect()
    all_registered = conn.execute("SELECT id FROM projects WHERE status='active'").fetchall()
    for (pid,) in all_registered:
        if not (self.root / pid).is_dir():
            conn.execute("UPDATE projects SET status='deleted' WHERE id=?", (pid,))
            cleaned.append(pid)
    conn.commit()
    conn.close()

批量清理 API

新增运维端点(src/api/admin_routes.py):

POST /api/admin/cleanup
  body: {
    "action": "purge_deleted",     # 清理所有 status=deleted 的项目和 data
    "confirm": true                # 安全确认
  }
  body: {
    "action": "purge_prefix",      # 按前缀批量删除
    "prefix": "e2e-",
    "confirm": true
  }
  body: {
    "action": "sync_registry",     # registry ↔ data 双向同步
    "confirm": true
  }

安全机制:

  • 所有操作需要 confirm: true
  • confirm: false 时返回 dry-run 预览(将删除什么)
  • 可选:admin token 验证

5. 并发控制一致性检查

5.1 现状

设计层 配置/代码 实际生效
per session key max_per_session=1 生效
per agent max_concurrent_sessions=3 生效
global max_global_agents=5 生效
per tick max_dispatch_per_tick=3 生效
per agent profile AgentProfile.max_concurrent 定义了未使用

5.2 问题

config/default.yaml 中每个 agent 有 max_concurrent(张飞=1, 司马懿=2, 庞统=3),但 Router 加载 profile 后 max_concurrent 从未被引用。实际 per-agent 限制统一用 counter.max_concurrent_sessions=3

5.3 方案(标记为 TODO,不在本次实施)

两种对齐方向:

  • 方向 A:删掉 AgentProfile.max_concurrent,统一用 max_concurrent_sessions
  • 方向 Bcounter.can_acquire() 读取 agent profile 的 max_concurrent 替代全局值

当前所有 agent 的 max_concurrent 都 ≤ 3,且 max_concurrent_sessions=3 已是宽松上限,不影响运行。标记为 TODO,在 #02 Main Session 实施时统一处理。

6. 影响范围

文件 改动 风险
src/daemon/spawner.py _classify_outcome 重写 + _handle_exit 增加 retry 分支 中 — 核心逻辑
src/daemon/spawner.py _record_attempt metadata 增强 低 — 只增加字段
src/daemon/spawner.py _do_retry cooldown 逻辑 低 — 在现有 retry 前加 sleep
src/blackboard/registry.py delete_project 增强 + discover_projects 孤儿清理 低 — 边缘逻辑
src/api/admin_routes.py 新增批量清理 API 低 — 新文件
src/api/project_routes.py delete_project 调用 registry 清理
config/default.yaml 无改动
docs/design/spawner-monitor-design.md A0 章节更新
tests/test_spawner.py 新增 A14-A17 + A3b + A7 retry 测试

7. 测试计划

7.1 单元测试(mock

测试 覆盖
test_A14_interrupted_sigint exit=130 → retry
test_A14_interrupted_sigterm exit=143 → retry
test_A15_no_json_network exit=1 + stderr network → retry
test_A16_no_json_compact exit=1 + stderr compact → retry
test_A17_crashed exit=1 + 空 stderr → crashed + cooldown
test_A3b_fallback_retry fallback_count=0 → retry
test_A3_fallback_exhausted fallback_count=2 → failed
test_A7_compact_retry status=error + compact → retry + 60s cooldown
test_A8_network_retry status=error + network → retry + 30s cooldown
test_A9_rate_limit_retry status=error + rate_limit → retry + 60s cooldown
test_A10_lock_retry status=error + lock → retry + 10s cooldown
test_A11_agent_error_reason status=error + 未知 stderr → failed + reason
test_A6_auth_failed_reason status=error + auth → failed + reason
test_stderr_recorded metadata 包含 stderr_preview
test_fallback_count_tracking fallback_count 跨 retry 累计

7.2 E2E 测试(需 RUN_INTEGRATION=1

  • 模拟 compact 场景:spawn → compact → retry → 成功
  • 模拟 network 中断:spawn → network error → retry → 成功

8. 与现有设计的关系

本文档是 spawner-monitor-design.md 的增量更新,不替代原文档。改动点:

  • §5 A0 章节更新为 A14-A17 四种细分
  • §5 A5/A6 (fallback) 改为 A3/A3b 分级处理
  • §5 A7-A12 中 compact/network/rate_limit/lock 改为 retry
  • 新增 §3.6 cooldown 参数汇总
  • 新增 §4 Registry 清理
  • 新增 §5 并发控制一致性检查(TODO 标记)