auto-sync: 2026-05-26 23:34:36

This commit is contained in:
cfdaily
2026-05-26 23:34:36 +08:00
parent 80addaf4ec
commit 676ed2c98f
2 changed files with 24639 additions and 0 deletions
+524
View File
@@ -0,0 +1,524 @@
# v2.8 Pipeline 架构设计
**版本**: v1.0
**日期**: 2026-05-26
**作者**: 庞统
**状态**: 调研完成 → 设计提案,待用户确认
---
## 1. 设计目标
1. **消灭 46 处 if/_mail 分支**ticker 14 + dispatcher 23 + spawner 9
2. **不同 task_type 走不同执行路径**(代码评审不需要 planning,数据下载不需要路由)
3. **新增 task_type 只加配置不改核心代码**Plugin/Registry
4. **和 spawner-monitor 设计自然融合**(执行层共享,调度层分化)
## 2. 设计模式选择
### 2.1 调研结论回顾
| 维度 | 需求 | 最适配模式 |
|------|------|-----------|
| 按类型分发 | 入口统一,分发独立 | **Routing + Registry**LangGraph |
| 骨架共享 + 步骤可替换 | tick → load → route → spawn → complete | **Template Method** |
| 新增类型零改动 | 声明式配置 | **Plugin/Registry**Argo |
| 触发方式可替换 | cron / webhook / manual / event | **Strategy** |
| 失败处理链式 | retry → rollback → escalate | **Chain of Responsibility** |
| 事件通知 | 完成后触发依赖 | **Observer**Phase 2 |
### 2.2 选定组合
```
Registry + Template Method + Strategy
```
- **Registry**Pipeline 注册中心,按 task_type 查找
- **Template Method**Pipeline 基类定义 `tick()` 骨架,子类覆写差异步骤
- **Strategy**:触发方式、路由策略、验证策略作为可注入的策略对象
不选:
- **Chain of Responsibility**:当前失败处理只有 3 种(retry/escalate/abort),不值得引入链式抽象
- **Observer**:依赖触发(F2)放 Phase 2,当前不实现
- **纯 Strategy**Pipeline 间共享逻辑太多(spawn/monitor/counter),纯策略会重复
### 2.3 业界印证
| 系统 | 做法 | 我们的对应 |
|------|------|-----------|
| Temporal | Workflow Type 注册不同处理函数 | Pipeline Registry |
| LangGraph | Router → Sub-graph | PipelineRouter → Pipeline |
| Argo | YAML WorkflowTemplate | Pipeline Profile YAML |
| Google ADK | SequentialAgent/ParallelAgent/LoopAgent | SingleStepPipeline/MultiStepPipelinePhase 2 加并行) |
---
## 3. 架构设计
### 3.1 整体结构
```
┌─────────────────────────────────────────────┐
│ Ticker │
│ tick() → 遍历 project → _tick_project() │
└───────────┬─────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ PipelineRouter │
│ route(task_type) → Pipeline │
│ route("_mail") → MailPipeline │
│ route("review") → SingleStepPipeline │
│ route(None) → DefaultPipeline │
└───────────┬─────────────────────────────────┘
┌──────┼──────┬──────────────┐
▼ ▼ ▼ ▼
┌────────┐┌────────┐┌──────────┐┌────────┐
│ Mail ││ Single ││ Default ││ Cron │
│Pipeline││ Step ││ (Multi) ││(Phase2)│
└────────┘└────────┘└──────────┘└────────┘
│ │ │
└────┬────┴────┬────┘
▼ ▼
┌──────────┐ ┌──────────┐
│ Spawner │ │ Counter │
│ (共享) │ │ (共享) │
└──────────┘ └──────────┘
```
### 3.2 Pipeline 基类(Template Method
```python
class TaskPipeline(ABC):
"""Pipeline 基类,定义 tick() 骨架"""
def __init__(self, spawner: Spawner, counter: ActiveAgentCounter,
router: Router, config: dict):
self.spawner = spawner
self.counter = counter
self.router = router
self.config = config
async def tick(self, db_path: Path, project_config: dict) -> TickResult:
"""每个 ticker 周期的入口(Template Method"""
# 1. 前置处理(超时检测、依赖推进等)
await self.pre_tick(db_path)
# 2. 加载待处理任务
tasks = self.load_pending(db_path)
# 3. 逐个处理
dispatched = 0
for task in tasks:
if not self.can_dispatch(task, db_path):
continue
result = await self.dispatch_one(task, db_path, project_config)
if result:
dispatched += 1
return TickResult(dispatched=dispatched, total=len(tasks))
# ── 钩子方法(子类覆写)──
async def pre_tick(self, db_path: Path):
"""前置处理(超时检测、依赖推进)。默认只做超时检测"""
self.check_timeouts(db_path)
@abstractmethod
def load_pending(self, db_path: Path) -> list:
"""加载待处理任务"""
...
def can_dispatch(self, task, db_path: Path) -> bool:
"""是否可以 dispatchguardrail、counter 等)"""
return True
@abstractmethod
async def dispatch_one(self, task, db_path: Path, project_config: dict) -> bool:
"""处理单个任务"""
...
def check_timeouts(self, db_path: Path):
"""超时检测(共享逻辑)"""
...
```
### 3.3 Pipeline 实现
#### MailPipeline
```python
class MailPipeline(TaskPipeline):
"""Mail 投递管道"""
async def pre_tick(self, db_path):
# Mail 也需要超时检测
self.check_timeouts(db_path)
# 不需要依赖推进
def load_pending(self, db_path):
# 只加载 pending 状态
return load_tasks(db_path, statuses=["pending"])
def can_dispatch(self, task, db_path):
# Mail 不走 guardrail
# 但需要检查 counter
return self.counter.can_acquire(task.assignee, "main")
async def dispatch_one(self, task, db_path, project_config):
agent_id = task.assignee # 直取,不路由
# 标记 working
mark_working(db_path, task.id, agent_id)
# 构造 prompt
message = self.spawner.build_mail_prompt(task)
# spawn
def on_complete(aid, outcome, session_id):
self._on_mail_complete(task.id, aid, session_id, db_path, task.must_haves)
return await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task.id,
use_main_session=True,
on_complete=on_complete,
project_id="_mail",
db_path=db_path,
)
def _on_mail_complete(self, task_id, agent_id, session_id, db_path, must_haves):
# 幻觉门控 + 自动标 done + inform 自动完成
...
```
#### SingleStepPipeline(代码评审、数据下载等)
```python
class SingleStepPipeline(TaskPipeline):
"""单步执行管道:route → spawn → verify → done"""
def __init__(self, spawner, counter, router, config, profile):
super().__init__(spawner, counter, router, config)
self.profile = profile # 从 YAML 加载
async def pre_tick(self, db_path):
# 不做依赖推进
self.check_timeouts(db_path)
def load_pending(self, db_path):
return load_tasks(db_path, statuses=["pending"],
task_type=self.profile.get("task_type"))
def can_dispatch(self, task, db_path):
# 检查 guardrail
if self.config.get("guardrails"):
violations = self.guardrails.check_task(task)
if violations:
mark_blocked(db_path, task.id, reason=violations)
return False
return True
async def dispatch_one(self, task, db_path, project_config):
# 路由(或固定 agent
agent_id = self._resolve_agent(task)
# counter 检查
if not self.counter.can_acquire(agent_id, task.id):
return False
# 标记 working
mark_working(db_path, task.id, agent_id)
# prompt
message = self.spawner.build_task_prompt(task, self.profile)
# spawn
def on_complete(aid, outcome, session_id):
self.counter.release(aid, session_id)
self._handle_completion(task.id, aid, outcome, db_path)
return await self.spawner.spawn_full_agent(
agent_id=agent_id,
message=message,
task_id=task.id,
use_main_session=False,
on_complete=on_complete,
project_id=project_config["id"],
db_path=db_path,
)
def _resolve_agent(self, task):
"""根据 profile 配置解析 agent"""
sel = self.profile.get("agent_selection", "route")
if sel == "specific_agent":
return self.profile["specific_agent"]
elif sel == "route_by_capability":
return self.router.decide(task)
else:
return self.router.decide(task)
def _handle_completion(self, task_id, agent_id, outcome, db_path):
if outcome == "completed":
mark_done(db_path, task_id)
elif outcome in ("timeout", "api_error"):
# 等 ticker 重试
mark_pending(db_path, task_id)
else:
mark_failed(db_path, task_id, reason=outcome)
```
#### DefaultPipeline(多步 Task,当前主流程)
```python
class DefaultPipeline(TaskPipeline):
"""默认多步管道:planning → executing → review → done"""
async def pre_tick(self, db_path):
# 依赖推进
self.advance_dependencies(db_path)
# 超时检测
self.check_timeouts(db_path)
def load_pending(self, db_path):
# 加载 pending + assigned + executing(续杯场景)
return load_tasks(db_path, statuses=["pending", "assigned", "executing"])
def can_dispatch(self, task, db_path):
# guardrail 检查
...
return True
async def dispatch_one(self, task, db_path, project_config):
# 当前完整流程:planning → route → spawn → review
...
```
### 3.4 PipelineRouterRegistry
```python
class PipelineRouter:
"""按 task_type 路由到对应 Pipeline"""
def __init__(self):
self._pipelines: Dict[str, TaskPipeline] = {}
self._default: TaskPipeline = None
def register(self, task_type: str, pipeline: TaskPipeline):
self._pipelines[task_type] = pipeline
def route(self, task_type: str = None, is_mail: bool = False) -> TaskPipeline:
if is_mail:
return self._pipelines["_mail"]
return self._pipelines.get(task_type, self._default)
```
### 3.5 Ticker 改造
```python
class Ticker:
def __init__(self, pipeline_router: PipelineRouter, ...):
self.pipeline_router = pipeline_router
async def _tick_project(self, project_id, project_config):
is_mail = project_id == "_mail"
db_path = self._get_db_path(project_id)
if is_mail:
pipeline = self.pipeline_router.route(is_mail=True)
else:
# 读取该 project 所有活跃的 task_type
task_types = self._get_active_task_types(db_path)
# 当所有 task_type 走同一个 pipeline 时,直接用默认
if len(task_types) <= 1:
pipeline = self.pipeline_router.route()
else:
# 多种 task_type 需要遍历
results = []
for tt in task_types:
p = self.pipeline_router.route(tt)
r = await p.tick(db_path, project_config)
results.append(r)
return merge_results(results)
return await pipeline.tick(db_path, project_config)
```
---
## 4. 从现有代码提取
### 4.1 完全共享(留在 Spawner,不动)
| 方法 | 当前位置 | 说明 |
|------|---------|------|
| `spawn_full_agent()` | spawner.py | spawn 进程机制 |
| `_monitor_process()` | spawner.py | 监控 + 情况 A/B 分类 |
| `_classify_outcome()` | spawner.py | exit 分类 |
| `_do_retry()` | spawner.py | 续杯机制 |
| `build_task_prompt()` | spawner.py | Task prompt(可被 Pipeline 覆写) |
| `build_mail_prompt()` | spawner.py | Mail prompt(搬到 MailPipeline |
### 4.2 搬到 Pipeline 基类
| 当前位置 | 方法 | 说明 |
|---------|------|------|
| ticker.py | `check_timeouts()` | 超时检测 |
| ticker.py | `_advance_dependencies()` | 依赖推进(只 DefaultPipeline |
| ticker.py | `_transition_status()` | 状态转换(只 DefaultPipeline |
| dispatcher.py | `dispatch()` 整体 | 拆到各 Pipeline 的 `dispatch_one()` |
| dispatcher.py | `decide()` | 路由逻辑 → Pipeline 内调用 |
### 4.3 搬到 MailPipeline 专属
| 当前位置 | 方法 | 说明 |
|---------|------|------|
| dispatcher.py | `_mail_auto_working()` | Mail 标 working |
| dispatcher.py | `_mail_auto_complete()` | Mail 自动完成 |
| dispatcher.py | `_mail_check_reply()` | Mail 检查回复 |
| spawner.py | `_build_mail_prompt()` | Mail prompt 构建 |
| ticker.py | 14 处 `_mail` 条件分支 | 全部消除 |
---
## 5. Pipeline ProfileYAML 声明)
### 5.1 设计原则
- **声明式**YAML 声明 pipeline 的 stages、参数、策略
- **自动加载**:启动时扫描 `config/pipelines/` 目录
- **运行时可扩展**:新增 YAML = 新增 pipeline 类型
### 5.2 Profile 格式
```yaml
# config/pipelines/review.yaml
name: code-review
task_type: review
pipeline_class: single_step # single_step | defaultmulti_step
agent_selection: specific_agent # specific_agent | route_by_capability | route
specific_agent: simayi-challenger
timeout_minutes: 20
completion_status: done # 完成后标什么状态
# 不声明 = 跳过
skip_guardrail: false
skip_planning: true # single_step 天然跳过
```
```yaml
# config/pipelines/data-download.yaml
name: data-download
task_type: data
pipeline_class: single_step
agent_selection: specific_agent
specific_agent: zhaoyun-data
timeout_minutes: 60
completion_status: done
verification: check_output_exists # 内置验证函数名
```
### 5.3 注册逻辑
```python
# main.py 或 bootstrap.py
def load_pipelines(pipelines_dir: Path, spawner, counter, router, config):
router = PipelineRouter()
# Mail pipeline(硬编码,不从 YAML 加载)
mail_pipeline = MailPipeline(spawner, counter, router, config)
router.register("_mail", mail_pipeline)
# 默认 pipeline(多步 Task
default_pipeline = DefaultPipeline(spawner, counter, router, config)
router.set_default(default_pipeline)
# 从 YAML 加载
for f in sorted(pipelines_dir.glob("*.yaml")):
profile = yaml.safe_load(f.read_text())
cls_name = profile.get("pipeline_class", "single_step")
if cls_name == "single_step":
pipeline = SingleStepPipeline(spawner, counter, router, config, profile)
else:
# 多步走 default,但可以用 profile 定制参数
pipeline = DefaultPipeline(spawner, counter, router, config, profile)
router.register(profile["task_type"], pipeline)
return router
```
---
## 6. 实施路线
### Phase 1Pipeline 分离(v2.7.2 + v2.8 合并)
消灭 46 处 `_mail` 分支,建立 Pipeline 框架。
| 步骤 | 改动 | 预估行数 |
|------|------|---------|
| 1. 新建 `pipeline/` 目录 | base.py, mail.py, default.py, router.py | ~300 行 |
| 2. MailPipeline 提取 | 从 dispatcher/ticker/spawner 提取 Mail 逻辑 | ~150 行 |
| 3. DefaultPipeline 提取 | 从 dispatcher/ticker 提取 Task 逻辑 | ~200 行 |
| 4. PipelineRouter | 路由注册中心 | ~50 行 |
| 5. Ticker 简化 | 删除所有 `_mail` 分支,改调 pipeline_router | -150 行 |
| 6. Dispatcher 瘦身 | 删除所有 `_mail` 分支和 Mail 方法 | -200 行 |
| 7. Spawner 瘦身 | 删除 `_build_mail_prompt` 等 Mail 方法 | -80 行 |
| **净增** | | **~270 行** |
### Phase 2Task Type Pipeline
新增 SingleStepPipeline + YAML Profile。
| 步骤 | 改动 |
|------|------|
| 1. SingleStepPipeline | ~100 行 |
| 2. YAML Profile 加载 | ~50 行 |
| 3. review/data 两个 profile YAML | ~40 行 |
| 4. 创建任务时选 task_type | 前端 + API |
### Phase 3:高级特性
- InteractivePipelineCheckpoint 门控)
- 并行 stage 支持
- 事件驱动触发(Observer 模式)
- 动态 pipeline 选择(AI 自动推断 task_type
---
## 7. 与其他设计的关系
| 设计文档 | 关系 |
|---------|------|
| v2.7.2 Pipeline Refactor | **本方案是它的超集**。v2.7.2 的 MailPipeline 在本方案中直接实现 |
| v2.8 Task Type Pipeline(旧) | **本方案替代它**。旧方案偏重 YAML Profile 细节,本方案补上了架构设计 |
| Spawner Monitor Design | **执行层共享**。Spawner 的 `_monitor_process``_classify_outcome` 不受影响 |
| counter v2.1 | **共享层**。counter 在 Pipeline 中注入使用,逻辑不变 |
---
## 8. 风险评估
| 风险 | 概率 | 影响 | 缓解 |
|------|------|------|------|
| 提取方法时引入回归 | 中 | 高 | 逐方法迁移 + 每步 E2E 测试 |
| PipelineRouter 路由逻辑复杂化 | 低 | 中 | Mail 走特殊 key `_mail`,其他走 task_type |
| 和 Spawner Monitor 设计冲突 | 低 | 高 | 两者改不同文件,共享接口不变 |
| Phase 1 改动量大 | 中 | 中 | 可拆:先提取 MailPipeline,验证后再提取 Default |
---
## 9. 待确认
1. **Phase 1 和 Phase 2 是否一起做**?建议先做 Phase 1Pipeline 分离),Phase 2Task Type)等 Phase 1 稳定后
2. **dispatcher.py 是否完全删除**?它的逻辑全部分散到各 Pipeline 后,dispatcher 本身可以删掉
3. **Mail pipeline 是否也走 YAML Profile**?建议不走——Mail 逻辑硬编码更安全
4. **当前 task_type 列表是否需要调整**DB 已有 `coding/review/data/deploy/research/discuss`
File diff suppressed because it is too large Load Diff