From 0c6608aa098db1a0bba68de38ff2c43c38f29270 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Wed, 27 May 2026 00:04:42 +0800 Subject: [PATCH] auto-sync: 2026-05-27 00:04:42 --- docs/design/v2.8-pipeline-architecture.md | 937 +++++++++++++--------- docs/research/distill-reorg-v2.md | 174 ++++ 2 files changed, 716 insertions(+), 395 deletions(-) create mode 100644 docs/research/distill-reorg-v2.md diff --git a/docs/design/v2.8-pipeline-architecture.md b/docs/design/v2.8-pipeline-architecture.md index d14b376..0f6531b 100644 --- a/docs/design/v2.8-pipeline-architecture.md +++ b/docs/design/v2.8-pipeline-architecture.md @@ -1,9 +1,11 @@ # v2.8 Pipeline 架构设计 -**版本**: v1.0 -**日期**: 2026-05-26 +**版本**: v2.0 +**日期**: 2026-05-27 **作者**: 庞统 -**状态**: 调研完成 → 设计提案,待用户确认 +**状态**: 调研完成 → 设计提案 v2.0,待用户确认 + +> v1.0→v2.0 变更:补充执行模式调研(parallel/loop/saga/interactive/event-driven),补充场景×执行模式映射表,明确业务类型 vs 执行模式的分层关系。 --- @@ -12,103 +14,316 @@ 1. **消灭 46 处 if/_mail 分支**(ticker 14 + dispatcher 23 + spawner 9) 2. **不同 task_type 走不同执行路径**(代码评审不需要 planning,数据下载不需要路由) 3. **新增 task_type 只加配置不改核心代码**(Plugin/Registry) -4. **和 spawner-monitor 设计自然融合**(执行层共享,调度层分化) +4. **执行模式是有限的几种,业务类型无限扩展**(用户选业务类型,系统映射到执行模式) -## 2. 设计模式选择 +--- -### 2.1 调研结论回顾 +## 2. 两层抽象:业务类型 vs 执行模式 -| 维度 | 需求 | 最适配模式 | -|------|------|-----------| -| 按类型分发 | 入口统一,分发独立 | **Routing + Registry**(LangGraph) | -| 骨架共享 + 步骤可替换 | tick → load → route → spawn → complete | **Template Method** | -| 新增类型零改动 | 声明式配置 | **Plugin/Registry**(Argo) | -| 触发方式可替换 | cron / webhook / manual / event | **Strategy** | -| 失败处理链式 | retry → rollback → escalate | **Chain of Responsibility** | -| 事件通知 | 完成后触发依赖 | **Observer**(Phase 2) | +### 2.1 概念 -### 2.2 选定组合 +| 层 | 定义 | 谁选 | 数量 | 例子 | +|----|------|------|------|------| +| **业务类型**(task_type) | 用户视角的"做什么" | 用户创建任务时选 | 无限扩展 | coding, review, data, deploy, research, discuss, data_collect, risk_patrol, ... | +| **执行模式**(execution_pattern) | 系统视角的"怎么跑" | YAML Profile 声明 | 有限(~7种) | single_step, multi_step, parallel, loop, saga, interactive, event_trigger | + +### 2.2 映射关系 + +```yaml +# config/pipelines/review.yaml +task_type: review # 业务类型(用户视角) +execution_pattern: single_step # 执行模式(系统视角) +``` + +**业务类型无限扩展,执行模式有限**。新增业务类型 = 新增一个 YAML。新增执行模式 = 新增代码。 + +--- + +## 3. 执行模式调研 + +> 每种模式的结论来自外部来源,不自己发明。 + +### 3.1 single_step(单步执行) + +**描述**:一个 agent 一次 spawn 就完成。 + +**适用场景**:代码评审、数据下载、Mail 投递、异常监控告警。 + +**优秀实践**: +- Temporal 的 Activity 概念——原子操作,独立执行,失败可重试(来源:Temporal docs) +- Hermes Kanban 的 `todo → ready → claimed → running → done` 生命周期——单步但状态完整(来源:知识库 hermes-agent) +- OpenClaw Lobster 单步 command——`exec --shell 'cmd --json'`(来源:`/opt/homebrew/lib/node_modules/openclaw/docs/tools/lobster.md`) + +**关键设计点**: +- 完成 ≠ 成功:需要 verification(如检查产出文件是否存在) +- 失败可重试,但有上限 + +### 3.2 multi_step(多步串行) + +**描述**:多个 stage 依次执行,前一步的输出是后一步的输入。 + +**适用场景**:编码(plan→implement→verify)、调研(plan→research→report)、部署(build→test→deploy)。 + +**优秀实践**: +- Lobster 的 `steps` + `stdin: $step.stdout` 管道——声明式步骤链,步骤间 JSON 管道传递(来源:`lobster.md`) +- OpenClaw TaskFlow——持久化多步 flow 管理,跨 gateway 重启(来源:`taskflow.md`) +- Temporal Workflow——步骤自动持久化,不怕进程重启(来源:Temporal docs) + +**关键设计点**: +- 步骤间数据传递:通过 DB(stages_json)或 prompt 注入前序 stage 的 output +- 每个 stage 对应一次 agent spawn +- 中间 stage 失败 → 可从当前 stage 重试(不需要从头开始) + +### 3.3 parallel(并行执行 / Fan-Out / Scatter-Gather) + +**描述**:同一个任务拆成多个子任务并行执行,结果汇聚。 + +**适用场景**:批量回测(N 组参数)、并行协作(张飞写代码 + 关羽做风控)、多角度评审(乐观/悲观/务实三路并行)。 + +**优秀实践**: + +| 来源 | 做法 | 关键点 | +|------|------|--------| +| **AWS Prescriptive Guidance** | scatter-gather:分发→并行执行→汇聚 | "sends tasks to multiple services in parallel, waits for responses, aggregates results"(web_search 摘要) | +| **Azure Architecture Center** | concurrent orchestration / fan-out-fan-in / map-reduce | "also known as parallel, fan-out/fan-in, scatter-gather, map-reduce"(web_search 摘要) | +| **Temporal** | `asyncio.gather(*futures, return_exceptions=True)` | `return_exceptions=True` 意味着部分失败不阻塞整体,结果里包含异常(来源:dev.to Temporal AI agents) | +| **open-multi-agent** | `AgentPool.runParallel()` + fan-out-aggregate pattern | 多 agent 并行,结果由 synthesizer 聚合(来源:知识库 `examples/patterns/fan-out-aggregate.ts`) | +| **GSD (get-shit-done)** | Wave Execution——按依赖分组,wave 内并行,wave 间串行 | "Plans are grouped into waves. Within each wave, plans run in parallel. Waves run sequentially."(来源:知识库 `get-shit-done/README.md`) | + +**关键设计点**: +1. **结果汇聚**:需要一个 aggregator(可以是固定逻辑,也可以是一个 agent) +2. **部分失败处理**: + - `fail_fast`:一个失败全部取消(适合 saga 类场景) + - `best_effort`:收集所有结果,失败标记,汇总时报告(适合批量回测) +3. **并发控制**:受 counter 全局上限约束(`max_global_agents`) +4. **GSD 的 Wave 启发**:parallel 可以和 multi_step 组合——一个多步 pipeline 的某个 stage 是并行的 + +### 3.4 loop(循环 / 审议 / Evaluator-Reflect-Refine) + +**描述**:执行→评审→不通过→修改→再提交,循环直到通过或达到上限。 + +**适用场景**:审议循环(门下省封驳重做)、自愈(检测→诊断→修复→验证循环)、质量迭代(写→评审→改→再审)。 + +**优秀实践**: + +| 来源 | 做法 | 关键点 | +|------|------|--------| +| **AWS Prescriptive Guidance** | Evaluator-Reflect-Refine Loop | "The loop repeats until the result meets criteria, is approved, or reaches a retry limit"(web_search 摘要) | +| **三省六部 Edict** | 门下省封驳→返回中书省→修改→重新审议,最多 3 轮 | 制度化循环,有硬上限(来源:知识库 `edict/docs/task-dispatch-architecture.md`) | +| **Cloudflare Agents** | Human-in-the-loop + schedule escalation | "Implement timeouts — use schedule() to escalate or auto-reject after reasonable periods"(web_search 摘要) | +| **Reddit AI_Agents 实战** | "Set a hard rule: if no CRITICAL items, output APPROVED — loop terminates. Max 3 cycles, then escalate to human"(web_search 摘要) | + +**关键设计点**: +1. **终止条件**:通过 / 达到 max_rounds(硬上限)/ 人工介入 +2. **收敛保障**:max_rounds 防止无限循环 +3. **上下文传递**:每次循环需要把上一次的评审意见传给执行者 +4. **和 multi_step 的关系**:loop 是 multi_step 的某个阶段加上循环条件。可以建模为 `multi_step + loop_guard` + +### 3.5 saga(补偿事务) + +**描述**:多步串行执行,任何一步失败,反向执行之前步骤的补偿动作。 + +**适用场景**:数据 ETL(下载→清洗→入库,入库失败要清掉已洗的数据)、实盘交易(下单→成交确认→风控检查,失败需要撤单)、策略上线(回测→模拟盘→实盘,失败需要回退)。 + +**优秀实践**: + +| 来源 | 做法 | 关键点 | +|------|------|--------| +| **SagaLLM (arXiv:2503.11951)** | AI Agent 场景的 saga——保留 context 用于 recovery,按 dependency graph 反向执行补偿 | "Upon failure, system invokes compensatory actions using logs and rollback specifications stored in... traverses dependency graph to orchestrate reverse execution paths"(web_search 摘要) | +| **SparkCo** | 每步定义 forward + compensate 动作,orchestrator 管理补偿链 | "Each step treated as independent transaction, with compensating actions defined for rollback scenarios"(web_search 摘要) | +| **Temporal** | 内置 Saga 支持:`await workflow.execute_activity(compensate_A)` | 每个 Activity 注册对应的补偿函数(来源:Temporal docs) | + +**关键设计点**: +1. **补偿动作定义**:每个 stage 必须声明 `compensate` 动作(可以是 agent spawn 或固定脚本) +2. **补偿链**:失败后从当前 stage 反向执行之前所有 stage 的 compensate +3. **补偿也可能失败**:需要记录补偿状态,人工介入 +4. **和 multi_step 的关系**:saga 是 multi_step + compensate 定义。可以建模为 `multi_step + per-stage-compensate` + +### 3.6 interactive(人机交互 / Human-in-the-Loop) + +**描述**:执行到关键点暂停等人确认,确认后继续。 + +**适用场景**:策略上线审批、实盘交易确认、多级审批链。 + +**优秀实践**: + +| 来源 | 做法 | 关键点 | +|------|------|--------| +| **OpenClaw Lobster** | `approval: required` + `resumeToken`——暂停等审批,resume 不重跑已完成步骤 | "Halted workflows return a token; approve and resume without re-running everything"(来源:`lobster.md`) | +| **Redis AI HITL** | 工作流暂停在 decision point,保存 state,等 human approve/reject/modify 后 resume | "Workflow pauses at decision point, saves state, waits for human to approve/reject/modify before resuming"(web_search 摘要) | +| **Cloudflare Agents** | 审批超时自动 escalate 或 auto-reject | "Implement timeouts — use schedule() to escalate or auto-reject"(web_search 摘要) | +| **MindStudio** | Iterative Kanban Pattern——agent 产出 → human 评审 → agent 修改 → human 确认 | "Task gets handed to agent, agent produces output, human reviews, sends it back, agent revises, human approves"(web_search 摘要) | + +**关键设计点**: +1. **resume 机制**:暂停时保存状态 + 产出 token,恢复时从 checkpoint 继续 +2. **超时处理**:审批等待超时 → escalate / auto-reject / auto-approve(可配置) +3. **Lobster 启发**:moziplus 可以直接用 Lobster 的 `approval: required` 模式,不需要自己实现暂停/恢复 +4. **和 multi_step 的关系**:interactive 是 multi_step 的某个 stage 加了 approval 门控。可以建模为 `multi_step + approval_stage` + +### 3.7 event_trigger(事件触发 / 统一触发入口) + +**描述**:不是一种执行模式,而是一种触发方式。和执行模式正交——任何执行模式都可以被事件触发。 + +**适用场景**:cron 定时触发、webhook 触发、依赖完成触发、条件触发。 + +**优秀实践**: + +| 来源 | 做法 | 关键点 | +|------|------|--------| +| **Gobii (hermes-agent #491)** | 统一 scheduled 和 event-driven 到一个 durable processing loop——cron / IMAP idle / webhook / agent-to-agent message 全部统一 | "unifies scheduled and event-driven execution into a single durable processing loop"(web_search 摘要) | +| **AgentC2** | Agent 同时支持 cron + webhook,hybrid pattern | "Agent can have both cron schedule and event triggers — daily summary at 6 AM + webhook for critical alerts"(web_search 摘要) | +| **Edict** | Event Bus + 主题路由(`task.created`, `task.completed` 等) | 所有状态变更发布事件,订阅者按需触发(来源:知识库 `edict/edict_agent_architecture.md`) | +| **OpenClaw Cron** | `cron add --cron "0 7 * * 1-5" --session session:xxx --message "..."` | 定时触发 agent session,声明式配置(来源:OpenClaw cron 工具) | + +**关键设计点**: +1. **触发方式 vs 执行模式正交**:trigger 是"什么时候开始",execution_pattern 是"开始后怎么跑" +2. **四种触发**:manual(人工)/ cron(定时)/ webhook(外部事件)/ dependency(前序完成) +3. **Gobii 启发**:统一到一个 loop,不需要为每种触发建独立系统 +4. **OpenClaw Cron 已有能力**:当前 moziplus 可以直接用 OpenClaw cron 做定时触发,不需要自己实现 + +--- + +## 4. 执行模式关系图 + +``` + ┌─────────────────────────────┐ + │ Trigger(正交层) │ + │ manual / cron / webhook / │ + │ dependency │ + └──────────────┬──────────────┘ + │ 触发任务 + ▼ +┌──────────┐ ┌──────────────┐ ┌──────────┐ ┌──────────┐ +│ single │ │ multi_step │ │ parallel │ │ mail │ +│ _step │ │ │ │ │ │ │ +└──────────┘ └──────┬───────┘ └──────────┘ └──────────┘ + │ + ┌────────┼────────┐ + ▼ ▼ ▼ + ┌──────────┐ ┌──────┐ ┌──────┐ + │interactive│ │ loop │ │ saga │ + │ (审批门控)│ │(循环)│ │(补偿)│ + └──────────┘ └──────┘ └──────┘ +``` + +**关系说明**: +- `single_step` 和 `multi_step` 是两种基础执行模式 +- `parallel` 是独立的执行模式(并行子任务) +- `mail` 是特殊的 single_step(独立 pipeline,不走通用路径) +- `interactive`、`loop`、`saga` 是 **multi_step 的增强**——它们在 multi_step 骨架上加了额外能力 +- `trigger` 和执行模式正交 + +### 建模方式 + +| 增强类型 | 本质 | 建模 | +|---------|------|------| +| interactive | multi_step + approval stage | `multi_step` + profile 中某些 stage 声明 `approval: required` | +| loop | multi_step + 循环 guard | `multi_step` + profile 中声明 `loop: { max_rounds: 3, convergence_check: true }` | +| saga | multi_step + compensate | `multi_step` + profile 中每个 stage 声明 `compensate: { agent: "...", message: "..." }` | + +**不需要 3 个独立的 Pipeline 子类**。只需要 `MultiStepPipeline` 识别 profile 中的增强声明,执行时加入对应逻辑。 + +--- + +## 5. 场景 × 执行模式映射 + +### 5.1 已有业务类型 + +| 业务类型 | 执行模式 | 触发方式 | 说明 | +|---------|---------|---------|------| +| **coding** | multi_step | manual | plan→implement→verify→review | +| **review** | single_step | manual / dependency | 固定 agent,一杆到底 | +| **data** | single_step | manual / cron | 固定 agent(赵云),下载+验证 | +| **deploy** | multi_step (interactive) | manual | build→test→approve→deploy | +| **research** | multi_step | manual | plan→research→report | +| **discuss** | single_step | manual | 讨论,一杆到底 | +| **mail (request)** | mail | API 创建 | 点对点投递 | +| **mail (inform)** | mail | API 创建 | 点对点,自动 done | + +### 5.2 调研的 28 个场景映射 + +| # | 场景 | 业务类型 | 执行模式 | 触发 | +|---|------|---------|---------|------| +| A1 | Mail 投递 | mail | mail | API | +| A2 | Task 派发 | coding/review/... | 各自映射 | manual | +| B1 | 定时数据采集 | data_collect | single_step | cron | +| B2 | 早朝简报 | morning_brief | multi_step | cron | +| B3 | 批量回测 | batch_backtest | parallel | manual | +| B4 | 风控巡查 | risk_patrol | single_step | cron / event | +| B5 | 策略上线审批 | strategy_launch | multi_step (interactive) | manual | +| B6 | 实盘交易信号 | trade_signal | multi_step (saga) | event | +| C1 | Webhook 触发 | (任意) | 由 task_type 决定 | webhook | +| C2 | Saga 链式任务 | data_etl | multi_step (saga) | manual / dependency | +| C3 | 并行协作 | parallel_collab | parallel | manual | +| C4 | 人机交互 | (任意 multi_step) | multi_step (interactive) | manual | +| C5 | 执行工具替换 | (架构层) | 不映射 | N/A | +| D1 | 隔夜编码 | overnight_coding | multi_step | cron | +| D2 | 持仓再平衡 | portfolio_rebalance | multi_step (interactive) | cron | +| D3 | 交易异常监控 | fraud_detect | single_step | event | +| D4 | 数据 ETL 管道 | data_etl | multi_step (saga) | cron / dependency | +| D5 | 合规审计追踪 | audit_trail | multi_step | event | +| D6 | 策略实验跟踪 | experiment_track | multi_step | manual | +| E1 | 三省六部分权 | edict_dispatch | multi_step (loop) | manual | +| E2 | 事件驱动响应 | (任意) | 由 task_type 决定 | event | +| E3 | 动态计划调整 | dynamic_plan | multi_step | manual | +| E4 | 自愈/故障恢复 | self_heal | multi_step (loop) | event | +| E5 | 审批链 | approval_chain | multi_step (interactive) | manual | +| E6 | 成本/配额控制 | quota_guard | single_step | event | +| E7 | 经验蒸馏 | experience_distill | single_step | dependency | +| F1 | 条件触发 | (任意) | 由 task_type 决定 | event (condition) | +| F2 | 依赖触发 | (任意) | 由 task_type 决定 | dependency | +| F3 | 人工触发 | (任意) | 由 task_type 决定 | manual | + +--- + +## 6. 设计方案 + +### 6.1 设计模式选择 ``` Registry + Template Method + Strategy ``` - **Registry**:Pipeline 注册中心,按 task_type 查找 -- **Template Method**:Pipeline 基类定义 `tick()` 骨架,子类覆写差异步骤 +- **Template Method**:Pipeline 基类定义 tick() 骨架,子类覆写差异步骤 - **Strategy**:触发方式、路由策略、验证策略作为可注入的策略对象 -不选: -- **Chain of Responsibility**:当前失败处理只有 3 种(retry/escalate/abort),不值得引入链式抽象 -- **Observer**:依赖触发(F2)放 Phase 2,当前不实现 -- **纯 Strategy**:Pipeline 间共享逻辑太多(spawn/monitor/counter),纯策略会重复 +来源:LangGraph Routing Pattern(来源:web_search "LangGraph Routing")、Argo WorkflowTemplate(来源:Argo docs)、Temporal Worker Framework(来源:Temporal docs)。 -### 2.3 业界印证 - -| 系统 | 做法 | 我们的对应 | -|------|------|-----------| -| Temporal | Workflow Type 注册不同处理函数 | Pipeline Registry | -| LangGraph | Router → Sub-graph | PipelineRouter → Pipeline | -| Argo | YAML WorkflowTemplate | Pipeline Profile YAML | -| Google ADK | SequentialAgent/ParallelAgent/LoopAgent | SingleStepPipeline/MultiStepPipeline(Phase 2 加并行) | - ---- - -## 3. 架构设计 - -### 3.1 整体结构 +### 6.2 Pipeline 类体系 ``` -┌─────────────────────────────────────────────┐ -│ Ticker │ -│ tick() → 遍历 project → _tick_project() │ -└───────────┬─────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────┐ -│ PipelineRouter │ -│ route(task_type) → Pipeline │ -│ route("_mail") → MailPipeline │ -│ route("review") → SingleStepPipeline │ -│ route(None) → DefaultPipeline │ -└───────────┬─────────────────────────────────┘ - │ - ┌──────┼──────┬──────────────┐ - ▼ ▼ ▼ ▼ -┌────────┐┌────────┐┌──────────┐┌────────┐ -│ Mail ││ Single ││ Default ││ Cron │ -│Pipeline││ Step ││ (Multi) ││(Phase2)│ -└────────┘└────────┘└──────────┘└────────┘ - │ │ │ - └────┬────┴────┬────┘ - ▼ ▼ - ┌──────────┐ ┌──────────┐ - │ Spawner │ │ Counter │ - │ (共享) │ │ (共享) │ - └──────────┘ └──────────┘ +TaskPipeline (ABC) # Template Method 骨架 +├── MailPipeline # Mail 专属(硬编码,不走 YAML) +├── SingleStepPipeline # 单步执行 +├── MultiStepPipeline # 多步执行(含增强) +│ ├── 识别 interactive 声明 → approval gate +│ ├── 识别 loop 声明 → 循环 guard +│ └── 识别 saga 声明 → compensate chain +└── ParallelPipeline # 并行执行(fan-out + aggregate) + +PipelineRouter # Registry,按 task_type 路由 ``` -### 3.2 Pipeline 基类(Template Method) +**为什么只有 4 个子类**:interactive/loop/saga 不是独立子类,而是 MultiStepPipeline 通过 profile 声明启用的增强能力。 + +### 6.3 Pipeline 基类 ```python class TaskPipeline(ABC): - """Pipeline 基类,定义 tick() 骨架""" + """Pipeline 基类(Template Method)""" - def __init__(self, spawner: Spawner, counter: ActiveAgentCounter, - router: Router, config: dict): + def __init__(self, spawner, counter, router, config, profile=None): self.spawner = spawner self.counter = counter self.router = router self.config = config + self.profile = profile or {} async def tick(self, db_path: Path, project_config: dict) -> TickResult: - """每个 ticker 周期的入口(Template Method)""" - # 1. 前置处理(超时检测、依赖推进等) + """每个 ticker 周期的入口""" await self.pre_tick(db_path) - - # 2. 加载待处理任务 tasks = self.load_pending(db_path) - - # 3. 逐个处理 dispatched = 0 for task in tasks: if not self.can_dispatch(task, db_path): @@ -116,192 +331,121 @@ class TaskPipeline(ABC): result = await self.dispatch_one(task, db_path, project_config) if result: dispatched += 1 - return TickResult(dispatched=dispatched, total=len(tasks)) - # ── 钩子方法(子类覆写)── - - async def pre_tick(self, db_path: Path): - """前置处理(超时检测、依赖推进)。默认只做超时检测""" + # ── 钩子方法 ── + async def pre_tick(self, db_path): self.check_timeouts(db_path) @abstractmethod - def load_pending(self, db_path: Path) -> list: - """加载待处理任务""" - ... + def load_pending(self, db_path) -> list: ... - def can_dispatch(self, task, db_path: Path) -> bool: - """是否可以 dispatch(guardrail、counter 等)""" + def can_dispatch(self, task, db_path) -> bool: return True @abstractmethod - async def dispatch_one(self, task, db_path: Path, project_config: dict) -> bool: - """处理单个任务""" - ... + async def dispatch_one(self, task, db_path, project_config) -> bool: ... - def check_timeouts(self, db_path: Path): - """超时检测(共享逻辑)""" - ... + def check_timeouts(self, db_path): ... ``` -### 3.3 Pipeline 实现 - -#### MailPipeline +### 6.4 MultiStepPipeline 增强 ```python -class MailPipeline(TaskPipeline): - """Mail 投递管道""" +class MultiStepPipeline(TaskPipeline): + """多步执行(含 interactive/loop/saga 增强)""" - async def pre_tick(self, db_path): - # Mail 也需要超时检测 - self.check_timeouts(db_path) - # 不需要依赖推进 + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # 从 profile 读取增强配置 + self.loop_config = self.profile.get("loop") # { max_rounds, convergence_check } + self.saga_config = self.profile.get("saga") # { stages: [{compensate: ...}] } + self.interactive_stages = self._extract_approval_stages() - def load_pending(self, db_path): - # 只加载 pending 状态 - return load_tasks(db_path, statuses=["pending"]) - - def can_dispatch(self, task, db_path): - # Mail 不走 guardrail - # 但需要检查 counter - return self.counter.can_acquire(task.assignee, "main") + def _extract_approval_stages(self): + """从 profile stages 中提取需要 approval 的 stage""" + return [s for s in self.profile.get("stages", []) + if s.get("approval") == "required"] async def dispatch_one(self, task, db_path, project_config): - agent_id = task.assignee # 直取,不路由 + stage = self._get_current_stage(task) - # 标记 working - mark_working(db_path, task.id, agent_id) + # interactive: approval gate + if stage.get("approval") == "required" and not self._is_approved(task, stage): + return False # 等审批,不 spawn - # 构造 prompt - message = self.spawner.build_mail_prompt(task) - - # spawn - def on_complete(aid, outcome, session_id): - self._on_mail_complete(task.id, aid, session_id, db_path, task.must_haves) - - return await self.spawner.spawn_full_agent( - agent_id=agent_id, - message=message, - task_id=task.id, - use_main_session=True, - on_complete=on_complete, - project_id="_mail", - db_path=db_path, - ) + # spawn agent + agent_id = self._resolve_agent(task, stage) + message = self._build_stage_prompt(task, stage) + return await self._spawn_and_track(agent_id, message, task, stage, db_path) - def _on_mail_complete(self, task_id, agent_id, session_id, db_path, must_haves): - # 幻觉门控 + 自动标 done + inform 自动完成 - ... + def _on_stage_complete(self, task, stage, outcome, db_path): + # saga: 记录 compensate 动作 + if self.saga_config and stage.get("compensate"): + self._record_compensate(task, stage, db_path) + + # loop: 检查收敛 + if self.loop_config: + round_num = self._get_current_round(task) + if outcome == "rejected" and round_num < self.loop_config["max_rounds"]: + self._advance_to_next_round(task, db_path) + return + elif round_num >= self.loop_config["max_rounds"]: + self._escalate(task, db_path, reason="loop_max_rounds") + return + + # 正常推进到下一个 stage + self._advance_stage(task, db_path) ``` -#### SingleStepPipeline(代码评审、数据下载等) +### 6.5 ParallelPipeline ```python -class SingleStepPipeline(TaskPipeline): - """单步执行管道:route → spawn → verify → done""" +class ParallelPipeline(TaskPipeline): + """并行执行(fan-out + aggregate)""" - def __init__(self, spawner, counter, router, config, profile): - super().__init__(spawner, counter, router, config) - self.profile = profile # 从 YAML 加载 - - async def pre_tick(self, db_path): - # 不做依赖推进 - self.check_timeouts(db_path) - - def load_pending(self, db_path): - return load_tasks(db_path, statuses=["pending"], - task_type=self.profile.get("task_type")) - - def can_dispatch(self, task, db_path): - # 检查 guardrail - if self.config.get("guardrails"): - violations = self.guardrails.check_task(task) - if violations: - mark_blocked(db_path, task.id, reason=violations) - return False - return True + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.failure_policy = self.profile.get("failure_policy", "best_effort") + self.aggregator = self.profile.get("aggregator", {}) async def dispatch_one(self, task, db_path, project_config): - # 路由(或固定 agent) - agent_id = self._resolve_agent(task) + # 1. 拆分子任务 + sub_tasks = self._fan_out(task) - # counter 检查 - if not self.counter.can_acquire(agent_id, task.id): - return False + # 2. 并行 spawn(受 counter 约束) + futures = [] + for sub in sub_tasks: + agent_id = self._resolve_agent(sub) + if not self.counter.can_acquire(agent_id, sub.id): + continue + self.counter.acquire(agent_id, sub.id) + futures.append(self._spawn_subtask(agent_id, sub, db_path)) - # 标记 working - mark_working(db_path, task.id, agent_id) + # 3. 等待所有完成 + results = await asyncio.gather(*futures, return_exceptions=True) - # prompt - message = self.spawner.build_task_prompt(task, self.profile) + # 4. 处理部分失败 + if self.failure_policy == "fail_fast": + if any(isinstance(r, Exception) for r in results): + self._cancel_remaining(futures) + self._mark_failed(task, db_path) + return True + # best_effort: 继续聚合 - # spawn - def on_complete(aid, outcome, session_id): - self.counter.release(aid, session_id) - self._handle_completion(task.id, aid, outcome, db_path) - - return await self.spawner.spawn_full_agent( - agent_id=agent_id, - message=message, - task_id=task.id, - use_main_session=False, - on_complete=on_complete, - project_id=project_config["id"], - db_path=db_path, - ) - - def _resolve_agent(self, task): - """根据 profile 配置解析 agent""" - sel = self.profile.get("agent_selection", "route") - if sel == "specific_agent": - return self.profile["specific_agent"] - elif sel == "route_by_capability": - return self.router.decide(task) + # 5. 聚合结果 + if self.aggregator.get("type") == "agent": + await self._aggregate_with_agent(task, results, db_path) else: - return self.router.decide(task) - - def _handle_completion(self, task_id, agent_id, outcome, db_path): - if outcome == "completed": - mark_done(db_path, task_id) - elif outcome in ("timeout", "api_error"): - # 等 ticker 重试 - mark_pending(db_path, task_id) - else: - mark_failed(db_path, task_id, reason=outcome) -``` - -#### DefaultPipeline(多步 Task,当前主流程) - -```python -class DefaultPipeline(TaskPipeline): - """默认多步管道:planning → executing → review → done""" - - async def pre_tick(self, db_path): - # 依赖推进 - self.advance_dependencies(db_path) - # 超时检测 - self.check_timeouts(db_path) - - def load_pending(self, db_path): - # 加载 pending + assigned + executing(续杯场景) - return load_tasks(db_path, statuses=["pending", "assigned", "executing"]) - - def can_dispatch(self, task, db_path): - # guardrail 检查 - ... + self._aggregate_simple(task, results, db_path) + return True - - async def dispatch_one(self, task, db_path, project_config): - # 当前完整流程:planning → route → spawn → review - ... ``` -### 3.4 PipelineRouter(Registry) +### 6.6 PipelineRouter ```python class PipelineRouter: - """按 task_type 路由到对应 Pipeline""" - def __init__(self): self._pipelines: Dict[str, TaskPipeline] = {} self._default: TaskPipeline = None @@ -315,7 +459,139 @@ class PipelineRouter: return self._pipelines.get(task_type, self._default) ``` -### 3.5 Ticker 改造 +### 6.7 Pipeline Profile(YAML 声明) + +```yaml +# config/pipelines/coding.yaml +task_type: coding +execution_pattern: multi_step +stages: + - id: plan + description: "分析需求,制定方案" + agent_selection: route_by_capability + capabilities_required: ["coding", "design"] + timeout_minutes: 10 + - id: implement + description: "编码实现" + reuse_plan_agent: true + timeout_minutes: 30 + depends_on: [plan] + - id: verify + description: "自动验证" + specific_agent: simayi-challenger + timeout_minutes: 15 + depends_on: [implement] +completion: + status: done + require_verification: true +``` + +```yaml +# config/pipelines/review.yaml +task_type: review +execution_pattern: single_step +agent_selection: specific_agent +specific_agent: simayi-challenger +timeout_minutes: 20 +completion: + status: done +``` + +```yaml +# config/pipelines/strategy_launch.yaml +task_type: strategy_launch +execution_pattern: multi_step +stages: + - id: backtest + description: "回测验证" + specific_agent: zhangfei-dev + timeout_minutes: 60 + - id: simulate + description: "模拟盘运行" + specific_agent: zhangfei-dev + timeout_minutes: 120 + depends_on: [backtest] + - id: approve + description: "上线审批" + approval: required # ← interactive 增强 + timeout_hours: 24 # 审批超时 + timeout_action: escalate # auto_reject / escalate / auto_approve + depends_on: [simulate] + - id: live + description: "实盘部署" + specific_agent: jiangwei-infra + timeout_minutes: 30 + depends_on: [approve] +``` + +```yaml +# config/pipelines/data_etl.yaml +task_type: data_etl +execution_pattern: multi_step +stages: + - id: download + description: "数据下载" + specific_agent: zhaoyun-data + timeout_minutes: 60 + compensate: # ← saga 增强 + action: "删除已下载文件" + command: "rm -rf /tmp/download/{task_id}" + - id: clean + description: "数据清洗" + specific_agent: zhaoyun-data + timeout_minutes: 30 + depends_on: [download] + compensate: + action: "删除已清洗文件" + command: "rm -rf /tmp/clean/{task_id}" + - id: validate + description: "数据验证" + specific_agent: zhaoyun-data + timeout_minutes: 15 + depends_on: [clean] +``` + +```yaml +# config/pipelines/edict_dispatch.yaml +task_type: edict_dispatch +execution_pattern: multi_step +loop: # ← loop 增强 + max_rounds: 3 + convergence_check: "review_passed" +stages: + - id: plan + description: "中书省规划" + specific_agent: zhongshu-agent + timeout_minutes: 15 + - id: review + description: "门下省审议" + specific_agent: menxia-agent + timeout_minutes: 10 + depends_on: [plan] + # review 不通过 → loop 回到 plan(最多3轮) + - id: dispatch + description: "尚书省派发" + depends_on: [review] +``` + +```yaml +# config/pipelines/batch_backtest.yaml +task_type: batch_backtest +execution_pattern: parallel +failure_policy: best_effort # 部分失败不阻塞 +aggregator: + type: agent # 用 agent 聚合结果 + specific_agent: pangtong-fujunshi +fan_out: + source: task.params.param_sets # 从任务参数获取参数组列表 + per_item: + specific_agent: zhangfei-dev + timeout_minutes: 30 +``` + +--- + +## 7. Ticker 改造 ```python class Ticker: @@ -328,197 +604,68 @@ class Ticker: if is_mail: pipeline = self.pipeline_router.route(is_mail=True) - else: - # 读取该 project 所有活跃的 task_type - task_types = self._get_active_task_types(db_path) - # 当所有 task_type 走同一个 pipeline 时,直接用默认 - if len(task_types) <= 1: - pipeline = self.pipeline_router.route() - else: - # 多种 task_type 需要遍历 - results = [] - for tt in task_types: - p = self.pipeline_router.route(tt) - r = await p.tick(db_path, project_config) - results.append(r) - return merge_results(results) + return await pipeline.tick(db_path, project_config) - return await pipeline.tick(db_path, project_config) + # 查所有活跃 task_type,每种走对应 pipeline + task_types = self._get_active_task_types(db_path) + results = [] + for tt in task_types: + p = self.pipeline_router.route(tt) + r = await p.tick(db_path, project_config) + results.append(r) + return merge_results(results) ``` --- -## 4. 从现有代码提取 +## 8. 实施范围 -### 4.1 完全共享(留在 Spawner,不动) +### 一次性全做 -| 方法 | 当前位置 | 说明 | -|------|---------|------| -| `spawn_full_agent()` | spawner.py | spawn 进程机制 | -| `_monitor_process()` | spawner.py | 监控 + 情况 A/B 分类 | -| `_classify_outcome()` | spawner.py | exit 分类 | -| `_do_retry()` | spawner.py | 续杯机制 | -| `build_task_prompt()` | spawner.py | Task prompt(可被 Pipeline 覆写) | -| `build_mail_prompt()` | spawner.py | Mail prompt(搬到 MailPipeline) | +| # | 事项 | 预估行数 | +|---|------|---------| +| 1 | 新建 `pipeline/` 目录:base.py, mail.py, single_step.py, multi_step.py, parallel.py, router.py | ~500 行 | +| 2 | Pipeline Profile YAML(coding, review, data, deploy, research, discuss) | ~200 行 | +| 3 | Profile 加载器(扫描 config/pipelines/) | ~50 行 | +| 4 | Ticker 改造(删除 _mail 分支,改调 pipeline_router) | -150 行(标注废弃) | +| 5 | Dispatcher 标注废弃(逻辑迁移到各 Pipeline) | 标注 | +| 6 | Spawner 瘦身(mail prompt 搬到 MailPipeline) | 标注 | +| 7 | 前端:task_type 选择 + stage 进度显示 | 待评估 | -### 4.2 搬到 Pipeline 基类 +### MultiStepPipeline 增强实现优先级 -| 当前位置 | 方法 | 说明 | -|---------|------|------| -| ticker.py | `check_timeouts()` | 超时检测 | -| ticker.py | `_advance_dependencies()` | 依赖推进(只 DefaultPipeline) | -| ticker.py | `_transition_status()` | 状态转换(只 DefaultPipeline) | -| dispatcher.py | `dispatch()` 整体 | 拆到各 Pipeline 的 `dispatch_one()` | -| dispatcher.py | `decide()` | 路由逻辑 → Pipeline 内调用 | - -### 4.3 搬到 MailPipeline 专属 - -| 当前位置 | 方法 | 说明 | -|---------|------|------| -| dispatcher.py | `_mail_auto_working()` | Mail 标 working | -| dispatcher.py | `_mail_auto_complete()` | Mail 自动完成 | -| dispatcher.py | `_mail_check_reply()` | Mail 检查回复 | -| spawner.py | `_build_mail_prompt()` | Mail prompt 构建 | -| ticker.py | 14 处 `_mail` 条件分支 | 全部消除 | +| 增强 | 优先级 | 理由 | +|------|--------|------| +| 基础 multi_step | P0 | 当前主流程 | +| interactive (approval gate) | P1 | 有 Lobster 参考,实现简单 | +| loop (循环 guard) | P2 | 有 edict 参考,逻辑清晰 | +| saga (compensate) | P3 | 场景需求相对远期 | +| parallel (fan-out) | P3 | 需要 sub-task 并行调度,改动大 | --- -## 5. Pipeline Profile(YAML 声明) +## 9. 来源引用 -### 5.1 设计原则 - -- **声明式**:YAML 声明 pipeline 的 stages、参数、策略 -- **自动加载**:启动时扫描 `config/pipelines/` 目录 -- **运行时可扩展**:新增 YAML = 新增 pipeline 类型 - -### 5.2 Profile 格式 - -```yaml -# config/pipelines/review.yaml -name: code-review -task_type: review -pipeline_class: single_step # single_step | default(multi_step) - -agent_selection: specific_agent # specific_agent | route_by_capability | route -specific_agent: simayi-challenger - -timeout_minutes: 20 -completion_status: done # 完成后标什么状态 - -# 不声明 = 跳过 -skip_guardrail: false -skip_planning: true # single_step 天然跳过 -``` - -```yaml -# config/pipelines/data-download.yaml -name: data-download -task_type: data -pipeline_class: single_step - -agent_selection: specific_agent -specific_agent: zhaoyun-data - -timeout_minutes: 60 -completion_status: done -verification: check_output_exists # 内置验证函数名 -``` - -### 5.3 注册逻辑 - -```python -# main.py 或 bootstrap.py -def load_pipelines(pipelines_dir: Path, spawner, counter, router, config): - router = PipelineRouter() - - # Mail pipeline(硬编码,不从 YAML 加载) - mail_pipeline = MailPipeline(spawner, counter, router, config) - router.register("_mail", mail_pipeline) - - # 默认 pipeline(多步 Task) - default_pipeline = DefaultPipeline(spawner, counter, router, config) - router.set_default(default_pipeline) - - # 从 YAML 加载 - for f in sorted(pipelines_dir.glob("*.yaml")): - profile = yaml.safe_load(f.read_text()) - cls_name = profile.get("pipeline_class", "single_step") - - if cls_name == "single_step": - pipeline = SingleStepPipeline(spawner, counter, router, config, profile) - else: - # 多步走 default,但可以用 profile 定制参数 - pipeline = DefaultPipeline(spawner, counter, router, config, profile) - - router.register(profile["task_type"], pipeline) - - return router -``` - ---- - -## 6. 实施路线 - -### Phase 1:Pipeline 分离(v2.7.2 + v2.8 合并) - -消灭 46 处 `_mail` 分支,建立 Pipeline 框架。 - -| 步骤 | 改动 | 预估行数 | -|------|------|---------| -| 1. 新建 `pipeline/` 目录 | base.py, mail.py, default.py, router.py | ~300 行 | -| 2. MailPipeline 提取 | 从 dispatcher/ticker/spawner 提取 Mail 逻辑 | ~150 行 | -| 3. DefaultPipeline 提取 | 从 dispatcher/ticker 提取 Task 逻辑 | ~200 行 | -| 4. PipelineRouter | 路由注册中心 | ~50 行 | -| 5. Ticker 简化 | 删除所有 `_mail` 分支,改调 pipeline_router | -150 行 | -| 6. Dispatcher 瘦身 | 删除所有 `_mail` 分支和 Mail 方法 | -200 行 | -| 7. Spawner 瘦身 | 删除 `_build_mail_prompt` 等 Mail 方法 | -80 行 | -| **净增** | | **~270 行** | - -### Phase 2:Task Type Pipeline - -新增 SingleStepPipeline + YAML Profile。 - -| 步骤 | 改动 | +| 结论 | 来源 | |------|------| -| 1. SingleStepPipeline | ~100 行 | -| 2. YAML Profile 加载 | ~50 行 | -| 3. review/data 两个 profile YAML | ~40 行 | -| 4. 创建任务时选 task_type | 前端 + API | - -### Phase 3:高级特性 - -- InteractivePipeline(Checkpoint 门控) -- 并行 stage 支持 -- 事件驱动触发(Observer 模式) -- 动态 pipeline 选择(AI 自动推断 task_type) +| parallel = fan-out/fan-in/scatter-gather/map-reduce | Azure Architecture Center(web_search);AWS Prescriptive Guidance(web_search);open-multi-agent `fan-out-aggregate.ts`(知识库) | +| Wave Execution(并行分组) | get-shit-done README(知识库) | +| loop = evaluator-reflect-refine | AWS Prescriptive Guidance(web_search) | +| 门下省封驳最多3轮 | edict `task-dispatch-architecture.md`(知识库) | +| saga + compensate for AI | SagaLLM arXiv:2503.11951(web_search);SparkCo(web_search) | +| interactive = approval + resumeToken | OpenClaw Lobster docs(`lobster.md`) | +| interactive 超时处理 | Cloudflare Agents docs(web_search) | +| event + cron 统一 loop | Gobii / hermes-agent #491(web_search) | +| Event Bus + topic routing | Edict architecture(知识库) | +| TaskFlow 持久化多步 | OpenClaw TaskFlow docs(`taskflow.md`) | +| AgentPool.runParallel() | open-multi-agent `fan-out-aggregate.ts`(知识库) | --- -## 7. 与其他设计的关系 +## 10. 待确认 -| 设计文档 | 关系 | -|---------|------| -| v2.7.2 Pipeline Refactor | **本方案是它的超集**。v2.7.2 的 MailPipeline 在本方案中直接实现 | -| v2.8 Task Type Pipeline(旧) | **本方案替代它**。旧方案偏重 YAML Profile 细节,本方案补上了架构设计 | -| Spawner Monitor Design | **执行层共享**。Spawner 的 `_monitor_process`、`_classify_outcome` 不受影响 | -| counter v2.1 | **共享层**。counter 在 Pipeline 中注入使用,逻辑不变 | - ---- - -## 8. 风险评估 - -| 风险 | 概率 | 影响 | 缓解 | -|------|------|------|------| -| 提取方法时引入回归 | 中 | 高 | 逐方法迁移 + 每步 E2E 测试 | -| PipelineRouter 路由逻辑复杂化 | 低 | 中 | Mail 走特殊 key `_mail`,其他走 task_type | -| 和 Spawner Monitor 设计冲突 | 低 | 高 | 两者改不同文件,共享接口不变 | -| Phase 1 改动量大 | 中 | 中 | 可拆:先提取 MailPipeline,验证后再提取 Default | - ---- - -## 9. 待确认 - -1. **Phase 1 和 Phase 2 是否一起做**?建议先做 Phase 1(Pipeline 分离),Phase 2(Task Type)等 Phase 1 稳定后 -2. **dispatcher.py 是否完全删除**?它的逻辑全部分散到各 Pipeline 后,dispatcher 本身可以删掉 -3. **Mail pipeline 是否也走 YAML Profile**?建议不走——Mail 逻辑硬编码更安全 -4. **当前 task_type 列表是否需要调整**?DB 已有 `coding/review/data/deploy/research/discuss` +1. **4 个 Pipeline 子类是否够**?还是 interactive/loop/saga 需要独立子类? +2. **parallel 是否现在实现**?还是只预留接口? +3. **discuss 业务类型**:走 single_step 还是 multi_step? +4. **Trigger 层是否纳入本次实现**?还是继续用 OpenClaw cron 做触发? +5. **前端 task_type 选择**:创建任务时的 UI 交互怎么设计? diff --git a/docs/research/distill-reorg-v2.md b/docs/research/distill-reorg-v2.md new file mode 100644 index 0000000..9fd90a9 --- /dev/null +++ b/docs/research/distill-reorg-v2.md @@ -0,0 +1,174 @@ +# 蒸馏 Skill 重新组织方案(v2) + +> 日期:2026-05-27 +> 状态:待主公确认后执行 Step 3-4 重跑 + +--- + +## 核心洞察 + +之前的组织**按"能力领域"分**(执行纪律/评审质量/系统设计……),导致: +1. 扫描模式与 Skill 对不上(Agent 不知道"我被纠正了"该加载哪个 Skill) +2. 经验层次不清晰(高频铁律和低频记忆混在一起) +3. 无法复用 moziplus v2.0 的**四层金字塔 + 三级载体**架构 + +## 新组织方式:6 种扫描模式 × 4 层金字塔 + +| 扫描模式 | 数据量 | → L0 铁律 | → L2 引擎注入 | → L3 Skill | → 黑板 Memory | +|---------|--------|---------|-------------|-----------|-------------| +| ① 纠正(591) | 高频明确 | ✅ 高频纠错(GATE流程) | ✅ 典型错误模式 | ❌ | | +| ② 试错(226) | 中频可复用 | ❌ | ✅ 绕坑指南 | ✅ 试错模式 | | +| ③ 成功(200) | 未知(未处理) | ❌ | ❌ | ✅ 最佳实践 | | +| ④ 协作(873) | 低频非通用 | ❌ | ❌ | ❌ | ✅ 协作经验(待蒸馏) | +| ⑤ 决策分歧(1241) | 高频模糊 | ❌ | ✅ 决策门控 | ❌ | | +| ⑥ 经验声明(21) | 低频已提炼 | ❌ | ❌ | ✅ 自我总结 | | + +### L0 铁律(guardrails.yaml / prompt_templates/) + +**筛选标准**: +- 高频(出现 ≥ 20 次) +- 结论明确("必须/禁止") +- 违反后果严重(P0 级别 bug / 用户极度沮丧) + +**候选**: +- GATE 流程门控(33 次) +- 不绕圈子(17+3=20 次,用户极度沮丧) + +### L2 引擎注入(prompt_templates/ + 相关 Memory) + +**筛选标准**: +- 中频(5-19 次) +- 有明确触发场景 +- 可模板化为"当 X 时,先 Y" + +**候选**: +- 先确认当前设计再改(6 次) +- 角色匹配检查(20+ 次,协作模式中) +- inform 邮件轻量处理 + +### L3 Skill(skills/ 目录) + +**筛选标准**: +- 可复用流程(有步骤) +- 有边界条件(适用/不适用) +- 需要按需加载 + +**候选**: +- 试错模式(counter 生命周期、续杯 retry、进程管理) +- 成功模式(待提炼) +- 经验声明(自我纠正、诚实边界) + +### 黑板 experiences(experiences 表) + +**筛选标准**: +- 低频(< 5 次) +- 非通用(特定协作场景) +- 待观察(先存起来,看是否积累到可蒸馏阈值) + +**候选**: +- 大部分协作模式细节 +- 决策分歧中的个别案例 + +--- + +## 具体产出预判 + +### L0 铁律:2 条 + +| # | 铁律 | 来源模式 | 内容 | +|---|------|---------|------| +| 1 | GATE 流程门控 | ① 纠正(33 次) | 需求不清不动手 / 根因不明不修复 / 方案未定不实现 / 评估影响范围才动手 | +| 2 | 不绕圈子 | ① 纠正(17+3=20 次) | 接受用户前提假设,直接给方案;不要重复讨论已确认过的问题 | + +### L2 引擎注入:3 条 + +| # | 模板 | 来源模式 | 内容 | +|---|------|---------|------| +| 1 | 设计确认模板 | ① 纠正(6 次) + ⑤ 决策分歧 | "实现前先查阅已有设计文档,对已确定决策保持尊重;不确定时问用户确认" | +| 2 | 角色匹配检查 | ④ 协作(20+ 次) | "认领任务前检查角色匹配:评审/审查类角色不应认领编码任务" | +| 3 | inform 轻量处理 | ④ 协作(2 次) | "inform 类型邮件让 Agent 感知但不做完整执行" | + +### L3 Skill:4 个 + +| Skill | 来源模式 | 说明 | +|-------|---------|------| +| trial-and-error-patterns | ② 试错(226) | counter/锁生命周期、续杯 retry、进程退出≠资源释放、广播路径一致、JSON 解析验证 | +| proven-practices | ③ 成功(200) | 待提炼:编码/流程最佳实践 | +| self-reflection-wisdom | ⑥ 经验声明(21) | 自我纠正、诚实边界、调研落地映射 | +|评审质量 | ① 纠正 + ⑥ 经验 | 评审闭环、枚举一致性、三层对照、自我纠正(从 batch2 代码评审知识中提炼) | + +### 黑板 Memory:1 个表 + +- **experiences 表**:容纳所有低频、非通用经验,待积累到阈值后触发二级蒸馏 + +--- + +## 与 moziplus v2.0 架构的对应 + +| moziplus 架构 | 对应扫描模式 | 对应产出 | +|-------------|-------------|---------| +| **L0 铁律(guardrails.yaml)** | ① 纠正(高频明确) | GATE 流程门控、不绕圈子 | +| **L2 引擎注入(prompt_templates/)** | ①⑤ 纠正+决策分歧(中频模糊) | 设计确认模板、角色匹配检查 | +| **L3 Skill(skills/ 目录)** | ②③⑥ 试错+成功+经验声明 | trial-and-error-patterns、proven-practices、self-reflection-wisdom、评审质量 | +| **黑板 experiences** | ④ 协作(低频非通用) | experiences 表 | + +--- + +## 与课题6 闭环的关系 + +| 课题6 阶段 | 对应扫描模式 | 对应产出 | +|-----------|-------------|---------| +| **DISCOVER** | 全部 6 种 | 本次扫描已做 | +| **一级蒸馏(Memory)** | 全部 6 种 | experiences 表(低频/非通用) | +| **二级蒸馏(Skill)** | ②③⑥ | L3 Skill(试错/成功/经验声明) | +| **固化(Rule)** | ① | L0 铁律 + L2 引擎注入 | + +--- + +## 执行计划 + +### Step 3(重新)——按新的组织方式归纳 + +**批次调整**: +- 批次 1:庞统 + 司马懿的 ①⑤ 纠正+决策分歧 → 提炼 L0 铁律 + L2 引擎注入 +- 批次 2:庞统 + 司马懿的 ② 试错 → 提炼 L3 Skill trial-and-error-patterns +- 批次 3:庞统 + 司马懿的 ③ 成功(新增) → 提炼 L3 Skill proven-practices +- 批次 4:庞统 + 司马懿的 ⑥ 经验声明 → 提炼 L3 Skill self-reflection-wisdom +- 批次 5:庞统 + 司马懿的 ④ 协作 → 直接写入 experiences 表(不提炼) + +**并发限制**:主公说并发 ≤ 2,分批跑: +- 先跑批次 1(L0+L2,优先级最高) +- 同时跑批次 2(L3,试错模式重要) +- 完成后跑批次 3-5 + +### Step 4(重新)——按四层金字塔输出 + +**输出目录**: +``` +moziplus_v2/ +├── guardrails/ +│ ├── gate-flow.yaml ← L0 铁律 +│ └── no-circle-jerking.yaml ← L0 铁律 +├── prompt_templates/ +│ ├── design-confirmation.md ← L2 引擎注入 +│ ├── role-match-check.md ← L2 引擎注入 +│ └── inform-lightweight.md ← L2 引擎注入 +├── skills/ +│ ├── trial-and-error-patterns.md ← L3 Skill +│ ├── proven-practices.md ← L3 Skill +│ ├── self-reflection-wisdom.md ← L3 Skill +│ └── review-quality.md ← L3 Skill(从 batch2 代码评审知识复用) +└── docs/research/distill-skills-v2/ + └── README.md(新的汇总) +``` + +--- + +## 主公确认 + +这个新的组织方式: +1. **复用了 moziplus v2.0 的四层金字塔**——经验层次清晰 +2. **6 种扫描模式有明确归宿**——Agent 知道经验从哪来、放哪层 +3. **L0/L2/L3/experiences 四路分流**——高频固化、中频模板化、低频文档化、待观察存表 + +对吗?确认后我重跑 Step 3-4。 \ No newline at end of file