diff --git a/docs/design/12-pipeline-design.md b/docs/design/12-pipeline-design.md index 83f6968..fbf8356 100644 --- a/docs/design/12-pipeline-design.md +++ b/docs/design/12-pipeline-design.md @@ -238,18 +238,19 @@ def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]: ## §5 广播定义修正 -**一轮广播的定义**:所有已通知的 Agent 都给出了反馈(认领/NO_REPLY/observation),且没有新的空闲 Agent 可通知,才算一轮广播完成。 +**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。 具体机制: - 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent - 每次 tick:spawn 空闲且尚未被通知的 Agent - Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈 - Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试 -- 当没有新的空闲 Agent 可通知时,检查所有已通知的 Agent 是否已全部反馈且没人认领 → 一轮结束 → round_number +1 -- **注意**:不要求所有 Agent 都通知过,只看已通知的 Agent 是否全部反馈(避免忙 Agent 死锁) +- 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1 - 有人认领 → 广播立即结束(不需要等其他 Agent 反馈) - 连续 3 轮无人认领 → 升级庞统 +注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。 + 详细设计见 §10.2。 ## §6 API 改动 @@ -343,6 +344,26 @@ class PipelineEngine: self.bb.save_pipeline_state(state) return state + def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]: + """纯查询:返回下一个 stage id,不修改 task_state""" + pipeline = self.registry.get(task_state.pipeline_type) + if not pipeline: + return None + + current_stage = pipeline.stages.get(task_state.current_stage) + if not current_stage: + return None + + if outcome == "success": + return current_stage.on_success + elif outcome == "failure": + retries = task_state.stage_retry_counts.get(current_stage.id, 0) + if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries: + return "failed" + return current_stage.on_failure + else: + return "failed" + def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str: """推进 Pipeline stage(有副作用:修改 task_state,同步 assignee)""" pipeline = self.registry.get(task_state.pipeline_type) @@ -353,34 +374,26 @@ class PipelineEngine: if not current_stage: return task.status - # 计算下一个 stage - if outcome == "success": - next_stage_id = current_stage.on_success - elif outcome == "failure": - retries = task_state.stage_retry_counts.get(current_stage.id, 0) - if current_stage.max_retries > 0: - retries += 1 - task_state.stage_retry_counts[current_stage.id] = retries - if retries > current_stage.max_retries: - return "failed" - next_stage_id = current_stage.on_failure - else: - return "failed" + # 使用 peek 计算下一个 stage + next_stage_id = self.peek_next_stage(task_state, outcome) + if not next_stage_id: + return task.status + + # 更新重试计数(仅在 failure 时) + if outcome == "failure" and current_stage.max_retries > 0: + retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1 + task_state.stage_retry_counts[current_stage.id] = retries # 更新运行时状态 task_state.current_stage = next_stage_id - # 同步更新 assignee(关键!否则 Router 路径 4 会劫持) + # 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持) next_stage = pipeline.stages.get(next_stage_id) if next_stage: new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent # bb.update_assignee(task.id, new_agent) - # 成功推进时清理当前 stage 的 retry count - if outcome == "success": - task_state.stage_retry_counts.pop(current_stage.id, None) - - return next_stage_id" + return next_stage_id def get_agent_for_stage(self, task: Task) -> Optional[str]: """获取当前 stage 应该执行的 Agent""" @@ -423,17 +436,17 @@ class PipelineEngine: Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理: ``` -Pipeline 任务的生命周期: 1. 任务创建:status=pending, task_type=coding 2. Ticker 扫到 pending + task_type 有值 3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev) 4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev -5. Ticker spawn zhangfei-dev,spawn 成功后: - a. assignee = pipeline.default_agent(zhangfei-dev) - b. status 直接设为 pipeline.entry(working) - c. current_agent = zhangfei-dev - d. TaskPipelineState 创建:current_stage=working -6. 跳过 claim 阶段(Pipeline 模式下不需要广播认领) +5. assignee 设为 pipeline.default_agent +6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working +7. current_agent = zhangfei-dev +8. TaskPipelineState 创建:current_stage=working(entry stage) + +代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。 +忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。 ``` ### 状态推进流程 @@ -570,9 +583,13 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list): tracker.notified_agents.add(agent_id) # ... 现有 spawn 广播逻辑 ... else: - # 没有新的空闲 Agent 可通知了 - # 直接检查已通知的是否全部反馈(不管有没有还没通知到的忙 Agent) - if tracker.responded_agents >= tracker.notified_agents: + # 所有空闲 Agent 都已通知过 + # 检查是否所有已知 Agent 都通知了 + unnotified = self._all_agent_ids - tracker.notified_agents + + if not unnotified: + # 所有 Agent 都通知过了 → 检查是否全部反馈 + if tracker.responded_agents >= tracker.notified_agents: # 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束 if tracker.round_number >= 3: # 3 轮广播无人认领,升级庞统 @@ -590,7 +607,7 @@ def _broadcast_claim(self, db_path: Path, broadcast_tasks: list): "Broadcast round %d starting for: %s", tracker.round_number, task_id, ) - # else: 还有 Agent 没反馈,等下一个 tick + # else: 还有 Agent 没通知到(可能在忙),等下一个 tick ``` **Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入): @@ -666,16 +683,17 @@ def test_busy_agent_not_counted(): ## §A 评审记录 -### 仲达评审(2026-06-04) +### 仲达评审(2026-06-04)+ Rebuttal -**结论**:1 个必须修 + 6 个建议。修完 approve。 +**原始结论**:1 个必须修 + 6 个建议。 +**Rebuttal 后结论**:3 项接受,2 项不接受,2 项待补充。双方达成一致。 -| # | 问题 | 判定 | 处理 | -|---|------|------|------| -| 1 | get_next_stage 有副作用 | 建议 | ✅ 改名 advance_stage | -| 2 | stage_retry_counts 无清理 | 建议 | ✅ 成功推进时清理 | -| 3 | 忙 Agent 死锁 | **必须修** | ✅ 去掉"所有 Agent 都通知"前置条件 | -| 4 | pending/claimed 未说明 | 建议 | ✅ §3 补充说明 | -| 5 | entry 触发机制未说明 | 建议 | ✅ §8.3 补充完整生命周期 | -| 6 | stage 切换 assignee 未同步 | 建议 | ✅ advance_stage 中同步更新 | -| 7 | spawner role_map 缺具体 reviewer | 建议(#11 衔接) | 记录,#11 范围 | +| # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 | +|---|------|------------|-------------|------| +| 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 | +| 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 | +| 3 | 忙 Agent 死锁 | **必须修** | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 | +| 4 | pending/claimed 说明 | 建议 | ✅ 补充 | | +| 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim) | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 | +| 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | | +| 7 | role_map 具体 reviewer | 建议 | 记录(#11 范围) | |