diff --git a/docs/design/20-task-type-architecture.md b/docs/design/20-task-type-architecture.md new file mode 100644 index 0000000..38efd00 --- /dev/null +++ b/docs/design/20-task-type-architecture.md @@ -0,0 +1,422 @@ +--- +title: "TaskTypeRegistry + Handler 架构重构" +created: 2026-06-10 +version: v1.0 +--- + +# §1 现状分析 + +moziplus v2 的任务调度系统当前通过 `if/else` 硬编码区分两种 task type:普通任务(task)和邮件(mail)。分支逻辑散落在 dispatcher、spawner、ticker 三个核心模块中,新增 task type 需要同时改动三处。 + +### dispatcher.py 中的分支 + +- **`is_mail` 判断**:`project_id == "_mail"` 作为唯一判据 +- **mail 分支**:跳过 guardrail 检查;有专属的 `on_checks_passed` 回调(`_mail_auto_working`)和 `on_complete` 回调(`_mail_auto_complete`,含幻觉门控 + 失败通知) +- **非 mail 分支**:走正常 guardrail 检查 → `on_checks_passed` → `on_complete` 全链路 + +### spawner.py 中的分支 + +- **`_build_prompt`**:`_mail` 走 `_build_mail_prompt()`,其余走 BootstrapBuilder L0-L3 全链 +- **`_build_api_section`**:`_mail` 的 `success_status = "done"`,其余 `success_status = "review"` +- **retry 逻辑**:mail 用 `MAIL_RETRY_PROMPT`,task 用标准 `RETRY_PROMPT` +- **`classify_outcome` 完成处理**:mail 走幻觉门控(hallucination gate) + +### ticker.py 中的分支 + +- **虚拟项目扫描**:`_general` 和 `_mail` 两个虚拟项目各自硬编码扫描逻辑 +- **mail 专属逻辑**:`_mail_check_reply` 判断邮件回复是否闭环 + +**问题总结**:新增第三种 task type(toolchain)如果继续硬编码,需要在三个文件中各加一套 if/else,且未来每种新类型都重复这个模式,维护成本线性增长。 + +--- + +# §2 三种 Task Type 行为差异表 + +| 维度 | task(普通任务) | mail | toolchain | +|------|---------|------|-----------| +| 存储 | 项目 DB(`projects/{pid}/blackboard.db`) | `_mail` DB(`_mail/blackboard.db`) | `_toolchain` DB | +| 状态流转 | pending→claimed→working→review→done | 跳过 claimed,auto-working→auto-done | auto-working→auto-done | +| prompt 构建 | BootstrapBuilder L0-L3 | MAIL_INFORM / MAIL_REQUEST 精简模板 | TOOLCHAIN 模板 + 事件上下文 | +| guardrail | 正常检查 | 跳过 | 跳过 | +| 完成标准 | 产出物 + review | 回复邮件 / inform done | Gitea 侧闭环(不回 Mail) | +| on_complete | classify_outcome → 状态机 | 幻觉门控 + 失败通知 | auto-done + 可选 escalate | +| 路由 | Router 四条快速路径 + 广播认领 | 直接路由到收件人 | 直接路由到事件相关 agent | +| retry | 标准 `RETRY_PROMPT` | `MAIL_RETRY_PROMPT` | 标准(或专用) | +| 前端展示 | 任务看板 Tab | 飞鸽传书 Tab | 待定 | + +--- + +# §3 Handler 接口定义 + +定义 Python Protocol,所有 task type handler 必须满足此接口: + +```python +from typing import Protocol, Optional, Dict, Any +from pathlib import Path + + +class TaskTypeHandler(Protocol): + """所有 task type handler 的统一接口。""" + + # 属性通过 __init__ 参数设置,Protocol 不强制 property + task_type: str # 类型标识:'task' | 'mail' | 'toolchain' + virtual_project: Optional[str] # 虚拟项目 ID,如 '_mail'、'_toolchain'。普通任务为 None + + def build_prompt( + self, + task_id: str, + title: str, + description: str, + must_haves: str, + project_id: str, + agent_id: str, + task: Optional[Dict] = None, + spawn_type: str = "executor", + spawner: Any = None, + ) -> str: + """构建 Agent prompt。""" + ... + + def build_api_section( + self, project_id: str, task_id: str, agent_id: str + ) -> str: + """构建 API 操作指令(success_status 等)。""" + ... + + def skip_guardrail(self, project_id: str) -> bool: + """是否跳过 guardrail 检查。""" + ... + + def pre_spawn( + self, task_id: str, db_path: Path, dispatcher: Any + ) -> Optional[callable]: + """spawn 前回调,返回 on_checks_passed 回调或 None。""" + ... + + def post_complete( + self, + task_id: str, + agent_id: str, + outcome: str, + db_path: Path, + must_haves: str, + dispatcher: Any, + ) -> None: + """spawn 完成后回调。""" + ... + + def build_retry_prompt( + self, + task_id: str, + agent_id: str, + retry_count: int, + max_retries: int, + retry_field: str, + task_info: Dict, + spawner: Any, + ) -> str: + """构建重试 prompt。""" + ... + + def check_completion(self, task_id: str, db_path: Path) -> bool: + """检查任务是否已完成(如 mail 的回复检查)。""" + ... +``` + +**设计原则**: + +- 每个方法在现有代码中都有明确的对应实现点,不存在"悬空"抽象 +- `pre_spawn` 返回 Optional[callable],普通任务返回 None 即可,不需要回调 +- `spawner` 和 `dispatcher` 参数用 `Any` 类型,避免循环导入;handler 只调用已知方法 + +**兼容期过渡策略**: + +引擎中 handler 查询优先,无 handler 时 fallback 到现有逻辑。具体流程: +1. `TaskTypeRegistry.get_by_project(project_id)` 查到 handler → 走 handler 路径 +2. 未查到 → 走现有 if/else 分支(兼容期) +3. 三种 handler 全部稳定后,Step 5 删除旧分支,统一走 handler +4. 兼容期内新旧路径互斥——同一个 project_id 只会走其中一条,不存在重复执行 + +--- + +# §4 TaskTypeRegistry 注册表 + +```python +class TaskTypeRegistry: + """Task type handler 注册表。启动时一次性加载,运行时只读。""" + + _handlers: Dict[str, TaskTypeHandler] = {} + + @classmethod + def register(cls, handler: TaskTypeHandler) -> None: + """注册一个 handler。启动时调用一次。""" + if handler.task_type in cls._handlers: + raise ValueError( + f"Task type '{handler.task_type}' already registered" + ) + cls._handlers[handler.task_type] = handler + + @classmethod + def get_by_project(cls, project_id: str) -> Optional[TaskTypeHandler]: + """通过 project_id 查找 handler(匹配 virtual_project)。""" + for h in cls._handlers.values(): + if h.virtual_project == project_id: + return h + return None + + @classmethod + def get(cls, task_type: str) -> Optional[TaskTypeHandler]: + """通过 task_type 标识查找 handler。""" + return cls._handlers.get(task_type) + + @classmethod + def virtual_projects(cls) -> list[str]: + """返回所有已注册的虚拟项目 ID(ticker 自动发现用)。""" + return [ + h.virtual_project + for h in cls._handlers.values() + if h.virtual_project is not None + ] +``` + +**使用方式**(daemon 启动时): + +```python +from task_handler import TaskHandler +from mail_handler import MailHandler +from toolchain_handler import ToolchainHandler + +TaskTypeRegistry.register(TaskHandler()) +TaskTypeRegistry.register(MailHandler()) +TaskTypeRegistry.register(ToolchainHandler()) +``` + +--- + +# §5 三个 Handler 的实现边界 + +## TaskHandler(普通任务) + +将现有 default(非 mail)分支封装为 handler,**不替代 BootstrapBuilder**。 + +| 方法 | 实现 | +|------|------| +| `task_type` | `"task"` | +| `virtual_project` | `None` | +| `build_prompt` | 调用 `BootstrapBuilder`(透传参数) | +| `build_api_section` | 现有 default 逻辑,`success_status = "review"` | +| `skip_guardrail` | `False` | +| `pre_spawn` | `None`(不需要回调) | +| `post_complete` | `classify_outcome` → 状态机 | +| `check_completion` | `False`(由状态机管理) | +| `build_retry_prompt` | 标准 `RETRY_PROMPT` | + +**改动量**:~60 行,从 spawner/dispatcher 的 default 分支包一层。 + +## MailHandler + +将分散在 dispatcher / spawner / ticker 三处的 mail 逻辑集中到一个 handler。 + +| 方法 | 实现 | +|------|------| +| `task_type` | `"mail"` | +| `virtual_project` | `"_mail"` | +| `build_prompt` | 复用 `_build_mail_prompt` / `MAIL_INFORM_TEMPLATE` / `MAIL_REQUEST_TEMPLATE` | +| `build_api_section` | `success_status = "done"` | +| `skip_guardrail` | `True` | +| `pre_spawn` | auto_working 回调(从 dispatcher `_mail_on_checks_passed` 搬入) | +| `post_complete` | 幻觉门控 + auto_done + 失败通知(从 dispatcher `_mail_auto_complete` 搬入) | +| `check_completion` | `_mail_check_reply`(从 ticker 搬入) | +| `build_retry_prompt` | `MAIL_RETRY_PROMPT` | + +**改动量**:~150 行,三处逻辑集中。 + +## ToolchainHandler + +全新 handler,处理 Gitea Webhook 事件通知。 + +| 方法 | 实现 | +|------|------| +| `task_type` | `"toolchain"` | +| `virtual_project` | `"_toolchain"` | +| `build_prompt` | 新建 `TOOLCHAIN_TEMPLATE` + 事件上下文注入 | +| `build_api_section` | `success_status = "done"` | +| `skip_guardrail` | `True` | +| `pre_spawn` | auto_working 回调 | +| `post_complete` | auto-done + 可选 escalate | +| `check_completion` | `False` | +| `build_retry_prompt` | 标准 `RETRY_PROMPT`(或后续定制) | + +**改动量**:~100 行,全新代码。 + +--- + +# §6 引擎改动点(一次性) + +引擎代码改动是受控的、一次性的。改完后新增 task type 不再触碰引擎。 + +## dispatcher.py(~20 行改动) + +```python +# dispatch() 中,替换现有的 is_mail 分支 +handler = TaskTypeRegistry.get_by_project(project_id) + +if handler: + if handler.skip_guardrail(project_id): + # 跳过 guardrail,直接 spawn + ... + on_passed = handler.pre_spawn(task_id, db_path, self) + # spawn 后由 on_passed 回调驱动后续 + ... + handler.post_complete(task_id, agent_id, outcome, db_path, must_haves, self) +else: + # 兼容期:保留现有 default 逻辑 + ... +``` + +## spawner.py(~15 行改动) + +```python +# _build_prompt() 中 +handler = TaskTypeRegistry.get_by_project(project_id) +if handler: + return handler.build_prompt( + task_id, title, description, must_haves, + project_id, agent_id, task, spawn_type, self + ) +# else: 现有 BootstrapBuilder 逻辑(兼容期) + +# _build_api_section() 中 +handler = TaskTypeRegistry.get_by_project(project_id) +if handler: + return handler.build_api_section(project_id, task_id, agent_id) + +# retry 逻辑中 +handler = TaskTypeRegistry.get_by_project(project_id) +if handler: + return handler.build_retry_prompt( + task_id, agent_id, retry_count, max_retries, + retry_field, task_info, self + ) +``` + +## ticker.py(~10 行改动) + +```python +# 虚拟项目扫描:从注册表自动发现 +for vp in TaskTypeRegistry.virtual_projects(): + handler = TaskTypeRegistry.get_by_project(vp) + if handler and handler.check_completion: + # 调用 handler.check_completion 检查 + ... + +# 保留 _general 硬编码作为 fallback(非 task type 机制) +``` + +--- + +# §7 实施顺序 + +按风险从低到高排列,每步完成后跑 `pytest -m "not e2e"` 全量回归测试。 + +### Step 1:注册表基础设施 + +- 新建 `src/daemon/task_type_registry.py` +- 定义 `TaskTypeHandler` Protocol +- 定义 `TaskTypeRegistry` 类 +- 编写单元测试验证注册/查询机制 +- **风险**:极低,纯新增文件,不改动现有代码 + +### Step 2:TaskHandler + +- 新建 `src/daemon/task_handler.py` +- 从 spawner/dispatcher 的 default 分支封装 handler 方法 +- 注册到 TaskTypeRegistry +- 运行全量回归测试,验证普通任务路径不变 +- **风险**:低,只是把现有逻辑包一层,不改行为 + +### Step 3:ToolchainHandler + +- 新建 `src/daemon/toolchain_handler.py` +- 全新实现,无迁移成本 +- 注册到 TaskTypeRegistry +- **风险**:低,全新代码,不影响现有路径 + +### Step 4:MailHandler + +- 新建 `src/daemon/mail_handler.py` +- 从 dispatcher / spawner / ticker 三处迁移 mail 逻辑 +- 注册到 TaskTypeRegistry +- 重点回归测试 mail 路径:发送、回复、重试、幻觉门控 +- **风险**:中,逻辑从三处集中,需确保行为一致 + +### Step 5:引擎清理 + +- 删除 dispatcher / spawner / ticker 中的旧 if/else 分支 +- 统一走 handler 路径 +- 全量回归测试 + 手工验证 +- **风险**:中,删除代码需确保无遗漏 + +--- + +# §8 未来新增 Task Type 的步骤 + +以新增 `cron`(定时任务)为例: + +1. **新建 handler 文件**:`src/daemon/cron_handler.py` +2. **实现 `TaskTypeHandler` 接口**:填入每个方法的具体逻辑 +3. **注册**:在 daemon 启动时 `TaskTypeRegistry.register(CronHandler())` +4. **完事**。dispatcher / spawner / ticker 不改一行。 + +可选: +- 如需专用 API,在 `api/` 下新增路由 +- 如需前端展示,在对应 Tab 里加渲染模板 + +--- + +# §9 前后端接口联动方案 + +### 后端 API 设计原则 + +- 每个 task type 可以有自己专用的 API(如 `/api/mail`、`/api/toolchain`) +- 也可以用统一 API 查询:`GET /api/tasks?type=mail|toolchain|task` +- 新增 task type 时,API 层自由选择:复用统一接口或新增专用接口 +- 统一数据协议:`{id, type, status, title, from, to, metadata}` + +### 前端展示原则 + +- 前端 Tab 和后端 task type 解耦——一个 Tab 可以展示多种 type,一种 type 也可以出现在多个 Tab +- 新增 task type 不需要新增前端组件,只需在对应 Tab 里加渲染模板 +- 前端通过 `type` 字段区分渲染逻辑 + +### 本次范围 + +**前端不动**,后端架构先行。ToolchainHandler 的前端展示方案在后续迭代中确定。 + +--- + +# §10 风险评估 + +| 风险 | 概率 | 影响 | 缓解措施 | +|------|------|------|----------| +| MailHandler 迁移遗漏逻辑 | 中 | 高 | 逐步迁移,每步对比迁移前后全量测试;保留旧代码注释期 | +| 注册表查询性能 | 低 | 低 | dict 查询 O(1),handler 数量个位数,忽略不计 | +| ticker 自动发现虚拟项目引入 bug | 低 | 中 | 保留 `_general` 硬编码作为 fallback;`virtual_projects()` 仅返回显式注册的虚拟项目 | +| 并发安全 | 低 | 高 | 注册表启动时一次性加载,运行时只读,无线程安全问题 | +| Handler 接口设计不足 | 低 | 中 | Protocol 可后续扩展方法,默认实现提供合理 fallback | +| 引擎清理删除过早 | 中 | 中 | Step 5 放在最后,确认所有 handler 稳定后再删旧代码 | + +--- + +## 附录:文件结构预览 + +``` +src/daemon/ +├── task_type_registry.py # §3 + §4:Protocol + Registry +├── task_handler.py # §5 TaskHandler +├── mail_handler.py # §5 MailHandler +├── toolchain_handler.py # §5 ToolchainHandler +├── dispatcher.py # §6 改动:handler 查询替代 if/else +├── spawner.py # §6 改动:handler 查询替代 if/else +└── ticker.py # §6 改动:自动发现虚拟项目 +```