# v2.8 Pipeline 架构设计 **版本**: v2.0 **日期**: 2026-05-27 **作者**: 庞统 **状态**: 调研完成 → 设计提案 v2.0,待用户确认 > v1.0→v2.0 变更:补充执行模式调研(parallel/loop/saga/interactive/event-driven),补充场景×执行模式映射表,明确业务类型 vs 执行模式的分层关系。 --- ## 1. 设计目标 1. **消灭 46 处 if/_mail 分支**(ticker 14 + dispatcher 23 + spawner 9) 2. **不同 task_type 走不同执行路径**(代码评审不需要 planning,数据下载不需要路由) 3. **新增 task_type 只加配置不改核心代码**(Plugin/Registry) 4. **执行模式是有限的几种,业务类型无限扩展**(用户选业务类型,系统映射到执行模式) --- ## 2. 两层抽象:业务类型 vs 执行模式 ### 2.1 概念 | 层 | 定义 | 谁选 | 数量 | 例子 | |----|------|------|------|------| | **业务类型**(task_type) | 用户视角的"做什么" | 用户创建任务时选 | 无限扩展 | coding, review, data, deploy, research, discuss, data_collect, risk_patrol, ... | | **执行模式**(execution_pattern) | 系统视角的"怎么跑" | YAML Profile 声明 | 有限(~7种) | single_step, multi_step, parallel, loop, saga, interactive, event_trigger | ### 2.2 映射关系 ```yaml # config/pipelines/review.yaml task_type: review # 业务类型(用户视角) execution_pattern: single_step # 执行模式(系统视角) ``` **业务类型无限扩展,执行模式有限**。新增业务类型 = 新增一个 YAML。新增执行模式 = 新增代码。 --- ## 3. 执行模式调研 > 每种模式的结论来自外部来源,不自己发明。 ### 3.1 single_step(单步执行) **描述**:一个 agent 一次 spawn 就完成。 **适用场景**:代码评审、数据下载、Mail 投递、异常监控告警。 **优秀实践**: - Temporal 的 Activity 概念——原子操作,独立执行,失败可重试(来源:Temporal docs) - Hermes Kanban 的 `todo → ready → claimed → running → done` 生命周期——单步但状态完整(来源:知识库 hermes-agent) - OpenClaw Lobster 单步 command——`exec --shell 'cmd --json'`(来源:`/opt/homebrew/lib/node_modules/openclaw/docs/tools/lobster.md`) **关键设计点**: - 完成 ≠ 成功:需要 verification(如检查产出文件是否存在) - 失败可重试,但有上限 ### 3.2 multi_step(多步串行) **描述**:多个 stage 依次执行,前一步的输出是后一步的输入。 **适用场景**:编码(plan→implement→verify)、调研(plan→research→report)、部署(build→test→deploy)。 **优秀实践**: - Lobster 的 `steps` + `stdin: $step.stdout` 管道——声明式步骤链,步骤间 JSON 管道传递(来源:`lobster.md`) - OpenClaw TaskFlow——持久化多步 flow 管理,跨 gateway 重启(来源:`taskflow.md`) - Temporal Workflow——步骤自动持久化,不怕进程重启(来源:Temporal docs) **关键设计点**: - 步骤间数据传递:通过 DB(stages_json)或 prompt 注入前序 stage 的 output - 每个 stage 对应一次 agent spawn - 中间 stage 失败 → 可从当前 stage 重试(不需要从头开始) ### 3.3 parallel(并行执行 / Fan-Out / Scatter-Gather) **描述**:同一个任务拆成多个子任务并行执行,结果汇聚。 **适用场景**:批量回测(N 组参数)、并行协作(张飞写代码 + 关羽做风控)、多角度评审(乐观/悲观/务实三路并行)。 **优秀实践**: | 来源 | 做法 | 关键点 | |------|------|--------| | **AWS Prescriptive Guidance** | scatter-gather:分发→并行执行→汇聚 | "sends tasks to multiple services in parallel, waits for responses, aggregates results"(web_search 摘要) | | **Azure Architecture Center** | concurrent orchestration / fan-out-fan-in / map-reduce | "also known as parallel, fan-out/fan-in, scatter-gather, map-reduce"(web_search 摘要) | | **Temporal** | `asyncio.gather(*futures, return_exceptions=True)` | `return_exceptions=True` 意味着部分失败不阻塞整体,结果里包含异常(来源:dev.to Temporal AI agents) | | **open-multi-agent** | `AgentPool.runParallel()` + fan-out-aggregate pattern | 多 agent 并行,结果由 synthesizer 聚合(来源:知识库 `examples/patterns/fan-out-aggregate.ts`) | | **GSD (get-shit-done)** | Wave Execution——按依赖分组,wave 内并行,wave 间串行 | "Plans are grouped into waves. Within each wave, plans run in parallel. Waves run sequentially."(来源:知识库 `get-shit-done/README.md`) | **关键设计点**: 1. **结果汇聚**:需要一个 aggregator(可以是固定逻辑,也可以是一个 agent) 2. **部分失败处理**: - `fail_fast`:一个失败全部取消(适合 saga 类场景) - `best_effort`:收集所有结果,失败标记,汇总时报告(适合批量回测) 3. **并发控制**:受 counter 全局上限约束(`max_global_agents`) 4. **GSD 的 Wave 启发**:parallel 可以和 multi_step 组合——一个多步 pipeline 的某个 stage 是并行的 ### 3.4 loop(循环 / 审议 / Evaluator-Reflect-Refine) **描述**:执行→评审→不通过→修改→再提交,循环直到通过或达到上限。 **适用场景**:审议循环(门下省封驳重做)、自愈(检测→诊断→修复→验证循环)、质量迭代(写→评审→改→再审)。 **优秀实践**: | 来源 | 做法 | 关键点 | |------|------|--------| | **AWS Prescriptive Guidance** | Evaluator-Reflect-Refine Loop | "The loop repeats until the result meets criteria, is approved, or reaches a retry limit"(web_search 摘要) | | **三省六部 Edict** | 门下省封驳→返回中书省→修改→重新审议,最多 3 轮 | 制度化循环,有硬上限(来源:知识库 `edict/docs/task-dispatch-architecture.md`) | | **Cloudflare Agents** | Human-in-the-loop + schedule escalation | "Implement timeouts — use schedule() to escalate or auto-reject after reasonable periods"(web_search 摘要) | | **Reddit AI_Agents 实战** | "Set a hard rule: if no CRITICAL items, output APPROVED — loop terminates. Max 3 cycles, then escalate to human"(web_search 摘要) | **关键设计点**: 1. **终止条件**:通过 / 达到 max_rounds(硬上限)/ 人工介入 2. **收敛保障**:max_rounds 防止无限循环 3. **上下文传递**:每次循环需要把上一次的评审意见传给执行者 4. **和 multi_step 的关系**:loop 是 multi_step 的某个阶段加上循环条件。可以建模为 `multi_step + loop_guard` ### 3.5 saga(补偿事务) **描述**:多步串行执行,任何一步失败,反向执行之前步骤的补偿动作。 **适用场景**:数据 ETL(下载→清洗→入库,入库失败要清掉已洗的数据)、实盘交易(下单→成交确认→风控检查,失败需要撤单)、策略上线(回测→模拟盘→实盘,失败需要回退)。 **优秀实践**: | 来源 | 做法 | 关键点 | |------|------|--------| | **SagaLLM (arXiv:2503.11951)** | AI Agent 场景的 saga——保留 context 用于 recovery,按 dependency graph 反向执行补偿 | "Upon failure, system invokes compensatory actions using logs and rollback specifications stored in... traverses dependency graph to orchestrate reverse execution paths"(web_search 摘要) | | **SparkCo** | 每步定义 forward + compensate 动作,orchestrator 管理补偿链 | "Each step treated as independent transaction, with compensating actions defined for rollback scenarios"(web_search 摘要) | | **Temporal** | 内置 Saga 支持:`await workflow.execute_activity(compensate_A)` | 每个 Activity 注册对应的补偿函数(来源:Temporal docs) | **关键设计点**: 1. **补偿动作定义**:每个 stage 必须声明 `compensate` 动作(可以是 agent spawn 或固定脚本) 2. **补偿链**:失败后从当前 stage 反向执行之前所有 stage 的 compensate 3. **补偿也可能失败**:需要记录补偿状态,人工介入 4. **和 multi_step 的关系**:saga 是 multi_step + compensate 定义。可以建模为 `multi_step + per-stage-compensate` ### 3.6 interactive(人机交互 / Human-in-the-Loop) **描述**:执行到关键点暂停等人确认,确认后继续。 **适用场景**:策略上线审批、实盘交易确认、多级审批链。 **优秀实践**: | 来源 | 做法 | 关键点 | |------|------|--------| | **OpenClaw Lobster** | `approval: required` + `resumeToken`——暂停等审批,resume 不重跑已完成步骤 | "Halted workflows return a token; approve and resume without re-running everything"(来源:`lobster.md`) | | **Redis AI HITL** | 工作流暂停在 decision point,保存 state,等 human approve/reject/modify 后 resume | "Workflow pauses at decision point, saves state, waits for human to approve/reject/modify before resuming"(web_search 摘要) | | **Cloudflare Agents** | 审批超时自动 escalate 或 auto-reject | "Implement timeouts — use schedule() to escalate or auto-reject"(web_search 摘要) | | **MindStudio** | Iterative Kanban Pattern——agent 产出 → human 评审 → agent 修改 → human 确认 | "Task gets handed to agent, agent produces output, human reviews, sends it back, agent revises, human approves"(web_search 摘要) | **关键设计点**: 1. **resume 机制**:暂停时保存状态 + 产出 token,恢复时从 checkpoint 继续 2. **超时处理**:审批等待超时 → escalate / auto-reject / auto-approve(可配置) 3. **Lobster 启发**:moziplus 可以直接用 Lobster 的 `approval: required` 模式,不需要自己实现暂停/恢复 4. **和 multi_step 的关系**:interactive 是 multi_step 的某个 stage 加了 approval 门控。可以建模为 `multi_step + approval_stage` ### 3.7 event_trigger(事件触发 / 统一触发入口) **描述**:不是一种执行模式,而是一种触发方式。和执行模式正交——任何执行模式都可以被事件触发。 **适用场景**:cron 定时触发、webhook 触发、依赖完成触发、条件触发。 **优秀实践**: | 来源 | 做法 | 关键点 | |------|------|--------| | **Gobii (hermes-agent #491)** | 统一 scheduled 和 event-driven 到一个 durable processing loop——cron / IMAP idle / webhook / agent-to-agent message 全部统一 | "unifies scheduled and event-driven execution into a single durable processing loop"(web_search 摘要) | | **AgentC2** | Agent 同时支持 cron + webhook,hybrid pattern | "Agent can have both cron schedule and event triggers — daily summary at 6 AM + webhook for critical alerts"(web_search 摘要) | | **Edict** | Event Bus + 主题路由(`task.created`, `task.completed` 等) | 所有状态变更发布事件,订阅者按需触发(来源:知识库 `edict/edict_agent_architecture.md`) | | **OpenClaw Cron** | `cron add --cron "0 7 * * 1-5" --session session:xxx --message "..."` | 定时触发 agent session,声明式配置(来源:OpenClaw cron 工具) | **关键设计点**: 1. **触发方式 vs 执行模式正交**:trigger 是"什么时候开始",execution_pattern 是"开始后怎么跑" 2. **四种触发**:manual(人工)/ cron(定时)/ webhook(外部事件)/ dependency(前序完成) 3. **Gobii 启发**:统一到一个 loop,不需要为每种触发建独立系统 4. **OpenClaw Cron 已有能力**:当前 moziplus 可以直接用 OpenClaw cron 做定时触发,不需要自己实现 --- ## 4. 执行模式关系图 ``` ┌─────────────────────────────┐ │ Trigger(正交层) │ │ manual / cron / webhook / │ │ dependency │ └──────────────┬──────────────┘ │ 触发任务 ▼ ┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌──────────┐ │ single │ │ multi_step │ │ parallel │ │ mail │ │ _step │ │ │ │ │ │ │ └──────────┘ └──────┬───────┘ └──────────┘ └──────────┘ │ ┌────────┼────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────┐ ┌──────┐ │interactive│ │ loop │ │ saga │ │ (审批门控)│ │(循环)│ │(补偿)│ └──────────┘ └──────┘ └──────┘ ``` **关系说明**: - `single_step` 和 `multi_step` 是两种基础执行模式 - `parallel` 是独立的执行模式(并行子任务) - `mail` 是特殊的 single_step(独立 pipeline,不走通用路径) - `interactive`、`loop`、`saga` 是 **multi_step 的增强**——它们在 multi_step 骨架上加了额外能力 - `trigger` 和执行模式正交 ### 建模方式 | 增强类型 | 本质 | 建模 | |---------|------|------| | interactive | multi_step + approval stage | `multi_step` + profile 中某些 stage 声明 `approval: required` | | loop | multi_step + 循环 guard | `multi_step` + profile 中声明 `loop: { max_rounds: 3, convergence_check: true }` | | saga | multi_step + compensate | `multi_step` + profile 中每个 stage 声明 `compensate: { agent: "...", message: "..." }` | **不需要 3 个独立的 Pipeline 子类**。只需要 `MultiStepPipeline` 识别 profile 中的增强声明,执行时加入对应逻辑。 --- ## 5. 场景 × 执行模式映射 ### 5.1 已有业务类型 | 业务类型 | 执行模式 | 触发方式 | 说明 | |---------|---------|---------|------| | **coding** | multi_step | manual | plan→implement→verify→review | | **review** | single_step | manual / dependency | 固定 agent,一杆到底 | | **data** | single_step | manual / cron | 固定 agent(赵云),下载+验证 | | **deploy** | multi_step (interactive) | manual | build→test→approve→deploy | | **research** | multi_step | manual | plan→research→report | | **discuss** | single_step | manual | 讨论,一杆到底 | | **mail (request)** | mail | API 创建 | 点对点投递 | | **mail (inform)** | mail | API 创建 | 点对点,自动 done | ### 5.2 调研的 28 个场景映射 | # | 场景 | 业务类型 | 执行模式 | 触发 | |---|------|---------|---------|------| | A1 | Mail 投递 | mail | mail | API | | A2 | Task 派发 | coding/review/... | 各自映射 | manual | | B1 | 定时数据采集 | data_collect | single_step | cron | | B2 | 早朝简报 | morning_brief | multi_step | cron | | B3 | 批量回测 | batch_backtest | parallel | manual | | B4 | 风控巡查 | risk_patrol | single_step | cron / event | | B5 | 策略上线审批 | strategy_launch | multi_step (interactive) | manual | | B6 | 实盘交易信号 | trade_signal | multi_step (saga) | event | | C1 | Webhook 触发 | (任意) | 由 task_type 决定 | webhook | | C2 | Saga 链式任务 | data_etl | multi_step (saga) | manual / dependency | | C3 | 并行协作 | parallel_collab | parallel | manual | | C4 | 人机交互 | (任意 multi_step) | multi_step (interactive) | manual | | C5 | 执行工具替换 | (架构层) | 不映射 | N/A | | D1 | 隔夜编码 | overnight_coding | multi_step | cron | | D2 | 持仓再平衡 | portfolio_rebalance | multi_step (interactive) | cron | | D3 | 交易异常监控 | fraud_detect | single_step | event | | D4 | 数据 ETL 管道 | data_etl | multi_step (saga) | cron / dependency | | D5 | 合规审计追踪 | audit_trail | multi_step | event | | D6 | 策略实验跟踪 | experiment_track | multi_step | manual | | E1 | 三省六部分权 | edict_dispatch | multi_step (loop) | manual | | E2 | 事件驱动响应 | (任意) | 由 task_type 决定 | event | | E3 | 动态计划调整 | dynamic_plan | multi_step | manual | | E4 | 自愈/故障恢复 | self_heal | multi_step (loop) | event | | E5 | 审批链 | approval_chain | multi_step (interactive) | manual | | E6 | 成本/配额控制 | quota_guard | single_step | event | | E7 | 经验蒸馏 | experience_distill | single_step | dependency | | F1 | 条件触发 | (任意) | 由 task_type 决定 | event (condition) | | F2 | 依赖触发 | (任意) | 由 task_type 决定 | dependency | | F3 | 人工触发 | (任意) | 由 task_type 决定 | manual | --- ## 6. 设计方案 ### 6.1 设计模式选择 ``` Registry + Template Method + Strategy ``` - **Registry**:Pipeline 注册中心,按 task_type 查找 - **Template Method**:Pipeline 基类定义 tick() 骨架,子类覆写差异步骤 - **Strategy**:触发方式、路由策略、验证策略作为可注入的策略对象 来源:LangGraph Routing Pattern(来源:web_search "LangGraph Routing")、Argo WorkflowTemplate(来源:Argo docs)、Temporal Worker Framework(来源:Temporal docs)。 ### 6.2 Pipeline 类体系 ``` TaskPipeline (ABC) # Template Method 骨架 ├── MailPipeline # Mail 专属(硬编码,不走 YAML) ├── SingleStepPipeline # 单步执行 ├── MultiStepPipeline # 多步执行(含增强) │ ├── 识别 interactive 声明 → approval gate │ ├── 识别 loop 声明 → 循环 guard │ └── 识别 saga 声明 → compensate chain └── ParallelPipeline # 并行执行(fan-out + aggregate) PipelineRouter # Registry,按 task_type 路由 ``` **为什么只有 4 个子类**:interactive/loop/saga 不是独立子类,而是 MultiStepPipeline 通过 profile 声明启用的增强能力。 ### 6.3 Pipeline 基类 ```python class TaskPipeline(ABC): """Pipeline 基类(Template Method)""" def __init__(self, spawner, counter, router, config, profile=None): self.spawner = spawner self.counter = counter self.router = router self.config = config self.profile = profile or {} async def tick(self, db_path: Path, project_config: dict) -> TickResult: """每个 ticker 周期的入口""" await self.pre_tick(db_path) tasks = self.load_pending(db_path) dispatched = 0 for task in tasks: if not self.can_dispatch(task, db_path): continue result = await self.dispatch_one(task, db_path, project_config) if result: dispatched += 1 return TickResult(dispatched=dispatched, total=len(tasks)) # ── 钩子方法 ── async def pre_tick(self, db_path): self.check_timeouts(db_path) @abstractmethod def load_pending(self, db_path) -> list: ... def can_dispatch(self, task, db_path) -> bool: return True @abstractmethod async def dispatch_one(self, task, db_path, project_config) -> bool: ... def check_timeouts(self, db_path): ... ``` ### 6.4 MultiStepPipeline 增强 ```python class MultiStepPipeline(TaskPipeline): """多步执行(含 interactive/loop/saga 增强)""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 从 profile 读取增强配置 self.loop_config = self.profile.get("loop") # { max_rounds, convergence_check } self.saga_config = self.profile.get("saga") # { stages: [{compensate: ...}] } self.interactive_stages = self._extract_approval_stages() def _extract_approval_stages(self): """从 profile stages 中提取需要 approval 的 stage""" return [s for s in self.profile.get("stages", []) if s.get("approval") == "required"] async def dispatch_one(self, task, db_path, project_config): stage = self._get_current_stage(task) # interactive: approval gate if stage.get("approval") == "required" and not self._is_approved(task, stage): return False # 等审批,不 spawn # spawn agent agent_id = self._resolve_agent(task, stage) message = self._build_stage_prompt(task, stage) return await self._spawn_and_track(agent_id, message, task, stage, db_path) def _on_stage_complete(self, task, stage, outcome, db_path): # saga: 记录 compensate 动作 if self.saga_config and stage.get("compensate"): self._record_compensate(task, stage, db_path) # loop: 检查收敛 if self.loop_config: round_num = self._get_current_round(task) if outcome == "rejected" and round_num < self.loop_config["max_rounds"]: self._advance_to_next_round(task, db_path) return elif round_num >= self.loop_config["max_rounds"]: self._escalate(task, db_path, reason="loop_max_rounds") return # 正常推进到下一个 stage self._advance_stage(task, db_path) ``` ### 6.5 ParallelPipeline ```python class ParallelPipeline(TaskPipeline): """并行执行(fan-out + aggregate)""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.failure_policy = self.profile.get("failure_policy", "best_effort") self.aggregator = self.profile.get("aggregator", {}) async def dispatch_one(self, task, db_path, project_config): # 1. 拆分子任务 sub_tasks = self._fan_out(task) # 2. 并行 spawn(受 counter 约束) futures = [] for sub in sub_tasks: agent_id = self._resolve_agent(sub) if not self.counter.can_acquire(agent_id, sub.id): continue self.counter.acquire(agent_id, sub.id) futures.append(self._spawn_subtask(agent_id, sub, db_path)) # 3. 等待所有完成 results = await asyncio.gather(*futures, return_exceptions=True) # 4. 处理部分失败 if self.failure_policy == "fail_fast": if any(isinstance(r, Exception) for r in results): self._cancel_remaining(futures) self._mark_failed(task, db_path) return True # best_effort: 继续聚合 # 5. 聚合结果 if self.aggregator.get("type") == "agent": await self._aggregate_with_agent(task, results, db_path) else: self._aggregate_simple(task, results, db_path) return True ``` ### 6.6 PipelineRouter ```python class PipelineRouter: def __init__(self): self._pipelines: Dict[str, TaskPipeline] = {} self._default: TaskPipeline = None def register(self, task_type: str, pipeline: TaskPipeline): self._pipelines[task_type] = pipeline def route(self, task_type: str = None, is_mail: bool = False) -> TaskPipeline: if is_mail: return self._pipelines["_mail"] return self._pipelines.get(task_type, self._default) ``` ### 6.7 Pipeline Profile(YAML 声明) ```yaml # config/pipelines/coding.yaml task_type: coding execution_pattern: multi_step stages: - id: plan description: "分析需求,制定方案" agent_selection: route_by_capability capabilities_required: ["coding", "design"] timeout_minutes: 10 - id: implement description: "编码实现" reuse_plan_agent: true timeout_minutes: 30 depends_on: [plan] - id: verify description: "自动验证" specific_agent: simayi-challenger timeout_minutes: 15 depends_on: [implement] completion: status: done require_verification: true ``` ```yaml # config/pipelines/review.yaml task_type: review execution_pattern: single_step agent_selection: specific_agent specific_agent: simayi-challenger timeout_minutes: 20 completion: status: done ``` ```yaml # config/pipelines/strategy_launch.yaml task_type: strategy_launch execution_pattern: multi_step stages: - id: backtest description: "回测验证" specific_agent: zhangfei-dev timeout_minutes: 60 - id: simulate description: "模拟盘运行" specific_agent: zhangfei-dev timeout_minutes: 120 depends_on: [backtest] - id: approve description: "上线审批" approval: required # ← interactive 增强 timeout_hours: 24 # 审批超时 timeout_action: escalate # auto_reject / escalate / auto_approve depends_on: [simulate] - id: live description: "实盘部署" specific_agent: jiangwei-infra timeout_minutes: 30 depends_on: [approve] ``` ```yaml # config/pipelines/data_etl.yaml task_type: data_etl execution_pattern: multi_step stages: - id: download description: "数据下载" specific_agent: zhaoyun-data timeout_minutes: 60 compensate: # ← saga 增强 action: "删除已下载文件" command: "rm -rf /tmp/download/{task_id}" - id: clean description: "数据清洗" specific_agent: zhaoyun-data timeout_minutes: 30 depends_on: [download] compensate: action: "删除已清洗文件" command: "rm -rf /tmp/clean/{task_id}" - id: validate description: "数据验证" specific_agent: zhaoyun-data timeout_minutes: 15 depends_on: [clean] ``` ```yaml # config/pipelines/edict_dispatch.yaml task_type: edict_dispatch execution_pattern: multi_step loop: # ← loop 增强 max_rounds: 3 convergence_check: "review_passed" stages: - id: plan description: "中书省规划" specific_agent: zhongshu-agent timeout_minutes: 15 - id: review description: "门下省审议" specific_agent: menxia-agent timeout_minutes: 10 depends_on: [plan] # review 不通过 → loop 回到 plan(最多3轮) - id: dispatch description: "尚书省派发" depends_on: [review] ``` ```yaml # config/pipelines/batch_backtest.yaml task_type: batch_backtest execution_pattern: parallel failure_policy: best_effort # 部分失败不阻塞 aggregator: type: agent # 用 agent 聚合结果 specific_agent: pangtong-fujunshi fan_out: source: task.params.param_sets # 从任务参数获取参数组列表 per_item: specific_agent: zhangfei-dev timeout_minutes: 30 ``` --- ## 7. Ticker 改造 ```python class Ticker: def __init__(self, pipeline_router: PipelineRouter, ...): self.pipeline_router = pipeline_router async def _tick_project(self, project_id, project_config): is_mail = project_id == "_mail" db_path = self._get_db_path(project_id) if is_mail: pipeline = self.pipeline_router.route(is_mail=True) return await pipeline.tick(db_path, project_config) # 查所有活跃 task_type,每种走对应 pipeline task_types = self._get_active_task_types(db_path) results = [] for tt in task_types: p = self.pipeline_router.route(tt) r = await p.tick(db_path, project_config) results.append(r) return merge_results(results) ``` --- ## 8. 实施范围 ### 一次性全做 | # | 事项 | 预估行数 | |---|------|---------| | 1 | 新建 `pipeline/` 目录:base.py, mail.py, single_step.py, multi_step.py, parallel.py, router.py | ~500 行 | | 2 | Pipeline Profile YAML(coding, review, data, deploy, research, discuss) | ~200 行 | | 3 | Profile 加载器(扫描 config/pipelines/) | ~50 行 | | 4 | Ticker 改造(删除 _mail 分支,改调 pipeline_router) | -150 行(标注废弃) | | 5 | Dispatcher 标注废弃(逻辑迁移到各 Pipeline) | 标注 | | 6 | Spawner 瘦身(mail prompt 搬到 MailPipeline) | 标注 | | 7 | 前端:task_type 选择 + stage 进度显示 | 待评估 | ### MultiStepPipeline 增强实现优先级 | 增强 | 优先级 | 理由 | |------|--------|------| | 基础 multi_step | P0 | 当前主流程 | | interactive (approval gate) | P1 | 有 Lobster 参考,实现简单 | | loop (循环 guard) | P2 | 有 edict 参考,逻辑清晰 | | saga (compensate) | P3 | 场景需求相对远期 | | parallel (fan-out) | P3 | 需要 sub-task 并行调度,改动大 | --- ## 9. 来源引用 | 结论 | 来源 | |------|------| | parallel = fan-out/fan-in/scatter-gather/map-reduce | Azure Architecture Center(web_search);AWS Prescriptive Guidance(web_search);open-multi-agent `fan-out-aggregate.ts`(知识库) | | Wave Execution(并行分组) | get-shit-done README(知识库) | | loop = evaluator-reflect-refine | AWS Prescriptive Guidance(web_search) | | 门下省封驳最多3轮 | edict `task-dispatch-architecture.md`(知识库) | | saga + compensate for AI | SagaLLM arXiv:2503.11951(web_search);SparkCo(web_search) | | interactive = approval + resumeToken | OpenClaw Lobster docs(`lobster.md`) | | interactive 超时处理 | Cloudflare Agents docs(web_search) | | event + cron 统一 loop | Gobii / hermes-agent #491(web_search) | | Event Bus + topic routing | Edict architecture(知识库) | | TaskFlow 持久化多步 | OpenClaw TaskFlow docs(`taskflow.md`) | | AgentPool.runParallel() | open-multi-agent `fan-out-aggregate.ts`(知识库) | --- ## 10. 待确认 1. **4 个 Pipeline 子类是否够**?还是 interactive/loop/saga 需要独立子类? 2. **parallel 是否现在实现**?还是只预留接口? 3. **discuss 业务类型**:走 single_step 还是 multi_step? 4. **Trigger 层是否纳入本次实现**?还是继续用 OpenClaw cron 做触发? 5. **前端 task_type 选择**:创建任务时的 UI 交互怎么设计?