docs: add 20-task-type-architecture.md - TaskTypeRegistry + Handler 架构重构设计

This commit is contained in:
cfdaily
2026-06-10 11:49:36 +08:00
parent 3071c95629
commit 1e16f63be5
+422
View File
@@ -0,0 +1,422 @@
---
title: "TaskTypeRegistry + Handler 架构重构"
created: 2026-06-10
version: v1.0
---
# §1 现状分析
moziplus v2 的任务调度系统当前通过 `if/else` 硬编码区分两种 task type:普通任务(task)和邮件(mail)。分支逻辑散落在 dispatcher、spawner、ticker 三个核心模块中,新增 task type 需要同时改动三处。
### dispatcher.py 中的分支
- **`is_mail` 判断**`project_id == "_mail"` 作为唯一判据
- **mail 分支**:跳过 guardrail 检查;有专属的 `on_checks_passed` 回调(`_mail_auto_working`)和 `on_complete` 回调(`_mail_auto_complete`,含幻觉门控 + 失败通知)
- **非 mail 分支**:走正常 guardrail 检查 → `on_checks_passed``on_complete` 全链路
### spawner.py 中的分支
- **`_build_prompt`**`_mail``_build_mail_prompt()`,其余走 BootstrapBuilder L0-L3 全链
- **`_build_api_section`**`_mail``success_status = "done"`,其余 `success_status = "review"`
- **retry 逻辑**mail 用 `MAIL_RETRY_PROMPT`task 用标准 `RETRY_PROMPT`
- **`classify_outcome` 完成处理**mail 走幻觉门控(hallucination gate
### ticker.py 中的分支
- **虚拟项目扫描**`_general``_mail` 两个虚拟项目各自硬编码扫描逻辑
- **mail 专属逻辑**`_mail_check_reply` 判断邮件回复是否闭环
**问题总结**:新增第三种 task type(toolchain)如果继续硬编码,需要在三个文件中各加一套 if/else,且未来每种新类型都重复这个模式,维护成本线性增长。
---
# §2 三种 Task Type 行为差异表
| 维度 | task(普通任务) | mail | toolchain |
|------|---------|------|-----------|
| 存储 | 项目 DB`projects/{pid}/blackboard.db` | `_mail` DB`_mail/blackboard.db` | `_toolchain` DB |
| 状态流转 | pending→claimed→working→review→done | 跳过 claimedauto-working→auto-done | auto-working→auto-done |
| prompt 构建 | BootstrapBuilder L0-L3 | MAIL_INFORM / MAIL_REQUEST 精简模板 | TOOLCHAIN 模板 + 事件上下文 |
| guardrail | 正常检查 | 跳过 | 跳过 |
| 完成标准 | 产出物 + review | 回复邮件 / inform done | Gitea 侧闭环(不回 Mail |
| on_complete | classify_outcome → 状态机 | 幻觉门控 + 失败通知 | auto-done + 可选 escalate |
| 路由 | Router 四条快速路径 + 广播认领 | 直接路由到收件人 | 直接路由到事件相关 agent |
| retry | 标准 `RETRY_PROMPT` | `MAIL_RETRY_PROMPT` | 标准(或专用) |
| 前端展示 | 任务看板 Tab | 飞鸽传书 Tab | 待定 |
---
# §3 Handler 接口定义
定义 Python Protocol,所有 task type handler 必须满足此接口:
```python
from typing import Protocol, Optional, Dict, Any
from pathlib import Path
class TaskTypeHandler(Protocol):
"""所有 task type handler 的统一接口。"""
# 属性通过 __init__ 参数设置,Protocol 不强制 property
task_type: str # 类型标识:'task' | 'mail' | 'toolchain'
virtual_project: Optional[str] # 虚拟项目 ID,如 '_mail'、'_toolchain'。普通任务为 None
def build_prompt(
self,
task_id: str,
title: str,
description: str,
must_haves: str,
project_id: str,
agent_id: str,
task: Optional[Dict] = None,
spawn_type: str = "executor",
spawner: Any = None,
) -> str:
"""构建 Agent prompt。"""
...
def build_api_section(
self, project_id: str, task_id: str, agent_id: str
) -> str:
"""构建 API 操作指令(success_status 等)。"""
...
def skip_guardrail(self, project_id: str) -> bool:
"""是否跳过 guardrail 检查。"""
...
def pre_spawn(
self, task_id: str, db_path: Path, dispatcher: Any
) -> Optional[callable]:
"""spawn 前回调,返回 on_checks_passed 回调或 None。"""
...
def post_complete(
self,
task_id: str,
agent_id: str,
outcome: str,
db_path: Path,
must_haves: str,
dispatcher: Any,
) -> None:
"""spawn 完成后回调。"""
...
def build_retry_prompt(
self,
task_id: str,
agent_id: str,
retry_count: int,
max_retries: int,
retry_field: str,
task_info: Dict,
spawner: Any,
) -> str:
"""构建重试 prompt。"""
...
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""检查任务是否已完成(如 mail 的回复检查)。"""
...
```
**设计原则**
- 每个方法在现有代码中都有明确的对应实现点,不存在"悬空"抽象
- `pre_spawn` 返回 Optional[callable],普通任务返回 None 即可,不需要回调
- `spawner``dispatcher` 参数用 `Any` 类型,避免循环导入;handler 只调用已知方法
**兼容期过渡策略**
引擎中 handler 查询优先,无 handler 时 fallback 到现有逻辑。具体流程:
1. `TaskTypeRegistry.get_by_project(project_id)` 查到 handler → 走 handler 路径
2. 未查到 → 走现有 if/else 分支(兼容期)
3. 三种 handler 全部稳定后,Step 5 删除旧分支,统一走 handler
4. 兼容期内新旧路径互斥——同一个 project_id 只会走其中一条,不存在重复执行
---
# §4 TaskTypeRegistry 注册表
```python
class TaskTypeRegistry:
"""Task type handler 注册表。启动时一次性加载,运行时只读。"""
_handlers: Dict[str, TaskTypeHandler] = {}
@classmethod
def register(cls, handler: TaskTypeHandler) -> None:
"""注册一个 handler。启动时调用一次。"""
if handler.task_type in cls._handlers:
raise ValueError(
f"Task type '{handler.task_type}' already registered"
)
cls._handlers[handler.task_type] = handler
@classmethod
def get_by_project(cls, project_id: str) -> Optional[TaskTypeHandler]:
"""通过 project_id 查找 handler(匹配 virtual_project)。"""
for h in cls._handlers.values():
if h.virtual_project == project_id:
return h
return None
@classmethod
def get(cls, task_type: str) -> Optional[TaskTypeHandler]:
"""通过 task_type 标识查找 handler。"""
return cls._handlers.get(task_type)
@classmethod
def virtual_projects(cls) -> list[str]:
"""返回所有已注册的虚拟项目 ID(ticker 自动发现用)。"""
return [
h.virtual_project
for h in cls._handlers.values()
if h.virtual_project is not None
]
```
**使用方式**daemon 启动时):
```python
from task_handler import TaskHandler
from mail_handler import MailHandler
from toolchain_handler import ToolchainHandler
TaskTypeRegistry.register(TaskHandler())
TaskTypeRegistry.register(MailHandler())
TaskTypeRegistry.register(ToolchainHandler())
```
---
# §5 三个 Handler 的实现边界
## TaskHandler(普通任务)
将现有 default(非 mail)分支封装为 handler**不替代 BootstrapBuilder**。
| 方法 | 实现 |
|------|------|
| `task_type` | `"task"` |
| `virtual_project` | `None` |
| `build_prompt` | 调用 `BootstrapBuilder`(透传参数) |
| `build_api_section` | 现有 default 逻辑,`success_status = "review"` |
| `skip_guardrail` | `False` |
| `pre_spawn` | `None`(不需要回调) |
| `post_complete` | `classify_outcome` → 状态机 |
| `check_completion` | `False`(由状态机管理) |
| `build_retry_prompt` | 标准 `RETRY_PROMPT` |
**改动量**~60 行,从 spawner/dispatcher 的 default 分支包一层。
## MailHandler
将分散在 dispatcher / spawner / ticker 三处的 mail 逻辑集中到一个 handler。
| 方法 | 实现 |
|------|------|
| `task_type` | `"mail"` |
| `virtual_project` | `"_mail"` |
| `build_prompt` | 复用 `_build_mail_prompt` / `MAIL_INFORM_TEMPLATE` / `MAIL_REQUEST_TEMPLATE` |
| `build_api_section` | `success_status = "done"` |
| `skip_guardrail` | `True` |
| `pre_spawn` | auto_working 回调(从 dispatcher `_mail_on_checks_passed` 搬入) |
| `post_complete` | 幻觉门控 + auto_done + 失败通知(从 dispatcher `_mail_auto_complete` 搬入) |
| `check_completion` | `_mail_check_reply`(从 ticker 搬入) |
| `build_retry_prompt` | `MAIL_RETRY_PROMPT` |
**改动量**~150 行,三处逻辑集中。
## ToolchainHandler
全新 handler,处理 Gitea Webhook 事件通知。
| 方法 | 实现 |
|------|------|
| `task_type` | `"toolchain"` |
| `virtual_project` | `"_toolchain"` |
| `build_prompt` | 新建 `TOOLCHAIN_TEMPLATE` + 事件上下文注入 |
| `build_api_section` | `success_status = "done"` |
| `skip_guardrail` | `True` |
| `pre_spawn` | auto_working 回调 |
| `post_complete` | auto-done + 可选 escalate |
| `check_completion` | `False` |
| `build_retry_prompt` | 标准 `RETRY_PROMPT`(或后续定制) |
**改动量**~100 行,全新代码。
---
# §6 引擎改动点(一次性)
引擎代码改动是受控的、一次性的。改完后新增 task type 不再触碰引擎。
## dispatcher.py~20 行改动)
```python
# dispatch() 中,替换现有的 is_mail 分支
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
if handler.skip_guardrail(project_id):
# 跳过 guardrail,直接 spawn
...
on_passed = handler.pre_spawn(task_id, db_path, self)
# spawn 后由 on_passed 回调驱动后续
...
handler.post_complete(task_id, agent_id, outcome, db_path, must_haves, self)
else:
# 兼容期:保留现有 default 逻辑
...
```
## spawner.py~15 行改动)
```python
# _build_prompt() 中
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
return handler.build_prompt(
task_id, title, description, must_haves,
project_id, agent_id, task, spawn_type, self
)
# else: 现有 BootstrapBuilder 逻辑(兼容期)
# _build_api_section() 中
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
return handler.build_api_section(project_id, task_id, agent_id)
# retry 逻辑中
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
return handler.build_retry_prompt(
task_id, agent_id, retry_count, max_retries,
retry_field, task_info, self
)
```
## ticker.py~10 行改动)
```python
# 虚拟项目扫描:从注册表自动发现
for vp in TaskTypeRegistry.virtual_projects():
handler = TaskTypeRegistry.get_by_project(vp)
if handler and handler.check_completion:
# 调用 handler.check_completion 检查
...
# 保留 _general 硬编码作为 fallback(非 task type 机制)
```
---
# §7 实施顺序
按风险从低到高排列,每步完成后跑 `pytest -m "not e2e"` 全量回归测试。
### Step 1:注册表基础设施
- 新建 `src/daemon/task_type_registry.py`
- 定义 `TaskTypeHandler` Protocol
- 定义 `TaskTypeRegistry`
- 编写单元测试验证注册/查询机制
- **风险**:极低,纯新增文件,不改动现有代码
### Step 2TaskHandler
- 新建 `src/daemon/task_handler.py`
- 从 spawner/dispatcher 的 default 分支封装 handler 方法
- 注册到 TaskTypeRegistry
- 运行全量回归测试,验证普通任务路径不变
- **风险**:低,只是把现有逻辑包一层,不改行为
### Step 3ToolchainHandler
- 新建 `src/daemon/toolchain_handler.py`
- 全新实现,无迁移成本
- 注册到 TaskTypeRegistry
- **风险**:低,全新代码,不影响现有路径
### Step 4MailHandler
- 新建 `src/daemon/mail_handler.py`
- 从 dispatcher / spawner / ticker 三处迁移 mail 逻辑
- 注册到 TaskTypeRegistry
- 重点回归测试 mail 路径:发送、回复、重试、幻觉门控
- **风险**:中,逻辑从三处集中,需确保行为一致
### Step 5:引擎清理
- 删除 dispatcher / spawner / ticker 中的旧 if/else 分支
- 统一走 handler 路径
- 全量回归测试 + 手工验证
- **风险**:中,删除代码需确保无遗漏
---
# §8 未来新增 Task Type 的步骤
以新增 `cron`(定时任务)为例:
1. **新建 handler 文件**`src/daemon/cron_handler.py`
2. **实现 `TaskTypeHandler` 接口**:填入每个方法的具体逻辑
3. **注册**:在 daemon 启动时 `TaskTypeRegistry.register(CronHandler())`
4. **完事**。dispatcher / spawner / ticker 不改一行。
可选:
- 如需专用 API,在 `api/` 下新增路由
- 如需前端展示,在对应 Tab 里加渲染模板
---
# §9 前后端接口联动方案
### 后端 API 设计原则
- 每个 task type 可以有自己专用的 API(如 `/api/mail``/api/toolchain`
- 也可以用统一 API 查询:`GET /api/tasks?type=mail|toolchain|task`
- 新增 task type 时,API 层自由选择:复用统一接口或新增专用接口
- 统一数据协议:`{id, type, status, title, from, to, metadata}`
### 前端展示原则
- 前端 Tab 和后端 task type 解耦——一个 Tab 可以展示多种 type,一种 type 也可以出现在多个 Tab
- 新增 task type 不需要新增前端组件,只需在对应 Tab 里加渲染模板
- 前端通过 `type` 字段区分渲染逻辑
### 本次范围
**前端不动**,后端架构先行。ToolchainHandler 的前端展示方案在后续迭代中确定。
---
# §10 风险评估
| 风险 | 概率 | 影响 | 缓解措施 |
|------|------|------|----------|
| MailHandler 迁移遗漏逻辑 | 中 | 高 | 逐步迁移,每步对比迁移前后全量测试;保留旧代码注释期 |
| 注册表查询性能 | 低 | 低 | dict 查询 O(1)handler 数量个位数,忽略不计 |
| ticker 自动发现虚拟项目引入 bug | 低 | 中 | 保留 `_general` 硬编码作为 fallback`virtual_projects()` 仅返回显式注册的虚拟项目 |
| 并发安全 | 低 | 高 | 注册表启动时一次性加载,运行时只读,无线程安全问题 |
| Handler 接口设计不足 | 低 | 中 | Protocol 可后续扩展方法,默认实现提供合理 fallback |
| 引擎清理删除过早 | 中 | 中 | Step 5 放在最后,确认所有 handler 稳定后再删旧代码 |
---
## 附录:文件结构预览
```
src/daemon/
├── task_type_registry.py # §3 + §4Protocol + Registry
├── task_handler.py # §5 TaskHandler
├── mail_handler.py # §5 MailHandler
├── toolchain_handler.py # §5 ToolchainHandler
├── dispatcher.py # §6 改动:handler 查询替代 if/else
├── spawner.py # §6 改动:handler 查询替代 if/else
└── ticker.py # §6 改动:自动发现虚拟项目
```