auto-sync: 2026-05-25 12:04:34

This commit is contained in:
cfdaily
2026-05-25 12:04:34 +08:00
parent d896eb0d36
commit 2993f921c8
+300
View File
@@ -0,0 +1,300 @@
# v2.7.2 调度管道重构设计
**版本**: v1.0
**日期**: 2026-05-25
**作者**: 庞统
**状态**: 待评审
---
## 1. 问题背景
### 1.1 事件
2026-05-25 00:30,庞统发邮件给司马懿(Mail),触发以下链条:
```
Mail 创建 (assignee=simayi-challenger)
→ ticker dispatch → guardrail 拦截("删掉"关键词)→ 标 blocked
→ _advance_dependencies → blocked→pendingdepends_on=[] 视为 all_deps_done=True
→ _transition_status(pending) → assignee=NULL ← 🔴 Bug
→ 第二次 dispatch → decide(assignee=NULL) → 不走快速路径4 → mode="delegate"
→ broadcast_tasks → Broadcasting 6 idle agents
→ 6 个 Agent 全部 spawn → 6 路并发 API → zhipu rate limit → 全部假死
```
### 1.2 根因
三个独立 Bug 叠加:
**Bug-A(致命)**`_transition_status` 推回 pending 时 `SET assignee=NULL`。Mail 的 assignee 是收件人,清空后路由不知道发给谁 → 走广播。
**Bug-B(严重)**`_advance_dependencies``depends_on=[]` 的 blocked 任务返回 `all_deps_done=True`,立刻推进回 pending。guardrail 刚拦下就被放行。
**Bug-C(触发器)**guardrail 对 Mail 任务检查(应跳过),邮件内容含"删掉"触发 data_deletion 规则。
### 1.3 结构性问题
当前代码有 **41 处** `if is_mail` / `if project_id == "_mail"` 的差异化分支,散落在:
| 文件 | if/_mail 分支数 | 说明 |
|------|----------------|------|
| dispatcher.py | 22 | 路由、spawn message、on_complete、状态流转 |
| ticker.py | 11 | tick 入口、dispatch、guardrail、超时、幻觉门控 |
| spawner.py | 8 | prompt 模板、完成状态 |
**问题本质**:Mail 和 Task 是两种语义完全不同的任务类型,但共享同一个调度链,靠 if/else 做差异化。每次改 Mail 的逻辑,都可能影响 Task,反之亦然。
---
## 2. 业界调研
### 2.1 Celery / RabbitMQ:独立队列路由
不同任务类型路由到不同的 Queue,Worker 按类型订阅。
```python
task_routes = {
'tasks.email.*': {'queue': 'email_queue'},
'tasks.analytics.*': {'queue': 'analytics_queue'},
}
```
**核心思路**:隔离在最底层——不同的队列,不同的 worker。
### 2.2 Temporal:统一调度 + 类型化队列 + 共享执行
Temporal 有 Workflow Task 和 Activity Task,走同一个 Matching Service(调度器),但通过不同的 Task Queue 路由到不同的 Worker。**执行框架统一**poll → execute → report result)。
**核心思路**:调度器统一,队列按类型分,执行机制共享。
### 2.3 Hermes Kanban:一切皆 Task,差异通过 Profile 表达
Hermes 只有一种 Dispatcher,所有任务走同一个 `todo → ready → claimed → running → done` 生命周期。差异通过 Profile(Agent 能力配置)表达,不在调度链里做 if/else。
**核心思路**:统一生命周期,差异下推到 Agent 配置层。
### 2.4 AirflowDAG 定义 vs Executor 执行 彻底分离
DAG 只定义"做什么、依赖什么"(声明式),不关心"怎么执行"。Executor 负责"怎么执行"(本地/远程/GPU)。**通过接口连接,不通过 if/else**。
**核心思路**:任务定义和任务执行是两个独立关注点。
### 2.5 业界共识
> **调度(谁来干、怎么路由)和执行(怎么 spawn、怎么监控)是两个独立的关注点。调度按任务类型差异化,执行是通用共享的。**
---
## 3. Mail vs Task 差异分析
### 3.1 差异点
| 维度 | Task | Mail |
|------|------|------|
| 路由方式 | Router 决定(LLM/广播/能力匹配) | 直接点对点(assignee=收件人) |
| guardrail | 需要检查 | 不需要(Agent 间通信) |
| 依赖推进 | blocked→pending 需要 depends_on | 不需要 |
| assignee 语义 | 执行者(可清空) | 收件人(永不清空) |
| 状态流转 | 完整(pending→claimed→working→review→done | 简化(pending→working→done |
| 完成状态 | review | done |
| prompt 模板 | 标准 SPAWN_PROMPT | 精简 MAIL_TEMPLATE |
### 3.2 共享点
| 维度 | 说明 |
|------|------|
| spawn 机制 | `spawn_full_agent()` 完全共享 |
| monitor 机制 | `_monitor_process()` 完全共享 |
| exit 分类 | `_classify_outcome()` 完全共享 |
| 信封/载荷分离 | 系统管状态,Agent 管业务(未来 Task 也应做) |
| 幻觉门控 | 验证产出是否存在(未来 Task 也应做) |
| 超时检测框架 | 超时扫描逻辑通用,只是超时后处理不同 |
| DB | 共享 SQLite |
### 3.3 关键洞察
信封/载荷分离、幻觉门控、系统管状态——**这些不是 Mail 的专属特性,而是所有任务类型的演进方向**。当前只是 Mail 先做了(因为 Mail 最简单),未来 Task 也应该做。
因此,这些能力应该放在**共享层**,不应绑定到 Mail 管道。
---
## 4. 设计方案
### 方案 A:最小止血(L1,~7 行改动)
只修三个 Bug,不改架构。
**改动 1**`_transition_status``_mail` project 不清空 assignee
```python
# ticker.py _transition_status
if new_status == "pending":
# Mail 的 assignee 是收件人,不能清空
if self._current_project_id == "_mail":
conn.execute("UPDATE tasks SET status=?, updated_at=? WHERE id=?", ...)
else:
conn.execute("UPDATE tasks SET status=?, assignee=NULL, resumed_from=NULL, updated_at=? WHERE id=?", ...)
```
**改动 2**`_advance_dependencies``depends_on=[]` 不推进
```python
# ticker.py _advance_dependencies
for item in blocked:
if not item["depends_on"]: # 无依赖的 blocked 不自动推进
continue
if item["all_deps_done"]:
...
```
**改动 3**dispatcher.dispatch 对 `_mail` project 跳过 guardrail
```python
# dispatcher.py dispatch
is_mail = project_config.get("project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
...
```
| 优点 | 缺点 |
|------|------|
| 改动最小(~7 行) | 新增 3 个 if is_mail 分支(41→44 |
| 不影响现有 Task 逻辑 | 治标不治本,未来新类型继续加 if |
| 立刻止血 | 不解决结构性问题 |
**风险**:改 `_transition_status` 可能影响 Task 的 blocked→pending 流转(需确认 Task 不会走到这个分支)。
### 方案 BPipeline 分离(L3~200 行改动)
引入 Pipeline 概念,每种任务类型有自己的调度管道。
```python
# ── Pipeline 接口 ──
class TaskPipeline(ABC):
@abstractmethod
async def tick(self, db_path: Path) -> Dict: ...
@abstractmethod
def check_timeouts(self, db_path: Path) -> List[str]: ...
# ── Mail 管道 ──
class MailPipeline(TaskPipeline):
async def tick(self, db_path):
# 1. 超时检测
self.check_timeouts(db_path)
# 2. 读 pending → 直接用 assignee → spawn
for task in pending(db_path):
self._mark_working(task, db_path) # 不走 _transition_status
on_complete = lambda: self._on_complete(task, db_path)
self.spawner.spawn(agent_id=task.assignee, on_complete=on_complete)
return result
# ── Task 管道 ──
class StandardTaskPipeline(TaskPipeline):
async def tick(self, db_path):
# 1. 依赖推进
self.advance_dependencies(db_path)
# 2. 超时检测
self.check_timeouts(db_path)
# 3. 路由 + guardrail + dispatch
for task in pending(db_path):
decision = self.router.decide(task)
if self.guardrails.check(task):
self.dispatch(task, decision)
return result
# ── ticker 只做分发 ──
class Ticker:
def __init__(self):
self.pipelines = {
"_mail": MailPipeline(spawner, counter),
# 其他 project 走默认
}
async def _tick_project(self, project_id, ...):
pipeline = self.pipelines.get(project_id, self.default_pipeline)
return await pipeline.tick(db_path)
```
| 优点 | 缺点 |
|------|------|
| Mail 和 Task 完全隔离 | 改动范围较大(~200 行) |
| 新类型只需加 Pipeline | 需要从 ticker/dispatcher 里抽取方法 |
| 共享层(spawner/DB)不动 | 需要充分测试确保不引入回归 |
| 未来可扩展(CronJob 等) | |
| 去掉 41 个 if/else | |
### 方案 C:A 先止血 + B 后重构
**第一步(立刻)**:方案 A 止血,修三个 Bug。
**第二步(v2.7.2**:方案 B Pipeline 重构。
| 优点 | 缺点 |
|------|------|
| 先止血再治本 | 两次改动,方案 A 的代码在方案 B 后可能被删 |
| 风险最低 | |
---
## 5. 交叉点处理(方案 B 适用)
### 5.1 完全共享(不需要差异化)
| 组件 | 位置 | 说明 |
|------|------|------|
| `spawn_full_agent()` | spawner.py | spawn 进程机制完全一样 |
| `_monitor_process()` | spawner.py | 监控逻辑完全一样 |
| `_classify_outcome()` | spawner.py | exit 分类完全一样 |
| `_handle_exit()` | spawner.py | 结果处理框架一样 |
| `_do_retry()` | spawner.py | 续杯机制完全一样 |
| DB (SQLite) | blackboard/ | 共享存储 |
| counter (ActiveAgentCounter) | counter.py | 共享并发控制 |
### 5.2 共享框架 + 各 Pipeline 注册回调
| 组件 | 共享部分 | 差异部分 |
|------|---------|---------|
| 超时检测 | 扫描 working 任务、计算 elapsed | Task→failed, Mail→先查回复再决定 |
| on_complete | 框架(spawner 调 callback | Task→release counter, Mail→幻觉门控+自动标 done |
| 信封/载荷分离 | 系统管状态的机制 | 未来 Task 也应启用 |
### 5.3 完全独立(各 Pipeline 自己管)
| 组件 | Task Pipeline | Mail Pipeline |
|------|--------------|---------------|
| 路由 | Router (decide) | 直接 assignee |
| guardrail | 检查 | 跳过 |
| 依赖推进 | _advance_dependencies | 不需要 |
| 状态流转 | _transition_status | 自己的 _mark_working / _mark_done |
| prompt 模板 | 标准 SPAWN_PROMPT | MAIL_TEMPLATE |
---
## 6. spawner retry 问题(附带修复)
### 6.1 之前的调查结论
spawner retry 绕过 counterBUG-2a WORKAROUND),导致同一 Agent 并发 spawn,打爆 API rate limit。
### 6.2 修复方案(已评审通过)
| 改动 | 文件 | 说明 |
|------|------|------|
| 撤回 retry_release WORKAROUND | spawner.py | on_complete 传入 _do_retrycounter 贯穿 retry 链 |
| _do_retry 直接 spawn | spawner.py | 不检查 counterretry 是 counter 占用链的延续) |
| dispatcher dispatch 加 is_available 检查 | dispatcher.py | 首次 dispatch 检查 counter |
| ticker catch AgentBusyError | ticker.py | Agent 忙时跳过 |
### 6.3 与 Pipeline 重构的关系
retry 修复属于 spawner 执行层,**不受 Pipeline 重构影响**(执行层是共享的)。可以独立推进。
---
## 7. 请评审
1. **紧急程度**Bug-Aassignee 清空)是否需要立刻止血?还是直接做 Pipeline 重构?
2. **方案选择**A(止血)/ BPipeline/ C(先 A 后 B)?
3. **Pipeline 设计**:接口是否合理?交叉点处理是否完整?
4. **spawner retry 修复**:是否独立推进?
5. **遗漏风险**:有没有我没考虑到的场景?