13 KiB
Agent 编排集成方案 — D1+D2+D4
版本: v2.6.1 日期: 2026-05-17 作者: 庞统 🐦 状态: 待评审
1. 背景
v2.6 已完成 F1-F18 全部模块编码(301 测试通过),前端 Mock UI 已就绪。当前断层:
- Ticker 在跑,但不调 Dispatcher/Spawner(只做依赖推进 + daemon_tick)
- Spawner 代码有,
spawn_full_agent()走openclaw agentCLI,但从未被 Ticker 调用过 - Agent 不知道 v2.0 API 的存在,无法读写黑板
- 前端 展示 mock 数据,未对接真实 API
本文档设计:Ticker → Dispatcher → Spawner → Agent → 回写黑板的完整闭环。
2. 设计目标
- 端到端可跑通:用户通过 API 创建任务 → Agent 被调起 → Agent 执行 → Agent 回写产出 → 审查流水线触发 → 前端实时展示
- 最小改动:复用现有 Spawner/Dispatcher 代码,只补 Ticker 的调度触发和 Agent 端的 API 调用
- 不破坏现有测试:301 测试全部继续通过
- 渐进式上线:先跑通 Full Agent(注册角色),Subagent 后续完善
3. 架构总览
┌──────────────────────────────────────────────────────────────────┐
│ Daemon (uvicorn) │
│ │
│ Ticker (30s) │
│ ├─ 1. 依赖推进 (blocked → pending) ← 已有 │
│ ├─ 2. 扫描 pending 任务 ← 新增 │
│ ├─ 3. Dispatcher.decide() 决策 ← 新增调用 │
│ ├─ 4. Spawner.spawn() 调起 Agent ← 新增调用 │
│ └─ 5. 状态 pending → claimed/working ← 新增 │
│ │
│ API (FastAPI) │
│ ├─ POST /api/projects/{id}/tasks ← 已有 │
│ ├─ GET /api/projects/{id}/tasks/{tid}?expand=all ← 已有 │
│ ├─ POST /api/projects/{id}/tasks/{tid}/outputs ← 已有 │
│ └─ PATCH /api/projects/{id}/tasks/{tid}/status ← 已有 │
│ │
│ SSE (/api/events/stream) ← 已有 │
└──────────────────────────────────────────────────────────────────┘
│
│ openclaw agent --agent {id} --message "..."
▼
┌──────────────────────────────────────────────────────────────────┐
│ OpenClaw Agent (独立 session) │
│ │
│ 1. 读任务:GET /api/projects/{pid}/tasks/{tid}?expand=all │
│ 2. 执行任务:根据 task_type + description 自主完成 │
│ 3. 写产出:POST /api/projects/{pid}/tasks/{tid}/outputs │
│ 4. 更新状态:PATCH /api/projects/{pid}/tasks/{tid}/status │
│ → working → review(触发审查流水线) │
└──────────────────────────────────────────────────────────────────┘
4. 详细设计
4.1 Ticker 集成调度(D4)
改动文件: src/daemon/ticker.py 的 _tick_project()
当前流程:
tick → 依赖推进 → daemon_tick event → 结束
新增流程:
tick → 依赖推进 → 调度扫描 → daemon_tick event → 结束
│
├─ 扫描所有 pending 任务
├─ 对每个 pending 任务调 Dispatcher.decide()
├─ 如果是 FULL_AGENT → Spawner.spawn_full_agent()
├ 同时将任务状态改为 claimed
└─ 如果是 LOCAL → 直接执行
关键约束:
- 每个 tick 最多调度
max_dispatch_per_tick个任务(默认 3,可配置) - 只调度
pending状态的任务(claimed/working说明已有 Agent 在处理) - 检查并发限制(
ActiveAgentCounter.can_acquire()) - dispatch 失败不阻塞 tick 循环
代码改动(伪代码):
# ticker.py _tick_project() 新增步骤
async def _tick_project(self, project_id, project_info):
# ... 现有: 依赖推进 ...
# 新增: 调度扫描
if self.dispatcher and self.spawner:
await self._dispatch_pending(project_id, db_path)
# ... 现有: daemon_tick event ...
async def _dispatch_pending(self, project_id, db_path):
"""扫描 pending 任务并调度"""
queries = Queries(db_path)
pending = queries.tasks_by_status("pending")
dispatched = 0
for task in pending[:self.max_dispatch_per_tick]:
try:
result = await self.dispatcher.dispatch(task, project_config={...})
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
# 标记为 claimed
self._transition_status(conn, task.id, "claimed",
agent="daemon",
detail={"dispatched_to": result["agent_id"],
"session_id": result.get("session_id")})
dispatched += 1
except Exception:
logger.exception("Dispatch failed for %s", task.id)
Ticker 构造器新增参数:
def __init__(self, ..., dispatcher=None, spawner=None, max_dispatch_per_tick=3):
self.dispatcher = dispatcher
self.spawner = spawner
self.max_dispatch_per_tick = max_dispatch_per_tick
4.2 Spawner 对接 OpenClaw(D1)
改动文件: src/daemon/spawner.py
现状分析: spawn_full_agent() 已经调用 openclaw agent CLI,参数完全匹配:
cmd = ["openclaw", "agent",
"--agent", agent_id,
"--session-id", session_id,
"--message", message,
"--json"]
openclaw agent --help 确认支持 --agent、--session-id、--message、--json。
改动点:
-
丰富 message 内容 — 当前
_build_message()只传 title + description。需要补充:- 项目 ID、任务 ID(让 Agent 知道调哪个 API)
- API 端点信息(让 Agent 知道怎么回写)
--deliver标志(可选,让 Agent 回复发送到频道)
-
新增 prompt 模板 — Agent 收到的 message 需要包含完整的执行指令:
SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务:
项目: {project_id}
任务ID: {task_id}
标题: {title}
描述: {description}
类型: {task_type}
优先级: {priority}
必要条件: {must_haves}
请按以下步骤执行:
1. 了解任务需求,确认理解无误
2. 执行任务(编码/回测/数据检查等)
3. 完成后,调用以下 API 写入产出:
POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs
Body: {{"agent": "{agent_id}", "content": "<产出内容>", "content_type": "report"}}
4. 更新任务状态为 review:
PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
Body: {{"status": "review", "agent": "{agent_id}"}}
5. 如果遇到问题,更新状态为 failed:
PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
Body: {{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}
注意:
- API 地址: http://{api_host}:{api_port}
- 你可以通过 GET /api/projects/{project_id}/tasks/{task_id}?expand=all 查看完整任务信息
- 产出写入后,审查流水线会自动触发
"""
- 配置化 API 地址 — 从
config/default.yaml或环境变量读取:
# config/default.yaml 新增
daemon:
api_host: "127.0.0.1"
api_port: 8083
4.3 Agent 执行回写流程(D2)
Agent 端无需代码改动 — Agent 本身已有 exec 工具可以调用 curl,也有 web_fetch 可以调 HTTP API。关键是在 spawn 的 prompt 里告诉 Agent:
- 任务详情(title、description、must_haves)
- API 端点和调用方式
- 回写产出的具体步骤
回写 API 已就绪:
| API | 方法 | 用途 |
|---|---|---|
/api/projects/{id}/tasks/{tid}/outputs |
POST | 写入产出 |
/api/projects/{id}/tasks/{tid}/status |
PATCH | 更新状态 |
/api/projects/{id}/tasks/{tid}/comments |
POST | 添加评论 |
/api/projects/{id}/tasks/{tid}?expand=all |
GET | 查看完整信息 |
审查触发: Agent 将状态改为 review 后,Ticker 在下次 tick 检测到 review 状态的任务,触发审查流水线(F12 + F13)。
4.4 审查流水线集成
改动文件: src/daemon/ticker.py 的 _tick_project()
新增逻辑:
# 检测 review 状态的任务,触发审查
review_tasks = queries.tasks_by_status("review")
for task in review_tasks:
# 检查是否已有 review 记录
reviews = bb.get_reviews(task.id)
if not reviews:
# 调度审查 Agent(司马懿)
await self.dispatcher.dispatch(task, action_type="review")
4.5 僵尸检测 + 超时处理
已有基础: F8 Health Monitor 有 zombie 检测逻辑。
新增: Ticker 在每次 tick 检查 claimed/working 状态超时的任务:
claimed超过claim_timeout(默认 5 分钟)→ 重置为pendingworking超过task_timeout(从任务deadline或默认 30 分钟计算)→ 标记failed
这部分复用现有 _transition_status() 方法。
5. 配置变更
# config/default.yaml 新增字段
daemon:
# Agent 调度
max_dispatch_per_tick: 3
claim_timeout_minutes: 5
default_task_timeout_minutes: 30
# API(供 Agent 回写)
api_host: "127.0.0.1"
api_port: 8083
6. main.py 启动集成
改动文件: src/main.py
当前 Ticker 只接收 registry 参数。需要:
- 创建
Dispatcher实例(传入 registered_agents、spawner、counter) - 创建
AgentSpawner实例 - 创建
ActiveAgentCounter实例 - 将 dispatcher + spawner 传给 Ticker
# main.py lifespan 中
from src.daemon.spawner import AgentSpawner
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter
spawner = AgentSpawner(dry_run=False, agent_timeout=600)
counter = ActiveAgentCounter(max_global=5, max_per_agent=2)
dispatcher = Dispatcher(
registered_agents=["pangtong-fujunshi", "simayi-challenger", ...],
spawner=spawner,
counter=counter,
)
ticker = Ticker(
registry=registry,
dispatcher=dispatcher,
spawner=spawner,
max_dispatch_per_tick=config.get("daemon", {}).get("max_dispatch_per_tick", 3),
)
7. 改动量评估
| 文件 | 改动类型 | 行数估计 | 风险 |
|---|---|---|---|
src/daemon/ticker.py |
新增 _dispatch_pending() + 构造器参数 |
+80 行 | 低(纯新增方法) |
src/daemon/spawner.py |
新增 prompt 模板 + 配置读取 | +40 行 | 低 |
src/main.py |
注入 dispatcher/spawner | +20 行 | 低 |
config/default.yaml |
新增配置项 | +8 行 | 无 |
tests/test_ticker.py |
新增调度测试 | +60 行 | 无 |
总改动: ~210 行,全部是新增代码,不修改现有逻辑。
8. 不在本方案范围内
| 项目 | 原因 |
|---|---|
| Subagent spawn (sessions_spawn) | 留后续,先跑通 Full Agent |
| 前端对接真实 API | 依赖本方案完成后才有真实数据 |
| Checkpoint | v2.7 |
| Agent bootstrap prompt 定制 | 当前用通用模板,后续按角色定制 |
9. 端到端流程示例
用户通过 API 创建一个回测任务:
1. POST /api/projects/quant-demo/tasks
→ 任务入库,status=pending,assignee=zhangfei-dev
2. Ticker tick(30s 内)
→ 扫描 pending 任务
→ Dispatcher: assignee 是注册角色 → FULL_AGENT
→ Spawner: openclaw agent --agent zhangfei-dev --message "..."
→ 任务 status → claimed
3. 张飞 Agent 收到 prompt
→ 读取任务详情(GET expand=all)
→ 执行回测
→ 写入产出(POST /outputs)
→ 更新状态为 review
4. Ticker 下次 tick
→ 检测 review 状态任务
→ 调度司马懿审查
→ 司马懿 Agent 收到 prompt → 查看产出 → 写 review → APPROVE/REJECT
5. 审查通过 → status → done
审查驳回 → status → pending(重做)
6. 前端 SSE 实时收到事件更新
10. 验收标准
- 通过 API 创建任务 → 30s 内 Agent 被调起
- Agent 能读取任务详情、执行、写入产出
- Agent 更新状态为 review → 审查 Agent 被调起
- 审查通过 → 任务完成
- 前端展示真实数据(非 mock)
- 现有 301 测试全部继续通过