Files
sanguo_moziplus_v2/docs/research/pipeline-architecture-research.md
T
2026-05-27 08:56:03 +08:00

20 KiB
Raw Blame History

Pipeline 架构调研:场景、实践与设计模式

版本: v1.0
日期: 2026-05-26
作者: 庞统
目的: 为 moziplus v2 Pipeline 重构提供场景驱动的设计依据


1. 调研背景

moziplus v2 当前 Mail 和 Task 共享同一条调度链,靠 41 处 if/else 做差异化。v2.7.2 设计文档已规划 Pipeline 分离(MailPipeline + StandardTaskPipeline)。

本次调研目标:

  1. 梳理未来所有可能的任务场景(不只 Mail 和 Task)
  2. 调研业界针对多场景的编排实践
  3. 识别适合的设计模式
  4. 为 Pipeline 架构提供可扩展的设计依据

2. 场景全景

2.1 当前已存在的场景

# 场景 描述 现状
A1 Mail 投递 Agent 间通信(评审/通知/回复),点对点,简化状态流转 44 个 if/else 在跑
A2 Task 派发 用户提需求 → 规划 → 路由 → 执行 → 审核 → 完成 当前唯一 Task pipeline

2.2 量化业务场景

# 场景 描述 和当前的区别
B1 定时数据采集 每天收盘后自动触发赵云下载日线/分钟线数据,不需要人工创建 触发方式不同:cron 而非人工
B2 早朝简报 每天早上自动生成市场/持仓/风控简报推送给用户 触发:cron;输出:推送而非看板
B3 批量回测 用户提交一个策略后,批量跑多组参数的回测 一个 task 产生多个子任务并行执行
B4 风控巡查 定时或事件触发(如涨停/跌停),关羽自动检查持仓风险 触发:cron 或事件;需快速响应
B5 策略上线审批 策略从回测→模拟盘→实盘,每个阶段需要人工审批才能继续 多阶段 + 人工 checkpoint
B6 实盘交易信号 策略产生交易信号 → 风控审核 → 执行(或拒绝) 需要实时性 + 安全门控 + 可回滚
D1 隔夜编码 张飞半夜自动从 backlog 领任务编码,早上交付 PR + 测试报告 来源: awesome-openclaw overnight-coder
D2 持仓再平衡 定期检查持仓偏离度,生成调仓建议(含税务/滑点考量) 来源: awesome-openclaw portfolio-rebalancer
D3 交易异常监控 实时监控交易/资金流,检测异常模式并告警 来源: awesome-openclaw fraud-detector
D4 数据 ETL 管道 下载→清洗→验证→入库,确定性多步管道 来源: awesome-openclaw etl-pipeline
D5 合规审计追踪 每笔交易/决策有完整审计日志,可追溯 Agent 推理过程 来源: IBM/AWS agent audit trail
D6 策略实验跟踪 回测/参数调优的实验记录,可对比 A/B 结果 来源: FinRL-X

2.3 编排/系统级场景

# 场景 描述 来源
C1 Webhook 触发 外部系统(如 NAS 数据就绪、交易信号)通过 webhook 触发任务 OpenClaw webhook
C2 Saga 链式任务 "下载→清洗→验证→入库",前一步失败需要清理/回滚 Temporal Saga Pattern
C3 并行协作 同一个 task 多个 agent 并行执行(如张飞写代码 + 关羽做风控),结果汇总 Azure parallel pattern
C4 人机交互 执行到关键点暂停等人确认/审批,确认后继续 OpenClaw Lobster approval
C5 共享层工具替换 当前调 openclaw CLI spawn agent,未来可能用 MCP、HTTP API、或其他编排工具 MCP/plugin 趋势
E1 三省六部式分权协作 规划→审议(可封驳)→派发→执行→回报,多轮审议循环 edict 三省六部
E2 事件驱动响应 外部事件(涨停/跌停/策略信号/系统告警)触发 agent 执行 Azure agent patterns
E3 动态计划调整 执行过程中根据中间结果动态增删步骤,不是固定 DAG Azure: evolving plan
E4 自愈/故障恢复 检测到 agent 执行失败 → 自动诊断 → 尝试修复 → 失败则升级 AWS incident response
E5 审批链 多级审批(agent→agent→human),每级有通过/驳回 edict 门下省 + Temporal Saga
E6 成本/配额控制 根据 token 消耗自动降级模型、暂停非紧急任务 zhipu quota 经验
E7 经验蒸馏 任务完成后自动提取可复用经验,沉淀到知识库 MEMORY.md 闭环学习

