auto-sync: 2026-05-26 22:43:32

This commit is contained in:
cfdaily
2026-05-26 22:43:32 +08:00
parent fb951db382
commit 26fd475d3d
+575
View File
@@ -0,0 +1,575 @@
# Task Type Pipeline 设计文档
**版本**: v1.0
**日期**: 2026-05-26
**作者**: 庞统
**状态**: 调研报告 + 方案,待用户确认
---
## 1. 背景与问题
### 1.1 现状
moziplus v2 所有 task 走同一个 pipeline
```
create → planning → assigned → executing → review → done
```
不同类型的任务(代码开发、数据下载、代码评审、部署等)共享同一套状态机、同一路由逻辑、同一个 prompt 模板。
### 1.2 问题
| 问题 | 影响 |
|------|------|
| 代码评审不需要 planning 阶段,但被强制走 planning | 浪费 token + 时间 |
| 数据下载是确定性的,不需要 LLM 路由 | 浪费路由开销 |
| 不同类型任务的完成标准不同,但走同一个 review | 验证不精准 |
| Mail 和 Task 靠 44 处 if/else 差异化(已规划 Pipeline 重构) | 结构性风险 |
### 1.3 目标
1. 不同 task_type 走不同的执行 pipeline
2. 新增 task_type 只需声明配置,不改核心代码
3. 和已规划的 v2.7.2 Pipeline 重构(Mail vs Task 分离)自然融合
---
## 2. 业界调研
### 2.1 TemporalWorkflow Type + Task Queue
**核心模式**:不同 Workflow Type 注册不同的处理函数,共享同一个 Worker 框架。
```python
@workflow.defn(name="CodeReviewWorkflow")
class CodeReviewWorkflow:
@workflow.run
async def run(self, input):
result = await workflow.execute_activity(review_code, input)
return result
@workflow.defn(name="DataDownloadWorkflow")
class DataDownloadWorkflow:
@workflow.run
async def run(self, input):
result = await workflow.execute_activity(download_data, input)
await workflow.execute_activity(validate_data, result)
return result
```
**启发**
- 每个 Workflow Type 有独立的执行逻辑
- 共享 Activity(原子操作)和 Worker 框架
- 路由在调度层完成,Worker 不知道自己执行的是哪种 Workflow
**来源**: [Temporal Task Routing](https://docs.temporal.io/task-routing)
### 2.2 LangGraphRouting Pattern
**核心模式**:Router 节点分类输入,分发到不同的 sub-graph 执行。
```python
def route_task(state):
task_type = state["task_type"]
if task_type == "coding":
return "coding_pipeline"
elif task_type == "review":
return "review_pipeline"
elif task_type == "data":
return "data_pipeline"
graph.add_node("router", route_task)
graph.add_conditional_edges("router", route_task, {
"coding_pipeline": "coding_node",
"review_pipeline": "review_node",
"data_pipeline": "data_node",
})
```
**启发**
- 分类和路由是第一步
- 每个 pipeline 是独立的 sub-graph
- 执行框架(节点、边、状态)是共享的
**来源**: [LangGraph Routing Pattern](https://www.scalablepath.com/machine-learning/langgraph)
### 2.3 Argo WorkflowsTemplate + DAG
**核心模式**:每种任务类型定义一个 WorkflowTemplate,调用时引用模板名。
```yaml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: code-review-template
spec:
templates:
- name: main
steps:
- - name: review
template: run-review
- - name: verify
template: verify-output
```
**启发**
- 模板是声明式的(YAML
- 新增类型 = 新增模板,不改引擎
- DAG/Steps 两种编排粒度
**来源**: [Argo Workflows Concepts](https://argo-workflows.readthedocs.io/en/latest/workflow-concepts/)
### 2.4 Google ADKWorkflow Agent 类型
**核心模式**:三种 Workflow Agent 类型——SequentialAgent(顺序)、ParallelAgent(并行)、LoopAgent(循环),加上自定义 Agentoverride run_async)。
```python
# Sequential Pipeline
sequential_agent = SequentialAgent(
name="data_pipeline",
sub_agents=[fetch_agent, parse_agent, validate_agent]
)
# Custom Router Agent
class RouterAgent(BaseAgent):
async def run_async(self, ctx):
task_type = ctx.state.get("task_type")
if task_type == "simple":
return await self.simple_agent.run_async(ctx)
return await self.complex_agent.run_async(ctx)
```
**启发**
- 组合优于继承:用不同 Agent 类型组合 pipeline
- Router 节点做分类,下面挂不同类型的 pipeline
- 共享 session.state 在步骤间传递数据
**来源**: [Google ADK Multi-Agent Patterns](https://developers.googleblog.com/developers-guide-to-multi-agent-patterns-in-adk/)
### 2.5 CrewAIProcess Type
**核心模式**Crew 级别指定 Processsequential / hierarchical),Task 级别指定 Agent。
```python
crew = Crew(
agents=[coder, reviewer],
tasks=[code_task, review_task],
process=Process.sequential # 或 Process.hierarchical
)
```
**启发**
- Process Type 是任务组级别的属性,不是单个任务的
- 但粒度太粗(只有 sequential 和 hierarchical 两种)
**来源**: [CrewAI Tasks](https://docs.crewai.com/en/concepts/tasks)
### 2.6 OpenClaw TaskFlow + Lobster
**核心模式**
- TaskFlow:持久化的多步 flow 管理(跨 gateway 重启)
- Lobster:确定性的 pipeline 运行时(步骤编排 + 审批门控 + resume token
```yaml
name: market-intel-brief
steps:
- id: collect
command: market-intel collect --json
- id: summarize
command: market-intel summarize --json
stdin: $collect.json
- id: approve
command: market-intel deliver --preview
approval: required
- id: deliver
command: market-intel deliver --execute
condition: $approve.approved
```
**启发**
- OpenClaw 自己就有 pipeline 能力(Lobster
- 声明式 YAML 定义步骤
- 审批门控是内置的
- **moziplus 可以直接利用 OpenClaw 的 Lobster/TaskFlow 来实现 task pipeline**
**来源**: `/opt/homebrew/lib/node_modules/openclaw/docs/automation/taskflow.md`
---
## 3. 设计模式分析
### 3.1 Strategy Pattern(策略模式)
**适用场景**:运行时根据 task_type 选择不同的执行策略。
```python
class ExecutionStrategy(ABC):
@abstractmethod
async def execute(self, task, agent_id, ...): ...
class SingleStepStrategy(ExecutionStrategy):
async def execute(self, task, agent_id, ...):
# 直接 spawn,不做 planning
...
class MultiStepStrategy(ExecutionStrategy):
async def execute(self, task, agent_id, ...):
# planning → execute → review
...
```
**优点**:策略独立,新增类型只需新增 Strategy 类
**缺点**:如果 pipeline 有很多共享步骤,Strategy 之间会有重复
### 3.2 Template Method Pattern(模板方法)
**适用场景**:pipeline 骨架固定,具体步骤可替换。
```python
class TaskPipeline(ABC):
async def execute(self, task):
agent_id = await self.route(task) # 路由
await self.pre_spawn(task, agent_id) # 前置处理
result = await self.spawn(task, agent_id) # 执行
await self.post_complete(task, result) # 后置处理
await self.verify(task, result) # 验证
@abstractmethod
async def route(self, task): ...
@abstractmethod
async def verify(self, task, result): ...
```
**优点**:骨架共享,差异点明确
**缺点**:骨架变化影响所有子类
### 3.3 Routing Pattern(路由模式)— LangGraph 风格
**适用场景**:入口统一分类,分发到不同的独立 pipeline。
```python
def route(task):
return PIPELINE_REGISTRY.get(task.task_type, "default")
PIPELINE_REGISTRY = {
"coding": CodingPipeline(),
"review": ReviewPipeline(),
"data": DataPipeline(),
"deploy": DeployPipeline(),
}
```
**优点**:最灵活,每个 pipeline 完全独立
**缺点**:需要 Registry 管理
### 3.4 推荐:Template Method + Strategy 混合
结合 Template Method(共享骨架)和 Routing(按类型分发):
```
Ticker.tick
└─ PipelineRouter.route(task.task_type)
├─ SingleStepPipeline (review, data_download)
│ └─ route → spawn → verify → done
├─ MultiStepPipeline (coding, strategy_dev)
│ └─ route → plan → spawn → verify → review → done
├─ InteractivePipeline (checkpoint_required)
│ └─ route → plan → spawn → checkpoint → wait → resume → done
└─ MailPipeline (已有设计)
└─ assignee → spawn → verify → done
```
---
## 4. 设计方案
### 4.1 核心概念
**Pipeline**:一个 task_type 对应一个 Pipeline,定义该类型的完整生命周期。
**Stage**:Pipeline 内的一个步骤。一个 Stage 对应一次 agent spawn。
**Pipeline Profile**YAML 声明的 Pipeline 配置,定义 stages 列表和各阶段参数。
### 4.2 Pipeline 类型
| 类型 | 适用 task_type | 流程 | 特点 |
|------|---------------|------|------|
| `single_step` | review, data_download, inform_mail | route → spawn → verify → done | 无 planning,直接执行 |
| `multi_step` | coding, strategy_dev, deploy | route → plan → spawn[N] → verify → done | 有 planning,多 stage 串行 |
| `interactive` | checkpoint_required | route → plan → spawn → checkpoint → wait → spawn → done | 有人工审批点 |
| `mail` | mailrequest/inform | assignee → spawn → verify → done | 点对点,无路由(已设计) |
### 4.3 Pipeline ProfileYAML 声明)
```yaml
# config/pipelines/coding.yaml
task_type: coding
pipeline_type: multi_step
risk_level: standard
stages:
- id: plan
description: "分析需求,制定方案"
agent_selection: route_by_capability # 或 specific_agent
capabilities_required: ["coding", "design"]
timeout_minutes: 10
prompt_template: "coding_plan_prompt"
- id: implement
description: "编码实现"
agent_selection: specific_agent
reuse_plan_agent: true # 实施者就是规划者
timeout_minutes: 30
prompt_template: "coding_impl_prompt"
depends_on: [plan]
- id: verify
description: "自动验证"
agent_selection: specific_agent
verify_agent: "simayi-challenger"
timeout_minutes: 15
prompt_template: "coding_verify_prompt"
depends_on: [implement]
completion:
status: done
require_verification: true
```
```yaml
# config/pipelines/review.yaml
task_type: review
pipeline_type: single_step
risk_level: low
stages:
- id: review
description: "代码评审"
agent_selection: specific_agent
specific_agent: "simayi-challenger"
timeout_minutes: 20
prompt_template: "review_prompt"
completion:
status: done
require_verification: false
```
```yaml
# config/pipelines/data_download.yaml
task_type: data
pipeline_type: single_step
risk_level: low
stages:
- id: download
description: "数据下载"
agent_selection: specific_agent
specific_agent: "zhaoyun-data"
timeout_minutes: 60
prompt_template: "data_download_prompt"
completion:
status: done
require_verification: true
verification: "check_output_exists" # 内置验证函数
```
### 4.4 代码架构
```python
# src/daemon/pipeline/base.py
class TaskPipeline(ABC):
"""Pipeline 基类(Template Method"""
def __init__(self, profile: dict, spawner, counter, router):
self.profile = profile
self.spawner = spawner
self.counter = counter
self.router = router
async def tick(self, db_path: Path) -> TickResult:
"""每个 tick 的入口"""
tasks = self._load_pending_tasks(db_path)
for task in tasks:
stage = self._get_current_stage(task)
await self._execute_stage(task, stage, db_path)
return TickResult(dispatched=len(tasks))
@abstractmethod
def _load_pending_tasks(self, db_path) -> list:
"""加载待处理任务(不同 pipeline 查不同状态)"""
...
@abstractmethod
def _get_current_stage(self, task) -> dict:
"""获取当前应该执行的 stage"""
...
async def _execute_stage(self, task, stage, db_path):
"""执行单个 stage(共享逻辑)"""
agent_id = await self._resolve_agent(task, stage)
message = self._build_prompt(task, stage)
await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task.id,
use_main_session=(stage.get("agent_selection") == "specific_agent"),
task_db_path=db_path,
)
# src/daemon/pipeline/single_step.py
class SingleStepPipeline(TaskPipeline):
"""单步执行:route → spawn → done"""
def _load_pending_tasks(self, db_path):
return load_tasks(db_path, status="pending", task_type=self.profile["task_type"])
def _get_current_stage(self, task):
return self.profile["stages"][0] # 只有一个 stage
# src/daemon/pipeline/multi_step.py
class MultiStepPipeline(TaskPipeline):
"""多步执行:plan → implement → verify → done"""
def _load_pending_tasks(self, db_path):
# 查 pending(新建)+ executing(下一个 stage 待执行)
...
def _get_current_stage(self, task):
# 从 stages_json 或 subtask 进度推算当前 stage
...
# src/daemon/pipeline/router.py
class PipelineRouter:
"""按 task_type 路由到对应 Pipeline"""
def __init__(self):
self._pipelines: Dict[str, TaskPipeline] = {}
self._default: TaskPipeline = None
def register(self, task_type: str, pipeline: TaskPipeline):
self._pipelines[task_type] = pipeline
def route(self, task_type: str) -> TaskPipeline:
return self._pipelines.get(task_type, self._default)
# src/daemon/ticker.py(改动后)
class Ticker:
def __init__(self, pipeline_router: PipelineRouter, ...):
self.pipeline_router = pipeline_router
# Mail 也是一个 pipeline
self.mail_pipeline = MailPipeline(...)
async def _tick_project(self, project_id, db_path):
if project_id == "_mail":
return await self.mail_pipeline.tick(db_path)
# 查所有活跃 task_type
task_types = self._get_active_task_types(db_path)
results = []
for tt in task_types:
pipeline = self.pipeline_router.route(tt)
result = await pipeline.tick(db_path)
results.append(result)
return merge_results(results)
```
### 4.5 与已规划 v2.7.2 Pipeline 重构的关系
v2.7.2 设计文档(`v2.7.2-pipeline-refactor.md`)规划了 Mail vs Task 的 Pipeline 分离。本方案是它的自然延伸:
```
v2.7.2 Pipeline 重构(Mail vs Task 分离)
v2.8 Task Type PipelineTask 内部按类型细分)
```
具体关系:
- `TaskPipeline` 基类 = v2.7.2 `TaskPipeline` 的扩展版
- `MailPipeline` = v2.7.2 已设计的,不变
- `PipelineRouter` = v2.7.2 ticker 的 pipeline 分发逻辑 + task_type 路由
- `SingleStepPipeline` / `MultiStepPipeline` = 新增
### 4.6 配置加载
```python
# main.py 启动时
pipeline_router = PipelineRouter()
# 自动扫描 config/pipelines/ 目录
for pipeline_file in Path("config/pipelines").glob("*.yaml"):
profile = yaml.safe_load(pipeline_file.read_text())
ptype = profile["pipeline_type"]
if ptype == "single_step":
pipeline = SingleStepPipeline(profile, spawner, counter, router)
elif ptype == "multi_step":
pipeline = MultiStepPipeline(profile, spawner, counter, router)
pipeline_router.register(profile["task_type"], pipeline)
# 默认 pipeline(未匹配 task_type 时使用)
default_pipeline = MultiStepPipeline(default_profile, spawner, counter, router)
pipeline_router.set_default(default_pipeline)
```
### 4.7 前端影响
- TaskModal 显示当前 pipeline 进度(stages 列表 + 当前 stage 高亮)
- 创建任务时选择 task_type(下拉框,从 pipelines/ 配置自动生成)
- 不同 task_type 可以显示不同的标签/颜色
---
## 5. 实施路线
### Phase 1Pipeline 基础框架(v2.7.2 合并)
1. 实现 `TaskPipeline` 基类 + `TickResult`
2. 实现 `MailPipeline`(替换当前 if is_mail 分支)
3. 实现 `PipelineRouter`
4. 重构 ticker 的 `_tick_project`
### Phase 2Task Type Pipeline
1. 实现 `SingleStepPipeline` + `MultiStepPipeline`
2. 创建 pipeline profile YAMLcoding, review, data, deploy
3. 实现 profile 自动加载
4. 更新 DB schematasks 表增加 task_type 列,已有)
5. 前端更新(task_type 选择 + stage 进度)
### Phase 3:高级特性
1. `InteractivePipeline`Checkpoint 门控)
2. 并行 stage 支持(multi_step 内的并行步骤)
3. 动态 pipelineAI 根据需求自动选择 task_type 和 pipeline
4. pipeline 版本管理
---
## 6. 风险与约束
| 风险 | 缓解 |
|------|------|
| Pipeline 重构范围大 | 分 PhasePhase 1 先做 Mail 分离(已设计) |
| Multi-step 的 stage 进度追踪复杂 | 复用已有 SubTask / stages_json 机制 |
| 新增 task_type 需要同时改配置和代码 | Pipeline Profile 纯声明式,代码自动加载 |
| 与 v2.7.2 重构的合并风险 | Phase 1 就是 v2.7.2Phase 2 在其基础上增量 |
---
## 7. 待确认问题
1. **task_type 列表**:当前 DB 有 `coding/review/data/deploy/research/discuss`,是否需要调整?
2. **pipeline_type 分类**single_step / multi_step / interactive 三种是否够用?
3. **agent_selection 策略**route_by_capabilityLLM 路由)vs specific_agent(固定)vs broadcast(广播),是否还有其他模式?
4. **multi_step 的 stage 间数据传递**:通过 DBstages_json)还是通过 prompt 注入前序 stage 的 output
5. **Phase 1 先做还是直接跳到 Phase 2**v2.7.2 的 Mail/Task 分离是否还需要单独做,还是和 Task Type Pipeline 一起做?