diff --git a/docs/design/agent-integration-v2.6.md b/docs/design/agent-integration-v2.6.md new file mode 100644 index 0000000..a8afa36 --- /dev/null +++ b/docs/design/agent-integration-v2.6.md @@ -0,0 +1,344 @@ +# Agent 编排集成方案 — D1+D2+D4 + +> 版本: v2.6.1 +> 日期: 2026-05-17 +> 作者: 庞统 🐦 +> 状态: 待评审 + +## 1. 背景 + +v2.6 已完成 F1-F18 全部模块编码(301 测试通过),前端 Mock UI 已就绪。当前断层: + +- **Ticker** 在跑,但不调 Dispatcher/Spawner(只做依赖推进 + daemon_tick) +- **Spawner** 代码有,`spawn_full_agent()` 走 `openclaw agent` CLI,但从未被 Ticker 调用过 +- **Agent** 不知道 v2.0 API 的存在,无法读写黑板 +- **前端** 展示 mock 数据,未对接真实 API + +本文档设计:Ticker → Dispatcher → Spawner → Agent → 回写黑板的完整闭环。 + +## 2. 设计目标 + +1. **端到端可跑通**:用户通过 API 创建任务 → Agent 被调起 → Agent 执行 → Agent 回写产出 → 审查流水线触发 → 前端实时展示 +2. **最小改动**:复用现有 Spawner/Dispatcher 代码,只补 Ticker 的调度触发和 Agent 端的 API 调用 +3. **不破坏现有测试**:301 测试全部继续通过 +4. **渐进式上线**:先跑通 Full Agent(注册角色),Subagent 后续完善 + +## 3. 架构总览 + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ Daemon (uvicorn) │ +│ │ +│ Ticker (30s) │ +│ ├─ 1. 依赖推进 (blocked → pending) ← 已有 │ +│ ├─ 2. 扫描 pending 任务 ← 新增 │ +│ ├─ 3. Dispatcher.decide() 决策 ← 新增调用 │ +│ ├─ 4. Spawner.spawn() 调起 Agent ← 新增调用 │ +│ └─ 5. 状态 pending → claimed/working ← 新增 │ +│ │ +│ API (FastAPI) │ +│ ├─ POST /api/projects/{id}/tasks ← 已有 │ +│ ├─ GET /api/projects/{id}/tasks/{tid}?expand=all ← 已有 │ +│ ├─ POST /api/projects/{id}/tasks/{tid}/outputs ← 已有 │ +│ └─ PATCH /api/projects/{id}/tasks/{tid}/status ← 已有 │ +│ │ +│ SSE (/api/events/stream) ← 已有 │ +└──────────────────────────────────────────────────────────────────┘ + │ + │ openclaw agent --agent {id} --message "..." + ▼ +┌──────────────────────────────────────────────────────────────────┐ +│ OpenClaw Agent (独立 session) │ +│ │ +│ 1. 读任务:GET /api/projects/{pid}/tasks/{tid}?expand=all │ +│ 2. 执行任务:根据 task_type + description 自主完成 │ +│ 3. 写产出:POST /api/projects/{pid}/tasks/{tid}/outputs │ +│ 4. 更新状态:PATCH /api/projects/{pid}/tasks/{tid}/status │ +│ → working → review(触发审查流水线) │ +└──────────────────────────────────────────────────────────────────┘ +``` + +## 4. 详细设计 + +### 4.1 Ticker 集成调度(D4) + +**改动文件**: `src/daemon/ticker.py` 的 `_tick_project()` + +**当前流程**: +``` +tick → 依赖推进 → daemon_tick event → 结束 +``` + +**新增流程**: +``` +tick → 依赖推进 → 调度扫描 → daemon_tick event → 结束 + │ + ├─ 扫描所有 pending 任务 + ├─ 对每个 pending 任务调 Dispatcher.decide() + ├─ 如果是 FULL_AGENT → Spawner.spawn_full_agent() + ├ 同时将任务状态改为 claimed + └─ 如果是 LOCAL → 直接执行 +``` + +**关键约束**: +- 每个 tick 最多调度 `max_dispatch_per_tick` 个任务(默认 3,可配置) +- 只调度 `pending` 状态的任务(`claimed`/`working` 说明已有 Agent 在处理) +- 检查并发限制(`ActiveAgentCounter.can_acquire()`) +- dispatch 失败不阻塞 tick 循环 + +**代码改动(伪代码)**: +```python +# ticker.py _tick_project() 新增步骤 +async def _tick_project(self, project_id, project_info): + # ... 现有: 依赖推进 ... + + # 新增: 调度扫描 + if self.dispatcher and self.spawner: + await self._dispatch_pending(project_id, db_path) + + # ... 现有: daemon_tick event ... + +async def _dispatch_pending(self, project_id, db_path): + """扫描 pending 任务并调度""" + queries = Queries(db_path) + pending = queries.tasks_by_status("pending") + + dispatched = 0 + for task in pending[:self.max_dispatch_per_tick]: + try: + result = await self.dispatcher.dispatch(task, project_config={...}) + if result["status"] == "dispatched" and result["level"] in ("full", "escalate"): + # 标记为 claimed + self._transition_status(conn, task.id, "claimed", + agent="daemon", + detail={"dispatched_to": result["agent_id"], + "session_id": result.get("session_id")}) + dispatched += 1 + except Exception: + logger.exception("Dispatch failed for %s", task.id) +``` + +**Ticker 构造器新增参数**: +```python +def __init__(self, ..., dispatcher=None, spawner=None, max_dispatch_per_tick=3): + self.dispatcher = dispatcher + self.spawner = spawner + self.max_dispatch_per_tick = max_dispatch_per_tick +``` + +### 4.2 Spawner 对接 OpenClaw(D1) + +**改动文件**: `src/daemon/spawner.py` + +**现状分析**: `spawn_full_agent()` 已经调用 `openclaw agent` CLI,参数完全匹配: +```python +cmd = ["openclaw", "agent", + "--agent", agent_id, + "--session-id", session_id, + "--message", message, + "--json"] +``` + +`openclaw agent --help` 确认支持 `--agent`、`--session-id`、`--message`、`--json`。 + +**改动点**: + +1. **丰富 message 内容** — 当前 `_build_message()` 只传 title + description。需要补充: + - 项目 ID、任务 ID(让 Agent 知道调哪个 API) + - API 端点信息(让 Agent 知道怎么回写) + - `--deliver` 标志(可选,让 Agent 回复发送到频道) + +2. **新增 prompt 模板** — Agent 收到的 message 需要包含完整的执行指令: + +```python +SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务: + +项目: {project_id} +任务ID: {task_id} +标题: {title} +描述: {description} +类型: {task_type} +优先级: {priority} +必要条件: {must_haves} + +请按以下步骤执行: + +1. 了解任务需求,确认理解无误 +2. 执行任务(编码/回测/数据检查等) +3. 完成后,调用以下 API 写入产出: + POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs + Body: {{"agent": "{agent_id}", "content": "<产出内容>", "content_type": "report"}} +4. 更新任务状态为 review: + PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status + Body: {{"status": "review", "agent": "{agent_id}"}} +5. 如果遇到问题,更新状态为 failed: + PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status + Body: {{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}} + +注意: +- API 地址: http://{api_host}:{api_port} +- 你可以通过 GET /api/projects/{project_id}/tasks/{task_id}?expand=all 查看完整任务信息 +- 产出写入后,审查流水线会自动触发 +""" +``` + +3. **配置化 API 地址** — 从 `config/default.yaml` 或环境变量读取: +```yaml +# config/default.yaml 新增 +daemon: + api_host: "127.0.0.1" + api_port: 8083 +``` + +### 4.3 Agent 执行回写流程(D2) + +**Agent 端无需代码改动** — Agent 本身已有 `exec` 工具可以调用 `curl`,也有 `web_fetch` 可以调 HTTP API。关键是在 spawn 的 prompt 里告诉 Agent: + +1. 任务详情(title、description、must_haves) +2. API 端点和调用方式 +3. 回写产出的具体步骤 + +**回写 API 已就绪**: + +| API | 方法 | 用途 | +|-----|------|------| +| `/api/projects/{id}/tasks/{tid}/outputs` | POST | 写入产出 | +| `/api/projects/{id}/tasks/{tid}/status` | PATCH | 更新状态 | +| `/api/projects/{id}/tasks/{tid}/comments` | POST | 添加评论 | +| `/api/projects/{id}/tasks/{tid}?expand=all` | GET | 查看完整信息 | + +**审查触发**: Agent 将状态改为 `review` 后,Ticker 在下次 tick 检测到 `review` 状态的任务,触发审查流水线(F12 + F13)。 + +### 4.4 审查流水线集成 + +**改动文件**: `src/daemon/ticker.py` 的 `_tick_project()` + +**新增逻辑**: +```python +# 检测 review 状态的任务,触发审查 +review_tasks = queries.tasks_by_status("review") +for task in review_tasks: + # 检查是否已有 review 记录 + reviews = bb.get_reviews(task.id) + if not reviews: + # 调度审查 Agent(司马懿) + await self.dispatcher.dispatch(task, action_type="review") +``` + +### 4.5 僵尸检测 + 超时处理 + +**已有基础**: F8 Health Monitor 有 zombie 检测逻辑。 + +**新增**: Ticker 在每次 tick 检查 `claimed`/`working` 状态超时的任务: +- `claimed` 超过 `claim_timeout`(默认 5 分钟)→ 重置为 `pending` +- `working` 超过 `task_timeout`(从任务 `deadline` 或默认 30 分钟计算)→ 标记 `failed` + +这部分复用现有 `_transition_status()` 方法。 + +## 5. 配置变更 + +```yaml +# config/default.yaml 新增字段 +daemon: + # Agent 调度 + max_dispatch_per_tick: 3 + claim_timeout_minutes: 5 + default_task_timeout_minutes: 30 + + # API(供 Agent 回写) + api_host: "127.0.0.1" + api_port: 8083 +``` + +## 6. main.py 启动集成 + +**改动文件**: `src/main.py` + +当前 Ticker 只接收 `registry` 参数。需要: +1. 创建 `Dispatcher` 实例(传入 registered_agents、spawner、counter) +2. 创建 `AgentSpawner` 实例 +3. 创建 `ActiveAgentCounter` 实例 +4. 将 dispatcher + spawner 传给 Ticker + +```python +# main.py lifespan 中 +from src.daemon.spawner import AgentSpawner +from src.daemon.dispatcher import Dispatcher +from src.daemon.counter import ActiveAgentCounter + +spawner = AgentSpawner(dry_run=False, agent_timeout=600) +counter = ActiveAgentCounter(max_global=5, max_per_agent=2) +dispatcher = Dispatcher( + registered_agents=["pangtong-fujunshi", "simayi-challenger", ...], + spawner=spawner, + counter=counter, +) + +ticker = Ticker( + registry=registry, + dispatcher=dispatcher, + spawner=spawner, + max_dispatch_per_tick=config.get("daemon", {}).get("max_dispatch_per_tick", 3), +) +``` + +## 7. 改动量评估 + +| 文件 | 改动类型 | 行数估计 | 风险 | +|------|---------|---------|------| +| `src/daemon/ticker.py` | 新增 `_dispatch_pending()` + 构造器参数 | +80 行 | 低(纯新增方法) | +| `src/daemon/spawner.py` | 新增 prompt 模板 + 配置读取 | +40 行 | 低 | +| `src/main.py` | 注入 dispatcher/spawner | +20 行 | 低 | +| `config/default.yaml` | 新增配置项 | +8 行 | 无 | +| `tests/test_ticker.py` | 新增调度测试 | +60 行 | 无 | + +**总改动**: ~210 行,全部是新增代码,不修改现有逻辑。 + +## 8. 不在本方案范围内 + +| 项目 | 原因 | +|------|------| +| Subagent spawn (sessions_spawn) | 留后续,先跑通 Full Agent | +| 前端对接真实 API | 依赖本方案完成后才有真实数据 | +| Checkpoint | v2.7 | +| Agent bootstrap prompt 定制 | 当前用通用模板,后续按角色定制 | + +## 9. 端到端流程示例 + +用户通过 API 创建一个回测任务: + +``` +1. POST /api/projects/quant-demo/tasks + → 任务入库,status=pending,assignee=zhangfei-dev + +2. Ticker tick(30s 内) + → 扫描 pending 任务 + → Dispatcher: assignee 是注册角色 → FULL_AGENT + → Spawner: openclaw agent --agent zhangfei-dev --message "..." + → 任务 status → claimed + +3. 张飞 Agent 收到 prompt + → 读取任务详情(GET expand=all) + → 执行回测 + → 写入产出(POST /outputs) + → 更新状态为 review + +4. Ticker 下次 tick + → 检测 review 状态任务 + → 调度司马懿审查 + → 司马懿 Agent 收到 prompt → 查看产出 → 写 review → APPROVE/REJECT + +5. 审查通过 → status → done + 审查驳回 → status → pending(重做) + +6. 前端 SSE 实时收到事件更新 +``` + +## 10. 验收标准 + +1. 通过 API 创建任务 → 30s 内 Agent 被调起 +2. Agent 能读取任务详情、执行、写入产出 +3. Agent 更新状态为 review → 审查 Agent 被调起 +4. 审查通过 → 任务完成 +5. 前端展示真实数据(非 mock) +6. 现有 301 测试全部继续通过