Files
sanguo_moziplus_v2/docs/design/v2.7.2-pipeline-refactor.md
T
2026-05-25 12:04:34 +08:00

301 lines
11 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# v2.7.2 调度管道重构设计
**版本**: v1.0
**日期**: 2026-05-25
**作者**: 庞统
**状态**: 待评审
---
## 1. 问题背景
### 1.1 事件
2026-05-25 00:30,庞统发邮件给司马懿(Mail),触发以下链条:
```
Mail 创建 (assignee=simayi-challenger)
→ ticker dispatch → guardrail 拦截("删掉"关键词)→ 标 blocked
→ _advance_dependencies → blocked→pendingdepends_on=[] 视为 all_deps_done=True
→ _transition_status(pending) → assignee=NULL ← 🔴 Bug
→ 第二次 dispatch → decide(assignee=NULL) → 不走快速路径4 → mode="delegate"
→ broadcast_tasks → Broadcasting 6 idle agents
→ 6 个 Agent 全部 spawn → 6 路并发 API → zhipu rate limit → 全部假死
```
### 1.2 根因
三个独立 Bug 叠加:
**Bug-A(致命)**`_transition_status` 推回 pending 时 `SET assignee=NULL`。Mail 的 assignee 是收件人,清空后路由不知道发给谁 → 走广播。
**Bug-B(严重)**`_advance_dependencies``depends_on=[]` 的 blocked 任务返回 `all_deps_done=True`,立刻推进回 pending。guardrail 刚拦下就被放行。
**Bug-C(触发器)**guardrail 对 Mail 任务检查(应跳过),邮件内容含"删掉"触发 data_deletion 规则。
### 1.3 结构性问题
当前代码有 **41 处** `if is_mail` / `if project_id == "_mail"` 的差异化分支,散落在:
| 文件 | if/_mail 分支数 | 说明 |
|------|----------------|------|
| dispatcher.py | 22 | 路由、spawn message、on_complete、状态流转 |
| ticker.py | 11 | tick 入口、dispatch、guardrail、超时、幻觉门控 |
| spawner.py | 8 | prompt 模板、完成状态 |
**问题本质**:Mail 和 Task 是两种语义完全不同的任务类型,但共享同一个调度链,靠 if/else 做差异化。每次改 Mail 的逻辑,都可能影响 Task,反之亦然。
---
## 2. 业界调研
### 2.1 Celery / RabbitMQ:独立队列路由
不同任务类型路由到不同的 Queue,Worker 按类型订阅。
```python
task_routes = {
'tasks.email.*': {'queue': 'email_queue'},
'tasks.analytics.*': {'queue': 'analytics_queue'},
}
```
**核心思路**:隔离在最底层——不同的队列,不同的 worker。
### 2.2 Temporal:统一调度 + 类型化队列 + 共享执行
Temporal 有 Workflow Task 和 Activity Task,走同一个 Matching Service(调度器),但通过不同的 Task Queue 路由到不同的 Worker。**执行框架统一**poll → execute → report result)。
**核心思路**:调度器统一,队列按类型分,执行机制共享。
### 2.3 Hermes Kanban:一切皆 Task,差异通过 Profile 表达
Hermes 只有一种 Dispatcher,所有任务走同一个 `todo → ready → claimed → running → done` 生命周期。差异通过 Profile(Agent 能力配置)表达,不在调度链里做 if/else。
**核心思路**:统一生命周期,差异下推到 Agent 配置层。
### 2.4 AirflowDAG 定义 vs Executor 执行 彻底分离
DAG 只定义"做什么、依赖什么"(声明式),不关心"怎么执行"。Executor 负责"怎么执行"(本地/远程/GPU)。**通过接口连接,不通过 if/else**。
**核心思路**:任务定义和任务执行是两个独立关注点。
### 2.5 业界共识
> **调度(谁来干、怎么路由)和执行(怎么 spawn、怎么监控)是两个独立的关注点。调度按任务类型差异化,执行是通用共享的。**
---
## 3. Mail vs Task 差异分析
### 3.1 差异点
| 维度 | Task | Mail |
|------|------|------|
| 路由方式 | Router 决定(LLM/广播/能力匹配) | 直接点对点(assignee=收件人) |
| guardrail | 需要检查 | 不需要(Agent 间通信) |
| 依赖推进 | blocked→pending 需要 depends_on | 不需要 |
| assignee 语义 | 执行者(可清空) | 收件人(永不清空) |
| 状态流转 | 完整(pending→claimed→working→review→done | 简化(pending→working→done |
| 完成状态 | review | done |
| prompt 模板 | 标准 SPAWN_PROMPT | 精简 MAIL_TEMPLATE |
### 3.2 共享点
| 维度 | 说明 |
|------|------|
| spawn 机制 | `spawn_full_agent()` 完全共享 |
| monitor 机制 | `_monitor_process()` 完全共享 |
| exit 分类 | `_classify_outcome()` 完全共享 |
| 信封/载荷分离 | 系统管状态,Agent 管业务(未来 Task 也应做) |
| 幻觉门控 | 验证产出是否存在(未来 Task 也应做) |
| 超时检测框架 | 超时扫描逻辑通用,只是超时后处理不同 |
| DB | 共享 SQLite |
### 3.3 关键洞察
信封/载荷分离、幻觉门控、系统管状态——**这些不是 Mail 的专属特性,而是所有任务类型的演进方向**。当前只是 Mail 先做了(因为 Mail 最简单),未来 Task 也应该做。
因此,这些能力应该放在**共享层**,不应绑定到 Mail 管道。
---
## 4. 设计方案
### 方案 A:最小止血(L1,~7 行改动)
只修三个 Bug,不改架构。
**改动 1**`_transition_status``_mail` project 不清空 assignee
```python
# ticker.py _transition_status
if new_status == "pending":
# Mail 的 assignee 是收件人,不能清空
if self._current_project_id == "_mail":
conn.execute("UPDATE tasks SET status=?, updated_at=? WHERE id=?", ...)
else:
conn.execute("UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?", ...)
```
**改动 2**`_advance_dependencies``depends_on=[]` 不推进
```python
# ticker.py _advance_dependencies
for item in blocked:
if not item["depends_on"]: # 无依赖的 blocked 不自动推进
continue
if item["all_deps_done"]:
...
```
**改动 3**dispatcher.dispatch 对 `_mail` project 跳过 guardrail
```python
# dispatcher.py dispatch
is_mail = project_config.get("project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
...
```
| 优点 | 缺点 |
|------|------|
| 改动最小(~7 行) | 新增 3 个 if is_mail 分支(41→44 |
| 不影响现有 Task 逻辑 | 治标不治本,未来新类型继续加 if |
| 立刻止血 | 不解决结构性问题 |
**风险**:改 `_transition_status` 可能影响 Task 的 blocked→pending 流转(需确认 Task 不会走到这个分支)。
### 方案 BPipeline 分离(L3~200 行改动)
引入 Pipeline 概念,每种任务类型有自己的调度管道。
```python
# ── Pipeline 接口 ──
class TaskPipeline(ABC):
@abstractmethod
async def tick(self, db_path: Path) -> Dict: ...
@abstractmethod
def check_timeouts(self, db_path: Path) -> List[str]: ...
# ── Mail 管道 ──
class MailPipeline(TaskPipeline):
async def tick(self, db_path):
# 1. 超时检测
self.check_timeouts(db_path)
# 2. 读 pending → 直接用 assignee → spawn
for task in pending(db_path):
self._mark_working(task, db_path) # 不走 _transition_status
on_complete = lambda: self._on_complete(task, db_path)
self.spawner.spawn(agent_id=task.assignee, on_complete=on_complete)
return result
# ── Task 管道 ──
class StandardTaskPipeline(TaskPipeline):
async def tick(self, db_path):
# 1. 依赖推进
self.advance_dependencies(db_path)
# 2. 超时检测
self.check_timeouts(db_path)
# 3. 路由 + guardrail + dispatch
for task in pending(db_path):
decision = self.router.decide(task)
if self.guardrails.check(task):
self.dispatch(task, decision)
return result
# ── ticker 只做分发 ──
class Ticker:
def __init__(self):
self.pipelines = {
"_mail": MailPipeline(spawner, counter),
# 其他 project 走默认
}
async def _tick_project(self, project_id, ...):
pipeline = self.pipelines.get(project_id, self.default_pipeline)
return await pipeline.tick(db_path)
```
| 优点 | 缺点 |
|------|------|
| Mail 和 Task 完全隔离 | 改动范围较大(~200 行) |
| 新类型只需加 Pipeline | 需要从 ticker/dispatcher 里抽取方法 |
| 共享层(spawner/DB)不动 | 需要充分测试确保不引入回归 |
| 未来可扩展(CronJob 等) | |
| 去掉 41 个 if/else | |
### 方案 C:A 先止血 + B 后重构
**第一步(立刻)**:方案 A 止血,修三个 Bug。
**第二步(v2.7.2**:方案 B Pipeline 重构。
| 优点 | 缺点 |
|------|------|
| 先止血再治本 | 两次改动,方案 A 的代码在方案 B 后可能被删 |
| 风险最低 | |
---
## 5. 交叉点处理(方案 B 适用)
### 5.1 完全共享(不需要差异化)
| 组件 | 位置 | 说明 |
|------|------|------|
| `spawn_full_agent()` | spawner.py | spawn 进程机制完全一样 |
| `_monitor_process()` | spawner.py | 监控逻辑完全一样 |
| `_classify_outcome()` | spawner.py | exit 分类完全一样 |
| `_handle_exit()` | spawner.py | 结果处理框架一样 |
| `_do_retry()` | spawner.py | 续杯机制完全一样 |
| DB (SQLite) | blackboard/ | 共享存储 |
| counter (ActiveAgentCounter) | counter.py | 共享并发控制 |
### 5.2 共享框架 + 各 Pipeline 注册回调
| 组件 | 共享部分 | 差异部分 |
|------|---------|---------|
| 超时检测 | 扫描 working 任务、计算 elapsed | Task→failed, Mail→先查回复再决定 |
| on_complete | 框架(spawner 调 callback | Task→release counter, Mail→幻觉门控+自动标 done |
| 信封/载荷分离 | 系统管状态的机制 | 未来 Task 也应启用 |
### 5.3 完全独立(各 Pipeline 自己管)
| 组件 | Task Pipeline | Mail Pipeline |
|------|--------------|---------------|
| 路由 | Router (decide) | 直接 assignee |
| guardrail | 检查 | 跳过 |
| 依赖推进 | _advance_dependencies | 不需要 |
| 状态流转 | _transition_status | 自己的 _mark_working / _mark_done |
| prompt 模板 | 标准 SPAWN_PROMPT | MAIL_TEMPLATE |
---
## 6. spawner retry 问题(附带修复)
### 6.1 之前的调查结论
spawner retry 绕过 counterBUG-2a WORKAROUND),导致同一 Agent 并发 spawn,打爆 API rate limit。
### 6.2 修复方案(已评审通过)
| 改动 | 文件 | 说明 |
|------|------|------|
| 撤回 retry_release WORKAROUND | spawner.py | on_complete 传入 _do_retrycounter 贯穿 retry 链 |
| _do_retry 直接 spawn | spawner.py | 不检查 counterretry 是 counter 占用链的延续) |
| dispatcher dispatch 加 is_available 检查 | dispatcher.py | 首次 dispatch 检查 counter |
| ticker catch AgentBusyError | ticker.py | Agent 忙时跳过 |
### 6.3 与 Pipeline 重构的关系
retry 修复属于 spawner 执行层,**不受 Pipeline 重构影响**(执行层是共享的)。可以独立推进。
---
## 7. 请评审
1. **紧急程度**Bug-Aassignee 清空)是否需要立刻止血?还是直接做 Pipeline 重构?
2. **方案选择**A(止血)/ BPipeline/ C(先 A 后 B)?
3. **Pipeline 设计**:接口是否合理?交叉点处理是否完整?
4. **spawner retry 修复**:是否独立推进?
5. **遗漏风险**:有没有我没考虑到的场景?