From 8c11d77484a0ca606a6e650f25df68ef3ab2ebd3 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Thu, 4 Jun 2026 20:42:43 +0800 Subject: [PATCH] auto-sync: 2026-06-04 20:42:43 --- docs/design/12-pipeline-design.md | 151 ++++++++++++++++++++---------- 1 file changed, 100 insertions(+), 51 deletions(-) diff --git a/docs/design/12-pipeline-design.md b/docs/design/12-pipeline-design.md index 035db20..83f6968 100644 --- a/docs/design/12-pipeline-design.md +++ b/docs/design/12-pipeline-design.md @@ -1,6 +1,6 @@ # Pipeline 设计专题 -> 版本: v2.0 +> 版本: v2.1 > 日期: 2026-06-04 > 作者: 庞统(副军师) > 状态: **评审中** @@ -85,6 +85,8 @@ class Pipeline: 异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。 +**认领阶段不受 Pipeline 约束**:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。 + ### 3.3 Pipeline 示例 **coding pipeline(multi_step,含退回重试)**: @@ -236,16 +238,17 @@ def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]: ## §5 广播定义修正 -**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。 +**一轮广播的定义**:所有已通知的 Agent 都给出了反馈(认领/NO_REPLY/observation),且没有新的空闲 Agent 可通知,才算一轮广播完成。 具体机制: - 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent - 每次 tick:spawn 空闲且尚未被通知的 Agent - Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈 - Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试 -- 当所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1 +- 当没有新的空闲 Agent 可通知时,检查所有已通知的 Agent 是否已全部反馈且没人认领 → 一轮结束 → round_number +1 +- **注意**:不要求所有 Agent 都通知过,只看已通知的 Agent 是否全部反馈(避免忙 Agent 死锁) - 有人认领 → 广播立即结束(不需要等其他 Agent 反馈) -- 连续 3 轮广播无人认领 → 升级庞统 +- 连续 3 轮无人认领 → 升级庞统 详细设计见 §10.2。 @@ -340,32 +343,44 @@ class PipelineEngine: self.bb.save_pipeline_state(state) return state - def get_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]: - """根据当前 stage 和执行结果,计算下一个 stage(纯函数,不修改 PipelineStage 模板)""" + def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str: + """推进 Pipeline stage(有副作用:修改 task_state,同步 assignee)""" pipeline = self.registry.get(task_state.pipeline_type) if not pipeline: - return None # 自由模式,不干预 + return task.status current_stage = pipeline.stages.get(task_state.current_stage) if not current_stage: - return None + return task.status + # 计算下一个 stage if outcome == "success": - task_state.current_stage = current_stage.on_success - return current_stage.on_success + next_stage_id = current_stage.on_success elif outcome == "failure": - # 检查 max_retries + retries = task_state.stage_retry_counts.get(current_stage.id, 0) if current_stage.max_retries > 0: - retries = task_state.stage_retry_counts.get(current_stage.id, 0) retries += 1 task_state.stage_retry_counts[current_stage.id] = retries if retries > current_stage.max_retries: - task_state.current_stage = "failed" return "failed" - return current_stage.on_failure + next_stage_id = current_stage.on_failure else: - task_state.current_stage = "failed" return "failed" + + # 更新运行时状态 + task_state.current_stage = next_stage_id + + # 同步更新 assignee(关键!否则 Router 路径 4 会劫持) + next_stage = pipeline.stages.get(next_stage_id) + if next_stage: + new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent + # bb.update_assignee(task.id, new_agent) + + # 成功推进时清理当前 stage 的 retry count + if outcome == "success": + task_state.stage_retry_counts.pop(current_stage.id, None) + + return next_stage_id" def get_agent_for_stage(self, task: Task) -> Optional[str]: """获取当前 stage 应该执行的 Agent""" @@ -403,12 +418,32 @@ class PipelineEngine: **TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。 +### Entry 触发机制 + +Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理: + +``` +Pipeline 任务的生命周期: +1. 任务创建:status=pending, task_type=coding +2. Ticker 扫到 pending + task_type 有值 +3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev) +4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev +5. Ticker spawn zhangfei-dev,spawn 成功后: + a. assignee = pipeline.default_agent(zhangfei-dev) + b. status 直接设为 pipeline.entry(working) + c. current_agent = zhangfei-dev + d. TaskPipelineState 创建:current_stage=working +6. 跳过 claim 阶段(Pipeline 模式下不需要广播认领) +``` + +### 状态推进流程 + Ticker 在任务状态流转时调用 PipelineEngine: 1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态 2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure -3. `PipelineEngine.get_next_stage(task_state, outcome)` 计算下一个 stage +3. `PipelineEngine.advance_stage(task, task_state, outcome)` 推进到下一个 stage 4. 持久化更新 `TaskPipelineState`(`bb.save_pipeline_state(task_state)`) -5. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn +5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn 6. 如果是终态 → 标记 done/failed ## §9 实施路线 @@ -459,6 +494,8 @@ task_type=body.get("task_type", None), **风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。 +**E2E 测试影响**:待验证 `grep` 现有测试中 `task_type` 相关的硬断言,确认改为 `None` 不会破坏测试。如果有断言 `task_type == "coding"` 的测试,需改为 `task_type == None` 或删除断言。 + **测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。 ### §10.2 广播计数器修正 @@ -533,49 +570,45 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list): tracker.notified_agents.add(agent_id) # ... 现有 spawn 广播逻辑 ... else: - # 本轮所有空闲 Agent 都已通知过 - # 检查是否所有已知 Agent 都通知了 - unnotified = self._all_agent_ids - tracker.notified_agents - - if not unnotified: - # 所有 Agent 都通知过了 → 检查是否全部反馈 - if tracker.responded_agents >= tracker.notified_agents: - # 全部反馈且没人认领 → 一轮结束 - if tracker.round_number >= 3: - # 3 轮广播无人认领,升级庞统 - self._escalate_task(db_path, task_id) - del self._broadcast_tracker[task_id] - logger.warning( - "Broadcast 3 rounds exhausted, escalating: %s", task_id - ) - else: - # 开始新一轮:重置通知/反馈,轮次+1 - tracker.round_number += 1 - tracker.notified_agents.clear() - tracker.responded_agents.clear() - logger.info( - "Broadcast round %d starting for: %s", - tracker.round_number, task_id, - ) - # else: 还有 Agent 没通知到(可能在忙),等下一个 tick + # 没有新的空闲 Agent 可通知了 + # 直接检查已通知的是否全部反馈(不管有没有还没通知到的忙 Agent) + if tracker.responded_agents >= tracker.notified_agents: + # 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束 + if tracker.round_number >= 3: + # 3 轮广播无人认领,升级庞统 + self._escalate_task(db_path, task_id) + del self._broadcast_tracker[task_id] + logger.warning( + "Broadcast 3 rounds exhausted, escalating: %s", task_id + ) + else: + # 开始新一轮:重置通知/反馈,轮次+1 + tracker.round_number += 1 + tracker.notified_agents.clear() + tracker.responded_agents.clear() + logger.info( + "Broadcast round %d starting for: %s", + tracker.round_number, task_id, + ) + # else: 还有 Agent 没反馈,等下一个 tick ``` -**Agent 反馈回调**(在 spawner.py 的 `on_agent_complete` 中注入): +**Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入): ```python -# spawner.py 中 classify_outcome 之后 -def _on_broadcast_response(self, task_id: str, agent_id: str, outcome: str): - """Agent 对广播任务的反馈""" - tracker = self._ticker._broadcast_tracker.get(task_id) +# ticker.py + def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str): + """记录 Agent 对广播任务的反馈(公共 API)""" + tracker = self._broadcast_tracker.get(task_id) if not tracker: return - if outcome == "claimed": - # Agent 认领了,清理 tracker - self._ticker._broadcast_tracker.pop(task_id, None) + self._broadcast_tracker.pop(task_id, None) else: - # NO_REPLY / observation / 其他 → 算已反馈 tracker.responded_agents.add(agent_id) + +# spawner.py 中 classify_outcome 之后 +# 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome) ``` 注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。 @@ -630,3 +663,19 @@ def test_busy_agent_not_counted(): 3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done? 4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码? 5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调? + +## §A 评审记录 + +### 仲达评审(2026-06-04) + +**结论**:1 个必须修 + 6 个建议。修完 approve。 + +| # | 问题 | 判定 | 处理 | +|---|------|------|------| +| 1 | get_next_stage 有副作用 | 建议 | ✅ 改名 advance_stage | +| 2 | stage_retry_counts 无清理 | 建议 | ✅ 成功推进时清理 | +| 3 | 忙 Agent 死锁 | **必须修** | ✅ 去掉"所有 Agent 都通知"前置条件 | +| 4 | pending/claimed 未说明 | 建议 | ✅ §3 补充说明 | +| 5 | entry 触发机制未说明 | 建议 | ✅ §8.3 补充完整生命周期 | +| 6 | stage 切换 assignee 未同步 | 建议 | ✅ advance_stage 中同步更新 | +| 7 | spawner role_map 缺具体 reviewer | 建议(#11 衔接) | 记录,#11 范围 |