Files
sanguo_moziplus_v2/docs/design/v3.0-router-refactor.md
T
cfdaily 0d7425b88c
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions
auto-sync: 2026-06-07 01:35:53
2026-06-07 01:35:53 +08:00

19 KiB
Raw Blame History

v3.0 调度重构方案:去掉独立 LLM,改为广播认领 + 确定性交接

日期: 2026-05-21 状态: 待司马懿评审 影响文件: router.py, dispatcher.py, ticker.py, main.py, config/default.yaml


1. 问题

1.1 LLMDriver 是设计之外的野路子

当前 Router 的 LLMDriver 用独立 OpenAI() 客户端直连 zhipu API 做路由决策:

  • 不属于 L1(调了 LLM)、不属于 L2(不是 openclaw agent)、不属于 L3(不是完整 Agent)
  • 不走 Gateway(无模型路由、无 fallback、无计费)
  • 需要单独维护 api_base/api_key(和 Gateway 配置重复)

已导致实际故障general-20260521-0004 因凭据为空反复调度失败。

1.2 _dispatch_pending 绕过了认领机制

设计文档核心原则(§1.2):

"编排是 AI Agent 在黑板上自主领活(动态协作)" "Daemon 是投递员,不是决策者"

_dispatch_pending 的实际行为是:

pending → Router 决定 agent → 强制 claimed + spawn → Agent 被动执行

代码里认领的全部基础设施(API + CAS + inbox 事件)都已经实现,但被跳过了。

1.3 设计违背

architecture-v2.6.md §1.1

"Agent 决策,Daemon 执行" — 庞统做 plan、张飞领任务、关羽发现风险,都写在黑板上。

Daemon 替 Agent 做决策(Router 决定分给谁),违反了"Agent 决策"原则。


2. 设计文档中已有的完整方案

2.1 认领基础设施(已实现)

组件 代码位置 说明
claim API POST /tasks/{id}/claim 原子 CAS 认领
claim_task() operations.py:155 WHERE status='pending' AND (assignee IS NULL OR assignee=?)
inbox agent_claim inbox.py:264 Agent 通过 JSONL 通知 Daemon 认领
claim_timeout ticker.py:530 claimed 超时 5 分钟 → 重置为 pending(续杯)

