Files
sanguo_moziplus_v2/docs/design/agent-integration-v2.6.md
T
cfdaily 0d7425b88c
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions
auto-sync: 2026-06-07 01:35:53
2026-06-07 01:35:53 +08:00

13 KiB
Raw Blame History

Agent 编排集成方案 — D1+D2+D4

版本: v2.6.1 日期: 2026-05-17 作者: 庞统 🐦 状态: 待评审

1. 背景

v2.6 已完成 F1-F18 全部模块编码(301 测试通过),前端 Mock UI 已就绪。当前断层:

  • Ticker 在跑,但不调 Dispatcher/Spawner(只做依赖推进 + daemon_tick
  • Spawner 代码有,spawn_full_agent()openclaw agent CLI,但从未被 Ticker 调用过
  • Agent 不知道 v2.0 API 的存在,无法读写黑板
  • 前端 展示 mock 数据,未对接真实 API

本文档设计:Ticker → Dispatcher → Spawner → Agent → 回写黑板的完整闭环。

2. 设计目标

  1. 端到端可跑通:用户通过 API 创建任务 → Agent 被调起 → Agent 执行 → Agent 回写产出 → 审查流水线触发 → 前端实时展示
  2. 最小改动:复用现有 Spawner/Dispatcher 代码,只补 Ticker 的调度触发和 Agent 端的 API 调用
  3. 不破坏现有测试301 测试全部继续通过
  4. 渐进式上线:先跑通 Full Agent(注册角色),Subagent 后续完善

3. 架构总览

┌──────────────────────────────────────────────────────────────────┐
│                        Daemon (uvicorn)                          │
│                                                                  │
│  Ticker (30s)                                                    │
│    ├─ 1. 依赖推进 (blocked → pending)     ← 已有               │
│    ├─ 2. 扫描 pending 任务                 ← 新增               │
│    ├─ 3. Dispatcher.decide() 决策          ← 新增调用           │
│    ├─ 4. Spawner.spawn() 调起 Agent        ← 新增调用           │
│    └─ 5. 状态 pending → claimed/working    ← 新增               │
│                                                                  │
│  API (FastAPI)                                                   │
│    ├─ POST /api/projects/{id}/tasks         ← 已有              │
│    ├─ GET  /api/projects/{id}/tasks/{tid}?expand=all  ← 已有    │
│    ├─ POST /api/projects/{id}/tasks/{tid}/outputs  ← 已有       │
│    └─ PATCH /api/projects/{id}/tasks/{tid}/status  ← 已有       │
│                                                                  │
│  SSE (/api/events/stream)                ← 已有                 │
└──────────────────────────────────────────────────────────────────┘
          │
          │ openclaw agent --agent {id} --message "..."
          ▼
┌──────────────────────────────────────────────────────────────────┐
│                  OpenClaw Agent (独立 session)                    │
│                                                                  │
│  1. 读任务:GET /api/projects/{pid}/tasks/{tid}?expand=all      │
│  2. 执行任务:根据 task_type + description 自主完成              │
│  3. 写产出:POST /api/projects/{pid}/tasks/{tid}/outputs        │
│  4. 更新状态:PATCH /api/projects/{pid}/tasks/{tid}/status      │
│     → working → review(触发审查流水线)                          │
└──────────────────────────────────────────────────────────────────┘

4. 详细设计

4.1 Ticker 集成调度(D4

改动文件: src/daemon/ticker.py_tick_project()

当前流程:

tick → 依赖推进 → daemon_tick event → 结束

新增流程:

tick → 依赖推进 → 调度扫描 → daemon_tick event → 结束
                     │
                     ├─ 扫描所有 pending 任务
                     ├─ 对每个 pending 任务调 Dispatcher.decide()
                     ├─ 如果是 FULL_AGENT → Spawner.spawn_full_agent()
                     ├   同时将任务状态改为 claimed
                     └─ 如果是 LOCAL → 直接执行

关键约束:

  • 每个 tick 最多调度 max_dispatch_per_tick 个任务(默认 3,可配置)
  • 只调度 pending 状态的任务(claimed/working 说明已有 Agent 在处理)
  • 检查并发限制(ActiveAgentCounter.can_acquire()
  • dispatch 失败不阻塞 tick 循环

代码改动(伪代码):

# ticker.py _tick_project() 新增步骤
async def _tick_project(self, project_id, project_info):
    # ... 现有: 依赖推进 ...

    # 新增: 调度扫描
    if self.dispatcher and self.spawner:
        await self._dispatch_pending(project_id, db_path)

    # ... 现有: daemon_tick event ...

async def _dispatch_pending(self, project_id, db_path):
    """扫描 pending 任务并调度"""
    queries = Queries(db_path)
    pending = queries.tasks_by_status("pending")

    dispatched = 0
    for task in pending[:self.max_dispatch_per_tick]:
        try:
            result = await self.dispatcher.dispatch(task, project_config={...})
            if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
                # 标记为 claimed
                self._transition_status(conn, task.id, "claimed",
                    agent="daemon",
                    detail={"dispatched_to": result["agent_id"],
                            "session_id": result.get("session_id")})
                dispatched += 1
        except Exception:
            logger.exception("Dispatch failed for %s", task.id)

Ticker 构造器新增参数:

def __init__(self, ..., dispatcher=None, spawner=None, max_dispatch_per_tick=3):
    self.dispatcher = dispatcher
    self.spawner = spawner
    self.max_dispatch_per_tick = max_dispatch_per_tick

4.2 Spawner 对接 OpenClawD1

改动文件: src/daemon/spawner.py

现状分析: spawn_full_agent() 已经调用 openclaw agent CLI,参数完全匹配:

cmd = ["openclaw", "agent",
       "--agent", agent_id,
       "--session-id", session_id,
       "--message", message,
       "--json"]

openclaw agent --help 确认支持 --agent--session-id--message--json

改动点:

  1. 丰富 message 内容 — 当前 _build_message() 只传 title + description。需要补充:

    • 项目 ID、任务 ID(让 Agent 知道调哪个 API
    • API 端点信息(让 Agent 知道怎么回写)
    • --deliver 标志(可选,让 Agent 回复发送到频道)
  2. 新增 prompt 模板 — Agent 收到的 message 需要包含完整的执行指令:

SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务:

项目: {project_id}
任务ID: {task_id}
标题: {title}
描述: {description}
类型: {task_type}
优先级: {priority}
必要条件: {must_haves}

请按以下步骤执行:

1. 了解任务需求,确认理解无误
2. 执行任务(编码/回测/数据检查等)
3. 完成后,调用以下 API 写入产出:
   POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs
   Body: {{"agent": "{agent_id}", "content": "<产出内容>", "content_type": "report"}}
4. 更新任务状态为 review
   PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
   Body: {{"status": "review", "agent": "{agent_id}"}}
5. 如果遇到问题,更新状态为 failed:
   PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
   Body: {{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}

注意:
- API 地址: http://{api_host}:{api_port}
- 你可以通过 GET /api/projects/{project_id}/tasks/{task_id}?expand=all 查看完整任务信息
- 产出写入后,审查流水线会自动触发
"""
  1. 配置化 API 地址 — 从 config/default.yaml 或环境变量读取:
# config/default.yaml 新增
daemon:
  api_host: "127.0.0.1"
  api_port: 8083

4.3 Agent 执行回写流程(D2

Agent 端无需代码改动 — Agent 本身已有 exec 工具可以调用 curl,也有 web_fetch 可以调 HTTP API。关键是在 spawn 的 prompt 里告诉 Agent

  1. 任务详情(title、description、must_haves
  2. API 端点和调用方式
  3. 回写产出的具体步骤

回写 API 已就绪:

API 方法 用途
/api/projects/{id}/tasks/{tid}/outputs POST 写入产出
/api/projects/{id}/tasks/{tid}/status PATCH 更新状态
/api/projects/{id}/tasks/{tid}/comments POST 添加评论
/api/projects/{id}/tasks/{tid}?expand=all GET 查看完整信息

审查触发: Agent 将状态改为 review 后,Ticker 在下次 tick 检测到 review 状态的任务,触发审查流水线(F12 + F13)。

4.4 审查流水线集成

改动文件: src/daemon/ticker.py_tick_project()

新增逻辑:

# 检测 review 状态的任务,触发审查
review_tasks = queries.tasks_by_status("review")
for task in review_tasks:
    # 检查是否已有 review 记录
    reviews = bb.get_reviews(task.id)
    if not reviews:
        # 调度审查 Agent(司马懿)
        await self.dispatcher.dispatch(task, action_type="review")

4.5 僵尸检测 + 超时处理

已有基础: F8 Health Monitor 有 zombie 检测逻辑。

新增: Ticker 在每次 tick 检查 claimed/working 状态超时的任务:

  • claimed 超过 claim_timeout(默认 5 分钟)→ 重置为 pending
  • working 超过 task_timeout(从任务 deadline 或默认 30 分钟计算)→ 标记 failed

这部分复用现有 _transition_status() 方法。

5. 配置变更

# config/default.yaml 新增字段
daemon:
  # Agent 调度
  max_dispatch_per_tick: 3
  claim_timeout_minutes: 5
  default_task_timeout_minutes: 30
  
  # API(供 Agent 回写)
  api_host: "127.0.0.1"
  api_port: 8083

6. main.py 启动集成

改动文件: src/main.py

当前 Ticker 只接收 registry 参数。需要:

  1. 创建 Dispatcher 实例(传入 registered_agents、spawner、counter
  2. 创建 AgentSpawner 实例
  3. 创建 ActiveAgentCounter 实例
  4. 将 dispatcher + spawner 传给 Ticker
# main.py lifespan 中
from src.daemon.spawner import AgentSpawner
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter

spawner = AgentSpawner(dry_run=False, agent_timeout=600)
counter = ActiveAgentCounter(max_global=5, max_per_agent=2)
dispatcher = Dispatcher(
    registered_agents=["pangtong-fujunshi", "simayi-challenger", ...],
    spawner=spawner,
    counter=counter,
)

ticker = Ticker(
    registry=registry,
    dispatcher=dispatcher,
    spawner=spawner,
    max_dispatch_per_tick=config.get("daemon", {}).get("max_dispatch_per_tick", 3),
)

7. 改动量评估

文件 改动类型 行数估计 风险
src/daemon/ticker.py 新增 _dispatch_pending() + 构造器参数 +80 行 低(纯新增方法)
src/daemon/spawner.py 新增 prompt 模板 + 配置读取 +40 行
src/main.py 注入 dispatcher/spawner +20 行
config/default.yaml 新增配置项 +8 行
tests/test_ticker.py 新增调度测试 +60 行

总改动: ~210 行,全部是新增代码,不修改现有逻辑。

8. 不在本方案范围内

项目 原因
Subagent spawn (sessions_spawn) 留后续,先跑通 Full Agent
前端对接真实 API 依赖本方案完成后才有真实数据
Checkpoint v2.7
Agent bootstrap prompt 定制 当前用通用模板,后续按角色定制

9. 端到端流程示例

用户通过 API 创建一个回测任务:

1. POST /api/projects/quant-demo/tasks
   → 任务入库,status=pendingassignee=zhangfei-dev

2. Ticker tick30s 内)
   → 扫描 pending 任务
   → Dispatcher: assignee 是注册角色 → FULL_AGENT
   → Spawner: openclaw agent --agent zhangfei-dev --message "..."
   → 任务 status → claimed

3. 张飞 Agent 收到 prompt
   → 读取任务详情(GET expand=all)
   → 执行回测
   → 写入产出(POST /outputs
   → 更新状态为 review

4. Ticker 下次 tick
   → 检测 review 状态任务
   → 调度司马懿审查
   → 司马懿 Agent 收到 prompt → 查看产出 → 写 review → APPROVE/REJECT

5. 审查通过 → status → done
   审查驳回 → status → pending(重做)

6. 前端 SSE 实时收到事件更新

10. 验收标准

  1. 通过 API 创建任务 → 30s 内 Agent 被调起
  2. Agent 能读取任务详情、执行、写入产出
  3. Agent 更新状态为 review → 审查 Agent 被调起
  4. 审查通过 → 任务完成
  5. 前端展示真实数据(非 mock
  6. 现有 301 测试全部继续通过