auto-sync: 2026-05-17 13:40:16

This commit is contained in:
cfdaily
2026-05-17 13:40:16 +08:00
parent f3539aaf60
commit 90a3545df1
+344
View File
@@ -0,0 +1,344 @@
# Agent 编排集成方案 — D1+D2+D4
> 版本: v2.6.1
> 日期: 2026-05-17
> 作者: 庞统 🐦
> 状态: 待评审
## 1. 背景
v2.6 已完成 F1-F18 全部模块编码(301 测试通过),前端 Mock UI 已就绪。当前断层:
- **Ticker** 在跑,但不调 Dispatcher/Spawner(只做依赖推进 + daemon_tick
- **Spawner** 代码有,`spawn_full_agent()``openclaw agent` CLI,但从未被 Ticker 调用过
- **Agent** 不知道 v2.0 API 的存在,无法读写黑板
- **前端** 展示 mock 数据,未对接真实 API
本文档设计:Ticker → Dispatcher → Spawner → Agent → 回写黑板的完整闭环。
## 2. 设计目标
1. **端到端可跑通**:用户通过 API 创建任务 → Agent 被调起 → Agent 执行 → Agent 回写产出 → 审查流水线触发 → 前端实时展示
2. **最小改动**:复用现有 Spawner/Dispatcher 代码,只补 Ticker 的调度触发和 Agent 端的 API 调用
3. **不破坏现有测试**301 测试全部继续通过
4. **渐进式上线**:先跑通 Full Agent(注册角色),Subagent 后续完善
## 3. 架构总览
```
┌──────────────────────────────────────────────────────────────────┐
│ Daemon (uvicorn) │
│ │
│ Ticker (30s) │
│ ├─ 1. 依赖推进 (blocked → pending) ← 已有 │
│ ├─ 2. 扫描 pending 任务 ← 新增 │
│ ├─ 3. Dispatcher.decide() 决策 ← 新增调用 │
│ ├─ 4. Spawner.spawn() 调起 Agent ← 新增调用 │
│ └─ 5. 状态 pending → claimed/working ← 新增 │
│ │
│ API (FastAPI) │
│ ├─ POST /api/projects/{id}/tasks ← 已有 │
│ ├─ GET /api/projects/{id}/tasks/{tid}?expand=all ← 已有 │
│ ├─ POST /api/projects/{id}/tasks/{tid}/outputs ← 已有 │
│ └─ PATCH /api/projects/{id}/tasks/{tid}/status ← 已有 │
│ │
│ SSE (/api/events/stream) ← 已有 │
└──────────────────────────────────────────────────────────────────┘
│ openclaw agent --agent {id} --message "..."
┌──────────────────────────────────────────────────────────────────┐
│ OpenClaw Agent (独立 session) │
│ │
│ 1. 读任务:GET /api/projects/{pid}/tasks/{tid}?expand=all │
│ 2. 执行任务:根据 task_type + description 自主完成 │
│ 3. 写产出:POST /api/projects/{pid}/tasks/{tid}/outputs │
│ 4. 更新状态:PATCH /api/projects/{pid}/tasks/{tid}/status │
│ → working → review(触发审查流水线) │
└──────────────────────────────────────────────────────────────────┘
```
## 4. 详细设计
### 4.1 Ticker 集成调度(D4
**改动文件**: `src/daemon/ticker.py``_tick_project()`
**当前流程**:
```
tick → 依赖推进 → daemon_tick event → 结束
```
**新增流程**:
```
tick → 依赖推进 → 调度扫描 → daemon_tick event → 结束
├─ 扫描所有 pending 任务
├─ 对每个 pending 任务调 Dispatcher.decide()
├─ 如果是 FULL_AGENT → Spawner.spawn_full_agent()
├ 同时将任务状态改为 claimed
└─ 如果是 LOCAL → 直接执行
```
**关键约束**:
- 每个 tick 最多调度 `max_dispatch_per_tick` 个任务(默认 3,可配置)
- 只调度 `pending` 状态的任务(`claimed`/`working` 说明已有 Agent 在处理)
- 检查并发限制(`ActiveAgentCounter.can_acquire()`
- dispatch 失败不阻塞 tick 循环
**代码改动(伪代码)**:
```python
# ticker.py _tick_project() 新增步骤
async def _tick_project(self, project_id, project_info):
# ... 现有: 依赖推进 ...
# 新增: 调度扫描
if self.dispatcher and self.spawner:
await self._dispatch_pending(project_id, db_path)
# ... 现有: daemon_tick event ...
async def _dispatch_pending(self, project_id, db_path):
"""扫描 pending 任务并调度"""
queries = Queries(db_path)
pending = queries.tasks_by_status("pending")
dispatched = 0
for task in pending[:self.max_dispatch_per_tick]:
try:
result = await self.dispatcher.dispatch(task, project_config={...})
if result["status"] == "dispatched" and result["level"] in ("full", "escalate"):
# 标记为 claimed
self._transition_status(conn, task.id, "claimed",
agent="daemon",
detail={"dispatched_to": result["agent_id"],
"session_id": result.get("session_id")})
dispatched += 1
except Exception:
logger.exception("Dispatch failed for %s", task.id)
```
**Ticker 构造器新增参数**:
```python
def __init__(self, ..., dispatcher=None, spawner=None, max_dispatch_per_tick=3):
self.dispatcher = dispatcher
self.spawner = spawner
self.max_dispatch_per_tick = max_dispatch_per_tick
```
### 4.2 Spawner 对接 OpenClawD1
**改动文件**: `src/daemon/spawner.py`
**现状分析**: `spawn_full_agent()` 已经调用 `openclaw agent` CLI,参数完全匹配:
```python
cmd = ["openclaw", "agent",
"--agent", agent_id,
"--session-id", session_id,
"--message", message,
"--json"]
```
`openclaw agent --help` 确认支持 `--agent``--session-id``--message``--json`
**改动点**:
1. **丰富 message 内容** — 当前 `_build_message()` 只传 title + description。需要补充:
- 项目 ID、任务 ID(让 Agent 知道调哪个 API
- API 端点信息(让 Agent 知道怎么回写)
- `--deliver` 标志(可选,让 Agent 回复发送到频道)
2. **新增 prompt 模板** — Agent 收到的 message 需要包含完整的执行指令:
```python
SPAWN_PROMPT_TEMPLATE = """你收到一个 v2.6 黑板任务:
项目: {project_id}
任务ID: {task_id}
标题: {title}
描述: {description}
类型: {task_type}
优先级: {priority}
必要条件: {must_haves}
请按以下步骤执行:
1. 了解任务需求,确认理解无误
2. 执行任务(编码/回测/数据检查等)
3. 完成后,调用以下 API 写入产出:
POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/outputs
Body: {{"agent": "{agent_id}", "content": "<产出内容>", "content_type": "report"}}
4. 更新任务状态为 review
PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
Body: {{"status": "review", "agent": "{agent_id}"}}
5. 如果遇到问题,更新状态为 failed:
PATCH http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_id}/status
Body: {{"status": "failed", "agent": "{agent_id}", "detail": "<失败原因>"}}
注意:
- API 地址: http://{api_host}:{api_port}
- 你可以通过 GET /api/projects/{project_id}/tasks/{task_id}?expand=all 查看完整任务信息
- 产出写入后,审查流水线会自动触发
"""
```
3. **配置化 API 地址** — 从 `config/default.yaml` 或环境变量读取:
```yaml
# config/default.yaml 新增
daemon:
api_host: "127.0.0.1"
api_port: 8083
```
### 4.3 Agent 执行回写流程(D2
**Agent 端无需代码改动** — Agent 本身已有 `exec` 工具可以调用 `curl`,也有 `web_fetch` 可以调 HTTP API。关键是在 spawn 的 prompt 里告诉 Agent
1. 任务详情(title、description、must_haves
2. API 端点和调用方式
3. 回写产出的具体步骤
**回写 API 已就绪**:
| API | 方法 | 用途 |
|-----|------|------|
| `/api/projects/{id}/tasks/{tid}/outputs` | POST | 写入产出 |
| `/api/projects/{id}/tasks/{tid}/status` | PATCH | 更新状态 |
| `/api/projects/{id}/tasks/{tid}/comments` | POST | 添加评论 |
| `/api/projects/{id}/tasks/{tid}?expand=all` | GET | 查看完整信息 |
**审查触发**: Agent 将状态改为 `review` 后,Ticker 在下次 tick 检测到 `review` 状态的任务,触发审查流水线(F12 + F13)。
### 4.4 审查流水线集成
**改动文件**: `src/daemon/ticker.py``_tick_project()`
**新增逻辑**:
```python
# 检测 review 状态的任务,触发审查
review_tasks = queries.tasks_by_status("review")
for task in review_tasks:
# 检查是否已有 review 记录
reviews = bb.get_reviews(task.id)
if not reviews:
# 调度审查 Agent(司马懿)
await self.dispatcher.dispatch(task, action_type="review")
```
### 4.5 僵尸检测 + 超时处理
**已有基础**: F8 Health Monitor 有 zombie 检测逻辑。
**新增**: Ticker 在每次 tick 检查 `claimed`/`working` 状态超时的任务:
- `claimed` 超过 `claim_timeout`(默认 5 分钟)→ 重置为 `pending`
- `working` 超过 `task_timeout`(从任务 `deadline` 或默认 30 分钟计算)→ 标记 `failed`
这部分复用现有 `_transition_status()` 方法。
## 5. 配置变更
```yaml
# config/default.yaml 新增字段
daemon:
# Agent 调度
max_dispatch_per_tick: 3
claim_timeout_minutes: 5
default_task_timeout_minutes: 30
# API(供 Agent 回写)
api_host: "127.0.0.1"
api_port: 8083
```
## 6. main.py 启动集成
**改动文件**: `src/main.py`
当前 Ticker 只接收 `registry` 参数。需要:
1. 创建 `Dispatcher` 实例(传入 registered_agents、spawner、counter
2. 创建 `AgentSpawner` 实例
3. 创建 `ActiveAgentCounter` 实例
4. 将 dispatcher + spawner 传给 Ticker
```python
# main.py lifespan 中
from src.daemon.spawner import AgentSpawner
from src.daemon.dispatcher import Dispatcher
from src.daemon.counter import ActiveAgentCounter
spawner = AgentSpawner(dry_run=False, agent_timeout=600)
counter = ActiveAgentCounter(max_global=5, max_per_agent=2)
dispatcher = Dispatcher(
registered_agents=["pangtong-fujunshi", "simayi-challenger", ...],
spawner=spawner,
counter=counter,
)
ticker = Ticker(
registry=registry,
dispatcher=dispatcher,
spawner=spawner,
max_dispatch_per_tick=config.get("daemon", {}).get("max_dispatch_per_tick", 3),
)
```
## 7. 改动量评估
| 文件 | 改动类型 | 行数估计 | 风险 |
|------|---------|---------|------|
| `src/daemon/ticker.py` | 新增 `_dispatch_pending()` + 构造器参数 | +80 行 | 低(纯新增方法) |
| `src/daemon/spawner.py` | 新增 prompt 模板 + 配置读取 | +40 行 | 低 |
| `src/main.py` | 注入 dispatcher/spawner | +20 行 | 低 |
| `config/default.yaml` | 新增配置项 | +8 行 | 无 |
| `tests/test_ticker.py` | 新增调度测试 | +60 行 | 无 |
**总改动**: ~210 行,全部是新增代码,不修改现有逻辑。
## 8. 不在本方案范围内
| 项目 | 原因 |
|------|------|
| Subagent spawn (sessions_spawn) | 留后续,先跑通 Full Agent |
| 前端对接真实 API | 依赖本方案完成后才有真实数据 |
| Checkpoint | v2.7 |
| Agent bootstrap prompt 定制 | 当前用通用模板,后续按角色定制 |
## 9. 端到端流程示例
用户通过 API 创建一个回测任务:
```
1. POST /api/projects/quant-demo/tasks
→ 任务入库,status=pendingassignee=zhangfei-dev
2. Ticker tick30s 内)
→ 扫描 pending 任务
→ Dispatcher: assignee 是注册角色 → FULL_AGENT
→ Spawner: openclaw agent --agent zhangfei-dev --message "..."
→ 任务 status → claimed
3. 张飞 Agent 收到 prompt
→ 读取任务详情(GET expand=all
→ 执行回测
→ 写入产出(POST /outputs
→ 更新状态为 review
4. Ticker 下次 tick
→ 检测 review 状态任务
→ 调度司马懿审查
→ 司马懿 Agent 收到 prompt → 查看产出 → 写 review → APPROVE/REJECT
5. 审查通过 → status → done
审查驳回 → status → pending(重做)
6. 前端 SSE 实时收到事件更新
```
## 10. 验收标准
1. 通过 API 创建任务 → 30s 内 Agent 被调起
2. Agent 能读取任务详情、执行、写入产出
3. Agent 更新状态为 review → 审查 Agent 被调起
4. 审查通过 → 任务完成
5. 前端展示真实数据(非 mock
6. 现有 301 测试全部继续通过