#12 Pipeline 设计专题 + 广播定义修正

- 新建 12-pipeline-design.md: Pipeline/自由模式二选一、硬约束工作流、@mention优先、广播轮次定义
- 更新 architecture-v3.0.md §8.3: 一轮广播=所有Agent反馈、§19.2: 已知问题标注
This commit is contained in:
cfdaily
2026-06-04 19:58:26 +08:00
parent 03600a69f5
commit dee0dae0be
2 changed files with 184 additions and 5 deletions
+163
View File
@@ -0,0 +1,163 @@
# Pipeline 设计专题
版本: v1.0
日期: 2026-06-04
作者: 庞统(副军师)
状态: **待确认**
## §1 背景
当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。
设计目标:
1. 支持用户指定任务执行流程(Pipeline)
2. 不指定时走广播认领(自由模式)
3. 广播认领有合理的轮次限制
## §2 核心决策
**D1: Pipeline 模式 vs 自由模式二选一**
- 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
- 不指定 PipelineNone/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)
**D2: Pipeline 是强工作流**
- 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
- Agent 必须走 Pipeline 定义的所有步骤,不能跳过
- 没有"可偏离"的中间地带
**D3: @mention 优先级高于 Pipeline 默认路由**
- Pipeline 定义了默认执行者(如 coding → zhangfei-dev
- 但 description 里的 @mention 优先级更高
- 指定 Pipeline + @zhaoyun-data → 按 coding Pipeline 的流程执行,但执行者是 zhaoyun-data
**D4: task_type 默认值改为 None**
- API 层 `task_type=body.get("task_type", None)` 而非默认 "coding"
- 不指定 = 自由模式,走广播认领
- 指定了 = Pipeline 模式
**D5: 两个入口**
1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none
2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户
## §3 Pipeline 定义
每个 Pipeline 包含:
- `task_type`:类型标识
- `pipeline_type`:执行策略(single_step / multi_step
- `default_agent`:默认执行者(可被 @mention 覆盖)
- `stages`:步骤列表(有序)
- `status_flow`:允许的状态流转(硬约束)
### 初始 Pipeline 列表
| task_type | pipeline_type | default_agent | stages |
|-----------|---------------|---------------|--------|
| `coding` | `multi_step` | `zhangfei-dev` | working → review → done |
| `review` | `single_step` | `simayi-challenger` | working → review → done |
| `data` | `single_step` | `zhaoyun-data` | working → done |
| `deploy` | `single_step` | `jiangwei-infra` | working → done |
| `risk_check` | `single_step` | `guanyu-dev` | working → done |
| None/不指定 | 自由模式 | 广播认领 | pending → claimed → working → review → done |
### 状态流转硬约束
```python
PIPELINE_STATUS_FLOW = {
"coding": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["review"],
"review": ["done", "working"], # review 失败回到 working
# 失败/暂停等异常状态不受 Pipeline 约束
},
"review": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["done"],
},
"data": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["done"],
},
# ...
}
```
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何状态都可以转入异常状态。
## §4 路由逻辑(更新 Router
Router 新增路径 5:按 task_type 匹配 Pipeline
```
路径1: 本地操作
路径2: retry → 原执行者
Mode B: Agent 声明式交接
路径3: 生命周期流转
路径4: 有 assignee → 直接给
路径5(新增): task_type 有值 → 查 Pipeline → 用 Pipeline 的 default_agent
- 如果 description 有 @mention → @mention 覆盖 default_agent
- mode = "deterministic"
模糊场景: delegate 庞统 → 广播
```
路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。
## §5 广播定义修正
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY),才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个 `notified_agents` 集合
- 每次 tick:spawn 空闲且未被通知过的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 加入已反馈集合
- 当所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1
- retry_count >= 3 → 升级庞统
注意:
- Agent 忙(被 counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- Agent 返回 NO_REPLY → 算已反馈
- Agent 认领 → 广播立即结束(不需要等其他 Agent)
实现方案:
- 在 events 表记录每个 Agent 的反馈
- 或在内存中维护 `task_id → {notified: set, responded: set}` 映射
- ticker 启动时从 events 表恢复状态
## §6 API 改动
1. `POST /api/projects/{pid}/tasks`
- `task_type` 默认改为 `None`(不再是 "coding"
- 前端传 task_type 时走 Pipeline,不传时走自由模式
2. 新增 `GET /api/pipelines`
- 返回可用 Pipeline 列表(从配置加载)
- 供前端下拉菜单使用
## §7 前端改动
1. TaskModal 创建任务时:
- 新增 Pipeline 下拉选择(默认"自由模式")
- 选项从 `/api/pipelines` 动态加载
2. Chat 入口:
- 庞统在需求探索时,判断是否需要指定 Pipeline
- 拿不准时通过 checkpoint 询问用户
## §8 实施路线
| Phase | 内容 | 优先级 |
|-------|------|--------|
| Phase 1 | task_type 默认值改 None + 广播计数器修正(bug fix | P0 |
| Phase 2 | Router 新增路径 5task_type → Pipeline default_agent | P0 |
| Phase 3 | Pipeline 硬约束状态机(status_flow 校验) | P1 |
| Phase 4 | 前端 Pipeline 下拉 + `/api/pipelines` 端点 | P1 |
| Phase 5 | Pipeline YAML 声明式配置(动态加载) | P2 |
## §9 待确认
1. Pipeline 列表是否完整?是否需要 `research`/`discussion` 类型?
2. coding 的 multi_step 具体步骤:working → review → done 是否足够?是否需要 plan → implement → verify 更细的粒度?
3. 广播的 `notified_agents` 状态存在内存还是 DB?ticker 重启后如何恢复?
4. Phase 1 和 Phase 2 是否合并实施?
+21 -5
View File
@@ -763,17 +763,27 @@ route(task_info, action_type):
- `set_cooldown(agent_id, seconds=120)`: API 429 后冷却 - `set_cooldown(agent_id, seconds=120)`: API 429 后冷却
- `is_cooling_down(agent_id)`: 检查冷却状态 - `is_cooling_down(agent_id)`: 检查冷却状态
### 8.3 广播认领 ✅(已实现,司马评审确认) ### 8.3 广播认领 ✅(已实现,司马评审确认)— 定义修正 2026-06-04
**设计文档 (v3.0-router-refactor §3.3)**: **定义**:一轮广播 = 所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY),才算一轮完成。
- 确定性路径:已知下一步谁做 → 直接 spawn
- 广播认领路径:不确定 → spawn 所有空闲 Agent → 自主 claim **机制**
- 无人认领 → retry 3 次后升级庞统兜底 - 每个 pending 广播任务,维护 `notified_agents` 追踪已通知和已反馈的 Agent
- 每次 tick:spawn 空闲且尚未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- 当所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮广播无人认领 → 升级庞统
**实际代码 (ticker.py L540 `_broadcast_claim()`)**: **实际代码 (ticker.py L540 `_broadcast_claim()`)**:
- ✅ 完整实现,包含:全局并发检查(counter.is_near_limit)、空闲 Agent 列表(_get_idle_agents)、批量广播(攒一批任务一次广播)、spawn 广播、无人认领检测(retry_count >= 3 → escalated)、审计事件记录(events 表 broadcast_claim 类型) - ✅ 完整实现,包含:全局并发检查(counter.is_near_limit)、空闲 Agent 列表(_get_idle_agents)、批量广播(攒一批任务一次广播)、spawn 广播、无人认领检测(retry_count >= 3 → escalated)、审计事件记录(events 表 broadcast_claim 类型)
- ✅ 确定性路径也完整(Router → Dispatcher → Spawner - ✅ 确定性路径也完整(Router → Dispatcher → Spawner
**与旧实现的区别**
- 旧:每次 tick spawn 所有空闲 Agent = 一轮,Agent 忙就跳过,retry_count 不递增
- 新:追踪每个 Agent 的通知/反馈状态,全部反馈才算一轮,防止忙 Agent 被反复跳过导致无限循环
**注**: 庞统初版误判为"未实现",因检查 router.py/spawner.py 时未查 ticker.py。司马懿评审纠正。 **注**: 庞统初版误判为"未实现",因检查 router.py/spawner.py 时未查 ticker.py。司马懿评审纠正。
--- ---
@@ -1447,6 +1457,12 @@ SSEEventType:
**纠正记录**: 庞统初版只检查了 router.py/spawner.py,遗漏了 ticker.py L540。司马懿评审纠正。 **纠正记录**: 庞统初版只检查了 router.py/spawner.py,遗漏了 ticker.py L540。司马懿评审纠正。
**⚠️ 已知问题(2026-06-04 发现)**
- 广播计数器不递增:`_broadcast_claim` 中只检查 `retry_count >= 3` 但不递增 retry_count
- retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增
- 导致广播任务无限循环:每 tick 广播一次,没人认领也不计数
- **修复方案**:见 `docs/design/12-pipeline-design.md` §5 广播定义修正
### 19.3 `_generate_title()` 绕过 Gateway ⚠️ TODO(保留,待替换本地 LLM) ### 19.3 `_generate_title()` 绕过 Gateway ⚠️ TODO(保留,待替换本地 LLM)
**现状**: `blackboard_routes._generate_title()`line 138-175)直接用 OpenAI client 调 zhipu API,不走 Gateway。 **现状**: `blackboard_routes._generate_title()`line 138-175)直接用 OpenAI client 调 zhipu API,不走 Gateway。