From 2993f921c8d10ed829f76d190f3d7e2a85756bcf Mon Sep 17 00:00:00 2001 From: cfdaily Date: Mon, 25 May 2026 12:04:34 +0800 Subject: [PATCH] auto-sync: 2026-05-25 12:04:34 --- docs/design/v2.7.2-pipeline-refactor.md | 300 ++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 docs/design/v2.7.2-pipeline-refactor.md diff --git a/docs/design/v2.7.2-pipeline-refactor.md b/docs/design/v2.7.2-pipeline-refactor.md new file mode 100644 index 0000000..9cfb8e9 --- /dev/null +++ b/docs/design/v2.7.2-pipeline-refactor.md @@ -0,0 +1,300 @@ +# v2.7.2 调度管道重构设计 + +**版本**: v1.0 +**日期**: 2026-05-25 +**作者**: 庞统 +**状态**: 待评审 + +--- + +## 1. 问题背景 + +### 1.1 事件 + +2026-05-25 00:30,庞统发邮件给司马懿(Mail),触发以下链条: + +``` +Mail 创建 (assignee=simayi-challenger) + → ticker dispatch → guardrail 拦截("删掉"关键词)→ 标 blocked + → _advance_dependencies → blocked→pending(depends_on=[] 视为 all_deps_done=True) + → _transition_status(pending) → assignee=NULL ← 🔴 Bug + → 第二次 dispatch → decide(assignee=NULL) → 不走快速路径4 → mode="delegate" + → broadcast_tasks → Broadcasting 6 idle agents + → 6 个 Agent 全部 spawn → 6 路并发 API → zhipu rate limit → 全部假死 +``` + +### 1.2 根因 + +三个独立 Bug 叠加: + +**Bug-A(致命)**:`_transition_status` 推回 pending 时 `SET assignee=NULL`。Mail 的 assignee 是收件人,清空后路由不知道发给谁 → 走广播。 + +**Bug-B(严重)**:`_advance_dependencies` 对 `depends_on=[]` 的 blocked 任务返回 `all_deps_done=True`,立刻推进回 pending。guardrail 刚拦下就被放行。 + +**Bug-C(触发器)**:guardrail 对 Mail 任务检查(应跳过),邮件内容含"删掉"触发 data_deletion 规则。 + +### 1.3 结构性问题 + +当前代码有 **41 处** `if is_mail` / `if project_id == "_mail"` 的差异化分支,散落在: + +| 文件 | if/_mail 分支数 | 说明 | +|------|----------------|------| +| dispatcher.py | 22 | 路由、spawn message、on_complete、状态流转 | +| ticker.py | 11 | tick 入口、dispatch、guardrail、超时、幻觉门控 | +| spawner.py | 8 | prompt 模板、完成状态 | + +**问题本质**:Mail 和 Task 是两种语义完全不同的任务类型,但共享同一个调度链,靠 if/else 做差异化。每次改 Mail 的逻辑,都可能影响 Task,反之亦然。 + +--- + +## 2. 业界调研 + +### 2.1 Celery / RabbitMQ:独立队列路由 + +不同任务类型路由到不同的 Queue,Worker 按类型订阅。 + +```python +task_routes = { + 'tasks.email.*': {'queue': 'email_queue'}, + 'tasks.analytics.*': {'queue': 'analytics_queue'}, +} +``` + +**核心思路**:隔离在最底层——不同的队列,不同的 worker。 + +### 2.2 Temporal:统一调度 + 类型化队列 + 共享执行 + +Temporal 有 Workflow Task 和 Activity Task,走同一个 Matching Service(调度器),但通过不同的 Task Queue 路由到不同的 Worker。**执行框架统一**(poll → execute → report result)。 + +**核心思路**:调度器统一,队列按类型分,执行机制共享。 + +### 2.3 Hermes Kanban:一切皆 Task,差异通过 Profile 表达 + +Hermes 只有一种 Dispatcher,所有任务走同一个 `todo → ready → claimed → running → done` 生命周期。差异通过 Profile(Agent 能力配置)表达,不在调度链里做 if/else。 + +**核心思路**:统一生命周期,差异下推到 Agent 配置层。 + +### 2.4 Airflow:DAG 定义 vs Executor 执行 彻底分离 + +DAG 只定义"做什么、依赖什么"(声明式),不关心"怎么执行"。Executor 负责"怎么执行"(本地/远程/GPU)。**通过接口连接,不通过 if/else**。 + +**核心思路**:任务定义和任务执行是两个独立关注点。 + +### 2.5 业界共识 + +> **调度(谁来干、怎么路由)和执行(怎么 spawn、怎么监控)是两个独立的关注点。调度按任务类型差异化,执行是通用共享的。** + +--- + +## 3. Mail vs Task 差异分析 + +### 3.1 差异点 + +| 维度 | Task | Mail | +|------|------|------| +| 路由方式 | Router 决定(LLM/广播/能力匹配) | 直接点对点(assignee=收件人) | +| guardrail | 需要检查 | 不需要(Agent 间通信) | +| 依赖推进 | blocked→pending 需要 depends_on | 不需要 | +| assignee 语义 | 执行者(可清空) | 收件人(永不清空) | +| 状态流转 | 完整(pending→claimed→working→review→done) | 简化(pending→working→done) | +| 完成状态 | review | done | +| prompt 模板 | 标准 SPAWN_PROMPT | 精简 MAIL_TEMPLATE | + +### 3.2 共享点 + +| 维度 | 说明 | +|------|------| +| spawn 机制 | `spawn_full_agent()` 完全共享 | +| monitor 机制 | `_monitor_process()` 完全共享 | +| exit 分类 | `_classify_outcome()` 完全共享 | +| 信封/载荷分离 | 系统管状态,Agent 管业务(未来 Task 也应做) | +| 幻觉门控 | 验证产出是否存在(未来 Task 也应做) | +| 超时检测框架 | 超时扫描逻辑通用,只是超时后处理不同 | +| DB | 共享 SQLite | + +### 3.3 关键洞察 + +信封/载荷分离、幻觉门控、系统管状态——**这些不是 Mail 的专属特性,而是所有任务类型的演进方向**。当前只是 Mail 先做了(因为 Mail 最简单),未来 Task 也应该做。 + +因此,这些能力应该放在**共享层**,不应绑定到 Mail 管道。 + +--- + +## 4. 设计方案 + +### 方案 A:最小止血(L1,~7 行改动) + +只修三个 Bug,不改架构。 + +**改动 1**:`_transition_status` 对 `_mail` project 不清空 assignee +```python +# ticker.py _transition_status +if new_status == "pending": + # Mail 的 assignee 是收件人,不能清空 + if self._current_project_id == "_mail": + conn.execute("UPDATE tasks SET status=?, updated_at=? WHERE id=?", ...) + else: + conn.execute("UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?", ...) +``` + +**改动 2**:`_advance_dependencies` 对 `depends_on=[]` 不推进 +```python +# ticker.py _advance_dependencies +for item in blocked: + if not item["depends_on"]: # 无依赖的 blocked 不自动推进 + continue + if item["all_deps_done"]: + ... +``` + +**改动 3**:dispatcher.dispatch 对 `_mail` project 跳过 guardrail +```python +# dispatcher.py dispatch +is_mail = project_config.get("project_id") == "_mail" if project_config else False +if self.guardrails and not is_mail: + violations = self.guardrails.check_task(task) + ... +``` + +| 优点 | 缺点 | +|------|------| +| 改动最小(~7 行) | 新增 3 个 if is_mail 分支(41→44) | +| 不影响现有 Task 逻辑 | 治标不治本,未来新类型继续加 if | +| 立刻止血 | 不解决结构性问题 | + +**风险**:改 `_transition_status` 可能影响 Task 的 blocked→pending 流转(需确认 Task 不会走到这个分支)。 + +### 方案 B:Pipeline 分离(L3,~200 行改动) + +引入 Pipeline 概念,每种任务类型有自己的调度管道。 + +```python +# ── Pipeline 接口 ── +class TaskPipeline(ABC): + @abstractmethod + async def tick(self, db_path: Path) -> Dict: ... + + @abstractmethod + def check_timeouts(self, db_path: Path) -> List[str]: ... + +# ── Mail 管道 ── +class MailPipeline(TaskPipeline): + async def tick(self, db_path): + # 1. 超时检测 + self.check_timeouts(db_path) + # 2. 读 pending → 直接用 assignee → spawn + for task in pending(db_path): + self._mark_working(task, db_path) # 不走 _transition_status + on_complete = lambda: self._on_complete(task, db_path) + self.spawner.spawn(agent_id=task.assignee, on_complete=on_complete) + return result + +# ── Task 管道 ── +class StandardTaskPipeline(TaskPipeline): + async def tick(self, db_path): + # 1. 依赖推进 + self.advance_dependencies(db_path) + # 2. 超时检测 + self.check_timeouts(db_path) + # 3. 路由 + guardrail + dispatch + for task in pending(db_path): + decision = self.router.decide(task) + if self.guardrails.check(task): + self.dispatch(task, decision) + return result + +# ── ticker 只做分发 ── +class Ticker: + def __init__(self): + self.pipelines = { + "_mail": MailPipeline(spawner, counter), + # 其他 project 走默认 + } + + async def _tick_project(self, project_id, ...): + pipeline = self.pipelines.get(project_id, self.default_pipeline) + return await pipeline.tick(db_path) +``` + +| 优点 | 缺点 | +|------|------| +| Mail 和 Task 完全隔离 | 改动范围较大(~200 行) | +| 新类型只需加 Pipeline | 需要从 ticker/dispatcher 里抽取方法 | +| 共享层(spawner/DB)不动 | 需要充分测试确保不引入回归 | +| 未来可扩展(CronJob 等) | | +| 去掉 41 个 if/else | | + +### 方案 C:A 先止血 + B 后重构 + +**第一步(立刻)**:方案 A 止血,修三个 Bug。 +**第二步(v2.7.2)**:方案 B Pipeline 重构。 + +| 优点 | 缺点 | +|------|------| +| 先止血再治本 | 两次改动,方案 A 的代码在方案 B 后可能被删 | +| 风险最低 | | + +--- + +## 5. 交叉点处理(方案 B 适用) + +### 5.1 完全共享(不需要差异化) + +| 组件 | 位置 | 说明 | +|------|------|------| +| `spawn_full_agent()` | spawner.py | spawn 进程机制完全一样 | +| `_monitor_process()` | spawner.py | 监控逻辑完全一样 | +| `_classify_outcome()` | spawner.py | exit 分类完全一样 | +| `_handle_exit()` | spawner.py | 结果处理框架一样 | +| `_do_retry()` | spawner.py | 续杯机制完全一样 | +| DB (SQLite) | blackboard/ | 共享存储 | +| counter (ActiveAgentCounter) | counter.py | 共享并发控制 | + +### 5.2 共享框架 + 各 Pipeline 注册回调 + +| 组件 | 共享部分 | 差异部分 | +|------|---------|---------| +| 超时检测 | 扫描 working 任务、计算 elapsed | Task→failed, Mail→先查回复再决定 | +| on_complete | 框架(spawner 调 callback) | Task→release counter, Mail→幻觉门控+自动标 done | +| 信封/载荷分离 | 系统管状态的机制 | 未来 Task 也应启用 | + +### 5.3 完全独立(各 Pipeline 自己管) + +| 组件 | Task Pipeline | Mail Pipeline | +|------|--------------|---------------| +| 路由 | Router (decide) | 直接 assignee | +| guardrail | 检查 | 跳过 | +| 依赖推进 | _advance_dependencies | 不需要 | +| 状态流转 | _transition_status | 自己的 _mark_working / _mark_done | +| prompt 模板 | 标准 SPAWN_PROMPT | MAIL_TEMPLATE | + +--- + +## 6. spawner retry 问题(附带修复) + +### 6.1 之前的调查结论 + +spawner retry 绕过 counter(BUG-2a WORKAROUND),导致同一 Agent 并发 spawn,打爆 API rate limit。 + +### 6.2 修复方案(已评审通过) + +| 改动 | 文件 | 说明 | +|------|------|------| +| 撤回 retry_release WORKAROUND | spawner.py | on_complete 传入 _do_retry,counter 贯穿 retry 链 | +| _do_retry 直接 spawn | spawner.py | 不检查 counter(retry 是 counter 占用链的延续) | +| dispatcher dispatch 加 is_available 检查 | dispatcher.py | 首次 dispatch 检查 counter | +| ticker catch AgentBusyError | ticker.py | Agent 忙时跳过 | + +### 6.3 与 Pipeline 重构的关系 + +retry 修复属于 spawner 执行层,**不受 Pipeline 重构影响**(执行层是共享的)。可以独立推进。 + +--- + +## 7. 请评审 + +1. **紧急程度**:Bug-A(assignee 清空)是否需要立刻止血?还是直接做 Pipeline 重构? +2. **方案选择**:A(止血)/ B(Pipeline)/ C(先 A 后 B)? +3. **Pipeline 设计**:接口是否合理?交叉点处理是否完整? +4. **spawner retry 修复**:是否独立推进? +5. **遗漏风险**:有没有我没考虑到的场景?