auto-sync: 2026-06-04 20:42:43

This commit is contained in:
cfdaily
2026-06-04 20:42:43 +08:00
parent 3408c5bc26
commit 8c11d77484
+100 -51
View File
@@ -1,6 +1,6 @@
# Pipeline 设计专题
> 版本: v2.0
> 版本: v2.1
> 日期: 2026-06-04
> 作者: 庞统(副军师)
> 状态: **评审中**
@@ -85,6 +85,8 @@ class Pipeline:
异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。
**认领阶段不受 Pipeline 约束**:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。
### 3.3 Pipeline 示例
**coding pipelinemulti_step,含退回重试)**
@@ -236,16 +238,17 @@ def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]:
## §5 广播定义修正
**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。
**一轮广播的定义**:所有已通知的 Agent 都给出了反馈(认领/NO_REPLY/observation),且没有新的空闲 Agent 可通知,才算一轮广播完成。
具体机制:
- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent
- 每次 tick:spawn 空闲且尚未被通知的 Agent
- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈
- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试
-所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1
-没有新的空闲 Agent 可通知时,检查所有已通知的 Agent 是否已全部反馈且没人认领 → 一轮结束 → round_number +1
- **注意**:不要求所有 Agent 都通知过,只看已通知的 Agent 是否全部反馈(避免忙 Agent 死锁)
- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈)
- 连续 3 轮广播无人认领 → 升级庞统
- 连续 3 轮无人认领 → 升级庞统
详细设计见 §10.2。
@@ -340,32 +343,44 @@ class PipelineEngine:
self.bb.save_pipeline_state(state)
return state
def get_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]:
"""根据当前 stage 和执行结果,计算下一个 stage(纯函数,不修改 PipelineStage 模板"""
def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str:
"""推进 Pipeline stage(有副作用:修改 task_state,同步 assignee"""
pipeline = self.registry.get(task_state.pipeline_type)
if not pipeline:
return None # 自由模式,不干预
return task.status
current_stage = pipeline.stages.get(task_state.current_stage)
if not current_stage:
return None
return task.status
# 计算下一个 stage
if outcome == "success":
task_state.current_stage = current_stage.on_success
return current_stage.on_success
next_stage_id = current_stage.on_success
elif outcome == "failure":
# 检查 max_retries
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
if current_stage.max_retries > 0:
retries = task_state.stage_retry_counts.get(current_stage.id, 0)
retries += 1
task_state.stage_retry_counts[current_stage.id] = retries
if retries > current_stage.max_retries:
task_state.current_stage = "failed"
return "failed"
return current_stage.on_failure
next_stage_id = current_stage.on_failure
else:
task_state.current_stage = "failed"
return "failed"
# 更新运行时状态
task_state.current_stage = next_stage_id
# 同步更新 assignee(关键!否则 Router 路径 4 会劫持)
next_stage = pipeline.stages.get(next_stage_id)
if next_stage:
new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent
# bb.update_assignee(task.id, new_agent)
# 成功推进时清理当前 stage 的 retry count
if outcome == "success":
task_state.stage_retry_counts.pop(current_stage.id, None)
return next_stage_id"
def get_agent_for_stage(self, task: Task) -> Optional[str]:
"""获取当前 stage 应该执行的 Agent"""
@@ -403,12 +418,32 @@ class PipelineEngine:
**TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。
### Entry 触发机制
Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理:
```
Pipeline 任务的生命周期:
1. 任务创建:status=pending, task_type=coding
2. Ticker 扫到 pending + task_type 有值
3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev
4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev
5. Ticker spawn zhangfei-devspawn 成功后:
a. assignee = pipeline.default_agentzhangfei-dev
b. status 直接设为 pipeline.entryworking
c. current_agent = zhangfei-dev
d. TaskPipelineState 创建:current_stage=working
6. 跳过 claim 阶段(Pipeline 模式下不需要广播认领)
```
### 状态推进流程
Ticker 在任务状态流转时调用 PipelineEngine
1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态
2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure
3. `PipelineEngine.get_next_stage(task_state, outcome)` 计算下一个 stage
3. `PipelineEngine.advance_stage(task, task_state, outcome)` 推进到下一个 stage
4. 持久化更新 `TaskPipelineState``bb.save_pipeline_state(task_state)`
5. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn
5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn
6. 如果是终态 → 标记 done/failed
## §9 实施路线
@@ -459,6 +494,8 @@ task_type=body.get("task_type", None),
**风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。
**E2E 测试影响**:待验证 `grep` 现有测试中 `task_type` 相关的硬断言,确认改为 `None` 不会破坏测试。如果有断言 `task_type == "coding"` 的测试,需改为 `task_type == None` 或删除断言。
**测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。
### §10.2 广播计数器修正
@@ -533,49 +570,45 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list):
tracker.notified_agents.add(agent_id)
# ... 现有 spawn 广播逻辑 ...
else:
# 本轮所有空闲 Agent 都已通知
# 检查是否所有已知 Agent 都通知了
unnotified = self._all_agent_ids - tracker.notified_agents
if not unnotified:
# 所有 Agent 都通知过了 → 检查是否全部反馈
if tracker.responded_agents >= tracker.notified_agents:
# 全部反馈且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
self._escalate_task(db_path, task_id)
del self._broadcast_tracker[task_id]
logger.warning(
"Broadcast 3 rounds exhausted, escalating: %s", task_id
)
else:
# 开始新一轮:重置通知/反馈,轮次+1
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info(
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没通知到(可能在忙),等下一个 tick
# 没有新的空闲 Agent 通知
# 直接检查已通知的是否全部反馈(不管有没有还没通知到的忙 Agent
if tracker.responded_agents >= tracker.notified_agents:
# 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束
if tracker.round_number >= 3:
# 3 轮广播无人认领,升级庞统
self._escalate_task(db_path, task_id)
del self._broadcast_tracker[task_id]
logger.warning(
"Broadcast 3 rounds exhausted, escalating: %s", task_id
)
else:
# 开始新一轮:重置通知/反馈,轮次+1
tracker.round_number += 1
tracker.notified_agents.clear()
tracker.responded_agents.clear()
logger.info(
"Broadcast round %d starting for: %s",
tracker.round_number, task_id,
)
# else: 还有 Agent 没反馈,等下一个 tick
```
**Agent 反馈回调**(在 spawner.py 的 `on_agent_complete` 中注入):
**Agent 反馈回调**通过 Ticker 公共 API在 spawner.py 的 `on_agent_complete` 中注入):
```python
# spawner.py 中 classify_outcome 之后
def _on_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""Agent 对广播任务的反馈"""
tracker = self._ticker._broadcast_tracker.get(task_id)
# ticker.py
def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str):
"""记录 Agent 对广播任务的反馈(公共 API"""
tracker = self._broadcast_tracker.get(task_id)
if not tracker:
return
if outcome == "claimed":
# Agent 认领了,清理 tracker
self._ticker._broadcast_tracker.pop(task_id, None)
self._broadcast_tracker.pop(task_id, None)
else:
# NO_REPLY / observation / 其他 → 算已反馈
tracker.responded_agents.add(agent_id)
# spawner.py 中 classify_outcome 之后
# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome)
```
注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。
@@ -630,3 +663,19 @@ def test_busy_agent_not_counted():
3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done
4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码?
5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调?
## §A 评审记录
### 仲达评审(2026-06-04
**结论**:1 个必须修 + 6 个建议。修完 approve。
| # | 问题 | 判定 | 处理 |
|---|------|------|------|
| 1 | get_next_stage 有副作用 | 建议 | ✅ 改名 advance_stage |
| 2 | stage_retry_counts 无清理 | 建议 | ✅ 成功推进时清理 |
| 3 | 忙 Agent 死锁 | **必须修** | ✅ 去掉"所有 Agent 都通知"前置条件 |
| 4 | pending/claimed 未说明 | 建议 | ✅ §3 补充说明 |
| 5 | entry 触发机制未说明 | 建议 | ✅ §8.3 补充完整生命周期 |
| 6 | stage 切换 assignee 未同步 | 建议 | ✅ advance_stage 中同步更新 |
| 7 | spawner role_map 缺具体 reviewer | 建议(#11 衔接) | 记录,#11 范围 |