From 87881c000cc9351ba837cc7202558d075397c050 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 23:22:20 +0800 Subject: [PATCH] auto-sync: 2026-05-26 23:22:20 --- .../pipeline-architecture-research.md | 353 ++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 docs/research/pipeline-architecture-research.md diff --git a/docs/research/pipeline-architecture-research.md b/docs/research/pipeline-architecture-research.md new file mode 100644 index 0000000..6413837 --- /dev/null +++ b/docs/research/pipeline-architecture-research.md @@ -0,0 +1,353 @@ +# Pipeline 架构调研:场景、实践与设计模式 + +**版本**: v1.0 +**日期**: 2026-05-26 +**作者**: 庞统 +**目的**: 为 moziplus v2 Pipeline 重构提供场景驱动的设计依据 + +--- + +## 1. 调研背景 + +moziplus v2 当前 Mail 和 Task 共享同一条调度链,靠 41 处 if/else 做差异化。v2.7.2 设计文档已规划 Pipeline 分离(MailPipeline + StandardTaskPipeline)。 + +本次调研目标: +1. 梳理未来所有可能的任务场景(不只 Mail 和 Task) +2. 调研业界针对多场景的编排实践 +3. 识别适合的设计模式 +4. 为 Pipeline 架构提供可扩展的设计依据 + +--- + +## 2. 场景全景 + +### 2.1 当前已存在的场景 + +| # | 场景 | 描述 | 现状 | +|---|------|------|------| +| A1 | **Mail 投递** | Agent 间通信(评审/通知/回复),点对点,简化状态流转 | 44 个 if/else 在跑 | +| A2 | **Task 派发** | 用户提需求 → 规划 → 路由 → 执行 → 审核 → 完成 | 当前唯一 Task pipeline | + +### 2.2 量化业务场景 + +| # | 场景 | 描述 | 和当前的区别 | +|---|------|------|------------| +| B1 | **定时数据采集** | 每天收盘后自动触发赵云下载日线/分钟线数据,不需要人工创建 | 触发方式不同:cron 而非人工 | +| B2 | **早朝简报** | 每天早上自动生成市场/持仓/风控简报推送给用户 | 触发:cron;输出:推送而非看板 | +| B3 | **批量回测** | 用户提交一个策略后,批量跑多组参数的回测 | 一个 task 产生多个子任务并行执行 | +| B4 | **风控巡查** | 定时或事件触发(如涨停/跌停),关羽自动检查持仓风险 | 触发:cron 或事件;需快速响应 | +| B5 | **策略上线审批** | 策略从回测→模拟盘→实盘,每个阶段需要人工审批才能继续 | 多阶段 + 人工 checkpoint | +| B6 | **实盘交易信号** | 策略产生交易信号 → 风控审核 → 执行(或拒绝) | 需要实时性 + 安全门控 + 可回滚 | +| D1 | **隔夜编码** | 张飞半夜自动从 backlog 领任务编码,早上交付 PR + 测试报告 | 来源: awesome-openclaw overnight-coder | +| D2 | **持仓再平衡** | 定期检查持仓偏离度,生成调仓建议(含税务/滑点考量) | 来源: awesome-openclaw portfolio-rebalancer | +| D3 | **交易异常监控** | 实时监控交易/资金流,检测异常模式并告警 | 来源: awesome-openclaw fraud-detector | +| D4 | **数据 ETL 管道** | 下载→清洗→验证→入库,确定性多步管道 | 来源: awesome-openclaw etl-pipeline | +| D5 | **合规审计追踪** | 每笔交易/决策有完整审计日志,可追溯 Agent 推理过程 | 来源: IBM/AWS agent audit trail | +| D6 | **策略实验跟踪** | 回测/参数调优的实验记录,可对比 A/B 结果 | 来源: FinRL-X | + +### 2.3 编排/系统级场景 + +| # | 场景 | 描述 | 来源 | +|---|------|------|------| +| C1 | **Webhook 触发** | 外部系统(如 NAS 数据就绪、交易信号)通过 webhook 触发任务 | OpenClaw webhook | +| C2 | **Saga 链式任务** | "下载→清洗→验证→入库",前一步失败需要清理/回滚 | Temporal Saga Pattern | +| C3 | **并行协作** | 同一个 task 多个 agent 并行执行(如张飞写代码 + 关羽做风控),结果汇总 | Azure parallel pattern | +| C4 | **人机交互** | 执行到关键点暂停等人确认/审批,确认后继续 | OpenClaw Lobster approval | +| C5 | **共享层工具替换** | 当前调 openclaw CLI spawn agent,未来可能用 MCP、HTTP API、或其他编排工具 | MCP/plugin 趋势 | +| E1 | **三省六部式分权协作** | 规划→审议(可封驳)→派发→执行→回报,多轮审议循环 | edict 三省六部 | +| E2 | **事件驱动响应** | 外部事件(涨停/跌停/策略信号/系统告警)触发 agent 执行 | Azure agent patterns | +| E3 | **动态计划调整** | 执行过程中根据中间结果动态增删步骤,不是固定 DAG | Azure: evolving plan | +| E4 | **自愈/故障恢复** | 检测到 agent 执行失败 → 自动诊断 → 尝试修复 → 失败则升级 | AWS incident response | +| E5 | **审批链** | 多级审批(agent→agent→human),每级有通过/驳回 | edict 门下省 + Temporal Saga | +| E6 | **成本/配额控制** | 根据 token 消耗自动降级模型、暂停非紧急任务 | zhipu quota 经验 | +| E7 | **经验蒸馏** | 任务完成后自动提取可复用经验,沉淀到知识库 | MEMORY.md 闭环学习 | + +### 2.4 触发方式场景 + +| # | 场景 | 描述 | +|---|------|------| +| F1 | **条件触发** | "当沪深300跌超3%时,自动触发风控巡查" | +| F2 | **依赖触发** | "赵云下载完数据后,自动触发张飞跑回测" | +| F3 | **人工触发** | 用户在 UI/聊天中手动触发一个预定义的工作流 | + +--- + +## 3. 场景抽象:五个关键维度 + +所有 28 个场景可以在五个维度上归类: + +### 3.1 触发方式(Trigger) + +| 触发方式 | 场景 | 说明 | +|---------|------|------| +| **人工** | A2, B3, B5, B6, C4, F3 | 用户主动发起 | +| **定时** | B1, B2, B4, D1, D2, D6 | cron 定期执行 | +| **事件** | B4, B6, C1, D3, E2, F1 | 外部系统/条件触发 | +| **依赖链** | C2, D4, E5, F2 | 前序任务完成后触发 | + +### 3.2 执行模式(Execution) + +| 执行模式 | 场景 | 说明 | +|---------|------|------| +| **单步** | A1, B1, D3, C1 | 一个 agent 一次执行就完成 | +| **多步串行** | A2, B5, C2, D4, E1, E5 | 多个步骤依次执行 | +| **多步并行** | B3, C3, E1(六部并行) | 多个 agent 同时工作 | +| **循环/迭代** | E1(审议循环), E3, E4 | 某步骤重复直到条件满足 | +| **条件分支** | E3, F1 | 根据中间结果选择不同路径 | + +### 3.3 人机交互(Human-in-the-loop) + +| 交互模式 | 场景 | 说明 | +|---------|------|------| +| **无人工** | A1, B1, B2, B3, B4, D1, D4 | 全自动,不需要人 | +| **可选审批** | B5, C4, E5 | 关键点可暂停等确认 | +| **必须审批** | B6, D2, F3 | 涉及实盘/资金,必须人工确认 | +| **多级审批** | E1, E5 | 逐级审批,可驳回重做 | + +### 3.4 失败处理(Failure Policy) + +| 失败策略 | 场景 | 说明 | +|---------|------|------| +| **重试** | 大部分场景 | 失败后自动重试 N 次 | +| **跳过继续** | B3(某个参数失败不影响其他) | 部分失败不阻塞 | +| **回滚** | C2, B6, D4 | 前序步骤需要撤销 | +| **升级** | E4, E6 | Agent 处理不了,升级给人 | + +### 3.5 执行工具(Executor) + +| 执行工具 | 场景 | 说明 | +|---------|------|------| +| **openclaw CLI** | 所有当前场景 | spawn `openclaw agent --message ...` | +| **Lobster** | D4, C4, B5 | 确定性多步 pipeline + 审批 | +| **MCP** | C5 未来 | Model Context Protocol | +| **HTTP API** | C1, E2, F1 | 外部系统直接调用 | + +--- + +## 4. 业界调研 + +### 4.1 Temporal:Workflow Type + Task Queue + Saga + +**核心机制**: +- 不同 Workflow Type 注册不同处理函数 +- Worker 框架共享(Activity 是原子操作,Workflow 定义编排) +- 内置 Saga Pattern(失败自动补偿) +- 内置重试、超时、持久化状态 + +**对我们有启发的**: +- Workflow Type = Pipeline Type,注册式扩展 +- Activity = 共享执行层的原子操作 +- Saga = 失败补偿机制(量化场景的回滚需求) +- 状态自动持久化,不怕进程重启 + +**来源**: [Temporal Task Routing](https://docs.temporal.io/task-routing), [Temporal Saga](https://docs.temporal.io/evaluate/use-cases-design-patterns) + +### 4.2 LangGraph:Routing Pattern + Sub-Graph + +**核心机制**: +- Router 节点分类输入,分发到不同 sub-graph +- 每个 sub-graph 有独立的状态和执行逻辑 +- 共享图执行框架(节点、边、状态) + +**对我们有启发的**: +- 路由是第一步(按 task_type 分发) +- sub-graph = Pipeline,完全独立 +- 条件边 = 分支逻辑(if/else 变成图的边) + +**来源**: [LangGraph Routing](https://www.scalablepath.com/machine-learning/langgraph) + +### 4.3 Argo Workflows:WorkflowTemplate + DAG + +**核心机制**: +- WorkflowTemplate 是声明式 YAML,定义步骤和依赖 +- steps(串行嵌套并行)和 dag(依赖图)两种编排 +- 新增工作流类型 = 新增 YAML,不改引擎 + +**对我们有启发的**: +- 声明式 YAML 定义 pipeline(新增类型不改代码) +- steps 和 dag 两种粒度(串行 vs DAG) +- 模板可嵌套、可引用、可参数化 + +**来源**: [Argo Concepts](https://argo-workflows.readthedocs.io/en/latest/workflow-concepts/) + +### 4.4 Google ADK:Agent 类型组合 + +**核心机制**: +- SequentialAgent、ParallelAgent、LoopAgent、CustomAgent +- 用不同 Agent 类型组合 pipeline +- Router Agent 做分类分发 + +**对我们有启发的**: +- 组合优于继承:用基本类型组合复杂 pipeline +- Sequential = 串行 multi_step +- Parallel = 并行协作 +- Loop = 审议循环 + +**来源**: [Google ADK Multi-Agent](https://developers.googleblog.com/developers-guide-to-multi-agent-patterns-in-adk/) + +### 4.5 OpenClaw TaskFlow + Lobster + +**核心机制**: +- TaskFlow:持久化多步 flow 管理(跨 gateway 重启) +- Lobster:确定性 pipeline 运行时(步骤编排 + 审批门控 + resume token) +- Cron + Lobster + TaskFlow 组合:定时触发 + pipeline 执行 + 持久化追踪 + +**对我们有启发的**: +- moziplus 可以直接利用 OpenClaw 的 Lobster/TaskFlow +- Lobster 的 approval = 人机交互 checkpoint +- resume token = 中断后继续 +- 声明式 YAML 定义步骤 + +**来源**: `/opt/homebrew/lib/node_modules/openclaw/docs/automation/taskflow.md` + +### 4.6 三省六部 Edict:制度化协作 + +**核心机制**: +- 分权制衡:太子→中书→门下→尚书→六部,不可越级 +- 门下省可封驳(驳回重做,最多3轮) +- 状态机严格递进(9个状态) +- 实时可观测看板 + +**对我们有启发的**: +- 多轮审议循环 = Loop Pipeline +- 封驳机制 = 驳回重做 +- 制度约束 = guardrail 的升级版 + +**来源**: `~/.openclaw/knowledge_base/edict/docs/task-dispatch-architecture.md` + +### 4.7 Azure AI Agent Patterns + +**核心机制**: +- Sequential(串行)、Parallel(并行)、Concurrent(并发独立) +- Dynamic(动态调整计划) +- 审计追踪(完整 reasoning step 记录) +- 自愈(检测失败 → 诊断 → 修复 → 升级) + +**对我们有启发的**: +- Dynamic plan = E3 场景(执行中调整计划) +- 审计追踪 = D5 场景 +- 自愈 = E4 场景 + +**来源**: [Azure Agent Patterns](https://learn.microsoft.com/en-us/azure/architecture/ai-ml/guide/ai-agent-design-patterns) + +### 4.8 AWS Incident Response Agent + +**核心机制**: +- 事件触发 → 自动诊断 → 修复执行 → 验证 → 升级 +- 不可变审计日志(CloudTrail) +- Agent Space 级数据隔离 + +**对我们有启发的**: +- 事件驱动响应 = E2 场景 +- 自愈流程 = E4 场景 + +**来源**: [AWS DevOps Agent](https://aws.amazon.com/blogs/devops/leverage-agentic-ai-for-autonomous-incident-response-with-aws-devops-agent/) + +--- + +## 5. 设计模式候选 + +### 5.1 Strategy Pattern(策略模式) + +**适用**:运行时根据类型选择不同执行策略。 + +``` +PipelineRouter → Strategy A (MailPipeline) + → Strategy B (TaskPipeline) + → Strategy C (CronPipeline) +``` + +**优点**:策略独立,新增类型只需新增类 +**缺点**:策略间共享逻辑需要额外处理 + +### 5.2 Template Method Pattern(模板方法) + +**适用**:pipeline 骨架固定,具体步骤可替换。 + +``` +BasePipeline.execute(): + 1. trigger() — 触发方式 + 2. pre_check() — 前置检查 + 3. route() — 路由 + 4. spawn() — 执行 + 5. post_check() — 后置检查/验证 + 6. complete() — 完成/失败处理 +``` + +**优点**:骨架共享,差异点明确 +**缺点**:骨架变化影响所有子类;继承耦合 + +### 5.3 Plugin / Registry Pattern(插件注册) + +**适用**:新增类型不改引擎,自动发现。 + +``` +PipelineRegistry.register("mail", MailPipeline) +PipelineRegistry.register("task", TaskPipeline) +PipelineRegistry.register("cron", CronPipeline) +# 自动扫描 config/pipelines/ 目录加载 +``` + +**优点**:开放封闭原则,新增类型零改动 +**缺点**:需要约定接口契约 + +### 5.4 Chain of Responsibility(责任链) + +**适用**:请求沿链传递,每个节点决定处理或传递。 + +``` +TriggerChain: CronTrigger → WebhookTrigger → ManualTrigger → ... +RouteChain: GuardrailCheck → CapabilityMatch → LLMRoute → ... +FailureChain: RetryHandler → RollbackHandler → EscalateHandler → ... +``` + +**优点**:每个节点只关心自己的逻辑,可组合 +**缺点**:链条过长时难调试 + +### 5.5 Observer / Event-driven(观察者/事件驱动) + +**适用**:任务状态变更时通知相关方。 + +``` +Task.on("completed") → trigger dependent tasks (F2) +Task.on("failed") → trigger failure handler (E4) +Task.on("checkpoint")→ notify human (C4) +``` + +**优点**:松耦合,支持事件驱动场景(E2, F1, F2) +**缺点**:事件流难追踪,调试复杂 + +### 5.6 Pipeline Pattern(管道模式) + +**适用**:数据流经一系列处理阶段。 + +``` +input → [Stage1] → [Stage2] → [Stage3] → output +``` + +每个 stage 有独立的输入/输出/错误处理。stage 之间通过共享上下文传递数据。 + +**优点**:每个 stage 独立可测试,可组合 +**缺点**:跨 stage 状态管理需要额外设计 + +--- + +## 6. 场景 × 维度 × 模式交叉分析 + +| 维度 | 出现频率 | 最适配的模式 | +|------|---------|------------| +| 触发方式多样化(4种) | 28个场景中占 4 种 | Strategy(触发策略可替换) | +| 执行模式多样化(5种) | 核心差异点 | Strategy + Template Method | +| 人机交互(4级) | 高风险场景必须 | Pipeline Pattern(approval gate) | +| 失败处理(4种) | 所有场景都需要 | Chain of Responsibility | +| 执行工具可替换 | C5 明确提出 | Strategy(执行器抽象) | +| 新增类型要方便 | 所有场景 | Plugin/Registry | + +--- + +## 7. 下一步 + +基于以上调研: +1. 从 28 个场景中筛选近期/远期 +2. 选择组合设计模式 +3. 输出 v2.7.2/v2.8 Pipeline 架构设计方案