Files
sanguo_moziplus_v2/docs/design/v2.8-pipeline-architecture.md
T
2026-05-26 23:34:36 +08:00

18 KiB
Raw Blame History

v2.8 Pipeline 架构设计

版本: v1.0
日期: 2026-05-26
作者: 庞统
状态: 调研完成 → 设计提案,待用户确认


1. 设计目标

  1. 消灭 46 处 if/_mail 分支ticker 14 + dispatcher 23 + spawner 9
  2. 不同 task_type 走不同执行路径(代码评审不需要 planning,数据下载不需要路由)
  3. 新增 task_type 只加配置不改核心代码Plugin/Registry
  4. 和 spawner-monitor 设计自然融合(执行层共享,调度层分化)

2. 设计模式选择

2.1 调研结论回顾

维度 需求 最适配模式
按类型分发 入口统一,分发独立 Routing + RegistryLangGraph
骨架共享 + 步骤可替换 tick → load → route → spawn → complete Template Method
新增类型零改动 声明式配置 Plugin/RegistryArgo
触发方式可替换 cron / webhook / manual / event Strategy
失败处理链式 retry → rollback → escalate Chain of Responsibility
事件通知 完成后触发依赖 ObserverPhase 2

2.2 选定组合

Registry + Template Method + Strategy
  • RegistryPipeline 注册中心,按 task_type 查找
  • Template MethodPipeline 基类定义 tick() 骨架,子类覆写差异步骤
  • Strategy:触发方式、路由策略、验证策略作为可注入的策略对象

不选:

  • Chain of Responsibility:当前失败处理只有 3 种(retry/escalate/abort),不值得引入链式抽象
  • Observer:依赖触发(F2)放 Phase 2,当前不实现
  • 纯 StrategyPipeline 间共享逻辑太多(spawn/monitor/counter),纯策略会重复

2.3 业界印证

系统 做法 我们的对应
Temporal Workflow Type 注册不同处理函数 Pipeline Registry
LangGraph Router → Sub-graph PipelineRouter → Pipeline
Argo YAML WorkflowTemplate Pipeline Profile YAML
Google ADK SequentialAgent/ParallelAgent/LoopAgent SingleStepPipeline/MultiStepPipelinePhase 2 加并行)

3. 架构设计

3.1 整体结构

┌─────────────────────────────────────────────┐
│                   Ticker                     │
│  tick() → 遍历 project → _tick_project()    │
└───────────┬─────────────────────────────────┘
            │
            ▼
┌─────────────────────────────────────────────┐
│             PipelineRouter                   │
│  route(task_type) → Pipeline                 │
│  route("_mail")   → MailPipeline             │
│  route("review")  → SingleStepPipeline       │
│  route(None)      → DefaultPipeline          │
└───────────┬─────────────────────────────────┘
            │
     ┌──────┼──────┬──────────────┐
     ▼      ▼      ▼              ▼
┌────────┐┌────────┐┌──────────┐┌────────┐
│  Mail  ││ Single ││ Default  ││ Cron   │
│Pipeline││  Step  ││ (Multi)  ││(Phase2)│
└────────┘└────────┘└──────────┘└────────┘
     │         │         │
     └────┬────┴────┬────┘
          ▼         ▼
    ┌──────────┐ ┌──────────┐
    │ Spawner  │ │ Counter  │
    │ (共享)   │ │ (共享)   │
    └──────────┘ └──────────┘