2.4 触发方式场景

# 场景 描述
F1 条件触发 "当沪深300跌超3%时,自动触发风控巡查"
F2 依赖触发 "赵云下载完数据后,自动触发张飞跑回测"
F3 人工触发 用户在 UI/聊天中手动触发一个预定义的工作流

3. 场景抽象:五个关键维度

所有 28 个场景可以在五个维度上归类:

3.1 触发方式(Trigger

触发方式 场景 说明
人工 A2, B3, B5, B6, C4, F3 用户主动发起
定时 B1, B2, B4, D1, D2, D6 cron 定期执行
事件 B4, B6, C1, D3, E2, F1 外部系统/条件触发
依赖链 C2, D4, E5, F2 前序任务完成后触发

3.2 执行模式(Execution

执行模式 场景 说明
单步 A1, B1, D3, C1 一个 agent 一次执行就完成
多步串行 A2, B5, C2, D4, E1, E5 多个步骤依次执行
多步并行 B3, C3, E1(六部并行) 多个 agent 同时工作
循环/迭代 E1(审议循环), E3, E4 某步骤重复直到条件满足
条件分支 E3, F1 根据中间结果选择不同路径

3.3 人机交互(Human-in-the-loop

交互模式 场景 说明
无人工 A1, B1, B2, B3, B4, D1, D4 全自动,不需要人
可选审批 B5, C4, E5 关键点可暂停等确认
必须审批 B6, D2, F3 涉及实盘/资金,必须人工确认
多级审批 E1, E5 逐级审批,可驳回重做

3.4 失败处理(Failure Policy

失败策略 场景 说明
重试 大部分场景 失败后自动重试 N 次
跳过继续 B3(某个参数失败不影响其他) 部分失败不阻塞
回滚 C2, B6, D4 前序步骤需要撤销
升级 E4, E6 Agent 处理不了,升级给人

3.5 执行工具(Executor

执行工具 场景 说明
openclaw CLI 所有当前场景 spawn openclaw agent --message ...
Lobster D4, C4, B5 确定性多步 pipeline + 审批
MCP C5 未来 Model Context Protocol
HTTP API C1, E2, F1 外部系统直接调用

4. 业界调研

4.1 TemporalWorkflow Type + Task Queue + Saga

核心机制

  • 不同 Workflow Type 注册不同处理函数
  • Worker 框架共享(Activity 是原子操作,Workflow 定义编排)
  • 内置 Saga Pattern(失败自动补偿)
  • 内置重试、超时、持久化状态

对我们有启发的

  • Workflow Type = Pipeline Type,注册式扩展
  • Activity = 共享执行层的原子操作
  • Saga = 失败补偿机制(量化场景的回滚需求)
  • 状态自动持久化,不怕进程重启

来源: Temporal Task Routing, Temporal Saga

4.2 LangGraphRouting Pattern + Sub-Graph

核心机制

  • Router 节点分类输入,分发到不同 sub-graph
  • 每个 sub-graph 有独立的状态和执行逻辑
  • 共享图执行框架(节点、边、状态)

对我们有启发的

  • 路由是第一步(按 task_type 分发)
  • sub-graph = Pipeline,完全独立
  • 条件边 = 分支逻辑(if/else 变成图的边)

来源: LangGraph Routing

4.3 Argo WorkflowsWorkflowTemplate + DAG

核心机制

  • WorkflowTemplate 是声明式 YAML,定义步骤和依赖
  • steps(串行嵌套并行)和 dag(依赖图)两种编排
  • 新增工作流类型 = 新增 YAML,不改引擎

对我们有启发的

  • 声明式 YAML 定义 pipeline(新增类型不改代码)
  • steps 和 dag 两种粒度(串行 vs DAG)
  • 模板可嵌套、可引用、可参数化

来源: Argo Concepts

4.4 Google ADKAgent 类型组合

核心机制

  • SequentialAgent、ParallelAgent、LoopAgent、CustomAgent
  • 用不同 Agent 类型组合 pipeline
  • Router Agent 做分类分发

对我们有启发的

  • 组合优于继承:用基本类型组合复杂 pipeline
  • Sequential = 串行 multi_step
  • Parallel = 并行协作
  • Loop = 审议循环

来源: Google ADK Multi-Agent

4.5 OpenClaw TaskFlow + Lobster

核心机制

  • TaskFlow:持久化多步 flow 管理(跨 gateway 重启)
  • Lobster:确定性 pipeline 运行时(步骤编排 + 审批门控 + resume token
  • Cron + Lobster + TaskFlow 组合:定时触发 + pipeline 执行 + 持久化追踪

对我们有启发的

  • moziplus 可以直接利用 OpenClaw 的 Lobster/TaskFlow
  • Lobster 的 approval = 人机交互 checkpoint
  • resume token = 中断后继续
  • 声明式 YAML 定义步骤

来源: /opt/homebrew/lib/node_modules/openclaw/docs/automation/taskflow.md

4.6 三省六部 Edict:制度化协作

核心机制

  • 分权制衡:太子→中书→门下→尚书→六部,不可越级
  • 门下省可封驳(驳回重做,最多3轮)
  • 状态机严格递进(9个状态)
  • 实时可观测看板

对我们有启发的

  • 多轮审议循环 = Loop Pipeline
  • 封驳机制 = 驳回重做
  • 制度约束 = guardrail 的升级版

来源: ~/.openclaw/knowledge_base/edict/docs/task-dispatch-architecture.md

4.7 Azure AI Agent Patterns

核心机制

  • Sequential(串行)、Parallel(并行)、Concurrent(并发独立)
  • Dynamic(动态调整计划)
  • 审计追踪(完整 reasoning step 记录)
  • 自愈(检测失败 → 诊断 → 修复 → 升级)

对我们有启发的

  • Dynamic plan = E3 场景(执行中调整计划)
  • 审计追踪 = D5 场景
  • 自愈 = E4 场景

来源: Azure Agent Patterns

4.8 AWS Incident Response Agent

核心机制

  • 事件触发 → 自动诊断 → 修复执行 → 验证 → 升级
  • 不可变审计日志(CloudTrail
  • Agent Space 级数据隔离

对我们有启发的

  • 事件驱动响应 = E2 场景
  • 自愈流程 = E4 场景

来源: AWS DevOps Agent


5. 设计模式候选

5.1 Strategy Pattern(策略模式)

适用:运行时根据类型选择不同执行策略。

PipelineRouter → Strategy A (MailPipeline)
               → Strategy B (TaskPipeline) 
               → Strategy C (CronPipeline)

优点:策略独立,新增类型只需新增类
缺点:策略间共享逻辑需要额外处理

5.2 Template Method Pattern(模板方法)

适用:pipeline 骨架固定,具体步骤可替换。

BasePipeline.execute():
  1. trigger()     — 触发方式
  2. pre_check()   — 前置检查
  3. route()       — 路由
  4. spawn()       — 执行
  5. post_check()  — 后置检查/验证
  6. complete()    — 完成/失败处理

优点:骨架共享,差异点明确
缺点:骨架变化影响所有子类;继承耦合

5.3 Plugin / Registry Pattern(插件注册)

适用:新增类型不改引擎,自动发现。

PipelineRegistry.register("mail", MailPipeline)
PipelineRegistry.register("task", TaskPipeline)
PipelineRegistry.register("cron", CronPipeline)
# 自动扫描 config/pipelines/ 目录加载

优点:开放封闭原则,新增类型零改动
缺点:需要约定接口契约

5.4 Chain of Responsibility(责任链)

适用:请求沿链传递,每个节点决定处理或传递。

TriggerChain: CronTrigger → WebhookTrigger → ManualTrigger → ...
RouteChain:   GuardrailCheck → CapabilityMatch → LLMRoute → ...
FailureChain: RetryHandler → RollbackHandler → EscalateHandler → ...

优点:每个节点只关心自己的逻辑,可组合
缺点:链条过长时难调试

5.5 Observer / Event-driven(观察者/事件驱动)

适用:任务状态变更时通知相关方。

Task.on("completed") → trigger dependent tasks (F2)
Task.on("failed")    → trigger failure handler (E4)
Task.on("checkpoint")→ notify human (C4)

优点:松耦合,支持事件驱动场景(E2, F1, F2)
缺点:事件流难追踪,调试复杂

5.6 Pipeline Pattern(管道模式)

适用:数据流经一系列处理阶段。

input → [Stage1] → [Stage2] → [Stage3] → output

每个 stage 有独立的输入/输出/错误处理。stage 之间通过共享上下文传递数据。

优点:每个 stage 独立可测试,可组合
缺点:跨 stage 状态管理需要额外设计


6. 场景 × 维度 × 模式交叉分析

维度 出现频率 最适配的模式
触发方式多样化(4种) 28个场景中占 4 种 Strategy(触发策略可替换)
执行模式多样化(5种) 核心差异点 Strategy + Template Method
人机交互(4级) 高风险场景必须 Pipeline Patternapproval gate
失败处理(4种) 所有场景都需要 Chain of Responsibility
执行工具可替换 C5 明确提出 Strategy(执行器抽象)
新增类型要方便 所有场景 Plugin/Registry

7. 调研结论与方向决策(2026-05-27 第二轮)

7.1 核心结论:不做 Pipeline 框架

经过第 1~6 节的调研和讨论,得出关键结论:

不需要代码层面的 Pipeline 框架。 各种执行模式(parallel/loop/saga/interactive)是执行路径的选择,不是代码层面的 Pipeline 类。Agent 自己根据黑板信息决定执行策略。

理由

  • 28 个场景的业务类型无限扩展,但执行模式不需要预定义
  • 两层抽象:业务类型(task_type,无限)+ 执行模式(Agent 自己选,不需要预定义)
  • 代码层硬编码执行模式 = 传统做法,不是 AI native

7.2 新方向:Daemon 退化 + Agent 进化

维度 当前(v2.7 未来(AI Native
Daemon 角色 调度器 + 路由器 + 决策者 投递员 + 看护人
Agent 角色 被动执行者(固定步骤 prompt 自主决策者(读黑板→想→干→写回)
谁决定执行路径 Daemonif/else + YAML Agent(根据黑板信息自主判断)
Agent 间通信 无(Daemon 中央调度) 黑板 handoff comment + observation

业界印证

  • Claude Code Agent TeamsAgent 自己 flock() claim 任务
  • Hermes KanbanAgent 有 kanban_* 工具直接操作黑板
  • PRD v3.0 B3:“黑板是唯一真相源,所有 Agent 读它、想、行动、写回结果”

7.3 Agent Prompt 结构对比调研

系统 “能做什么”怎么表达 “全局视角”怎么给 自主程度
Claude Code 工具列表(自动可用) 文件系统自己读
Hermes Kanban kanban_* 工具集(环境变量激活) kanban_show() 自己读
我们当前 prompt 里写 curl 命令模板 不给全局
v2.9 目标 API 列表 + 约束 黑板 API 自己读

7.4 工具评估

在第二轮讨论中评估了以下工具:

CodeGraph

  • 来源: knowledge_base/codegraph/
  • 功能: MCP 服务器,索引代码 AST(符号关系、调用图、影响分析)
  • 评估结论: 对 L3 知识注入无用
  • 理由: 索引的是代码文件的结构(函数/类/调用链),而 L3 知识注入检索的是 Markdown 蒸馏知识文档(practices/concepts/skills)。两个完全不同的数据类型和查询需求。
  • 潜在用途: 将来对编码 Agent 的工具增强(让张飞快速定位代码)有用,但不是知识注入。

Understand Anything

  • 来源: knowledge_base/understand-anything/
  • 功能: Claude Code 插件,支持 /understand-knowledge 命令解析 LLM Wiki 的 index.md + wikilinks,构建知识图谱可视化
  • 评估结论: ⚠️ 间接有用
  • 理由: /understand-knowledge 可以可视化 LLM Wiki 知识图谱(社区聚类 + 隐含关系发现),适合人工维护 wiki 时探索。但它是 Claude Code 插件,产出是 Dashboard 不是 API,不适合运行时 Daemon 调用。
  • 适合场景: 知识库页面 > 500 页时人工探索用,不是运行时组件。

MattPocock Handoff Skill

  • 来源: knowledge_base/mattpocock-skills/skills/in-progress/handoff/
  • 功能: 把当前对话压缩成一份交接文档(handoff document),让下一个 fresh agent 能接手
  • 评估结论: 直接有用,已采纳
  • 理由: 解决的是 Agent 间上下文传递的核心问题。当前 Daemon 中央摘要式传话信息逐层丢失(思考过程→设计权衡→遇到的问题)。Handoff 思路是让产出 Agent 主动写交接文档到黑板,下一个 Agent 自主读取。
  • 采纳方式: 不是装这个 skill(那是 Claude Code 用的),而是借鉴其设计理念改造 prompt。

7.5 知识注入调研:LLM Wiki 复用

LLM Wiki 已有完整基础设施,不需要另建 L3 知识层:

内容 数量
wiki-vault 总页面 ~273 个 md 文件
practices(蒸馏实践) 118 个
concepts 8 个
skills 5 个
projects 40+ 个

LLM Wiki 的检索分级原语直接复用为知识注入方案:

检索级别 原语 成本
第 1 级 grep index.md 的 summary 行 ~50 token/关键词
第 2 级 读页面 summary frontmatter 字段 ~100 token/页面
第 3 级 grep 页面内特定段落 ~200 token/段
第 4 级 读整个页面 ~500+ token/页

spawn 时用第 1 级即可:3 个关键词 × 2 条匹配 = 6 条 summary~300 token。便宜到可以每次都跑。

7.6 知识管理体系与本次设计的关系

知识管理体系 和 v2.8 设计关系 说明
LLM Wiki 三层架构 直接用 就是 L3 知识层
wiki-query skill 直接用 检索分级原语复用
wiki-ingest skill 后续用 新调研结果可 ingest 进 wiki
记忆分区(memory/ 四区) ⚠️ 正交 Agent 自身记忆管理,和 prompt 进化独立
Skill 三级约束 ⚠️ 正交 产出格式约束,和执行自主度独立
Skill 集群模板 不做 和 Agent 自主决策方向矛盾
四层加载机制 对齐 ①固化=SOUL.md ②注册=SKILL.md ③注入=BootstrapBuilder ④检索=wiki-query

7.7 最终决策

决策项 结论
是否新建 Pipeline 框架 不做。Agent 自主决定执行路径
是否新建 L3 知识库 不做。复用 LLM Wiki273 页)
是否新建黑板摘要注入 不做。Agent 自己读黑板
是否新建 blackboard_* 工具 优先级低。curl 够用
是否采纳 Handoff 理念 是。Agent 主动写交接文档到黑板
是否采纳 LLM Wiki 检索原语 是。spawn 前 grep index.md 注入 summary

8. 调研产出

文件 说明
docs/research/pipeline-architecture-research.md 本文档:28场景+8实践+6模式+方向决策
docs/design/v2.8-direction-notes.md 设计方向备忘:核心结论+改动清单+工具评估
docs/design/v2.8-pipeline-architecture.md Pipeline 架构设计 v1.0~v2.0已废弃
docs/design/v2.8-task-type-pipeline.md Task Type Pipeline 设计(已废弃
docs/design/v2.7.2-pipeline-refactor.md v2.7.2 Pipeline 分离设计(部分已完成)

9. 下一步

基于调研结论,具体实现计划见 docs/design/v2.8-direction-notes.md 第五节。