auto-sync: 2026-06-04 20:10:35

This commit is contained in:
cfdaily
2026-06-04 20:10:35 +08:00
parent 3aa64f1f0e
commit f3e8d85b13
+517 -84
View File
@@ -1,9 +1,9 @@
# Pipeline 设计专题
版本: v1.0
日期: 2026-06-04
作者: 庞统(副军师)
状态: **待确认**
> 版本: v2.0
> 日期: 2026-06-04
> 作者: 庞统(副军师)
> 状态: **评审中**
## §1 背景
@@ -13,6 +13,7 @@
1. 支持用户指定任务执行流程(Pipeline)
2. 不指定时走广播认领(自由模式)
3. 广播认领有合理的轮次限制
4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试
## §2 核心决策
@@ -20,10 +21,11 @@
- 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离
- 不指定 PipelineNone/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff)
**D2: Pipeline 是强工作流**
**D2: Pipeline 是强工作流,但支持灵活流转**
- 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束
- Agent 必须走 Pipeline 定义的所有步骤,不能跳过
- 没有"可偏离"的中间地带
- 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试
- 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径
**D3: @mention 在 Pipeline 模式下只做通知,不改变执行者**
- Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者
@@ -39,64 +41,164 @@
1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none
2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户
## §3 Pipeline 定义
## §3 Pipeline 定义(灵活流转)
每个 Pipeline 包含:
- `task_type`:类型标识
- `pipeline_type`:执行策略(single_step / multi_step
- `default_agent`:默认执行者(可被 @mention 覆盖)
- `stages`:步骤列表(有序)
- `status_flow`:允许的状态流转(硬约束)
每个 Pipeline 是一个**有向状态机**,支持顺序、条件分支、退回、循环重试。
### 初始 Pipeline 列表
| task_type | pipeline_type | default_agent | stages |
|-----------|---------------|---------------|--------|
| `coding` | `multi_step` | `zhangfei-dev` | working → review → done |
| `review` | `single_step` | `simayi-challenger` | working → review → done |
| `data` | `single_step` | `zhaoyun-data` | working → done |
| `deploy` | `single_step` | `jiangwei-infra` | working → done |
| `risk_check` | `single_step` | `guanyu-dev` | working → done |
| None/不指定 | 自由模式 | 广播认领 | pending → claimed → working → review → done |
### 状态流转硬约束
### 3.1 数据结构
```python
PIPELINE_STATUS_FLOW = {
"coding": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["review"],
"review": ["done", "working"], # review 失败回到 working
# 失败/暂停等异常状态不受 Pipeline 约束
},
"review": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["done"],
},
"data": {
"pending": ["claimed"],
"claimed": ["working"],
"working": ["done"],
},
# ...
}
@dataclass
class PipelineStage:
id: str # 步骤标识(如 "working", "review"
agent: str # 执行者:固定 agent_id 或 "pipeline_default"
on_success: str # 成功后转到哪个 stage(或 "done"/"failed"
on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回)
max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed)
retry_count: int = 0 # 当前已重试次数(运行时状态)
is_terminal: bool = False # 是否终态
@dataclass
class Pipeline:
task_type: str # 类型标识
default_agent: str # 默认执行者
entry: str # 入口 stage id
stages: Dict[str, PipelineStage] # stage id → PipelineStage
```
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何状态都可以转入异常状态。
### 3.2 流转规则
| 流转类型 | 实现方式 | 示例 |
|---------|---------|------|
| **顺序** | `on_success` 指向下一个 stage | working → review → done |
| **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→donereview 拒绝→working |
| **退回** | `on_failure` 指向前序 stage | review 失败退回 working |
| **循环/重试** | `max_retries` + `retry_count` 控制 | review 最多退回 3 次,超过转 failed |
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
### 3.3 Pipeline 示例
**coding pipelinemulti_step,含退回重试)**
```yaml
task_type: coding
default_agent: zhangfei-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: review
on_failure: failed
review:
agent: simayi-challenger
on_success: done
on_failure: working # review 失败退回重做
max_retries: 3 # 最多退回 3 次,超过转 failed
```
流转图:
```
pending → claimed → working ──→ review ──→ done
↑ │
└────────────┘ (on_failure, 最多3次)
│ >3次
└──→ failed
```
**data pipelinesingle_step,简单顺序)**
```yaml
task_type: data
default_agent: zhaoyun-data
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
```
**review pipeline(单步,固定 agent**
```yaml
task_type: review
default_agent: simayi-challenger
entry: working
stages:
working:
agent: simayi-challenger
on_success: done
on_failure: failed
```
**deploy pipeline(含验证循环)**
```yaml
task_type: deploy
default_agent: jiangwei-infra
entry: working
stages:
working:
agent: pipeline_default
on_success: verify
on_failure: failed
verify:
agent: pipeline_default # 部署者自验
on_success: done
on_failure: working # 验证失败退回重新部署
max_retries: 2 # 最多重试 2 次
```
**risk_check pipeline(单步)**
```yaml
task_type: risk_check
default_agent: guanyu-dev
entry: working
stages:
working:
agent: pipeline_default
on_success: done
on_failure: failed
```
**自由模式(None/不指定)**
| 项目 | 值 |
|------|-----|
| default_agent | 广播认领 |
| stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) |
### 3.4 初始 Pipeline 注册表
| task_type | 类型 | default_agent | stages | 退回 |
|-----------|------|---------------|--------|------|
| `coding` | multi_step | `zhangfei-dev` | working → review → done | review→working (max 3) |
| `review` | single_step | `simayi-challenger` | working → done | 无 |
| `data` | single_step | `zhaoyun-data` | working → done | 无 |
| `deploy` | multi_step | `jiangwei-infra` | working → verify → done | verify→working (max 2) |
| `risk_check` | single_step | `guanyu-dev` | working → done | 无 |
| None | 自由模式 | 广播认领 | 基本状态机 | 无 |
## §4 路由逻辑(更新 Router
Router 新增路径 5:按 task_type 匹配 Pipeline
```
路径1: 本地操作
路径2: retry → 原执行者
Mode B: Agent 声明式交接
路径3: 生命周期流转
路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS)
路径2: retry → 原执行者 (action_type == "retry")
Mode B: Agent 声明式交接 (next_capability 有值)
路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY)
路径4: 有 assignee → 直接给
路径5(新增): task_type 有值 → 查 Pipeline → 用 Pipeline 的 default_agent
路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent
- @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知)
- mode = "deterministic"
模糊场景: delegate 庞统 → 广播
@@ -104,60 +206,391 @@ Mode B: Agent 声明式交接
路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。
路径 5 的具体逻辑:
```python
# router.py 新增
def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
task_type = task_info.get("task_type")
if not task_type:
return None
pipeline = self.pipeline_registry.get(task_type)
if not pipeline:
return None
return RouteDecision(
agent_id=pipeline.default_agent,
mode="deterministic",
confidence=0.85,
reason=f"Pipeline match: {task_type}{pipeline.default_agent}",
)
```
## §5 广播定义修正
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY),才算一轮广播完成。
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个 `notified_agents` 集合
- 每次 tick:spawn 空闲且未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 加入已反馈集合
- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent
- 每次 tickspawn 空闲且未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- 当所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1
- retry_count >= 3 → 升级庞统
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮广播无人认领 → 升级庞统
注意:
- Agent 忙(被 counter 阻塞)→ 不算已通知,下次 tick 继续尝试
- Agent 返回 NO_REPLY → 算已反馈
- Agent 认领 → 广播立即结束(不需要等其他 Agent)
实现方案:
- 在 events 表记录每个 Agent 的反馈
- 或在内存中维护 `task_id → {notified: set, responded: set}` 映射
- ticker 启动时从 events 表恢复状态
详细设计见 §10.2。
## §6 API 改动
1. `POST /api/projects/{pid}/tasks`
- `task_type` 默认改为 `None`(不再是 "coding"
- 前端传 task_type 时走 Pipeline,不传时走自由模式
### §6.1 修改现有端点
2. 新增 `GET /api/pipelines`
- 返回可用 Pipeline 列表(从配置加载
- 前端下拉菜单使用
`POST /api/projects/{pid}/tasks`
- `task_type` 默认改为 `None`(不再是 "coding"
- 前端传 task_type 时走 Pipeline,不传时走自由模式
### §6.2 新增端点
`GET /api/pipelines`
- 返回可用 Pipeline 列表(从注册表加载)
- 供前端下拉菜单使用
- 响应格式:
```json
{
"pipelines": [
{"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"},
{"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"},
{"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"},
{"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"},
{"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"}
]
}
```
## §7 前端改动
1. TaskModal 创建任务时:
- 新增 Pipeline 下拉选择(默认"自由模式")
- 选项从 `/api/pipelines` 动态加载
- 选中 Pipeline 后显示简要流程说明
2. Chat 入口:
- 庞统在需求探索时,判断是否需要指定 Pipeline
- 拿不准时通过 checkpoint 询问用户
## §8 实施路线
## §8 Pipeline 引擎
| Phase | 内容 | 优先级 |
|-------|------|--------|
| Phase 1 | task_type 默认值改 None + 广播计数器修正(bug fix | P0 |
| Phase 2 | Router 新增路径 5task_type → Pipeline default_agent | P0 |
| Phase 3 | Pipeline 硬约束状态机(status_flow 校验) | P1 |
| Phase 4 | 前端 Pipeline 下拉 + `/api/pipelines` 端点 | P1 |
| Phase 5 | Pipeline YAML 声明式配置(动态加载) | P2 |
### §8.1 注册表
## §9 待确认
```python
class PipelineRegistry:
"""Pipeline 注册表"""
def __init__(self):
self._pipelines: Dict[str, Pipeline] = {}
self._load_defaults()
def _load_defaults(self):
"""加载内置 Pipeline"""
for cfg in DEFAULT_PIPELINES:
p = self._parse_pipeline(cfg)
self._pipelines[p.task_type] = p
def get(self, task_type: str) -> Optional[Pipeline]:
return self._pipelines.get(task_type)
def list_all(self) -> List[Pipeline]:
return list(self._pipelines.values())
```
1. Pipeline 列表是否完整?是否需要 `research`/`discussion` 类型?
2. coding 的 multi_step 具体步骤:working → review → done 是否足够?是否需要 plan → implement → verify 更细的粒度?
3. 广播的 `notified_agents` 状态存在内存还是 DB?ticker 重启后如何恢复?
4. Phase 1 和 Phase 2 是否合并实施?
### §8.2 状态流转校验
```python
class PipelineEngine:
"""Pipeline 执行引擎"""
def __init__(self, registry: PipelineRegistry, bb: Blackboard):
self.registry = registry
self.bb = bb
def get_next_stage(self, task: Task, outcome: str) -> Optional[str]:
"""根据当前 stage 和执行结果,计算下一个 stage"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return None # 自由模式,不干预
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return None
if outcome == "success":
return current_stage.on_success
elif outcome == "failure":
# 检查 max_retries
if current_stage.max_retries > 0:
current_stage.retry_count += 1
if current_stage.retry_count > current_stage.max_retries:
return "failed"
return current_stage.on_failure
else:
return "failed"
def get_agent_for_stage(self, task: Task) -> Optional[str]:
"""获取当前 stage 应该执行的 Agent"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return None
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return None
if current_stage.agent == "pipeline_default":
return pipeline.default_agent
return current_stage.agent
def validate_transition(self, task: Task, new_status: str) -> bool:
"""校验状态流转是否合法(硬约束)"""
pipeline = self.registry.get(task.task_type)
if not pipeline:
return True # 自由模式不限制
# 异常状态始终允许
if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"):
return True
current_stage = pipeline.stages.get(task.status)
if not current_stage:
return True
allowed = {current_stage.on_success, current_stage.on_failure}
return new_status in allowed
```
### §8.3 Ticker 集成
Ticker 在任务状态流转时调用 PipelineEngine
1. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
2. `PipelineEngine.get_next_stage()` 计算下一个 stage
3. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn
4. 如果是终态 → 标记 done/failed
## §9 实施路线
### Phase 1Bug fix(独立实施)
| Step | 内容 | 详细设计 |
|------|------|---------|
| 1.1 | task_type 默认值改 None | §10.1 |
| 1.2 | 广播计数器修正 | §10.2 |
### Phase 2Pipeline 全量实施(Phase 1 完成后一口气做)
| Step | 内容 |
|------|------|
| 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2 |
| 2.2 | Router 路径 5(§4 |
| 2.3 | Ticker 集成 PipelineEngine(§8.3 |
| 2.4 | 广播 tracker 反馈回调注入 |
| 2.5 | API 端点(§6 |
| 2.6 | 前端 Pipeline 下拉(§7 |
| 2.7 | E2E 测试覆盖 |
## §10 Phase 1 详细设计
### §10.1 task_type 默认值改 None
**文件**`src/api/blackboard_routes.py`
**改动**1 行
```python
# 当前(line 133):
task_type=body.get("task_type", "coding"),
# 改为:
task_type=body.get("task_type", None),
```
**影响分析**
| 场景 | 改前 | 改后 |
|------|------|------|
| 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 |
| 前端传 task_type="coding" | coding | coding(不变) |
| E2E 测试不传 task_type | coding | None |
| 现有 DB 数据 | 不受影响(已写入) | 不受影响 |
**风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。
**测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。
### §10.2 广播计数器修正
**文件**`src/daemon/ticker.py`
#### 当前问题
- `_broadcast_claim` 在 line ~1020 执行广播
- line ~1046 检查 `retry_count >= 3` 但从不递增 retry_count
- retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增(line ~1317
- 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环
#### 修复方案
**新增数据结构**ticker.py 顶部):
```python
from dataclasses import dataclass, field
@dataclass
class BroadcastRound:
"""追踪单个任务的广播状态"""
task_id: str
notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent
responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent
round_number: int = 1 # 当前第几轮
```
**Ticker 类新增属性**
```python
class Ticker:
def __init__(self, ...):
...
self._broadcast_tracker: Dict[str, BroadcastRound] = {}
self._all_agent_ids: Set[str] = set() # 从 config 加载
```
**`_broadcast_claim` 改造**(替换 line ~1020 区域):
```python
def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
"""广播认领:按轮次追踪,所有 Agent 反馈才算一轮"""
for task_info in broadcast_tasks:
task_id = task_info["id"]
# 获取或创建 tracker
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
tracker = BroadcastRound(task_id=task_id)
self._broadcast_tracker[task_id] = tracker
# 检查是否已认领(可能上一个 tick 被认领了)
bb = Blackboard(db_path)
task = bb.get_task(task_id)
if task.status != "pending":
# 已被认领或状态已变,清理 tracker
self._broadcast_tracker.pop(task_id, None)
continue
# 获取空闲 Agent
idle_agents = self._get_idle_agents()
# 过滤已通知过的,只 spawn 尚未通知的
pending_agents = [a for a in idle_agents if a not in tracker.notified_agents]
if pending_agents:
# spawn 尚未通知的 Agent
for agent_id in pending_agents:
tracker.notified_agents.add(agent_id)
# ... 现有 spawn 广播逻辑 ...
else:
# 本轮所有空闲 Agent 都已通知过
# 检查是否所有已知 Agent 都通知了
unnotified = self._all_agent_ids - tracker.notified_agents
if not unnotified:
# 所有 Agent 都通知过了 → 检查是否全部反馈
if tracker.responded_agents >= tracker.notified_agents:
# 全部反馈且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
self._escalate_task(db_path, task_id)
del self._broadcast_tracker[task_id]
logger.warning(
"Broadcast 3 rounds exhausted, escalating: %s", task_id
)
else:
# 开始新一轮:重置通知/反馈,轮次+1
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info(
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
```
**Agent 反馈回调**(在 spawner.py 的 `on_agent_complete` 中注入):
```python
# spawner.py 中 classify_outcome 之后
def _on_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""Agent 对广播任务的反馈"""
tracker = self._ticker._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
# Agent 认领了,清理 tracker
self._ticker._broadcast_tracker.pop(task_id, None)
else:
# NO_REPLY / observation / 其他 → 算已反馈
tracker.responded_agents.add(agent_id)
```
注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。
**Ticker 重启恢复**
简单方案:重启时清空 `_broadcast_tracker`,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。
```python
# ticker.py 启动时
self._broadcast_tracker.clear()
logger.info("Broadcast tracker cleared (ticker restart)")
```
**测试**
```python
# tests/test_broadcast_tracker.py
def test_round_completes_when_all_respond():
"""所有 Agent 反馈后,一轮结束"""
...
def test_round_resets_for_next_round():
"""一轮结束后,开始新一轮"""
...
def test_escalate_after_3_rounds():
"""3 轮无人认领,升级庞统"""
...
def test_claim_clears_tracker():
"""Agent 认领后,tracker 清理"""
...
def test_busy_agent_not_counted():
"""忙的 Agent 不算已通知,下 tick 继续尝试"""
...
```
#### 改动文件清单
| 文件 | 改动 | 行数估计 |
|------|------|---------|
| `src/daemon/ticker.py` | BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 |
| `src/daemon/spawner.py` | _on_broadcast_response 回调注入 | ~15 行 |
| `tests/test_broadcast_tracker.py` | 新测试文件 | ~80 行 |
## §11 待确认
1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)?
2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗?
3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done
4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调?