3.2 Pipeline 基类(Template Method

class TaskPipeline(ABC):
    """Pipeline 基类,定义 tick() 骨架"""
    
    def __init__(self, spawner: Spawner, counter: ActiveAgentCounter,
                 router: Router, config: dict):
        self.spawner = spawner
        self.counter = counter
        self.router = router
        self.config = config
    
    async def tick(self, db_path: Path, project_config: dict) -> TickResult:
        """每个 ticker 周期的入口(Template Method"""
        # 1. 前置处理(超时检测、依赖推进等)
        await self.pre_tick(db_path)
        
        # 2. 加载待处理任务
        tasks = self.load_pending(db_path)
        
        # 3. 逐个处理
        dispatched = 0
        for task in tasks:
            if not self.can_dispatch(task, db_path):
                continue
            result = await self.dispatch_one(task, db_path, project_config)
            if result:
                dispatched += 1
        
        return TickResult(dispatched=dispatched, total=len(tasks))
    
    # ── 钩子方法(子类覆写)──
    
    async def pre_tick(self, db_path: Path):
        """前置处理(超时检测、依赖推进)。默认只做超时检测"""
        self.check_timeouts(db_path)
    
    @abstractmethod
    def load_pending(self, db_path: Path) -> list:
        """加载待处理任务"""
        ...
    
    def can_dispatch(self, task, db_path: Path) -> bool:
        """是否可以 dispatchguardrail、counter 等)"""
        return True
    
    @abstractmethod
    async def dispatch_one(self, task, db_path: Path, project_config: dict) -> bool:
        """处理单个任务"""
        ...
    
    def check_timeouts(self, db_path: Path):
        """超时检测(共享逻辑)"""
        ...

3.3 Pipeline 实现

MailPipeline

class MailPipeline(TaskPipeline):
    """Mail 投递管道"""
    
    async def pre_tick(self, db_path):
        # Mail 也需要超时检测
        self.check_timeouts(db_path)
        # 不需要依赖推进
    
    def load_pending(self, db_path):
        # 只加载 pending 状态
        return load_tasks(db_path, statuses=["pending"])
    
    def can_dispatch(self, task, db_path):
        # Mail 不走 guardrail
        # 但需要检查 counter
        return self.counter.can_acquire(task.assignee, "main")
    
    async def dispatch_one(self, task, db_path, project_config):
        agent_id = task.assignee  # 直取,不路由
        
        # 标记 working
        mark_working(db_path, task.id, agent_id)
        
        # 构造 prompt
        message = self.spawner.build_mail_prompt(task)
        
        # spawn
        def on_complete(aid, outcome, session_id):
            self._on_mail_complete(task.id, aid, session_id, db_path, task.must_haves)
        
        return await self.spawner.spawn_full_agent(
            agent_id=agent_id,
            message=message,
            task_id=task.id,
            use_main_session=True,
            on_complete=on_complete,
            project_id="_mail",
            db_path=db_path,
        )
    
    def _on_mail_complete(self, task_id, agent_id, session_id, db_path, must_haves):
        # 幻觉门控 + 自动标 done + inform 自动完成
        ...

SingleStepPipeline(代码评审、数据下载等)

class SingleStepPipeline(TaskPipeline):
    """单步执行管道:route → spawn → verify → done"""
    
    def __init__(self, spawner, counter, router, config, profile):
        super().__init__(spawner, counter, router, config)
        self.profile = profile  # 从 YAML 加载
    
    async def pre_tick(self, db_path):
        # 不做依赖推进
        self.check_timeouts(db_path)
    
    def load_pending(self, db_path):
        return load_tasks(db_path, statuses=["pending"], 
                         task_type=self.profile.get("task_type"))
    
    def can_dispatch(self, task, db_path):
        # 检查 guardrail
        if self.config.get("guardrails"):
            violations = self.guardrails.check_task(task)
            if violations:
                mark_blocked(db_path, task.id, reason=violations)
                return False
        return True
    
    async def dispatch_one(self, task, db_path, project_config):
        # 路由(或固定 agent
        agent_id = self._resolve_agent(task)
        
        # counter 检查
        if not self.counter.can_acquire(agent_id, task.id):
            return False
        
        # 标记 working
        mark_working(db_path, task.id, agent_id)
        
        # prompt
        message = self.spawner.build_task_prompt(task, self.profile)
        
        # spawn
        def on_complete(aid, outcome, session_id):
            self.counter.release(aid, session_id)
            self._handle_completion(task.id, aid, outcome, db_path)
        
        return await self.spawner.spawn_full_agent(
            agent_id=agent_id,
            message=message,
            task_id=task.id,
            use_main_session=False,
            on_complete=on_complete,
            project_id=project_config["id"],
            db_path=db_path,
        )
    
    def _resolve_agent(self, task):
        """根据 profile 配置解析 agent"""
        sel = self.profile.get("agent_selection", "route")
        if sel == "specific_agent":
            return self.profile["specific_agent"]
        elif sel == "route_by_capability":
            return self.router.decide(task)
        else:
            return self.router.decide(task)
    
    def _handle_completion(self, task_id, agent_id, outcome, db_path):
        if outcome == "completed":
            mark_done(db_path, task_id)
        elif outcome in ("timeout", "api_error"):
            # 等 ticker 重试
            mark_pending(db_path, task_id)
        else:
            mark_failed(db_path, task_id, reason=outcome)

DefaultPipeline(多步 Task,当前主流程)

class DefaultPipeline(TaskPipeline):
    """默认多步管道:planning → executing → review → done"""
    
    async def pre_tick(self, db_path):
        # 依赖推进
        self.advance_dependencies(db_path)
        # 超时检测
        self.check_timeouts(db_path)
    
    def load_pending(self, db_path):
        # 加载 pending + assigned + executing(续杯场景)
        return load_tasks(db_path, statuses=["pending", "assigned", "executing"])
    
    def can_dispatch(self, task, db_path):
        # guardrail 检查
        ...
        return True
    
    async def dispatch_one(self, task, db_path, project_config):
        # 当前完整流程:planning → route → spawn → review
        ...

3.4 PipelineRouterRegistry

class PipelineRouter:
    """按 task_type 路由到对应 Pipeline"""
    
    def __init__(self):
        self._pipelines: Dict[str, TaskPipeline] = {}
        self._default: TaskPipeline = None
    
    def register(self, task_type: str, pipeline: TaskPipeline):
        self._pipelines[task_type] = pipeline
    
    def route(self, task_type: str = None, is_mail: bool = False) -> TaskPipeline:
        if is_mail:
            return self._pipelines["_mail"]
        return self._pipelines.get(task_type, self._default)

3.5 Ticker 改造

class Ticker:
    def __init__(self, pipeline_router: PipelineRouter, ...):
        self.pipeline_router = pipeline_router
    
    async def _tick_project(self, project_id, project_config):
        is_mail = project_id == "_mail"
        db_path = self._get_db_path(project_id)
        
        if is_mail:
            pipeline = self.pipeline_router.route(is_mail=True)
        else:
            # 读取该 project 所有活跃的 task_type
            task_types = self._get_active_task_types(db_path)
            # 当所有 task_type 走同一个 pipeline 时,直接用默认
            if len(task_types) <= 1:
                pipeline = self.pipeline_router.route()
            else:
                # 多种 task_type 需要遍历
                results = []
                for tt in task_types:
                    p = self.pipeline_router.route(tt)
                    r = await p.tick(db_path, project_config)
                    results.append(r)
                return merge_results(results)
        
        return await pipeline.tick(db_path, project_config)

4. 从现有代码提取

4.1 完全共享(留在 Spawner,不动)

方法 当前位置 说明
spawn_full_agent() spawner.py spawn 进程机制
_monitor_process() spawner.py 监控 + 情况 A/B 分类
_classify_outcome() spawner.py exit 分类
_do_retry() spawner.py 续杯机制
build_task_prompt() spawner.py Task prompt(可被 Pipeline 覆写)
build_mail_prompt() spawner.py Mail prompt(搬到 MailPipeline

4.2 搬到 Pipeline 基类

当前位置 方法 说明
ticker.py check_timeouts() 超时检测
ticker.py _advance_dependencies() 依赖推进(只 DefaultPipeline
ticker.py _transition_status() 状态转换(只 DefaultPipeline
dispatcher.py dispatch() 整体 拆到各 Pipeline 的 dispatch_one()
dispatcher.py decide() 路由逻辑 → Pipeline 内调用

4.3 搬到 MailPipeline 专属

当前位置 方法 说明
dispatcher.py _mail_auto_working() Mail 标 working
dispatcher.py _mail_auto_complete() Mail 自动完成
dispatcher.py _mail_check_reply() Mail 检查回复
spawner.py _build_mail_prompt() Mail prompt 构建
ticker.py 14 处 _mail 条件分支 全部消除

5. Pipeline ProfileYAML 声明)

5.1 设计原则

  • 声明式YAML 声明 pipeline 的 stages、参数、策略
  • 自动加载:启动时扫描 config/pipelines/ 目录
  • 运行时可扩展:新增 YAML = 新增 pipeline 类型

5.2 Profile 格式

# config/pipelines/review.yaml
name: code-review
task_type: review
pipeline_class: single_step      # single_step | defaultmulti_step

agent_selection: specific_agent   # specific_agent | route_by_capability | route
specific_agent: simayi-challenger

timeout_minutes: 20
completion_status: done           # 完成后标什么状态

# 不声明 = 跳过
skip_guardrail: false
skip_planning: true               # single_step 天然跳过
# config/pipelines/data-download.yaml
name: data-download
task_type: data
pipeline_class: single_step

agent_selection: specific_agent
specific_agent: zhaoyun-data

timeout_minutes: 60
completion_status: done
verification: check_output_exists  # 内置验证函数名

5.3 注册逻辑

# main.py 或 bootstrap.py
def load_pipelines(pipelines_dir: Path, spawner, counter, router, config):
    router = PipelineRouter()
    
    # Mail pipeline(硬编码,不从 YAML 加载)
    mail_pipeline = MailPipeline(spawner, counter, router, config)
    router.register("_mail", mail_pipeline)
    
    # 默认 pipeline(多步 Task
    default_pipeline = DefaultPipeline(spawner, counter, router, config)
    router.set_default(default_pipeline)
    
    # 从 YAML 加载
    for f in sorted(pipelines_dir.glob("*.yaml")):
        profile = yaml.safe_load(f.read_text())
        cls_name = profile.get("pipeline_class", "single_step")
        
        if cls_name == "single_step":
            pipeline = SingleStepPipeline(spawner, counter, router, config, profile)
        else:
            # 多步走 default,但可以用 profile 定制参数
            pipeline = DefaultPipeline(spawner, counter, router, config, profile)
        
        router.register(profile["task_type"], pipeline)
    
    return router

6. 实施路线

Phase 1Pipeline 分离(v2.7.2 + v2.8 合并)

消灭 46 处 _mail 分支,建立 Pipeline 框架。

步骤 改动 预估行数
1. 新建 pipeline/ 目录 base.py, mail.py, default.py, router.py ~300 行
2. MailPipeline 提取 从 dispatcher/ticker/spawner 提取 Mail 逻辑 ~150 行
3. DefaultPipeline 提取 从 dispatcher/ticker 提取 Task 逻辑 ~200 行
4. PipelineRouter 路由注册中心 ~50 行
5. Ticker 简化 删除所有 _mail 分支,改调 pipeline_router -150 行
6. Dispatcher 瘦身 删除所有 _mail 分支和 Mail 方法 -200 行
7. Spawner 瘦身 删除 _build_mail_prompt 等 Mail 方法 -80 行
净增 ~270 行

Phase 2Task Type Pipeline

新增 SingleStepPipeline + YAML Profile。

步骤 改动
1. SingleStepPipeline ~100 行
2. YAML Profile 加载 ~50 行
3. review/data 两个 profile YAML ~40 行
4. 创建任务时选 task_type 前端 + API

Phase 3:高级特性

  • InteractivePipelineCheckpoint 门控)
  • 并行 stage 支持
  • 事件驱动触发(Observer 模式)
  • 动态 pipeline 选择(AI 自动推断 task_type

7. 与其他设计的关系

设计文档 关系
v2.7.2 Pipeline Refactor 本方案是它的超集。v2.7.2 的 MailPipeline 在本方案中直接实现
v2.8 Task Type Pipeline(旧) 本方案替代它。旧方案偏重 YAML Profile 细节,本方案补上了架构设计
Spawner Monitor Design 执行层共享。Spawner 的 _monitor_process_classify_outcome 不受影响
counter v2.1 共享层。counter 在 Pipeline 中注入使用,逻辑不变

8. 风险评估

风险 概率 影响 缓解
提取方法时引入回归 逐方法迁移 + 每步 E2E 测试
PipelineRouter 路由逻辑复杂化 Mail 走特殊 key _mail,其他走 task_type
和 Spawner Monitor 设计冲突 两者改不同文件,共享接口不变
Phase 1 改动量大 可拆:先提取 MailPipeline,验证后再提取 Default

9. 待确认

  1. Phase 1 和 Phase 2 是否一起做?建议先做 Phase 1Pipeline 分离),Phase 2Task Type)等 Phase 1 稳定后
  2. dispatcher.py 是否完全删除?它的逻辑全部分散到各 Pipeline 后,dispatcher 本身可以删掉
  3. Mail pipeline 是否也走 YAML Profile?建议不走——Mail 逻辑硬编码更安全
  4. 当前 task_type 列表是否需要调整DB 已有 coding/review/data/deploy/research/discuss