From 26fd475d3dd80a6d8251ab40444edebff0098610 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 22:43:32 +0800 Subject: [PATCH] auto-sync: 2026-05-26 22:43:32 --- docs/design/v2.8-task-type-pipeline.md | 575 +++++++++++++++++++++++++ 1 file changed, 575 insertions(+) create mode 100644 docs/design/v2.8-task-type-pipeline.md diff --git a/docs/design/v2.8-task-type-pipeline.md b/docs/design/v2.8-task-type-pipeline.md new file mode 100644 index 0000000..2694989 --- /dev/null +++ b/docs/design/v2.8-task-type-pipeline.md @@ -0,0 +1,575 @@ +# Task Type Pipeline 设计文档 + +**版本**: v1.0 +**日期**: 2026-05-26 +**作者**: 庞统 +**状态**: 调研报告 + 方案,待用户确认 + +--- + +## 1. 背景与问题 + +### 1.1 现状 + +moziplus v2 所有 task 走同一个 pipeline: + +``` +create → planning → assigned → executing → review → done +``` + +不同类型的任务(代码开发、数据下载、代码评审、部署等)共享同一套状态机、同一路由逻辑、同一个 prompt 模板。 + +### 1.2 问题 + +| 问题 | 影响 | +|------|------| +| 代码评审不需要 planning 阶段,但被强制走 planning | 浪费 token + 时间 | +| 数据下载是确定性的,不需要 LLM 路由 | 浪费路由开销 | +| 不同类型任务的完成标准不同,但走同一个 review | 验证不精准 | +| Mail 和 Task 靠 44 处 if/else 差异化(已规划 Pipeline 重构) | 结构性风险 | + +### 1.3 目标 + +1. 不同 task_type 走不同的执行 pipeline +2. 新增 task_type 只需声明配置,不改核心代码 +3. 和已规划的 v2.7.2 Pipeline 重构(Mail vs Task 分离)自然融合 + +--- + +## 2. 业界调研 + +### 2.1 Temporal:Workflow Type + Task Queue + +**核心模式**:不同 Workflow Type 注册不同的处理函数,共享同一个 Worker 框架。 + +```python +@workflow.defn(name="CodeReviewWorkflow") +class CodeReviewWorkflow: + @workflow.run + async def run(self, input): + result = await workflow.execute_activity(review_code, input) + return result + +@workflow.defn(name="DataDownloadWorkflow") +class DataDownloadWorkflow: + @workflow.run + async def run(self, input): + result = await workflow.execute_activity(download_data, input) + await workflow.execute_activity(validate_data, result) + return result +``` + +**启发**: +- 每个 Workflow Type 有独立的执行逻辑 +- 共享 Activity(原子操作)和 Worker 框架 +- 路由在调度层完成,Worker 不知道自己执行的是哪种 Workflow + +**来源**: [Temporal Task Routing](https://docs.temporal.io/task-routing) + +### 2.2 LangGraph:Routing Pattern + +**核心模式**:Router 节点分类输入,分发到不同的 sub-graph 执行。 + +```python +def route_task(state): + task_type = state["task_type"] + if task_type == "coding": + return "coding_pipeline" + elif task_type == "review": + return "review_pipeline" + elif task_type == "data": + return "data_pipeline" + +graph.add_node("router", route_task) +graph.add_conditional_edges("router", route_task, { + "coding_pipeline": "coding_node", + "review_pipeline": "review_node", + "data_pipeline": "data_node", +}) +``` + +**启发**: +- 分类和路由是第一步 +- 每个 pipeline 是独立的 sub-graph +- 执行框架(节点、边、状态)是共享的 + +**来源**: [LangGraph Routing Pattern](https://www.scalablepath.com/machine-learning/langgraph) + +### 2.3 Argo Workflows:Template + DAG + +**核心模式**:每种任务类型定义一个 WorkflowTemplate,调用时引用模板名。 + +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: code-review-template +spec: + templates: + - name: main + steps: + - - name: review + template: run-review + - - name: verify + template: verify-output +``` + +**启发**: +- 模板是声明式的(YAML) +- 新增类型 = 新增模板,不改引擎 +- DAG/Steps 两种编排粒度 + +**来源**: [Argo Workflows Concepts](https://argo-workflows.readthedocs.io/en/latest/workflow-concepts/) + +### 2.4 Google ADK:Workflow Agent 类型 + +**核心模式**:三种 Workflow Agent 类型——SequentialAgent(顺序)、ParallelAgent(并行)、LoopAgent(循环),加上自定义 Agent(override run_async)。 + +```python +# Sequential Pipeline +sequential_agent = SequentialAgent( + name="data_pipeline", + sub_agents=[fetch_agent, parse_agent, validate_agent] +) + +# Custom Router Agent +class RouterAgent(BaseAgent): + async def run_async(self, ctx): + task_type = ctx.state.get("task_type") + if task_type == "simple": + return await self.simple_agent.run_async(ctx) + return await self.complex_agent.run_async(ctx) +``` + +**启发**: +- 组合优于继承:用不同 Agent 类型组合 pipeline +- Router 节点做分类,下面挂不同类型的 pipeline +- 共享 session.state 在步骤间传递数据 + +**来源**: [Google ADK Multi-Agent Patterns](https://developers.googleblog.com/developers-guide-to-multi-agent-patterns-in-adk/) + +### 2.5 CrewAI:Process Type + +**核心模式**:Crew 级别指定 Process(sequential / hierarchical),Task 级别指定 Agent。 + +```python +crew = Crew( + agents=[coder, reviewer], + tasks=[code_task, review_task], + process=Process.sequential # 或 Process.hierarchical +) +``` + +**启发**: +- Process Type 是任务组级别的属性,不是单个任务的 +- 但粒度太粗(只有 sequential 和 hierarchical 两种) + +**来源**: [CrewAI Tasks](https://docs.crewai.com/en/concepts/tasks) + +### 2.6 OpenClaw TaskFlow + Lobster + +**核心模式**: +- TaskFlow:持久化的多步 flow 管理(跨 gateway 重启) +- Lobster:确定性的 pipeline 运行时(步骤编排 + 审批门控 + resume token) + +```yaml +name: market-intel-brief +steps: + - id: collect + command: market-intel collect --json + - id: summarize + command: market-intel summarize --json + stdin: $collect.json + - id: approve + command: market-intel deliver --preview + approval: required + - id: deliver + command: market-intel deliver --execute + condition: $approve.approved +``` + +**启发**: +- OpenClaw 自己就有 pipeline 能力(Lobster) +- 声明式 YAML 定义步骤 +- 审批门控是内置的 +- **moziplus 可以直接利用 OpenClaw 的 Lobster/TaskFlow 来实现 task pipeline** + +**来源**: `/opt/homebrew/lib/node_modules/openclaw/docs/automation/taskflow.md` + +--- + +## 3. 设计模式分析 + +### 3.1 Strategy Pattern(策略模式) + +**适用场景**:运行时根据 task_type 选择不同的执行策略。 + +```python +class ExecutionStrategy(ABC): + @abstractmethod + async def execute(self, task, agent_id, ...): ... + +class SingleStepStrategy(ExecutionStrategy): + async def execute(self, task, agent_id, ...): + # 直接 spawn,不做 planning + ... + +class MultiStepStrategy(ExecutionStrategy): + async def execute(self, task, agent_id, ...): + # planning → execute → review + ... +``` + +**优点**:策略独立,新增类型只需新增 Strategy 类 +**缺点**:如果 pipeline 有很多共享步骤,Strategy 之间会有重复 + +### 3.2 Template Method Pattern(模板方法) + +**适用场景**:pipeline 骨架固定,具体步骤可替换。 + +```python +class TaskPipeline(ABC): + async def execute(self, task): + agent_id = await self.route(task) # 路由 + await self.pre_spawn(task, agent_id) # 前置处理 + result = await self.spawn(task, agent_id) # 执行 + await self.post_complete(task, result) # 后置处理 + await self.verify(task, result) # 验证 + + @abstractmethod + async def route(self, task): ... + @abstractmethod + async def verify(self, task, result): ... +``` + +**优点**:骨架共享,差异点明确 +**缺点**:骨架变化影响所有子类 + +### 3.3 Routing Pattern(路由模式)— LangGraph 风格 + +**适用场景**:入口统一分类,分发到不同的独立 pipeline。 + +```python +def route(task): + return PIPELINE_REGISTRY.get(task.task_type, "default") + +PIPELINE_REGISTRY = { + "coding": CodingPipeline(), + "review": ReviewPipeline(), + "data": DataPipeline(), + "deploy": DeployPipeline(), +} +``` + +**优点**:最灵活,每个 pipeline 完全独立 +**缺点**:需要 Registry 管理 + +### 3.4 推荐:Template Method + Strategy 混合 + +结合 Template Method(共享骨架)和 Routing(按类型分发): + +``` +Ticker.tick + └─ PipelineRouter.route(task.task_type) + ├─ SingleStepPipeline (review, data_download) + │ └─ route → spawn → verify → done + ├─ MultiStepPipeline (coding, strategy_dev) + │ └─ route → plan → spawn → verify → review → done + ├─ InteractivePipeline (checkpoint_required) + │ └─ route → plan → spawn → checkpoint → wait → resume → done + └─ MailPipeline (已有设计) + └─ assignee → spawn → verify → done +``` + +--- + +## 4. 设计方案 + +### 4.1 核心概念 + +**Pipeline**:一个 task_type 对应一个 Pipeline,定义该类型的完整生命周期。 + +**Stage**:Pipeline 内的一个步骤。一个 Stage 对应一次 agent spawn。 + +**Pipeline Profile**:YAML 声明的 Pipeline 配置,定义 stages 列表和各阶段参数。 + +### 4.2 Pipeline 类型 + +| 类型 | 适用 task_type | 流程 | 特点 | +|------|---------------|------|------| +| `single_step` | review, data_download, inform_mail | route → spawn → verify → done | 无 planning,直接执行 | +| `multi_step` | coding, strategy_dev, deploy | route → plan → spawn[N] → verify → done | 有 planning,多 stage 串行 | +| `interactive` | checkpoint_required | route → plan → spawn → checkpoint → wait → spawn → done | 有人工审批点 | +| `mail` | mail(request/inform) | assignee → spawn → verify → done | 点对点,无路由(已设计) | + +### 4.3 Pipeline Profile(YAML 声明) + +```yaml +# config/pipelines/coding.yaml +task_type: coding +pipeline_type: multi_step +risk_level: standard + +stages: + - id: plan + description: "分析需求,制定方案" + agent_selection: route_by_capability # 或 specific_agent + capabilities_required: ["coding", "design"] + timeout_minutes: 10 + prompt_template: "coding_plan_prompt" + + - id: implement + description: "编码实现" + agent_selection: specific_agent + reuse_plan_agent: true # 实施者就是规划者 + timeout_minutes: 30 + prompt_template: "coding_impl_prompt" + depends_on: [plan] + + - id: verify + description: "自动验证" + agent_selection: specific_agent + verify_agent: "simayi-challenger" + timeout_minutes: 15 + prompt_template: "coding_verify_prompt" + depends_on: [implement] + +completion: + status: done + require_verification: true +``` + +```yaml +# config/pipelines/review.yaml +task_type: review +pipeline_type: single_step +risk_level: low + +stages: + - id: review + description: "代码评审" + agent_selection: specific_agent + specific_agent: "simayi-challenger" + timeout_minutes: 20 + prompt_template: "review_prompt" + +completion: + status: done + require_verification: false +``` + +```yaml +# config/pipelines/data_download.yaml +task_type: data +pipeline_type: single_step +risk_level: low + +stages: + - id: download + description: "数据下载" + agent_selection: specific_agent + specific_agent: "zhaoyun-data" + timeout_minutes: 60 + prompt_template: "data_download_prompt" + +completion: + status: done + require_verification: true + verification: "check_output_exists" # 内置验证函数 +``` + +### 4.4 代码架构 + +```python +# src/daemon/pipeline/base.py +class TaskPipeline(ABC): + """Pipeline 基类(Template Method)""" + + def __init__(self, profile: dict, spawner, counter, router): + self.profile = profile + self.spawner = spawner + self.counter = counter + self.router = router + + async def tick(self, db_path: Path) -> TickResult: + """每个 tick 的入口""" + tasks = self._load_pending_tasks(db_path) + for task in tasks: + stage = self._get_current_stage(task) + await self._execute_stage(task, stage, db_path) + return TickResult(dispatched=len(tasks)) + + @abstractmethod + def _load_pending_tasks(self, db_path) -> list: + """加载待处理任务(不同 pipeline 查不同状态)""" + ... + + @abstractmethod + def _get_current_stage(self, task) -> dict: + """获取当前应该执行的 stage""" + ... + + async def _execute_stage(self, task, stage, db_path): + """执行单个 stage(共享逻辑)""" + agent_id = await self._resolve_agent(task, stage) + message = self._build_prompt(task, stage) + await self.spawner.spawn_full_agent( + agent_id=agent_id, + message=message, + task_id=task.id, + use_main_session=(stage.get("agent_selection") == "specific_agent"), + task_db_path=db_path, + ) + + +# src/daemon/pipeline/single_step.py +class SingleStepPipeline(TaskPipeline): + """单步执行:route → spawn → done""" + + def _load_pending_tasks(self, db_path): + return load_tasks(db_path, status="pending", task_type=self.profile["task_type"]) + + def _get_current_stage(self, task): + return self.profile["stages"][0] # 只有一个 stage + + +# src/daemon/pipeline/multi_step.py +class MultiStepPipeline(TaskPipeline): + """多步执行:plan → implement → verify → done""" + + def _load_pending_tasks(self, db_path): + # 查 pending(新建)+ executing(下一个 stage 待执行) + ... + + def _get_current_stage(self, task): + # 从 stages_json 或 subtask 进度推算当前 stage + ... + + +# src/daemon/pipeline/router.py +class PipelineRouter: + """按 task_type 路由到对应 Pipeline""" + + def __init__(self): + self._pipelines: Dict[str, TaskPipeline] = {} + self._default: TaskPipeline = None + + def register(self, task_type: str, pipeline: TaskPipeline): + self._pipelines[task_type] = pipeline + + def route(self, task_type: str) -> TaskPipeline: + return self._pipelines.get(task_type, self._default) + + +# src/daemon/ticker.py(改动后) +class Ticker: + def __init__(self, pipeline_router: PipelineRouter, ...): + self.pipeline_router = pipeline_router + # Mail 也是一个 pipeline + self.mail_pipeline = MailPipeline(...) + + async def _tick_project(self, project_id, db_path): + if project_id == "_mail": + return await self.mail_pipeline.tick(db_path) + + # 查所有活跃 task_type + task_types = self._get_active_task_types(db_path) + results = [] + for tt in task_types: + pipeline = self.pipeline_router.route(tt) + result = await pipeline.tick(db_path) + results.append(result) + return merge_results(results) +``` + +### 4.5 与已规划 v2.7.2 Pipeline 重构的关系 + +v2.7.2 设计文档(`v2.7.2-pipeline-refactor.md`)规划了 Mail vs Task 的 Pipeline 分离。本方案是它的自然延伸: + +``` +v2.7.2 Pipeline 重构(Mail vs Task 分离) + ↓ +v2.8 Task Type Pipeline(Task 内部按类型细分) +``` + +具体关系: +- `TaskPipeline` 基类 = v2.7.2 `TaskPipeline` 的扩展版 +- `MailPipeline` = v2.7.2 已设计的,不变 +- `PipelineRouter` = v2.7.2 ticker 的 pipeline 分发逻辑 + task_type 路由 +- `SingleStepPipeline` / `MultiStepPipeline` = 新增 + +### 4.6 配置加载 + +```python +# main.py 启动时 +pipeline_router = PipelineRouter() + +# 自动扫描 config/pipelines/ 目录 +for pipeline_file in Path("config/pipelines").glob("*.yaml"): + profile = yaml.safe_load(pipeline_file.read_text()) + ptype = profile["pipeline_type"] + + if ptype == "single_step": + pipeline = SingleStepPipeline(profile, spawner, counter, router) + elif ptype == "multi_step": + pipeline = MultiStepPipeline(profile, spawner, counter, router) + + pipeline_router.register(profile["task_type"], pipeline) + +# 默认 pipeline(未匹配 task_type 时使用) +default_pipeline = MultiStepPipeline(default_profile, spawner, counter, router) +pipeline_router.set_default(default_pipeline) +``` + +### 4.7 前端影响 + +- TaskModal 显示当前 pipeline 进度(stages 列表 + 当前 stage 高亮) +- 创建任务时选择 task_type(下拉框,从 pipelines/ 配置自动生成) +- 不同 task_type 可以显示不同的标签/颜色 + +--- + +## 5. 实施路线 + +### Phase 1:Pipeline 基础框架(v2.7.2 合并) + +1. 实现 `TaskPipeline` 基类 + `TickResult` +2. 实现 `MailPipeline`(替换当前 if is_mail 分支) +3. 实现 `PipelineRouter` +4. 重构 ticker 的 `_tick_project` + +### Phase 2:Task Type Pipeline + +1. 实现 `SingleStepPipeline` + `MultiStepPipeline` +2. 创建 pipeline profile YAML(coding, review, data, deploy) +3. 实现 profile 自动加载 +4. 更新 DB schema(tasks 表增加 task_type 列,已有) +5. 前端更新(task_type 选择 + stage 进度) + +### Phase 3:高级特性 + +1. `InteractivePipeline`(Checkpoint 门控) +2. 并行 stage 支持(multi_step 内的并行步骤) +3. 动态 pipeline(AI 根据需求自动选择 task_type 和 pipeline) +4. pipeline 版本管理 + +--- + +## 6. 风险与约束 + +| 风险 | 缓解 | +|------|------| +| Pipeline 重构范围大 | 分 Phase,Phase 1 先做 Mail 分离(已设计) | +| Multi-step 的 stage 进度追踪复杂 | 复用已有 SubTask / stages_json 机制 | +| 新增 task_type 需要同时改配置和代码 | Pipeline Profile 纯声明式,代码自动加载 | +| 与 v2.7.2 重构的合并风险 | Phase 1 就是 v2.7.2,Phase 2 在其基础上增量 | + +--- + +## 7. 待确认问题 + +1. **task_type 列表**:当前 DB 有 `coding/review/data/deploy/research/discuss`,是否需要调整? +2. **pipeline_type 分类**:single_step / multi_step / interactive 三种是否够用? +3. **agent_selection 策略**:route_by_capability(LLM 路由)vs specific_agent(固定)vs broadcast(广播),是否还有其他模式? +4. **multi_step 的 stage 间数据传递**:通过 DB(stages_json)还是通过 prompt 注入前序 stage 的 output? +5. **Phase 1 先做还是直接跳到 Phase 2**:v2.7.2 的 Mail/Task 分离是否还需要单独做,还是和 Task Type Pipeline 一起做?