diff --git a/docs/design/12-pipeline-design.md b/docs/design/12-pipeline-design.md index 7791a45..68be849 100644 --- a/docs/design/12-pipeline-design.md +++ b/docs/design/12-pipeline-design.md @@ -1,9 +1,9 @@ # Pipeline 设计专题 -版本: v1.0 -日期: 2026-06-04 -作者: 庞统(副军师) -状态: **待确认** +> 版本: v2.0 +> 日期: 2026-06-04 +> 作者: 庞统(副军师) +> 状态: **评审中** ## §1 背景 @@ -13,6 +13,7 @@ 1. 支持用户指定任务执行流程(Pipeline) 2. 不指定时走广播认领(自由模式) 3. 广播认领有合理的轮次限制 +4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试 ## §2 核心决策 @@ -20,10 +21,11 @@ - 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离 - 不指定 Pipeline(None/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff) -**D2: Pipeline 是强工作流** +**D2: Pipeline 是强工作流,但支持灵活流转** - 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束 - Agent 必须走 Pipeline 定义的所有步骤,不能跳过 -- 没有"可偏离"的中间地带 +- 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试 +- 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径 **D3: @mention 在 Pipeline 模式下只做通知,不改变执行者** - Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者 @@ -39,64 +41,164 @@ 1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none 2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户 -## §3 Pipeline 定义 +## §3 Pipeline 定义(灵活流转) -每个 Pipeline 包含: -- `task_type`:类型标识 -- `pipeline_type`:执行策略(single_step / multi_step) -- `default_agent`:默认执行者(可被 @mention 覆盖) -- `stages`:步骤列表(有序) -- `status_flow`:允许的状态流转(硬约束) +每个 Pipeline 是一个**有向状态机**,支持顺序、条件分支、退回、循环重试。 -### 初始 Pipeline 列表 - -| task_type | pipeline_type | default_agent | stages | -|-----------|---------------|---------------|--------| -| `coding` | `multi_step` | `zhangfei-dev` | working → review → done | -| `review` | `single_step` | `simayi-challenger` | working → review → done | -| `data` | `single_step` | `zhaoyun-data` | working → done | -| `deploy` | `single_step` | `jiangwei-infra` | working → done | -| `risk_check` | `single_step` | `guanyu-dev` | working → done | -| None/不指定 | 自由模式 | 广播认领 | pending → claimed → working → review → done | - -### 状态流转硬约束 +### 3.1 数据结构 ```python -PIPELINE_STATUS_FLOW = { - "coding": { - "pending": ["claimed"], - "claimed": ["working"], - "working": ["review"], - "review": ["done", "working"], # review 失败回到 working - # 失败/暂停等异常状态不受 Pipeline 约束 - }, - "review": { - "pending": ["claimed"], - "claimed": ["working"], - "working": ["done"], - }, - "data": { - "pending": ["claimed"], - "claimed": ["working"], - "working": ["done"], - }, - # ... -} +@dataclass +class PipelineStage: + id: str # 步骤标识(如 "working", "review") + agent: str # 执行者:固定 agent_id 或 "pipeline_default" + on_success: str # 成功后转到哪个 stage(或 "done"/"failed") + on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回) + max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed) + retry_count: int = 0 # 当前已重试次数(运行时状态) + is_terminal: bool = False # 是否终态 + +@dataclass +class Pipeline: + task_type: str # 类型标识 + default_agent: str # 默认执行者 + entry: str # 入口 stage id + stages: Dict[str, PipelineStage] # stage id → PipelineStage ``` -异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何状态都可以转入异常状态。 +### 3.2 流转规则 + +| 流转类型 | 实现方式 | 示例 | +|---------|---------|------| +| **顺序** | `on_success` 指向下一个 stage | working → review → done | +| **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→done,review 拒绝→working | +| **退回** | `on_failure` 指向前序 stage | review 失败退回 working | +| **循环/重试** | `max_retries` + `retry_count` 控制 | review 最多退回 3 次,超过转 failed | + +异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。 + +### 3.3 Pipeline 示例 + +**coding pipeline(multi_step,含退回重试)**: + +```yaml +task_type: coding +default_agent: zhangfei-dev +entry: working + +stages: + working: + agent: pipeline_default + on_success: review + on_failure: failed + + review: + agent: simayi-challenger + on_success: done + on_failure: working # review 失败退回重做 + max_retries: 3 # 最多退回 3 次,超过转 failed +``` + +流转图: +``` +pending → claimed → working ──→ review ──→ done + ↑ │ + └────────────┘ (on_failure, 最多3次) + │ >3次 + └──→ failed +``` + +**data pipeline(single_step,简单顺序)**: + +```yaml +task_type: data +default_agent: zhaoyun-data +entry: working + +stages: + working: + agent: pipeline_default + on_success: done + on_failure: failed +``` + +**review pipeline(单步,固定 agent)**: + +```yaml +task_type: review +default_agent: simayi-challenger +entry: working + +stages: + working: + agent: simayi-challenger + on_success: done + on_failure: failed +``` + +**deploy pipeline(含验证循环)**: + +```yaml +task_type: deploy +default_agent: jiangwei-infra +entry: working + +stages: + working: + agent: pipeline_default + on_success: verify + on_failure: failed + + verify: + agent: pipeline_default # 部署者自验 + on_success: done + on_failure: working # 验证失败退回重新部署 + max_retries: 2 # 最多重试 2 次 +``` + +**risk_check pipeline(单步)**: + +```yaml +task_type: risk_check +default_agent: guanyu-dev +entry: working + +stages: + working: + agent: pipeline_default + on_success: done + on_failure: failed +``` + +**自由模式(None/不指定)**: + +| 项目 | 值 | +|------|-----| +| default_agent | 广播认领 | +| stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) | + +### 3.4 初始 Pipeline 注册表 + +| task_type | 类型 | default_agent | stages | 退回 | +|-----------|------|---------------|--------|------| +| `coding` | multi_step | `zhangfei-dev` | working → review → done | review→working (max 3) | +| `review` | single_step | `simayi-challenger` | working → done | 无 | +| `data` | single_step | `zhaoyun-data` | working → done | 无 | +| `deploy` | multi_step | `jiangwei-infra` | working → verify → done | verify→working (max 2) | +| `risk_check` | single_step | `guanyu-dev` | working → done | 无 | +| None | 自由模式 | 广播认领 | 基本状态机 | 无 | ## §4 路由逻辑(更新 Router) Router 新增路径 5:按 task_type 匹配 Pipeline ``` -路径1: 本地操作 -路径2: retry → 原执行者 -Mode B: Agent 声明式交接 -路径3: 生命周期流转 +路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS) +路径2: retry → 原执行者 (action_type == "retry") +Mode B: Agent 声明式交接 (next_capability 有值) +路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY) 路径4: 有 assignee → 直接给 -路径5(新增): task_type 有值 → 查 Pipeline → 用 Pipeline 的 default_agent +路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent - @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知) - mode = "deterministic" 模糊场景: delegate 庞统 → 广播 @@ -104,60 +206,391 @@ Mode B: Agent 声明式交接 路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。 +路径 5 的具体逻辑: +```python +# router.py 新增 +def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]: + task_type = task_info.get("task_type") + if not task_type: + return None + + pipeline = self.pipeline_registry.get(task_type) + if not pipeline: + return None + + return RouteDecision( + agent_id=pipeline.default_agent, + mode="deterministic", + confidence=0.85, + reason=f"Pipeline match: {task_type} → {pipeline.default_agent}", + ) +``` + ## §5 广播定义修正 -**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY),才算一轮广播完成。 +**一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。 具体机制: -- 每个 pending 广播任务,维护一个 `notified_agents` 集合 -- 每次 tick:spawn 空闲且未被通知过的 Agent -- Agent 返回(认领/NO_REPLY/observation)→ 加入已反馈集合 +- 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent +- 每次 tick:spawn 空闲且尚未被通知的 Agent +- Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈 +- Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试 - 当所有 Agent 都已反馈且没人认领 → 一轮结束 → retry_count +1 -- retry_count >= 3 → 升级庞统 +- 有人认领 → 广播立即结束(不需要等其他 Agent 反馈) +- 连续 3 轮广播无人认领 → 升级庞统 -注意: -- Agent 忙(被 counter 阻塞)→ 不算已通知,下次 tick 继续尝试 -- Agent 返回 NO_REPLY → 算已反馈 -- Agent 认领 → 广播立即结束(不需要等其他 Agent) - -实现方案: -- 在 events 表记录每个 Agent 的反馈 -- 或在内存中维护 `task_id → {notified: set, responded: set}` 映射 -- ticker 启动时从 events 表恢复状态 +详细设计见 §10.2。 ## §6 API 改动 -1. `POST /api/projects/{pid}/tasks`: - - `task_type` 默认改为 `None`(不再是 "coding") - - 前端传 task_type 时走 Pipeline,不传时走自由模式 +### §6.1 修改现有端点 -2. 新增 `GET /api/pipelines`: - - 返回可用 Pipeline 列表(从配置加载) - - 供前端下拉菜单使用 +`POST /api/projects/{pid}/tasks`: +- `task_type` 默认改为 `None`(不再是 "coding") +- 前端传 task_type 时走 Pipeline,不传时走自由模式 + +### §6.2 新增端点 + +`GET /api/pipelines`: +- 返回可用 Pipeline 列表(从注册表加载) +- 供前端下拉菜单使用 +- 响应格式: +```json +{ + "pipelines": [ + {"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"}, + {"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"}, + {"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"}, + {"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"}, + {"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"} + ] +} +``` ## §7 前端改动 1. TaskModal 创建任务时: - 新增 Pipeline 下拉选择(默认"自由模式") - 选项从 `/api/pipelines` 动态加载 + - 选中 Pipeline 后显示简要流程说明 2. Chat 入口: - 庞统在需求探索时,判断是否需要指定 Pipeline - 拿不准时通过 checkpoint 询问用户 -## §8 实施路线 +## §8 Pipeline 引擎 -| Phase | 内容 | 优先级 | -|-------|------|--------| -| Phase 1 | task_type 默认值改 None + 广播计数器修正(bug fix) | P0 | -| Phase 2 | Router 新增路径 5(task_type → Pipeline default_agent) | P0 | -| Phase 3 | Pipeline 硬约束状态机(status_flow 校验) | P1 | -| Phase 4 | 前端 Pipeline 下拉 + `/api/pipelines` 端点 | P1 | -| Phase 5 | Pipeline YAML 声明式配置(动态加载) | P2 | +### §8.1 注册表 -## §9 待确认 +```python +class PipelineRegistry: + """Pipeline 注册表""" + + def __init__(self): + self._pipelines: Dict[str, Pipeline] = {} + self._load_defaults() + + def _load_defaults(self): + """加载内置 Pipeline""" + for cfg in DEFAULT_PIPELINES: + p = self._parse_pipeline(cfg) + self._pipelines[p.task_type] = p + + def get(self, task_type: str) -> Optional[Pipeline]: + return self._pipelines.get(task_type) + + def list_all(self) -> List[Pipeline]: + return list(self._pipelines.values()) +``` -1. Pipeline 列表是否完整?是否需要 `research`/`discussion` 类型? -2. coding 的 multi_step 具体步骤:working → review → done 是否足够?是否需要 plan → implement → verify 更细的粒度? -3. 广播的 `notified_agents` 状态存在内存还是 DB?ticker 重启后如何恢复? -4. Phase 1 和 Phase 2 是否合并实施? +### §8.2 状态流转校验 + +```python +class PipelineEngine: + """Pipeline 执行引擎""" + + def __init__(self, registry: PipelineRegistry, bb: Blackboard): + self.registry = registry + self.bb = bb + + def get_next_stage(self, task: Task, outcome: str) -> Optional[str]: + """根据当前 stage 和执行结果,计算下一个 stage""" + pipeline = self.registry.get(task.task_type) + if not pipeline: + return None # 自由模式,不干预 + + current_stage = pipeline.stages.get(task.status) + if not current_stage: + return None + + if outcome == "success": + return current_stage.on_success + elif outcome == "failure": + # 检查 max_retries + if current_stage.max_retries > 0: + current_stage.retry_count += 1 + if current_stage.retry_count > current_stage.max_retries: + return "failed" + return current_stage.on_failure + else: + return "failed" + + def get_agent_for_stage(self, task: Task) -> Optional[str]: + """获取当前 stage 应该执行的 Agent""" + pipeline = self.registry.get(task.task_type) + if not pipeline: + return None + + current_stage = pipeline.stages.get(task.status) + if not current_stage: + return None + + if current_stage.agent == "pipeline_default": + return pipeline.default_agent + return current_stage.agent + + def validate_transition(self, task: Task, new_status: str) -> bool: + """校验状态流转是否合法(硬约束)""" + pipeline = self.registry.get(task.task_type) + if not pipeline: + return True # 自由模式不限制 + + # 异常状态始终允许 + if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"): + return True + + current_stage = pipeline.stages.get(task.status) + if not current_stage: + return True + + allowed = {current_stage.on_success, current_stage.on_failure} + return new_status in allowed +``` + +### §8.3 Ticker 集成 + +Ticker 在任务状态流转时调用 PipelineEngine: +1. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure +2. `PipelineEngine.get_next_stage()` 计算下一个 stage +3. 如果下一个 stage 不是终态 → 更新任务状态 + `PipelineEngine.get_agent_for_stage()` 获取下一个 Agent → spawn +4. 如果是终态 → 标记 done/failed + +## §9 实施路线 + +### Phase 1:Bug fix(独立实施) + +| Step | 内容 | 详细设计 | +|------|------|---------| +| 1.1 | task_type 默认值改 None | §10.1 | +| 1.2 | 广播计数器修正 | §10.2 | + +### Phase 2:Pipeline 全量实施(Phase 1 完成后一口气做) + +| Step | 内容 | +|------|------| +| 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2) | +| 2.2 | Router 路径 5(§4) | +| 2.3 | Ticker 集成 PipelineEngine(§8.3) | +| 2.4 | 广播 tracker 反馈回调注入 | +| 2.5 | API 端点(§6) | +| 2.6 | 前端 Pipeline 下拉(§7) | +| 2.7 | E2E 测试覆盖 | + +## §10 Phase 1 详细设计 + +### §10.1 task_type 默认值改 None + +**文件**:`src/api/blackboard_routes.py` + +**改动**:1 行 + +```python +# 当前(line 133): +task_type=body.get("task_type", "coding"), + +# 改为: +task_type=body.get("task_type", None), +``` + +**影响分析**: + +| 场景 | 改前 | 改后 | +|------|------|------| +| 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 | +| 前端传 task_type="coding" | coding | coding(不变) | +| E2E 测试不传 task_type | coding | None | +| 现有 DB 数据 | 不受影响(已写入) | 不受影响 | + +**风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。 + +**测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。 + +### §10.2 广播计数器修正 + +**文件**:`src/daemon/ticker.py` + +#### 当前问题 + +- `_broadcast_claim` 在 line ~1020 执行广播 +- line ~1046 检查 `retry_count >= 3` 但从不递增 retry_count +- retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增(line ~1317) +- 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环 + +#### 修复方案 + +**新增数据结构**(ticker.py 顶部): + +```python +from dataclasses import dataclass, field + +@dataclass +class BroadcastRound: + """追踪单个任务的广播状态""" + task_id: str + notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent + responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent + round_number: int = 1 # 当前第几轮 +``` + +**Ticker 类新增属性**: + +```python +class Ticker: + def __init__(self, ...): + ... + self._broadcast_tracker: Dict[str, BroadcastRound] = {} + self._all_agent_ids: Set[str] = set() # 从 config 加载 +``` + +**`_broadcast_claim` 改造**(替换 line ~1020 区域): + +```python +def _broadcast_claim(self, db_path: Path, broadcast_tasks: list): + """广播认领:按轮次追踪,所有 Agent 反馈才算一轮""" + + for task_info in broadcast_tasks: + task_id = task_info["id"] + + # 获取或创建 tracker + tracker = self._broadcast_tracker.get(task_id) + if not tracker: + tracker = BroadcastRound(task_id=task_id) + self._broadcast_tracker[task_id] = tracker + + # 检查是否已认领(可能上一个 tick 被认领了) + bb = Blackboard(db_path) + task = bb.get_task(task_id) + if task.status != "pending": + # 已被认领或状态已变,清理 tracker + self._broadcast_tracker.pop(task_id, None) + continue + + # 获取空闲 Agent + idle_agents = self._get_idle_agents() + + # 过滤已通知过的,只 spawn 尚未通知的 + pending_agents = [a for a in idle_agents if a not in tracker.notified_agents] + + if pending_agents: + # spawn 尚未通知的 Agent + for agent_id in pending_agents: + tracker.notified_agents.add(agent_id) + # ... 现有 spawn 广播逻辑 ... + else: + # 本轮所有空闲 Agent 都已通知过 + # 检查是否所有已知 Agent 都通知了 + unnotified = self._all_agent_ids - tracker.notified_agents + + if not unnotified: + # 所有 Agent 都通知过了 → 检查是否全部反馈 + if tracker.responded_agents >= tracker.notified_agents: + # 全部反馈且没人认领 → 一轮结束 + if tracker.round_number >= 3: + # 3 轮广播无人认领,升级庞统 + self._escalate_task(db_path, task_id) + del self._broadcast_tracker[task_id] + logger.warning( + "Broadcast 3 rounds exhausted, escalating: %s", task_id + ) + else: + # 开始新一轮:重置通知/反馈,轮次+1 + tracker.round_number += 1 + tracker.notified_agents.clear() + tracker.responded_agents.clear() + logger.info( + "Broadcast round %d starting for: %s", + tracker.round_number, task_id, + ) + # else: 还有 Agent 没通知到(可能在忙),等下一个 tick +``` + +**Agent 反馈回调**(在 spawner.py 的 `on_agent_complete` 中注入): + +```python +# spawner.py 中 classify_outcome 之后 +def _on_broadcast_response(self, task_id: str, agent_id: str, outcome: str): + """Agent 对广播任务的反馈""" + tracker = self._ticker._broadcast_tracker.get(task_id) + if not tracker: + return + + if outcome == "claimed": + # Agent 认领了,清理 tracker + self._ticker._broadcast_tracker.pop(task_id, None) + else: + # NO_REPLY / observation / 其他 → 算已反馈 + tracker.responded_agents.add(agent_id) +``` + +注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。 + +**Ticker 重启恢复**: + +简单方案:重启时清空 `_broadcast_tracker`,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。 + +```python +# ticker.py 启动时 +self._broadcast_tracker.clear() +logger.info("Broadcast tracker cleared (ticker restart)") +``` + +**测试**: + +```python +# tests/test_broadcast_tracker.py +def test_round_completes_when_all_respond(): + """所有 Agent 反馈后,一轮结束""" + ... + +def test_round_resets_for_next_round(): + """一轮结束后,开始新一轮""" + ... + +def test_escalate_after_3_rounds(): + """3 轮无人认领,升级庞统""" + ... + +def test_claim_clears_tracker(): + """Agent 认领后,tracker 清理""" + ... + +def test_busy_agent_not_counted(): + """忙的 Agent 不算已通知,下 tick 继续尝试""" + ... +``` + +#### 改动文件清单 + +| 文件 | 改动 | 行数估计 | +|------|------|---------| +| `src/daemon/ticker.py` | BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 | +| `src/daemon/spawner.py` | _on_broadcast_response 回调注入 | ~15 行 | +| `tests/test_broadcast_tracker.py` | 新测试文件 | ~80 行 | + +## §11 待确认 + +1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)? +2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗? +3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done? +4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码? +5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调?