2.2 竞态解决(已设计,§3.6

  1. 默认:先到先得 — SQLite CAS,谁先 claim 谁做
  2. 升级:庞统仲裁 — 争议时 @庞统 请求仲裁
  3. 最终:用户拍板 — @user 请求用户决定

2.3 续杯兜底(已实现)

pending → 广播 → 无人认领 → claim_timeout(5min) → pending → 再广播 → ... → 庞统兜底

claim_timeout_minutes = 5.0_check_timeouts 自动把超时的 claimed 重置为 pending。

2.4 三层执行模型(§4.2

层级 方式 命令 适用场景
L1 Daemon 直接操作 SQLite/文件 状态更新、机械验证
L2 spawn sub 隔离 session openclaw agent --agent <id> --session-id <uuid> scope guard、格式校验
L3 run agent 完整黑板参与者 openclaw agent --agent <id> 编码、审查、策略

核心原则:系统只有两种 LLM 调用方式,都通过 Gateway,没有第三种。


3. 方案:广播认领 + 确定性交接

3.1 核心思路

任务调度分两条路径:

确定性路径:已明确知道下一步该谁做 → 直接 spawn,不需要广播

  • retry → 原执行者
  • Agent handoffnext_capability)→ 能力匹配
  • 有 assignee → 直接分
  • review 生命周期 → 能力匹配

广播认领路径:首次分配 / 不确定场景 → spawn 所有空闲 Agent,自主 claim

  • Agent 读黑板 → 自己判断是否适合 → claim
  • 无人认领 → 续杯 → 庞统兜底

3.2 完整调度流程

Ticker._tick_project()
  │
  ├─ 1. 扫描状态
  ├─ 2. 依赖推进
  ├─ 3. 超时检测(claimed 5min → pendingworking 30min → failed
  │
  ├─ 4. 调度 pending 任务:_dispatch_pending()
  │     for each pending task:
  │       ├─ 有 next_capability?→ 能力匹配 → 直接 spawn 对应 Agent
  │       ├─ 有 assignee?→ 直接 spawn 该 Agent
  │       ├─ retry?→ spawn 原执行者
  │       └─ 都没有?→ _broadcast_claim()
  │            → spawn 每个空闲 Agent,传入"黑板上有新任务,请认领"的 prompt
  │            → Agent 读黑板(GET /tasks?status=pending
  │            → Agent 判断是否适合自己
  │            → Agent claimPOST /tasks/{id}/claim)或退出
  │
  ├─ 5. 调度 review 任务:_dispatch_reviews()(不变)
  └─ 6-10. 其他处理(不变)

3.3 广播认领的 Spawn Prompt

广播 spawn 的 Agent 收到的 prompt

你是 {agent_id}。黑板上有新的待认领任务。

## 操作
1. 读黑板查看待认领任务:
   curl http://{api_host}:{api_port}/api/projects/{project_id}/tasks?status=pending

2. 分析每个 pending 任务,判断是否适合你:
   - 你的能力:{capabilities}
   - 任务类型、描述、优先级是否匹配你的专长

3. 如果有适合你的任务,立即认领:
   curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/claim \
     -H 'Content-Type: application/json' \
     -d '{"agent": "{agent_id}"}'

4. 认领成功后,开始执行(状态改为 working)

5. 如果没有适合你的任务,直接退出

3.4 Claim 竞争处理

多个 Agent 同时 claim 同一个任务:

  • SQLite CASWHERE status='pending' — 只有第一个成功,其余 rowcount=0
  • Agent 行为:claim 失败 → 检查其他 pending 任务 → 没有适合的 → 退出
  • 自然负载均衡:不同 Agent 倾向认领不同类型的任务(张飞→coding,司马懿→review

3.5 无人认领兜底

广播 → 5 分钟内无人 claim → claim_timeout → pending → 下轮 ticker 再广播
→ 连续 3 轮无人认领 → spawn 庞统 → 庞统决定分配或自己执行

连续无人认领的检测:在 events 表中记录 broadcast_sent 事件,_check_timeouts 中统计广播轮次,超过阈值 escalate to 庞统。


4. 改动清单

4.1 删除 LLMDriver 类(router.py

删除整个 LLMDriver 类(~120 行)。AgentRouter.route() 末尾改为返回 delegate。

4.2 AgentRouter.__init__ 去掉 llm_driver 参数

4.3 新增 _broadcast_claimticker.py

司马懿建议:攒一批任务,每轮 ticker 最多广播一次(而非每个 pending 任务触发一次广播)。 5 个任务只需 spawn 5 个 Agent,而不是 25 个。

广播前检查全局并发(司马懿建议 1):接近上限时跳过本轮广播。

async def _broadcast_claim(self, tasks, db_path, project_id):
    """广播一批待认领任务给所有空闲 Agent,每轮最多广播一次"""
    # 全局并发检查
    if self.counter and self.counter.global_active >= self.counter._max_global - 1:
        logger.info("Skipping broadcast: global concurrent near limit")
        return []

    idle_agents = self._get_idle_agents()
    if not idle_agents:
        return []

    spawned = []
    for agent_id in idle_agents:
        if not await self.counter.can_acquire(agent_id):
            continue
        prompt = self._build_claim_prompt(agent_id, tasks, project_id)
        await self.counter.acquire(agent_id)
        session_id = await self.spawner.spawn_full_agent(
            agent_id=agent_id,
            message=prompt,
            on_complete=lambda aid, _: self.counter.release(aid),
        )
        spawned.append(agent_id)
    return spawned

广播 prompt 包含所有 pending 任务列表:

def _build_claim_prompt(self, agent_id, tasks, project_id):
    task_list = "\n".join([
        f"- ID: {t.id}, 标题: {t.title}, 类型: {t.task_type}, 优先级: {t.priority}"
        for t in tasks
    ])
    return f"""你是 {agent_id}。黑板上有 {len(tasks)} 个待认领任务。

## 待认领任务
{task_list}

## 操作
1. 读黑板查看详情:
   curl http://{api}/api/projects/{project_id}/tasks?status=pending

2. 选择适合你的任务并认领:
   curl -X POST http://{api}/api/projects/{project_id}/tasks/{task_id}/claim \
     -H 'Content-Type: application/json' -d '{{"agent": "{agent_id}"}}'

3. 认领成功后开始执行(状态改为 working)
4. 没有适合你的任务则退出
"""

4.4 Dispatcher 增加 delegate 模式(dispatcher.py

_build_spawn_message 增加 delegate 分支(庞统兜底 prompt),和广播认领共存:

  • 广播认领失败 → 任务回到 pending → 多轮后 → spawn 庞统 delegate

4.5 main.py 去掉 LLMDriver 初始化

4.6 config/default.yaml 去掉 routing 节

4.7 无人认领检测(复用 retry_count

司马懿建议:不需要新增 broadcast_sent 事件。无人认领重置 pending 时 retry_count 已在递增,直接用它判断阈值。当 retry_count >= 3 时 escalate to 庞统。


5. 广播认领完整生命周期

本节记录从任务创建到最终闭环的完整链路,包括所有已实现但此前未在设计文档中记录的机制。

5.1 完整流程图

用户创建任务 → pending
  ↓
Ticker 检测到 pending 任务
  ↓
有确定性路径?(retry/handoff/assignee/生命周期)
  ├─ 是 → 直接 spawn 指定 Agent
  └─ 否 → 进入广播认领
           ↓
       广播前检查全局并发(counter.is_near_limit()
       → 接近上限?跳过本轮
           ↓
       获取空闲 Agent 列表(从 config agents 列表取)
       → 无空闲?log warning,不递增 retry_count,等下轮
           ↓
       spawn 所有空闲 Agent,传入 claim prompt
       retry_count++(本轮广播计数)
           ↓
       ┌─ 有人 claim → 正常执行流程
       └─ 无人 claim
           ↓
       claimed 超时(5min)→ _check_timeouts 重置为 pending
       retry_count 已递增
           ↓
       retry_count < 3?→ 下轮 Ticker 继续广播
       retry_count >= 3?→ escalate to 庞统

5.2 retry_count 的三种递增场景

场景 触发位置 retry_count++ 说明
广播后无人认领 _broadcast_claim L536 每轮广播递增,记录广播轮次
claimed 超时回收 _check_timeouts L724 Agent 认领后未执行,视为失败
无空闲 Agent _broadcast_claim L527 系统容量问题,不是任务问题

5.3 超时阈值(与 working 状态一致)

状态 超时 处理
claimed claim_timeout_minutes = 5.0 → pendingretry_count < 3)或 escalatedretry_count >= 3
working default_task_timeout_minutes = 30.0 → failed

超时后 retry_count >= 3 时的处理与 working 超时一致:标记 escalated,触发用户介入流程。

5.4 庞统兜底(delegate 模式)

当任务进入以下状态时,Router 路由到 FALLBACK_AGENT = "pangtong-fujunshi"mode="delegate"

  • 广播 3 轮无人认领(retry_count >= 3
  • 确定性路由无法匹配(模糊场景)

庞统收到的 delegate prompt 包含:任务信息 + 团队能力列表 + API 端点。庞统可以:

  1. 直接分配给指定 Agent(通过 claim API
  2. 自己执行
  3. 判断任务不合理 → cancel

5.5 用户介入链路(已完整实现)

当任务 escalated 后,用户介入的完整链路:

Agent 超时/广播无人认领
  → Ticker 标记 status="escalated"
  → SSE broker.publish_sync("task_updated", {...})
  → 前端 EventSource 实时收到事件
  → 通知中心推送 "{old_status} → escalated"
  → 卡片红色高亮 + ⚠️ 图标
  → 用户点开 TaskModal
  → 看到 escalated 按钮:
     · ▶ 继续执行 (→ working)
     · 🔄 重新分配 (→ pending, retry_count 重置)
     · 🚫 取消 (→ cancelled)
     ▸ 高级操作:
     · (可扩展更多手动干预)
  → 用户点击 → POST /tasks/{id}/status → SSE 推送 → 状态更新

已实现代码对应表:

环节 代码文件 具体实现
标记 escalated ticker.py _check_timeouts / _broadcast_claim _transition_status(conn, task_id, "escalated", ...)
SSE 推送 blackboard_routes.py update_status broker.publish_sync("task_updated", ...)
SSE 基础设施 sse.py / sse_routes.py SSEBroker + EventSource endpoint
前端 SSE 监听 store.ts startSSE() addEventListener('task_updated', ...)
前端通知中心 store.ts _pushSseEvent() 通知列表 + 未读计数
escalated 按钮 TaskModal.tsx PRIMARY_ACTIONS[escalated] ▶ 继续执行 + 🔄 重新分配
escalated 高级操作 TaskModal.tsx ADVANCED_ACTIONS 🚫 取消
状态转换 API blackboard_routes.py POST /tasks/{id}/status VALID_TRANSITIONS 校验
转换矩阵 db.py VALID_TRANSITIONS escalated → {working, pending, cancelled}

5.6 前端按钮完整矩阵(v3.1 用户确认版)

设计原则:

  • 暂停和取消是用户的通用权利,所有非终态都有
  • AI 自动流转的操作(认领、开始、提审、通过、打回)不需要按钮
  • escalated / waiting_human 是 AI 发起后等待用户决策,不是通用按钮
  • blocked 任务本身没在动,暂停无意义
  • failed 的升级是 AI 发起不是按钮

状态定义

状态 含义 谁触发 颜色
pending 等待 AI 认领 用户创建/AI流转
claimed AI 已认领 AI 认领
working AI 执行中 AI 开始
paused 用户暂停 用户
review AI 审查中 AI 提交审查
failed 失败 AI 或 daemon
blocked 缺条件卡住 Agent 遇到外部阻塞
escalated AI 解决不了,升级求助 Agent
waiting_human 流程验收点等用户确认 Agent Checkpoint
done 完成 审查通过/用户确认 绿
cancelled 取消 用户

escalated vs waiting_human 区别:

  • escalatedAgent 遇到意外问题解决不了(如技术选型不确定)→ 红色 → 严重
  • waiting_human:执行到计划中的验收点(如 Checkpoint)→ 橙色 → 正常进度

按钮矩阵

状态 用户按钮 说明
pending ⏸ 暂停、🚫 取消 等 AI 认领时用户可暂停或取消
claimed ⏸ 暂停、🚫 取消 AI 已认领未开始
working ⏸ 暂停、🚫 取消 AI 执行中
paused ▶ 恢复、🚫 取消 恢复回暂停前状态
review ⏸ 暂停、🚫 取消 AI 审查中
failed 🔄 重试、🚫 取消 重试回 pending
blocked 🔓 解除(→pending)、🚫 取消 任务本身没在动,暂停无意义
escalated ▶ 继续(→working)、🔄 重分配(→pending)、🚫 取消 AI 发起升级后用户决策
waiting_human 确认、🔄 拒绝(→working)、🚫 取消 流程验收点
done 📦 归档 终态
cancelled 🔄 重新启动(→pending)、📦 归档 可重启保留历史

状态机变更

暂停是通用权利,需要在更多状态允许 → paused 转换:

# 当前
VALID_TRANSITIONS = {
    "pending":       {"claimed", "cancelled"},
    "claimed":       {"working", "paused", "pending", "cancelled"},
    "working":       {"review", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled"},
    "paused":        {"working", "cancelled"},
    "review":        {"done", "pending", "failed", "escalated", "waiting_human", "cancelled"},
    "blocked":       {"pending", "escalated", "cancelled"},
    "failed":        {"pending", "escalated", "cancelled"},
    "escalated":     {"working", "pending", "cancelled"},
    "waiting_human": {"working", "done", "cancelled"},
    "done":          set(),
    "cancelled":     set(),
}

# 变更后
VALID_TRANSITIONS = {
    "pending":       {"claimed", "paused", "cancelled"},            # +paused
    "claimed":       {"working", "paused", "pending", "cancelled"},  # 不变
    "working":       {"review", "blocked", "failed", "paused", "escalated", "waiting_human", "cancelled"},  # 不变
    "paused":        {"pending", "cancelled"},                       # 恢复统一回 pending
    "review":        {"done", "pending", "failed", "paused", "escalated", "waiting_human", "cancelled"},  # +paused
    "blocked":       {"pending", "escalated", "cancelled"},  # 不变(blocked暂停无意义)
    "failed":        {"pending", "escalated", "cancelled"},  # 不变(暂停语义不清晰)
    "escalated":     {"working", "pending", "paused", "cancelled"},  # +paused
    "waiting_human": {"working", "done", "paused", "cancelled"},  # +paused
    "done":          {"cancelled"},                             # +cancelled(取消已完成任务)
    "cancelled":     {"pending"},                               # +pending(重新启动)
}

paused 恢复机制: paused 记录 resumed_from 字段,恢复时回到暂停前状态,从暂停的节点继续执行(允许一个 stage 的冗余重跑)。

  • working → paused → ▶恢复 → working
  • review → paused → ▶恢复 → review
  • claimed → paused → ▶恢复 → claimed

取消 ADVANCED_ACTIONS 折叠区

v3.1 不再使用 ADVANCED_ACTIONS 折叠区。所有用户操作直接展示在主操作区。 原因:暂停和取消是通用权利,不应隐藏在折叠区里。


6. 场景对比

场景 改前(独立 LLM 分配) 改后(广播认领 + 确定性交接)
retry 原执行者 不变(确定性)
Agent handoff 能力匹配 不变(确定性)
有 assignee 直接分 不变(确定性)
review 生命周期 能力匹配 不变(确定性)
首次分配 独立 LLM 决定分给谁 广播所有空闲 Agent,自主认领
无人认领 无此场景(强制分配) 续杯 → 庞统兜底

7. 代码量

  • ~130 行(LLMDriver + routing config 初始化 + config.yaml routing 节)
  • ~30 行(Router.route() 末尾 + Dispatcher._build_spawn_message() delegate 分支)
  • 新增~60 行(_broadcast_claim + _build_claim_prompt
  • 净减~40 行

8. 风险与缓解

# 风险 评估 缓解
1 广播 spawn 消耗资源(每个 pending 任务都 spawn 所有空闲 Agent 只有"无确定性路径"的任务才广播;且 Agent 读黑板后无适合任务会快速退出
2 多 Agent 竞争 claim SQLite CAS 先到先得,已实现
3 无人认领 续杯机制兜底,多轮后庞统接管
4 Agent 认领了不适合的任务 Agent 有完整上下文(SOUL+AGENTS+能力),比 LLM 判断更准确
5 广播速度比直接分配慢 首次分配不需要快,准确比快重要

9. 实施步骤

  1. router.py:删除 LLMDriver 类 + AgentRouter 去掉 llm_driver + route() 末尾改 delegate
  2. ticker.py:新增 _broadcast_claim + _build_claim_prompt,修改 _dispatch_pending 增加 广播路径
  3. dispatcher.py_build_spawn_message() 增加 delegate 分支(庞统兜底)
  4. main.py:删除 llm_driver 初始化块
  5. config/default.yaml:删除 routing
  6. 测试:创建 pending 任务 → 观察广播 spawn → Agent claim → 执行