# Agent 编排集成方案 — D1+D2+D4 > 版本: v2.6.1 > 日期: 2026-05-17 > 作者: 庞统 🐦 > 状态: 待评审 ## 1. 背景 v2.6 已完成 F1-F18 全部模块编码(301 测试通过),前端 Mock UI 已就绪。当前断层: - **Ticker** 在跑,但不调 Dispatcher/Spawner(只做依赖推进 + daemon_tick) - **Spawner** 代码有,`spawn_full_agent()` 走 `openclaw agent` CLI,但从未被 Ticker 调用过 - **Agent** 不知道 v2.0 API 的存在,无法读写黑板 - **前端** 展示 mock 数据,未对接真实 API 本文档设计:Ticker → Dispatcher → Spawner → Agent → 回写黑板的完整闭环。 ## 2. 设计目标 1. **端到端可跑通**:用户通过 API 创建任务 → Agent 被调起 → Agent 执行 → Agent 回写产出 → 审查流水线触发 → 前端实时展示 2. **最小改动**:复用现有 Spawner/Dispatcher 代码,只补 Ticker 的调度触发和 Agent 端的 API 调用 3. **不破坏现有测试**:301 测试全部继续通过 4. **渐进式上线**:先跑通 Full Agent(注册角色),Subagent 后续完善 ## 3. 架构总览 ``` ┌──────────────────────────────────────────────────────────────────┐ │ Daemon (uvicorn) │ │ │ │ Ticker (30s) │ │ ├─ 1. 依赖推进 (blocked → pending) ← 已有 │ │ ├─ 2. 扫描 pending 任务 ← 新增 │ │ ├─ 3. Dispatcher.decide() 决策 ← 新增调用 │ │ ├─ 4. Spawner.spawn() 调起 Agent ← 新增调用 │ │ └─ 5. 状态 pending → claimed/working ← 新增 │ │ │ │ API (FastAPI) │ │ ├─ POST /api/projects/{id}/tasks ← 已有 │ │ ├─ GET /api/projects/{id}/tasks/{tid}?expand=all ← 已有 │ │ ├─ POST /api/projects/{id}/tasks/{tid}/outputs ← 已有 │ │ └─ PATCH /api/projects/{id}/tasks/{tid}/status ← 已有 │ │ │ │ SSE (/api/events/stream) ← 已有 │ └──────────────────────────────────────────────────────────────────┘ │ │ openclaw agent --agent {id} --message "..." ▼ ┌──────────────────────────────────────────────────────────────────┐ │ OpenClaw Agent (独立 session) │ │ │ │ 1. 读任务:GET /api/projects/{pid}/tasks/{tid}?expand=all │ │ 2. 执行任务:根据 task_type + description 自主完成 │ │ 3. 写产出:POST /api/projects/{pid}/tasks/{tid}/outputs │ │ 4. 更新状态:PATCH /api/projects/{pid}/tasks/{tid}/status │ │ → working → review(触发审查流水线) │ └──────────────────────────────────────────────────────────────────┘ ``` ## 4. 详细设计 ### 4.1 Ticker 集成调度(D4) **改动文件**: `src/daemon/ticker.py` 的 `_tick_project()` **当前流程**: ``` tick → 依赖推进 → daemon_tick event → 结束 ``` **新增流程**: ``` tick → 依赖推进 → 调度扫描 → daemon_tick event → 结束 │ ├─ 扫描所有 pending 任务 ├─ 对每个 pending 任务调 Dispatcher.decide() ├─ 如果是 FULL_AGENT → Spawner.spawn_full_agent() ├ 同时将任务状态改为 claimed └─ 如果是 LOCAL → 直接执行 ``` **关键约束**: - 每个 tick 最多调度 `max_dispatch_per_tick` 个任务(默认 3,可配置) - 只调度 `pending` 状态的任务(`claimed`/`working` 说明已有 Agent 在处理) - 检查并发限制(`ActiveAgentCounter.can_acquire()`) - dispatch 失败不阻塞 tick 循环 **代码改动(伪代码)**: ```python # ticker.py _tick_project() 新增步骤 async def _tick_project(self, project_id, project_info): # ... 现有: 依赖推进 ... # 新增: 调度扫描 if self.dispatcher and self.spawner: await self._dispatch_pending(project_id, db_path) # ... 现有: daemon_tick event ... async def _dispatch_pending(self, project_id, db_path): """扫描 pending 任务并调度""" queries = Queries(db_path) pending = queries.tasks_by_status("pending") dispatched = 0 for task in pending[:self.max_dispatch_per_tick]: try: result = await self.dispatcher.dispatch(task, project_config={...}) if result["status"] == "dispatched" and result["level"] in ("full", "escalate"): # 标记为 claimed self._transition_status(conn, task.id, "claimed", agent="daemon", detail={"dispatched_to": result["agent_id"], "session_id": result.get("session_id")}) dispatched += 1 except Exception: logger.exception("Dispatch failed for %s", task.id) ``` **Ticker 构造器新增参数**: ```python def __init__(self, ..., dispatcher=None, spawner=None, max_dispatch_per_tick=3): self.dispatcher = dispatcher self.spawner = spawner self.max_dispatch_per_tick = max_dispatch_per_tick ``` ### 4.2 Spawner 对接 OpenClaw(D1) **改动文件**: `src/daemon/spawner.py` **现状分析**: `spawn_full_agent()` 已经调用 `openclaw agent` CLI,参数完全匹配: ```python cmd = ["openclaw", "agent", "--agent", agent_id, "--session-id", session_id, "--message", message, "--json"] ``` `openclaw agent --help` 确认支持 `--agent`、`--session-id`、`--message`、`--json`。 **改动点**: 1. **丰富 message 内容** — 当前 `_build_message()` 只传 title + description。需要补充: - 项目 ID、任务 ID(让 Agent 知道调哪个 API) - API 端点信息(让 Agent 知道怎么回写) - `--deliver` 标志(可选,让 Agent 回复发送到频道) 2. **新增 prompt 模板** — Agent 收到的 message 需要包含完整的执行指令: ```python SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务: 项目: {project_id} 任务ID: {task_id} 标题: {title} 描述: {description} 类型: {task_type} 优先级: {priority} 必要条件: {must_haves} 请按以下步骤执行: 1. 了解任务需求,确认理解无误 2. 执行任务(编码/回测/数据检查等) 3. 完成后,调用以下 API 写入产出: POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs Body: {{"agent": "{agent_id}", "content": "<产出内容>", "content_type": "report"}} 4. 更新任务状态为 review: PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status Body: {{"status": "review", "agent": "{agent_id}"}} 5. 如果遇到问题,更新状态为 failed: PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status Body: {{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}} 注意: - API 地址: http://{api_host}:{api_port} - 你可以通过 GET /api/projects/{project_id}/tasks/{task_id}?expand=all 查看完整任务信息 - 产出写入后,审查流水线会自动触发 """ ``` 3. **配置化 API 地址** — 从 `config/default.yaml` 或环境变量读取: ```yaml # config/default.yaml 新增 daemon: api_host: "127.0.0.1" api_port: 8083 ``` ### 4.3 Agent 执行回写流程(D2) **Agent 端无需代码改动** — Agent 本身已有 `exec` 工具可以调用 `curl`,也有 `web_fetch` 可以调 HTTP API。关键是在 spawn 的 prompt 里告诉 Agent: 1. 任务详情(title、description、must_haves) 2. API 端点和调用方式 3. 回写产出的具体步骤 **回写 API 已就绪**: | API | 方法 | 用途 | |-----|------|------| | `/api/projects/{id}/tasks/{tid}/outputs` | POST | 写入产出 | | `/api/projects/{id}/tasks/{tid}/status` | PATCH | 更新状态 | | `/api/projects/{id}/tasks/{tid}/comments` | POST | 添加评论 | | `/api/projects/{id}/tasks/{tid}?expand=all` | GET | 查看完整信息 | **审查触发**: Agent 将状态改为 `review` 后,Ticker 在下次 tick 检测到 `review` 状态的任务,触发审查流水线(F12 + F13)。 ### 4.4 审查流水线集成 **改动文件**: `src/daemon/ticker.py` 的 `_tick_project()` **新增逻辑**: ```python # 检测 review 状态的任务,触发审查 review_tasks = queries.tasks_by_status("review") for task in review_tasks: # 检查是否已有 review 记录 reviews = bb.get_reviews(task.id) if not reviews: # 调度审查 Agent(司马懿) await self.dispatcher.dispatch(task, action_type="review") ``` ### 4.5 僵尸检测 + 超时处理 **已有基础**: F8 Health Monitor 有 zombie 检测逻辑。 **新增**: Ticker 在每次 tick 检查 `claimed`/`working` 状态超时的任务: - `claimed` 超过 `claim_timeout`(默认 5 分钟)→ 重置为 `pending` - `working` 超过 `task_timeout`(从任务 `deadline` 或默认 30 分钟计算)→ 标记 `failed` 这部分复用现有 `_transition_status()` 方法。 ## 5. 配置变更 ```yaml # config/default.yaml 新增字段 daemon: # Agent 调度 max_dispatch_per_tick: 3 claim_timeout_minutes: 5 default_task_timeout_minutes: 30 # API(供 Agent 回写) api_host: "127.0.0.1" api_port: 8083 ``` ## 6. main.py 启动集成 **改动文件**: `src/main.py` 当前 Ticker 只接收 `registry` 参数。需要: 1. 创建 `Dispatcher` 实例(传入 registered_agents、spawner、counter) 2. 创建 `AgentSpawner` 实例 3. 创建 `ActiveAgentCounter` 实例 4. 将 dispatcher + spawner 传给 Ticker ```python # main.py lifespan 中 from src.daemon.spawner import AgentSpawner from src.daemon.dispatcher import Dispatcher from src.daemon.counter import ActiveAgentCounter spawner = AgentSpawner(dry_run=False, agent_timeout=600) counter = ActiveAgentCounter(max_global=5, max_per_agent=2) dispatcher = Dispatcher( registered_agents=["pangtong-fujunshi", "simayi-challenger", ...], spawner=spawner, counter=counter, ) ticker = Ticker( registry=registry, dispatcher=dispatcher, spawner=spawner, max_dispatch_per_tick=config.get("daemon", {}).get("max_dispatch_per_tick", 3), ) ``` ## 7. 改动量评估 | 文件 | 改动类型 | 行数估计 | 风险 | |------|---------|---------|------| | `src/daemon/ticker.py` | 新增 `_dispatch_pending()` + 构造器参数 | +80 行 | 低(纯新增方法) | | `src/daemon/spawner.py` | 新增 prompt 模板 + 配置读取 | +40 行 | 低 | | `src/main.py` | 注入 dispatcher/spawner | +20 行 | 低 | | `config/default.yaml` | 新增配置项 | +8 行 | 无 | | `tests/test_ticker.py` | 新增调度测试 | +60 行 | 无 | **总改动**: ~210 行,全部是新增代码,不修改现有逻辑。 ## 8. 不在本方案范围内 | 项目 | 原因 | |------|------| | Subagent spawn (sessions_spawn) | 留后续,先跑通 Full Agent | | 前端对接真实 API | 依赖本方案完成后才有真实数据 | | Checkpoint | v2.7 | | Agent bootstrap prompt 定制 | 当前用通用模板,后续按角色定制 | ## 9. 端到端流程示例 用户通过 API 创建一个回测任务: ``` 1. POST /api/projects/quant-demo/tasks → 任务入库,status=pending,assignee=zhangfei-dev 2. Ticker tick(30s 内) → 扫描 pending 任务 → Dispatcher: assignee 是注册角色 → FULL_AGENT → Spawner: openclaw agent --agent zhangfei-dev --message "..." → 任务 status → claimed 3. 张飞 Agent 收到 prompt → 读取任务详情(GET expand=all) → 执行回测 → 写入产出(POST /outputs) → 更新状态为 review 4. Ticker 下次 tick → 检测 review 状态任务 → 调度司马懿审查 → 司马懿 Agent 收到 prompt → 查看产出 → 写 review → APPROVE/REJECT 5. 审查通过 → status → done 审查驳回 → status → pending(重做) 6. 前端 SSE 实时收到事件更新 ``` ## 10. 验收标准 1. 通过 API 创建任务 → 30s 内 Agent 被调起 2. Agent 能读取任务详情、执行、写入产出 3. Agent 更新状态为 review → 审查 Agent 被调起 4. 审查通过 → 任务完成 5. 前端展示真实数据(非 mock) 6. 现有 301 测试全部继续通过