# Pipeline 设计专题 > 版本: v2.1 > 日期: 2026-06-04 > 作者: 庞统(副军师) > 状态: **评审中** ## §1 背景 当前 moziplus v2 的任务调度只有两条路径:确定性路由(Router 四条快速路径)和广播认领(spawn 所有空闲 Agent)。Router 不看 task_type,导致 coding 任务可能走入广播路径且无人认领时无限循环。 设计目标: 1. 支持用户指定任务执行流程(Pipeline) 2. 不指定时走广播认领(自由模式) 3. 广播认领有合理的轮次限制 4. Pipeline 支持灵活流转:顺序、分支、退回、循环重试 ## §2 核心决策 **D1: Pipeline 模式 vs 自由模式二选一** - 指定了 Pipeline → 硬约束模式,Agent 必须按 Pipeline 定义的流程执行,不可偏离 - 不指定 Pipeline(None/general)→ 自由模式,广播认领,Agent 自主决定怎么做,只有基本完成标准(产出物 + handoff) **D2: Pipeline 是强工作流,但支持灵活流转** - 指定了 Pipeline 的任务,状态机是硬约束,执行步骤也是硬约束 - Agent 必须走 Pipeline 定义的所有步骤,不能跳过 - 但 Pipeline 本身可以定义灵活的流转规则:条件分支、退回重做、循环重试 - 没有"可偏离"的中间地带——偏离必须是 Pipeline 预定义的路径 **D3: @mention 在 Pipeline 模式下只做通知,不改变执行者** - Pipeline 模式(强工作流):执行者由 Pipeline 决定(coding → zhangfei-dev),@mention 只是在黑板 comment 通知该 Agent,不改变执行者 - 自由模式:@mention 决定执行者(和现有逻辑一致) - 理由:强工作流意味着流程和角色都是确定的,让数据专家走 coding 流程不合理 **D4: task_type 默认值改为 None** - API 层 `task_type=body.get("task_type", None)` 而非默认 "coding" - 不指定 = 自由模式,走广播认领 - 指定了 = Pipeline 模式 **D5: 两个入口** 1. **前端入口**:创建任务时下拉菜单选择 Pipeline,默认 none 2. **Chat 入口**:用户通过自然语言提需求,庞统判断是否需要指定 Pipeline。拿不准时询问用户 ## §3 Pipeline 定义(灵活流转) 每个 Pipeline 是一个**有向状态机**,支持顺序、条件分支、退回、循环重试。 ### 3.1 数据结构 ```python @dataclass class PipelineStage: id: str # 步骤标识(如 "working", "review") agent: str # 执行者:固定 agent_id 或 "pipeline_default" on_success: str # 成功后转到哪个 stage(或 "done"/"failed") on_failure: str # 失败后转到哪个 stage(可指向前序 stage = 退回) max_retries: int = 0 # 最大退回重试次数(0=不限,超限转 failed) is_terminal: bool = False # 是否终态 # 注意:retry_count 不在此处——PipelineStage 是注册表模板,所有同类型任务共享 @dataclass class TaskPipelineState: """每个任务的 Pipeline 运行时状态(存在 DB 或内存)""" task_id: str pipeline_type: str current_stage: str stage_retry_counts: Dict[str, int] = field(default_factory=dict) # stage_id → 已重试次数 @dataclass class Pipeline: task_type: str # 类型标识 default_agent: str # 默认执行者 entry: str # 入口 stage id stages: Dict[str, PipelineStage] # stage id → PipelineStage ``` ### 3.2 流转规则 | 流转类型 | 实现方式 | 示例 | |---------|---------|------| | **顺序** | `on_success` 指向下一个 stage | working → review → done | | **条件分支** | `on_success` / `on_failure` 指向不同 stage | review 通过→done,review 拒绝→working | | **退回** | `on_failure` 指向前序 stage | review 失败退回 working | | **循环/重试** | `max_retries` + `TaskPipelineState.stage_retry_counts` 控制 | review 最多退回 3 次,超过转 failed | 异常状态(failed/paused/escalated/blocked/cancelled)不受 Pipeline 约束,任何 stage 都可以转入异常状态。 **认领阶段不受 Pipeline 约束**:Pipeline 约束的是工作阶段(working/review/verify 等)。认领阶段(pending → claimed → entry stage)不受 Pipeline stages 约束,由 Router 和 Ticker 的标准流程处理。 ### 3.3 Pipeline 示例 **coding pipeline(multi_step,含退回重试)**: ```yaml task_type: coding default_agent: zhangfei-dev entry: working stages: working: agent: pipeline_default on_success: review on_failure: failed review: agent: simayi-challenger on_success: done on_failure: working # review 失败退回重做 max_retries: 3 # 最多退回 3 次,超过转 failed ``` 流转图: ``` pending → claimed → working ──→ review ──→ done ↑ │ └────────────┘ (on_failure, 最多3次) │ >3次 └──→ failed ``` **data pipeline(single_step,简单顺序)**: ```yaml task_type: data default_agent: zhaoyun-data entry: working stages: working: agent: pipeline_default on_success: done on_failure: failed ``` **review pipeline(单步,固定 agent)**: ```yaml task_type: review default_agent: simayi-challenger entry: working stages: working: agent: simayi-challenger on_success: done on_failure: failed ``` **deploy pipeline(含验证循环)**: ```yaml task_type: deploy default_agent: jiangwei-infra entry: working stages: working: agent: pipeline_default on_success: verify on_failure: failed verify: agent: pipeline_default # 部署者自验 on_success: done on_failure: working # 验证失败退回重新部署 max_retries: 2 # 最多重试 2 次 ``` **risk_check pipeline(单步)**: ```yaml task_type: risk_check default_agent: guanyu-dev entry: working stages: working: agent: pipeline_default on_success: done on_failure: failed ``` **自由模式(None/不指定)**: | 项目 | 值 | |------|-----| | default_agent | 广播认领 | | stages | pending → claimed → working → review → done(基本状态机,无 Pipeline 约束) | ### 3.4 初始 Pipeline 注册表 | task_type | 类型 | default_agent | stages | 退回 | |-----------|------|---------------|--------|------| | `coding` | multi_step | `zhangfei-dev` | working → review → done | review→working (max 3) | | `review` | single_step | `simayi-challenger` | working → done | 无 | | `data` | single_step | `zhaoyun-data` | working → done | 无 | | `deploy` | multi_step | `jiangwei-infra` | working → verify → done | verify→working (max 2) | | `risk_check` | single_step | `guanyu-dev` | working → done | 无 | | None | 自由模式 | 广播认领 | 基本状态机 | 无 | ## §4 路由逻辑(更新 Router) Router 新增路径 5:按 task_type 匹配 Pipeline ``` 路径1: 本地操作 (action_type ∈ LOCAL_ACTIONS) 路径2: retry → 原执行者 (action_type == "retry") Mode B: Agent 声明式交接 (next_capability 有值) 路径3: 生命周期流转 (status ∈ LIFECYCLE_CAPABILITY) 路径4: 有 assignee → 直接给 路径5(新增): task_type 有值 → 查 Pipeline 注册表 → 用 Pipeline 的 default_agent - @mention 不覆盖执行者(Pipeline 模式下 @mention 只做通知) - mode = "deterministic" 模糊场景: delegate 庞统 → 广播 ``` 路径 5 的优先级低于路径 4(assignee)和 Mode B(声明式交接),高于模糊场景。 路径 5 的具体逻辑: ```python # router.py 新增 def _route_by_pipeline(self, task_info: dict) -> Optional[RouteDecision]: task_type = task_info.get("task_type") if not task_type: return None pipeline = self.pipeline_registry.get(task_type) if not pipeline: return None return RouteDecision( agent_id=pipeline.default_agent, mode="deterministic", confidence=0.85, reason=f"Pipeline match: {task_type} → {pipeline.default_agent}", ) ``` ## §5 广播定义修正 **一轮广播的定义**:所有 Agent 都收到了任务并给出了反馈(认领/NO_REPLY/observation),才算一轮广播完成。 具体机制: - 每个 pending 广播任务,维护一个 `notified_agents` 追踪已通知和已反馈的 Agent - 每次 tick:spawn 空闲且尚未被通知的 Agent - Agent 返回(认领/NO_REPLY/observation)→ 记录为已反馈 - Agent 忙(counter 阻塞)→ 不算已通知,下次 tick 继续尝试 - 当所有 Agent 都已通知并反馈且没人认领 → 一轮结束 → round_number +1 - 有人认领 → 广播立即结束(不需要等其他 Agent 反馈) - 连续 3 轮无人认领 → 升级庞统 注意:Agent 何时被 spawn 由 Spawner 保证(含超时上限),Ticker 不需要额外超时机制。 详细设计见 §10.2。 ## §6 API 改动 ### §6.1 修改现有端点 `POST /api/projects/{pid}/tasks`: - `task_type` 默认改为 `None`(不再是 "coding") - 前端传 task_type 时走 Pipeline,不传时走自由模式 ### §6.2 新增端点 `GET /api/pipelines`: - 返回可用 Pipeline 列表(从注册表加载) - 供前端下拉菜单使用 - 响应格式: ```json { "pipelines": [ {"task_type": "coding", "label": "编码开发", "description": "多步:开发→审查→完成"}, {"task_type": "review", "label": "代码审查", "description": "单步:审查→完成"}, {"task_type": "data", "label": "数据获取", "description": "单步:执行→完成"}, {"task_type": "deploy", "label": "部署", "description": "多步:部署→验证→完成"}, {"task_type": "risk_check", "label": "风控检查", "description": "单步:检查→完成"} ] } ``` ## §7 前端改动 1. TaskModal 创建任务时: - 新增 Pipeline 下拉选择(默认"自由模式") - 选项从 `/api/pipelines` 动态加载 - 选中 Pipeline 后显示简要流程说明 2. Chat 入口: - 庞统在需求探索时,判断是否需要指定 Pipeline - 拿不准时通过 checkpoint 询问用户 ## §8 Pipeline 引擎 ### §8.1 注册表 ```python class PipelineRegistry: """Pipeline 注册表""" def __init__(self): self._pipelines: Dict[str, Pipeline] = {} self._load_defaults() def _load_defaults(self): """加载内置 Pipeline""" for cfg in DEFAULT_PIPELINES: p = self._parse_pipeline(cfg) self._pipelines[p.task_type] = p def get(self, task_type: str) -> Optional[Pipeline]: return self._pipelines.get(task_type) def list_all(self) -> List[Pipeline]: return list(self._pipelines.values()) ``` ### §8.2 状态流转校验 ```python class PipelineEngine: """Pipeline 执行引擎""" def __init__(self, registry: PipelineRegistry, bb: Blackboard): self.registry = registry self.bb = bb def get_or_create_state(self, task: Task) -> Optional[TaskPipelineState]: """获取或创建任务的 Pipeline 运行时状态""" pipeline_type = task.task_type if not pipeline_type: return None # 自由模式 pipeline = self.registry.get(pipeline_type) if not pipeline: return None # 从 DB 或内存加载;首次则初始化 state = self.bb.get_pipeline_state(task.id) if not state: state = TaskPipelineState( task_id=task.id, pipeline_type=pipeline_type, current_stage=pipeline.entry, ) self.bb.save_pipeline_state(state) return state def peek_next_stage(self, task_state: TaskPipelineState, outcome: str) -> Optional[str]: """纯查询:返回下一个 stage id,不修改 task_state""" pipeline = self.registry.get(task_state.pipeline_type) if not pipeline: return None current_stage = pipeline.stages.get(task_state.current_stage) if not current_stage: return None if outcome == "success": return current_stage.on_success elif outcome == "failure": retries = task_state.stage_retry_counts.get(current_stage.id, 0) if current_stage.max_retries > 0 and retries + 1 > current_stage.max_retries: return "failed" return current_stage.on_failure else: return "failed" def advance_stage(self, task: Task, task_state: TaskPipelineState, outcome: str) -> str: """推进 Pipeline stage(有副作用:修改 task_state,同步 assignee)""" pipeline = self.registry.get(task_state.pipeline_type) if not pipeline: return task.status current_stage = pipeline.stages.get(task_state.current_stage) if not current_stage: return task.status # 使用 peek 计算下一个 stage next_stage_id = self.peek_next_stage(task_state, outcome) if not next_stage_id: return task.status # 更新重试计数(仅在 failure 时) if outcome == "failure" and current_stage.max_retries > 0: retries = task_state.stage_retry_counts.get(current_stage.id, 0) + 1 task_state.stage_retry_counts[current_stage.id] = retries # 更新运行时状态 task_state.current_stage = next_stage_id # 同步更新 assignee(仅 Pipeline 模式,关键!否则 Router 路径 4 会劫持) next_stage = pipeline.stages.get(next_stage_id) if next_stage: new_agent = pipeline.default_agent if next_stage.agent == "pipeline_default" else next_stage.agent # bb.update_assignee(task.id, new_agent) return next_stage_id def get_agent_for_stage(self, task: Task) -> Optional[str]: """获取当前 stage 应该执行的 Agent""" pipeline = self.registry.get(task.task_type) if not pipeline: return None current_stage = pipeline.stages.get(task.status) if not current_stage: return None if current_stage.agent == "pipeline_default": return pipeline.default_agent return current_stage.agent def validate_transition(self, task: Task, new_status: str) -> bool: """校验状态流转是否合法(硬约束)""" pipeline = self.registry.get(task.task_type) if not pipeline: return True # 自由模式不限制 # 异常状态始终允许 if new_status in ("failed", "paused", "escalated", "blocked", "cancelled"): return True current_stage = pipeline.stages.get(task.status) if not current_stage: return True allowed = {current_stage.on_success, current_stage.on_failure} return new_status in allowed ``` ### §8.3 Ticker 集成 **TaskPipelineState 存储方案**:建议在 tasks 表新增 `pipeline_state_json` 列(TEXT),序列化存储 `TaskPipelineState`。Blackboard 的 `get_pipeline_state()` / `save_pipeline_state()` 负责序列化/反序列化。独立表方案亦可,但鉴于 state 结构简单且与任务 1:1,放 tasks 列更方便。 ### Entry 触发机制 Pipeline 任务的入口阶段不受 Pipeline stages 约束,由 Router 和 Ticker 标准流程处理: ``` 1. 任务创建:status=pending, task_type=coding 2. Ticker 扫到 pending + task_type 有值 3. Router 路径 5 匹配 → pipeline.default_agent(如 zhangfei-dev) 4. Dispatcher 返回 mode=deterministic, agent_id=zhangfei-dev 5. assignee 设为 pipeline.default_agent 6. Ticker spawn zhangfei-dev → Agent claim → status 变为 working 7. current_agent = zhangfei-dev 8. TaskPipelineState 创建:current_stage=working(entry stage) 代码路径统一:所有任务(Pipeline / 自由模式)都走 pending → claim → working。 忙时处理:assignee 有值但 Agent 忙 → 等下一个 tick 重试(和路径 4 一样的行为)。 ``` ### 状态推进流程 Ticker 在任务状态流转时调用 PipelineEngine: 1. `PipelineEngine.get_or_create_state(task)` 获取运行时状态 2. 任务完成(spawn 返回)→ `classify_outcome()` 判定 success/failure 3. `PipelineEngine.advance_stage(task, task_state, outcome)` 推进到下一个 stage 4. 持久化更新 `TaskPipelineState`(`bb.save_pipeline_state(task_state)`) 5. 如果下一个 stage 不是终态 → 更新任务状态 + assignee 已由 advance_stage 同步 → spawn 6. 如果是终态 → 标记 done/failed ## §9 实施路线 ### Phase 1:Bug fix(独立实施) | Step | 内容 | 详细设计 | |------|------|---------| | 1.1 | task_type 默认值改 None | §10.1 | | 1.2 | 广播计数器修正 | §10.2 | ### Phase 2:Pipeline 全量实施(Phase 1 完成后一口气做) | Step | 内容 | |------|------| | 2.1 | Pipeline 注册表 + PipelineEngine(§8.1 + §8.2) | | 2.2 | Router 路径 5(§4) | | 2.3 | Ticker 集成 PipelineEngine(§8.3) | | 2.4 | 广播 tracker 反馈回调注入 | | 2.5 | API 端点(§6) | | 2.6 | 前端 Pipeline 下拉(§7) | | 2.7 | E2E 测试覆盖 | ## §10 Phase 1 详细设计 ### §10.1 task_type 默认值改 None **文件**:`src/api/blackboard_routes.py` **改动**:1 行 ```python # 当前(line 133): task_type=body.get("task_type", "coding"), # 改为: task_type=body.get("task_type", None), ``` **影响分析**: | 场景 | 改前 | 改后 | |------|------|------| | 前端不传 task_type | task_type="coding" → Router 不看 → 走广播 | task_type=None → Router 不看 → 走广播 | | 前端传 task_type="coding" | coding | coding(不变) | | E2E 测试不传 task_type | coding | None | | 现有 DB 数据 | 不受影响(已写入) | 不受影响 | **风险**:低。改前改后对于当前 Router 行为一致(Router 不看 task_type,都是走广播)。区别在于 Phase 2 实施后,task_type=None 的任务会继续走广播,而显式传了 task_type 的会走 Pipeline。 **E2E 测试影响**:待验证 `grep` 现有测试中 `task_type` 相关的硬断言,确认改为 `None` 不会破坏测试。如果有断言 `task_type == "coding"` 的测试,需改为 `task_type == None` 或删除断言。 **测试**:现有 E2E 测试中不传 task_type 的用例行为不变(都走广播),无需改测试。 ### §10.2 广播计数器修正 **文件**:`src/daemon/ticker.py` #### 当前问题 - `_broadcast_claim` 在 line ~1020 执行广播 - line ~1046 检查 `retry_count >= 3` 但从不递增 retry_count - retry_count 只在 `_check_timeouts` 的 claimed 超时路径递增(line ~1317) - 结果:没人认领的广播任务每 tick 广播一次,retry_count 永远为 0,无限循环 #### 修复方案 **新增数据结构**(ticker.py 顶部): ```python from dataclasses import dataclass, field @dataclass class BroadcastRound: """追踪单个任务的广播状态""" task_id: str notified_agents: set = field(default_factory=set) # 已 spawn 过的 Agent responded_agents: set = field(default_factory=set) # 已返回反馈的 Agent round_number: int = 1 # 当前第几轮 ``` **Ticker 类新增属性**: ```python class Ticker: def __init__(self, ...): ... self._broadcast_tracker: Dict[str, BroadcastRound] = {} self._all_agent_ids: Set[str] = set() # 从 config 加载 ``` **`_broadcast_claim` 改造**(替换 line ~1020 区域): ```python def _broadcast_claim(self, db_path: Path, broadcast_tasks: list): """广播认领:按轮次追踪,所有 Agent 反馈才算一轮""" for task_info in broadcast_tasks: task_id = task_info["id"] # 获取或创建 tracker tracker = self._broadcast_tracker.get(task_id) if not tracker: tracker = BroadcastRound(task_id=task_id) self._broadcast_tracker[task_id] = tracker # 检查是否已认领(可能上一个 tick 被认领了) bb = Blackboard(db_path) task = bb.get_task(task_id) if task.status != "pending": # 已被认领或状态已变,清理 tracker self._broadcast_tracker.pop(task_id, None) continue # 获取空闲 Agent idle_agents = self._get_idle_agents() # 过滤已通知过的,只 spawn 尚未通知的 pending_agents = [a for a in idle_agents if a not in tracker.notified_agents] if pending_agents: # spawn 尚未通知的 Agent for agent_id in pending_agents: tracker.notified_agents.add(agent_id) # ... 现有 spawn 广播逻辑 ... else: # 所有空闲 Agent 都已通知过 # 检查是否所有已知 Agent 都通知了 unnotified = self._all_agent_ids - tracker.notified_agents if not unnotified: # 所有 Agent 都通知过了 → 检查是否全部反馈 if tracker.responded_agents >= tracker.notified_agents: # 全部已通知的 Agent 都反馈了且没人认领 → 一轮结束 if tracker.round_number >= 3: # 3 轮广播无人认领,升级庞统 self._escalate_task(db_path, task_id) del self._broadcast_tracker[task_id] logger.warning( "Broadcast 3 rounds exhausted, escalating: %s", task_id ) else: # 开始新一轮:重置通知/反馈,轮次+1 tracker.round_number += 1 tracker.notified_agents.clear() tracker.responded_agents.clear() logger.info( "Broadcast round %d starting for: %s", tracker.round_number, task_id, ) # else: 还有 Agent 没通知到(可能在忙),等下一个 tick ``` **Agent 反馈回调**(通过 Ticker 公共 API,在 spawner.py 的 `on_agent_complete` 中注入): ```python # ticker.py def record_broadcast_response(self, task_id: str, agent_id: str, outcome: str): """记录 Agent 对广播任务的反馈(公共 API)""" tracker = self._broadcast_tracker.get(task_id) if not tracker: return if outcome == "claimed": self._broadcast_tracker.pop(task_id, None) else: tracker.responded_agents.add(agent_id) # spawner.py 中 classify_outcome 之后 # 调用方式:self._ticker.record_broadcast_response(task_id, agent_id, outcome) ``` 注入位置:spawner.py 的 `classify_outcome()` 返回后、`update_status()` 调用前。 **Ticker 重启恢复**: 简单方案:重启时清空 `_broadcast_tracker`,所有广播中的任务从第一轮重新开始。最坏情况多广播一轮,可接受。 ```python # ticker.py 启动时 self._broadcast_tracker.clear() logger.info("Broadcast tracker cleared (ticker restart)") ``` **测试**: ```python # tests/test_broadcast_tracker.py def test_round_completes_when_all_respond(): """所有 Agent 反馈后,一轮结束""" ... def test_round_resets_for_next_round(): """一轮结束后,开始新一轮""" ... def test_escalate_after_3_rounds(): """3 轮无人认领,升级庞统""" ... def test_claim_clears_tracker(): """Agent 认领后,tracker 清理""" ... def test_busy_agent_not_counted(): """忙的 Agent 不算已通知,下 tick 继续尝试""" ... ``` #### 改动文件清单 | 文件 | 改动 | 行数估计 | |------|------|---------| | `src/daemon/ticker.py` | BroadcastRound 数据类 + tracker 属性 + _broadcast_claim 改造 + _escalate_task | ~60 行新增/修改 | | `src/daemon/spawner.py` | _on_broadcast_response 回调注入 | ~15 行 | | `tests/test_broadcast_tracker.py` | 新测试文件 | ~80 行 | ## §11 待确认 1. Pipeline 注册表存储方式:内存 dict(当前方案)vs DB table(持久化)vs YAML file(声明式配置)? 2. 灵活 Pipeline 的 stage 定义格式:用 YAML 声明式还是 Python dict 硬编码?前端需要渲染流程图吗? 3. coding pipeline 的 stages 是否需要更细粒度?如 plan → implement → review → done? 4. Phase 2 实施时,是否一次性把 YAML 声明式配置也做了,还是先硬编码? 5. 广播 tracker 的反馈回调:在 spawner 的 `classify_outcome` 之后注入,还是用独立的事件回调? ## §A 评审记录 ### 仲达评审(2026-06-04)+ Rebuttal **原始结论**:1 个必须修 + 6 个建议。 **Rebuttal 后结论**:3 项接受,2 项不接受,2 项待补充。双方达成一致。 | # | 问题 | 仲达原始判定 | Rebuttal 结论 | 理由 | |---|------|------------|-------------|------| | 1 | get_next_stage 副作用 | 建议 | ✅ 接受(拆 peek + advance) | 拆方法比单纯改名更清晰 | | 2 | retry_counts 清理 | 建议 | ❌ 不改 | 清理丢历史,仲达自己说不影响正确性 | | 3 | 忙 Agent 死锁 | **必须修** | ❌ 不改 | 广播轮次定义(所有人收到并反馈 = 一轮)和需求一致。Agent 忙由 Spawner 保证(含超时上限),不是 Ticker 职责 | | 4 | pending/claimed 说明 | 建议 | ✅ 补充 | | | 5 | entry 触发机制 | 建议 | ✅ 补充(走 claim) | Pipeline 任务走标准 claim 流程,代码路径统一,忙时自然排队 | | 6 | assignee 同步更新 | 建议 | ✅ 接受(仅 Pipeline 模式) | | | 7 | role_map 具体 reviewer | 建议 | 记录(#11 范围) | | ## §B 后记:强工作流与 AI native 的关系(2026-06-04) ### 设计意图记录 Pipeline 强工作流模式**本质上不是 AI native**: - **自由模式(AI native)**:Agent 自主判断、自主认领、自主决定执行方式。系统只提供任务和约束,Agent 是决策主体 - **Pipeline 强工作流**:系统控制流程、指定执行者、规定步骤顺序。Agent 是执行主体,不是决策主体 这两个模式的差异不仅是"要不要 claim"的问题,而是影响到: - 引擎提示词(强工作流下提示词要严格约束 Agent 行为边界) - Agent 的自主性(强工作流下 Agent 不能自主决定跳过/偏离) - 错误处理(强工作流下 Agent 失败是流程回退,不是自主重试) ### 实施策略 **不着急实现 Pipeline 强工作流**。先把设计方案考虑清楚,特别是: 1. 强工作流下提示词模板如何约束 Agent 不产生不可预料分支 2. 自由模式和强工作流模式如何共存(同一系统、不同任务) 3. 两种模式对 Spawner/Ticker/Router 的差异化需求 当前优先级: - **Phase 1(bug fix)**:task_type 默认值改 None + 广播计数器修正 - **Pipeline 强工作流**:设计继续深化,等方案成熟后再实施