From 8d72a1fa19de35aaa51a858a42f8520da30b0839 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Wed, 10 Jun 2026 22:33:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Step=205=20=E5=BC=95=E6=93=8E=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=20+=20H1-H3/S3=20=E4=BF=AE=E5=A4=8D=20+=20=E5=AE=A1?= =?UTF-8?q?=E8=AE=A1=20D1/D2/D5=20=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 引擎接入(dispatcher/spawner/ticker → handler 统一路由): - dispatcher: guardrail/on_checks_passed/on_complete → handler 查询 - spawner: _build_prompt/_build_api_section → handler.build_prompt - ticker: 虚拟项目扫描/assignee/claimed/review/幻觉门控 → handler 判断 Handler 缺陷修复: - H1: _mark_task_status 加 3 次重试(防 DB 锁) - H2: review @mention 加 comment_type='review' - H3: review 非 approved 保持 review 状态(不标 working) - S3: 通知链接改 Gitea(PR/Issue/Commit) 审计修复: - D1: pre_spawn 返回值未检查 → 加 if not 抛 RuntimeError - D2: PromptContext 缺 from_agent/mail_type → 从 must_haves 解析 - D5: _check_reply 查错表 → 恢复查 tasks 表找 in_reply_to 旧方法保留未删(deprecated),确认稳定后再清理。 --- docs/design/step5-audit-report.md | 74 ++++++ docs/design/step5-impact-analysis.md | 324 +++++++++++++++++++++++++++ src/daemon/base_task_handler.py | 53 ++--- src/daemon/dispatcher.py | 176 ++++++--------- src/daemon/mail_handler.py | 19 +- src/daemon/spawner.py | 38 +++- src/daemon/task_handler.py | 54 ++++- src/daemon/ticker.py | 61 ++--- src/daemon/toolchain_handler.py | 22 +- 9 files changed, 648 insertions(+), 173 deletions(-) create mode 100644 docs/design/step5-audit-report.md create mode 100644 docs/design/step5-impact-analysis.md diff --git a/docs/design/step5-audit-report.md b/docs/design/step5-audit-report.md new file mode 100644 index 0000000..46f9a00 --- /dev/null +++ b/docs/design/step5-audit-report.md @@ -0,0 +1,74 @@ +# Step 5 双重审计报告 + +## 摘要 +- 设计一致性检查项: 8 +- 特殊逻辑覆盖检查项: 22 +- 一致/覆盖: 24 +- **偏差/遗漏: 6(严重 3 / 轻微 3)** + +--- + +## 偏差/遗漏清单 + +| # | 维度 | 设计要求 / 旧逻辑 | 代码实际 | 严重程度 | 建议 | +|---|------|-------------------|---------|---------|------| +| **D1** | B1.2 pre_spawn | 旧 `_mail_on_checks_passed`: `if not _mail_auto_working(): raise RuntimeError` — pre_spawn 失败时中止 spawn | 新 `_handler_on_checks_passed`: `_handler.pre_spawn(...)` 返回值未检查,`handler_marked_working = True` 无条件执行 | **严重** | 改为 `if not _handler.pre_spawn(...): raise RuntimeError("handler_pre_spawn_failed")` | +| **D2** | B3.1 PromptContext | 旧 `_build_mail_prompt` 从 must_haves JSON 解析 `from_agent` 和 `performative` 传入模板 | 新 `spawner._build_spawn_message` 构建 PromptContext 时缺少 `from_agent` 和 `mail_type`,均为空字符串 | **严重** | 从 `must_haves` JSON 提取 `from` 和 `performative` 填入 PromptContext | +| **D3** | B1.3 inform outcome 白名单 | 旧 `_mail_auto_complete`: inform 类型有 outcome 白名单 `{"completed", "claimed", "no_reply"}`,不在白名单的 outcome 跳过 auto-done | 新 `MailHandler.verify_completion`: inform 始终返回 True,不检查 outcome | **轻微** | CRASH_OUTCOMES 已被基类处理。剩余异常 outcome(session_revived/api_error/fallback_timeout)极少出现,且旧逻辑不标 done 只是等 ticker 重投,最终效果差异不大。但严格对齐需要加白名单检查 | +| **D4** | A. 设计 §6 retry 逻辑 | 设计文档要求 retry 逻辑中 `handler = TaskTypeRegistry.get_by_project(project_id); if handler: return handler.build_retry_prompt(...)` | spawner L1118-1130 重试 prompt 仍用 `is_mail = project_id == "_mail"` 硬编码 | **轻微** | 当前不影响运行(旧的 `_build_mail_prompt` 仍保留且可用),但与设计文档不一致 | +| **D5** | B1.5 _check_reply 语义差异 | 旧 `_mail_check_reply`: `SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ?` — 检查是否有其他任务的 must_haves 包含当前 task_id(即 in_reply_to 匹配) | 新 `MailHandler._check_reply`: `SELECT COUNT(*) FROM comments WHERE task_id=? AND author != 'daemon' AND comment_type != 'system'` — 检查当前任务是否有非系统 comment | **严重** | 两个查询语义完全不同。旧逻辑检查的是 **mail 表的回复任务**(通过 must_haves 中 in_reply_to 关联),新逻辑检查的是 **当前任务的 comments**。这可能导致 request 类型邮件的幻觉门控行为不同 | +| **D6** | B1.3 标 done 重试机制 | 旧 `_mail_auto_complete`: 标 done 时外层有 `for attempt in range(3)` 循环 | 新 `BaseTaskHandler._mark_task_status`: H1 修复后已有 3 次重试 | **轻微** | ✅ 已修复,但注意旧代码标 done 和标 failed 是分开的重试循环,新代码统一走 `_mark_task_status`。行为等价 | + +--- + +## 一致确认项 + +### A. 设计一致性 + +| # | 维度 | 检查点 | 结果 | +|---|------|--------|------| +| A1 | §6 dispatcher | classify_outcome 后调 handler.post_complete | ✅ on_complete 闭包替换为 handler.post_complete | +| A2 | §6 dispatcher | on_checks_passed → handler.pre_spawn | ✅ _handler_on_checks_passed 调用 handler.pre_spawn(但返回值未检查,见 D1) | +| A3 | §6 dispatcher | guardrail 跳过 → handler 判断 | ✅ `is_handler_task = handler is not None` | +| A4 | §6 spawner | _build_prompt → handler.build_prompt | ✅ handler 路径调用 handler.build_prompt(ctx) | +| A5 | §6 spawner | _build_api_section → handler 查询 | ✅ handler 存在时 success_status 从 handler.target_success_status 获取 | +| A6 | §6 ticker | 虚拟项目扫描 → registry.virtual_projects() | ✅ 循环 `TaskTypeRegistry.virtual_projects()` | +| A7 | §6 ticker | check_completion → handler.check_completion | ✅ 超时检查中调 `handler.check_completion(task.id, db_path)` | +| A8 | §6 兼容期 | 设计说"兼容期保留旧逻辑" | ✅ 无 handler 的项目走旧路径(legacy_on_complete) | + +### B. 特殊逻辑覆盖 + +| # | 维度 | 检查点 | 结果 | +|---|------|--------|------| +| B1 | 1.1 guardrail | handler 项目跳过,_general 等走 guardrail | ✅ | +| B2 | 1.2 _mail_auto_working | `BEGIN IMMEDIATE` + status 检查 + 标 working | ✅ `_auto_mark_working` 完全一致 | +| B3 | 1.3 request 无回复 → 标 failed + notify | ✅ MailHandler.on_failure 调 `_mark_task_status(failed)` + `notify_mail_failed` | +| B4 | 1.4 _mail_revert_to_pending | spawn 失败回退 working → pending | ✅ Exception handler 中有 `BEGIN IMMEDIATE` + 状态检查回退 | +| B5 | 1.6 Task review verdict 读取 | approved → done | ✅ handle_review_complete | +| B6 | 1.6 Task review 非 approved → @mention assignee + 保持 review | ✅ H3 修复后保持 review + INSERT comment with comment_type='review' | +| B7 | 1.6 Task executor 三信号验证 | output/comment/terminal status → review | ✅ verify_completion 完全一致 | +| B8 | 1.7 Legacy dispatch 路径 | handler 替代 is_mail_legacy | ✅ handler_legacy 查注册表 | +| B9 | 2.1 _transition_status assignee 清空 | handler 项目不清空 | ✅ | +| B10 | 2.2 跳过 claimed 状态 | handler 项目跳过 claimed 直接 working | ✅ | +| B11 | 2.3 _dispatch_reviews 跳过 | handler 项目不走 review | ✅ | +| B12 | 2.5 startup recovery | `_general` + virtual_projects() | ✅ 不会重复扫描 | +| B13 | 3.1 _build_api_section | handler 存在时正确获取 success_status | ✅ | +| B14 | B4.1 TaskHandler.post_complete | 区分 executor/review 流程 | ✅ 通过读 DB status 判断 | +| B15 | B4.2 MailHandler.post_complete | 基类统一流程 | ✅ | +| B16 | B4.3 ToolchainHandler.post_complete | 基类统一流程 | ✅ | +| B17 | B1.5 _check_reply 异常保守处理 | 旧: return True(保守)/ 新: return False | 见 D5 | +| B18 | CRASH_OUTCOMES 集合 | 与旧 ROLLBACK_CURRENT_AGENT_OUTCOMES 一致 | ✅ 完全一致 | +| B19 | B2.1 _toolchain ticker 扫描 | _toolchain 会被 ticker 扫描 | ✅ _toolchain 有 blackboard.db 时会被 tick_project 处理 | +| B20 | B2.3 handler 项目都跳过 claimed | _toolchain 也跳过 | ✅ 所有 handler 项目统一处理 | + +--- + +## 修复优先级 + +| 优先级 | # | 修复内容 | +|--------|---|---------| +| **P0** | D1 | dispatcher _handler_on_checks_passed 检查 pre_spawn 返回值 | +| **P0** | D2 | spawner PromptContext 从 must_haves 提取 from_agent 和 mail_type | +| **P0** | D5 | MailHandler._check_reply 恢复旧查询语义(检查 must_haves 中的 in_reply_to) | +| P1 | D3 | inform outcome 白名单(可选,影响极小) | +| P2 | D4 | retry prompt 用 handler 路径替代硬编码 | diff --git a/docs/design/step5-impact-analysis.md b/docs/design/step5-impact-analysis.md new file mode 100644 index 0000000..a729e3d --- /dev/null +++ b/docs/design/step5-impact-analysis.md @@ -0,0 +1,324 @@ +# Step 5 引擎接入 — 影响分析与逐点对照 + +## 方法论 + +逐行审查 dispatcher.py / spawner.py / ticker.py 中所有 `is_mail` / `_mail` / `project_id == "_mail"` 分支, +对照 handler 实现,确认每个特殊处理的去向。 + +--- + +## 一、dispatcher.py(985 行) + +### 1.1 Guardrail 跳过(L127-129) + +```python +is_mail = project_config.get("project_id") == "_mail" if project_config else False +if self.guardrails and not is_mail: + violations = self.guardrails.check_task(task) +``` + +**特殊处理**:Mail 不做 guardrail 检查。 + +**Handler 覆盖**:设计文档 D6 "skip_guardrail 从接口删除,guardrail 自己判断"。Step 5 改为:`if self.guardrails and handler is None`(无 handler 时走 guardrail),或者用 handler.virtual_project 判断。handler 存在时跳过 guardrail。 + +**改动**:`is_mail` → `TaskTypeRegistry.get_by_project(project_id) is not None` + +--- + +### 1.2 Mail on_checks_passed(L194-213) + +```python +on_checks_passed = None +_mail_marked_working = False +if is_mail and db_path: + def _mail_on_checks_passed(): + nonlocal _mail_marked_working + if not _disp._mail_auto_working(_task_id, _mail_db): + raise RuntimeError("mail_auto_working_failed") + _mail_marked_working = True + on_checks_passed = _mail_on_checks_passed +``` + +**特殊处理**:Mail spawn 前通过 on_checks_passed 回调标 working,标记成功后才 spawn,spawn 失败回退。 + +**Handler 覆盖**:MailHandler.pre_spawn 调用 `_auto_mark_working`,和 `_mail_auto_working` 逻辑完全一致。 + +**改动**: +- `on_checks_passed` 改为调用 `handler.pre_spawn(task_id, db_path)` +- `_mail_marked_working` 标记保留,用于 Exception 回退 + +--- + +### 1.3 Mail on_complete(L224-238) + +```python +if is_mail: + def _mail_on_complete(aid, outcome): + _dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves, outcome=outcome) + on_complete = _mail_on_complete +``` + +**特殊处理**:Mail on_complete 调用 `_mail_auto_complete`(含 inform/request 分支、幻觉门控、重试 3 次、失败通知)。 + +**Handler 覆盖**:MailHandler 使用基类 post_complete 统一流程(crash→verify→mark→notify)。但现有 `_mail_auto_complete` 有几个细节差异需要注意: + +| 现有逻辑 | Handler 覆盖 | 差异 | +|---------|-------------|------| +| request 无回复 → 重试 3 次标 failed | on_failure 标 failed + notify | ⚠️ 缺少 3 次重试 | +| inform 只在特定 outcome 标 done | verify 始终返回 True → 基类标 done | ✅ 简化了,合理 | +| 标 done 重试 3 次 | _mark_task_status 单次 | ⚠️ 缺少重试 | +| notify_mail_failed | on_failure 中调用 notify_mail_failed | ✅ 一致 | + +**⚠️ 关键发现**:现有代码标状态时有 **重试 3 次** 机制(防止 DB 锁),handler 的 `_mark_task_status` 只做一次。需要把重试逻辑补到 `_mark_task_status` 或在 handler 层加。 + +**改动**:on_complete 改为调用 `handler.post_complete(task_id, agent_id, outcome, db_path)` + +--- + +### 1.4 Task on_complete(L241-310) + +```python +else: + def _task_on_complete(aid, outcome): + # #07.2: crash 回退 + if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db: + _dispatcher._rollback_current_agent(_task_db, _task_id, aid) + + if _is_review: + if outcome in ("completed", "session_revived"): + # 读 verdict → approved 标 done / 非 approved @mention assignee + else: + logger.warning("review agent outcome=%s, NOT marking done", outcome) + else: + # executor: 三信号验证 → 标 review + _dispatcher._task_auto_complete(_task_id, _task_db) +``` + +**特殊处理清单**: + +1. **#07.2 crash 回退**:executor 和 review 都回退 current_agent → assignee +2. **review 分支**:outcome 必须是 "completed" 或 "session_revived" 才走 verdict 读取 +3. **review verdict 读取**:approved → done,非 approved → @mention assignee + 保持 review +4. **review @mention**:通过 Blackboard.add_comment,comment_type="review" +5. **executor 分支**:走 _task_auto_complete → 三信号验证 → review + +**Handler 覆盖**: +- crash 回退:✅ BaseTaskHandler.post_complete 第一步 +- review verdict:⚠️ **TaskHandler.handle_review_complete 存在但未被 dispatcher 调用**。现有 dispatcher 直接在闭包里做了,不走 handler。 +- @mention:⚠️ handler 用 `conn.execute("INSERT INTO comments")` 直接插入,dispatcher 用 `Blackboard.add_comment`(会做更多处理,如 comment_type="review") +- executor 三信号:✅ TaskHandler.verify_completion + +**⚠️ 关键发现**: +1. dispatcher 的 review @mention 用 `bb.add_comment(..., comment_type="review")`,handler 直接 INSERT 不带 comment_type。需要修复 handler。 +2. dispatcher 对 review outcome 有白名单检查(只处理 "completed"/"session_revived"),handler 的 post_complete 没有 outcome 白名单——crash 已在基类处理,其他 outcome 都会走 verify。 +3. dispatcher review 非 approved 时**保持 review 状态**,handler 的 handle_review_complete 标回 working。这是**行为差异**。 + +**改动**:需要先修复 handler 的 review 分支,再替换 on_complete。 + +--- + +### 1.5 Mail spawn 失败回退(L355-358) + +```python +except Exception as e: + if _mail_marked_working: + self._mail_revert_to_pending(task.id, db_path) +``` + +**特殊处理**:spawn 失败(subprocess 启动失败)回退 working → pending。 + +**Handler 覆盖**:❌ handler 没有这个。这是 dispatcher 级别的异常处理,和 handler 无关。但 toolchain 也需要类似逻辑。 + +**改动**:保留在 dispatcher 中,改为 `_mail_marked_working` → `handler_marked_working`。 + +--- + +### 1.6 Legacy dispatch(L584-660) + +```python +is_mail_legacy = project_config.get("project_id") == "_mail" +if is_mail_legacy: + if not self._mail_auto_working(task.id, db_path_legacy): + return error +``` + +**特殊处理**:legacy 路径(router=None 时触发)也有 mail 特殊处理。 + +**Handler 覆盖**:同 1.2/1.3,用 handler 替代。 + +**改动**:同样用 handler.pre_spawn 和 handler.post_complete 替代。 + +--- + +### 1.7 现有 Mail 辅助方法(L658-870) + +`_mail_auto_working` / `_mail_revert_to_pending` / `_mail_auto_complete` / `_mail_check_reply` + +**改动**:Step 5 不删这些方法(安全起见保留,标记 deprecated),只改调用方。确认稳定后再删。 + +--- + +## 二、spawner.py(1704 行) + +### 2.1 _build_prompt 中的 mail 分支(L282-284) + +```python +if project_id == "_mail": + return self._build_mail_prompt(task_id, title, description, must_haves, agent_id) +``` + +**特殊处理**:Mail 用专用精简模板。 + +**Handler 覆盖**:MailHandler.build_prompt 通过 PromptComposer 拼 3 个 section。 + +**改动**:查注册表 → handler.build_prompt(context)。需要构建 PromptContext 传入。 + +--- + +### 2.2 _build_api_section(L321-325) + +```python +success_status = '"done"' if project_id == "_mail" else '"review"' +``` + +**特殊处理**:Mail 的 success_status 是 done。 + +**Handler 覆盖**:已由 handler 的 PromptSection 处理(TaskApiSection hardcode review,MailApiSection 不含 status 回写指令)。 + +**改动**:如果 handler 存在,跳过 _build_api_section(handler.build_prompt 已包含)。 + +--- + +### 2.3 classify_outcome 中的 handler 调用 + +spawner 在 classify_outcome 后调 on_complete(outcome)。on_complete 是 dispatcher 传入的闭包。 + +**改动**:on_complete 闭包改为调用 handler.post_complete。spawner 本身不直接查注册表。 + +--- + +## 三、ticker.py(1897 行) + +### 3.1 虚拟项目扫描(L218-229) + +```python +mail_db = Path(self.registry.root) / "_mail" / "blackboard.db" +if mail_db.exists() and "_mail" not in active_projects: + pr = await self._tick_project("_mail", {...}) +``` + +**特殊处理**:_mail 硬编码扫描。 + +**Handler 覆盖**:TaskTypeRegistry.virtual_projects() 返回 ["_toolchain", "_mail"]。 + +**改动**:循环 `TaskTypeRegistry.virtual_projects()` 替代硬编码。_toolchain 如果也需要 ticker 扫描就自动发现。但需确认 _toolchain 是否需要 ticker——当前 toolchain 任务创建和完成都在 toolchain_routes.py 中处理,可能不需要 ticker 扫描。 + +--- + +### 3.2 _transition_status 中 mail assignee 不清空(L953-960) + +```python +if new_status == "pending": + if self._current_project_id == "_mail": + # Mail 的 assignee 是收件人,永不清空 + conn.execute("UPDATE tasks SET status=?, updated_at=? WHERE id=?", ...) + else: + conn.execute("UPDATE tasks SET status=?, assignee=NULL, ...", ...) +``` + +**特殊处理**:Mail 重置到 pending 时不清空 assignee(assignee 是收件人)。 + +**Handler 覆盖**:❌ handler 不管 ticker 的状态转换逻辑。这是 ticker 内部逻辑。 + +**改动**:用 `TaskTypeRegistry.get_by_project(project_id)` 判断替代硬编码。 + +--- + +### 3.3 Mail 跳过 claimed 状态(L1029-1043) + +```python +if project_id == "_mail": + conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ...) + # 跳过 claimed,直接 working +``` + +**特殊处理**:Mail 不走 claimed 中间态(已在 dispatcher 中标 working)。 + +**Handler 覆盖**:handler.pre_spawn 的 _auto_mark_working 跳过了 claimed。 + +**改动**:用 handler 判断替代硬编码。 + +--- + +### 3.4 _dispatch_reviews 跳过 mail(L1304) + +```python +if project_id == "_mail": + return [] +``` + +**特殊处理**:Mail 不走 review 流程。 + +**Handler 覆盖**:MailHandler.target_success_status = "done",不走 review。但 ticker 的 _dispatch_reviews 是看项目级。 + +**改动**:用 handler 判断。 + +--- + +### 3.5 Mail 幻觉门控兜底(L1474-1492) + +```python +if self._current_project_id == "_mail": + has_reply = self._mail_check_reply(task.id, db_path) + if has_reply: + # working → done +``` + +**特殊处理**:Ticker 超时检查时,如果 mail 有回复,标 done 而非 failed。 + +**Handler 覆盖**:❌ handler 的 check_completion 只返回 bool,不做状态标记。 + +**改动**:调用 handler.check_completion 替代 _mail_check_reply。状态标记逻辑保留在 ticker 中。 + +--- + +### 3.6 _mail_check_reply(L1555-1575) + +和 dispatcher 版本一致。 + +**改动**:用 handler.check_completion 替代。 + +--- + +### 3.7 虚拟项目 init + recovery 扫描(L1625-1643) + +```python +for virtual_id in ("_general", "_mail"): + ... + # _mail 项目不清空 assignee +``` + +**改动**:virtual_projects() + _general 硬编码。 + +--- + +## 四、Handler 缺陷(需在 Step 5 前修复) + +| # | 缺陷 | 影响 | 修复方案 | +|---|------|------|---------| +| H1 | BaseTaskHandler._mark_task_status 无重试 | DB 锁时标状态失败,任务卡住 | 加 3 次重试(和 dispatcher 现有行为一致) | +| H2 | TaskHandler.handle_review_complete 中 @mention 不带 comment_type="review" | review comment 无类型标记 | INSERT 加 comment_type | +| H3 | dispatcher review 非 approved 保持 review 状态,handler 标 working | **行为差异** | handler 改为保持 review 状态(和 dispatcher 一致) | +| H4 | dispatcher review outcome 有白名单("completed"/"session_revived"),handler 无 | crash 之外的异常 outcome 也会走 verify | handler 的 post_complete 已在基类处理 crash,其余 outcome 走 verify 是合理的 | + +**H3 最关键**——dispatcher review 非 approved 保持 review 状态(等 assignee 自己处理),handler 标 working 会触发 ticker 重新 dispatch executor,这不是预期行为。 + +## 五、改动策略 + +**不删旧代码,只改调用方**: +1. dispatcher 中 is_mail → handler 判断,on_checks_passed/on_complete → handler.pre_spawn/post_complete +2. spawner 中 _build_prompt → handler.build_prompt +3. ticker 中虚拟项目扫描 → registry.virtual_projects(),mail 特殊判断 → handler 判断 +4. 旧方法(_mail_auto_working 等)标记 @deprecated 保留,不删 + +**先修 handler 缺陷(H1-H3),再改引擎**。 diff --git a/src/daemon/base_task_handler.py b/src/daemon/base_task_handler.py index b494dd8..c80a083 100644 --- a/src/daemon/base_task_handler.py +++ b/src/daemon/base_task_handler.py @@ -127,32 +127,35 @@ class BaseTaskHandler: task_id, e) def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None: - """更新任务状态 + 写审计事件。 - 从 dispatcher._mark_task_status 迁移。""" - try: - conn = get_connection(db_path) + """更新任务状态 + 写审计事件(带 3 次重试,防 SQLite DB 锁)。""" + for attempt in range(3): try: - conn.execute("BEGIN IMMEDIATE") - old_row = conn.execute( - "SELECT status FROM tasks WHERE id=?", (task_id,) - ).fetchone() - old_status = old_row["status"] if old_row else "unknown" - conn.execute( - "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", - (status, task_id), - ) - conn.execute( - "INSERT INTO events (task_id, agent, event_type, payload) " - "VALUES (?, 'handler', 'status_change', ?)", - (task_id, - f'{{"from": "{old_status}", "to": "{status}", ' - f'"source": "{self.task_type}_handler"}}'), - ) - conn.commit() - finally: - conn.close() - except Exception as e: - logger.error("Task %s: mark status error: %s", task_id, e) + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + old_row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + old_status = old_row["status"] if old_row else "unknown" + conn.execute( + "UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?", + (status, task_id), + ) + conn.execute( + "INSERT INTO events (task_id, agent, event_type, payload) " + "VALUES (?, 'handler', 'status_change', ?)", + (task_id, + f'{{"from": "{old_status}", "to": "{status}", ' + f'"source": "{self.task_type}_handler"}}'), + ) + conn.commit() + return + finally: + conn.close() + except Exception as e: + logger.warning("Handler: mark %s → %s attempt %d failed: %s", + task_id, status, attempt + 1, e) + logger.error("Handler: mark %s → %s all 3 attempts failed", task_id, status) def _auto_mark_working(self, task_id: str, db_path: Path) -> bool: """pending → working(mail/toolchain 通用)。""" diff --git a/src/daemon/dispatcher.py b/src/daemon/dispatcher.py index 077a8d2..ef7cbb5 100644 --- a/src/daemon/dispatcher.py +++ b/src/daemon/dispatcher.py @@ -22,6 +22,7 @@ from src.blackboard.models import Task from src.blackboard.db import get_connection from src.daemon.spawner import AgentBusyError from src.daemon.router import AgentRouter +from src.daemon.task_type_registry import TaskTypeRegistry logger = logging.getLogger("moziplus-v2.dispatcher") @@ -123,10 +124,11 @@ class Dispatcher: "status": "dispatched"|"skipped"|"error"|"blocked", "reason": str} """ # 安全红线检查(调度前拦截) - # Mail 是 Agent 间通信,不做 guardrail 检查 - is_mail = project_config.get( - "project_id") == "_mail" if project_config else False - if self.guardrails and not is_mail: + # handler 项目(_mail/_toolchain)不做 guardrail 检查 + handler = TaskTypeRegistry.get_by_project( + project_config.get("project_id", "") if project_config else "") + is_handler_task = handler is not None + if self.guardrails and not is_handler_task: violations = self.guardrails.check_task(task) critical = [ v for v in violations if v.action in ( @@ -190,27 +192,26 @@ class Dispatcher: } try: - # [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前) - is_mail = project_config.get( - "project_id") == "_mail" if project_config else False - if is_mail: - db_path = Path( - project_config["db_path"]) if project_config and "db_path" in project_config else None + # [Step 5] Handler: pre_spawn + on_checks_passed 统一 + project_id = project_config.get("project_id", "") if project_config else "" + handler = TaskTypeRegistry.get_by_project(project_id) + db_path = Path( + project_config["db_path"]) if project_config and "db_path" in project_config else None - # on_checks_passed: 所有检查通过后才标 working,检查失败不标 + # on_checks_passed: handler 项目在 check 通过后调用 handler.pre_spawn on_checks_passed = None - _mail_marked_working = False - if is_mail and db_path: + handler_marked_working = False + if handler and db_path: _task_id = task.id - _mail_db = db_path - _disp = self + _handler_db = db_path + _handler = handler - def _mail_on_checks_passed(): - nonlocal _mail_marked_working - if not _disp._mail_auto_working(_task_id, _mail_db): - raise RuntimeError("mail_auto_working_failed") - _mail_marked_working = True - on_checks_passed = _mail_on_checks_passed + def _handler_on_checks_passed(): + nonlocal handler_marked_working + if not _handler.pre_spawn(_task_id, _handler_db): + raise RuntimeError("handler_pre_spawn_failed") + handler_marked_working = True + on_checks_passed = _handler_on_checks_passed # 构建 spawn message message = self._build_spawn_message(task, agent_id, project_config, @@ -218,94 +219,46 @@ class Dispatcher: "mode", ""), spawn_type=action_type or "executor") - # v2.7.2: on_complete 只含业务逻辑,不含 counter.release - # counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证 + # [Step 5] Handler: on_complete 统一走 handler.post_complete + # 保留旧路径作为 fallback(无 handler 的项目) on_complete = None - if is_mail: + if handler: _task_id = task.id - _mail_db = db_path - _must_haves = task.must_haves or "" - _dispatcher = self + _handler_db = db_path + _handler = handler - def _mail_on_complete(aid, outcome): - # 幻觉门控:检查是否有回复,自动标 done/failed + def _handler_on_complete(aid, outcome): try: - _dispatcher._mail_auto_complete( - _task_id, aid, _mail_db, _must_haves, outcome=outcome) + _handler.post_complete( + _task_id, aid, outcome, _handler_db) except Exception as e: logger.error( - "Mail %s: on_complete error: %s", _task_id, e) - on_complete = _mail_on_complete + "Handler %s: on_complete error: %s", _task_id, e) + on_complete = _handler_on_complete else: - # #02: Task 路径也加 on_complete(幻觉门控) + # 旧路径:无 handler 的项目(_general 等) _task_id = task.id - _task_db = Path( - project_config["db_path"]) if project_config and "db_path" in project_config else None + _task_db = db_path _dispatcher = self _is_review = action_type == "review" - # #07.2: executor/review 统一 crash 回退 ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({ "crashed", "compact_failed", "process_crash", "session_stuck", "compact_hanging", }) - def _task_on_complete(aid, outcome): + def _legacy_on_complete(aid, outcome): try: - # #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db: _dispatcher._rollback_current_agent( _task_db, _task_id, aid) - - if _is_review: - if _task_db and outcome in ( - "completed", "session_revived"): - # #09: 读 verdict 决定后续动作 - conn = get_connection(_task_db) - try: - review = conn.execute( - "SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1", - (_task_id,) - ).fetchone() - finally: - conn.close() - - if review and review["verdict"] == "approved": - _dispatcher._mark_task_status( - _task_db, _task_id, "done") - logger.info( - "Task %s: review approved, marking done", _task_id) - else: - # 非 approved → @mention 被审 - # agent(assignee,非 current_agent) - verdict_str = review["verdict"] if review else "未知" - conn2 = get_connection(_task_db) - try: - task_row = conn2.execute( - "SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone() - finally: - conn2.close() - - if task_row and task_row["assignee"]: - from src.blackboard.blackboard import Blackboard - bb = Blackboard(_task_db) - bb.add_comment(_task_id, "daemon", - f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳", - comment_type="review") - logger.info("Task %s: review verdict=%s, notified assignee=%s", - _task_id, verdict_str, task_row["assignee"] if task_row else "?") - # 不标 done,保持 review 状态 - else: - logger.warning( - "Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome) - else: - # executor: 三信号验证 → 标 review + if not _is_review: _dispatcher._task_auto_complete( _task_id, _task_db) except Exception as e: logger.error( - "Task %s: on_complete error: %s", _task_id, e) - on_complete = _task_on_complete + "Legacy %s: on_complete error: %s", _task_id, e) + on_complete = _legacy_on_complete session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, @@ -354,8 +307,26 @@ class Dispatcher: } except Exception as e: # on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending - if _mail_marked_working: - self._mail_revert_to_pending(task.id, db_path) + if handler_marked_working and handler and db_path: + # handler 项目:回退到 pending + try: + conn = get_connection(db_path) + try: + conn.execute("BEGIN IMMEDIATE") + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task.id,)).fetchone() + if row and row["status"] == "working": + conn.execute( + "UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?", + (task.id,)) + conn.commit() + logger.info( + "Task %s: reverted working → pending (spawn failed)", task.id) + finally: + conn.close() + except Exception as revert_err: + logger.error( + "Task %s: failed to revert to pending: %s", task.id, revert_err) self._record_routing( task, decision, "error", str(e), _routing_db) return { @@ -580,17 +551,18 @@ class Dispatcher: try: # NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。 # Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。 - # 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。 - is_mail_legacy = project_config.get( - "project_id") == "_mail" if project_config else False - if is_mail_legacy: + # [Step 5] handler 统一:用注册表查 handler + project_id_legacy = project_config.get("project_id", "") if project_config else "" + handler_legacy = TaskTypeRegistry.get_by_project(project_id_legacy) + if handler_legacy: db_path_legacy = Path( project_config["db_path"]) if project_config and "db_path" in project_config else None - if not db_path_legacy or not self._mail_auto_working( - task.id, db_path_legacy): + if db_path_legacy: + handler_legacy.pre_spawn(task.id, db_path_legacy) + else: return {"level": level.value, "agent_id": agent_id, "session_id": None, "status": "error", - "reason": "mail_auto_working_failed"} + "reason": "no db_path for handler"} if hasattr(self.spawner, 'build_spawn_message') and project_config: @@ -612,20 +584,18 @@ class Dispatcher: # v2.7.2: on_complete 只含业务逻辑 on_complete_legacy = None - if is_mail_legacy: + if handler_legacy: _t_id = task.id - _m_db = db_path_legacy - _m_mh = task.must_haves or "" - _disp = self + _h_db = db_path_legacy + _h = handler_legacy - def _mail_oc_legacy(aid, outcome): + def _handler_oc_legacy(aid, outcome): try: - _disp._mail_auto_complete( - _t_id, aid, _m_db, _m_mh, outcome=outcome) + _h.post_complete(_t_id, aid, outcome, _h_db) except Exception as e: logger.error( - "Mail %s: legacy on_complete error: %s", _t_id, e) - on_complete_legacy = _mail_oc_legacy + "Handler %s: legacy on_complete error: %s", _t_id, e) + on_complete_legacy = _handler_oc_legacy session_id = await self.spawner.spawn_full_agent( agent_id=agent_id, message=message, diff --git a/src/daemon/mail_handler.py b/src/daemon/mail_handler.py index d14dadf..91c6e91 100644 --- a/src/daemon/mail_handler.py +++ b/src/daemon/mail_handler.py @@ -93,23 +93,26 @@ class MailHandler(BaseTaskHandler): return "request" def _check_reply(self, task_id: str, db_path: Path) -> bool: - """检查是否已回复(从 dispatcher._mail_check_reply 迁移)""" + """检查是否已回复(查 tasks 表找 in_reply_to 回复邮件) + + 从 dispatcher._mail_check_reply 迁移。 + Mail 回复机制:创建新 task,must_haves JSON 中包含 in_reply_to = original_task_id。 + 不能查 comments 表——回复邮件是独立的 task,不是 comment。 + """ try: conn = get_connection(db_path) try: row = conn.execute( - "SELECT COUNT(*) as cnt FROM comments " - "WHERE task_id=? AND author != 'daemon' " - "AND comment_type != 'system'", - (task_id,) + "SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1", + (task_id, f'%{task_id}%'), ).fetchone() - count = row["cnt"] if row else 0 - return count > 0 + return row is not None finally: conn.close() except Exception as e: logger.error("Mail %s: check reply error: %s", task_id, e) - return False + # 查询失败时保守处理:假设有回复(避免误标 failed) + return True def check_completion(self, task_id: str, db_path: Path) -> bool: """ticker 级别的完成检查:检查是否已回复""" diff --git a/src/daemon/spawner.py b/src/daemon/spawner.py index 915ef07..a67b3c1 100644 --- a/src/daemon/spawner.py +++ b/src/daemon/spawner.py @@ -278,10 +278,30 @@ class AgentSpawner: task_id, title, description, must_haves, project_id, agent_id) - # mail 任务用精简模板 - if project_id == "_mail": - return self._build_mail_prompt( - task_id, title, description, must_haves, agent_id) + # handler 路径:Task/Mail/Toolchain 用各自的 PromptSection 构建 + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(project_id) + if handler: + from src.daemon.prompt_composer import PromptContext + # 从 must_haves 解析 mail 元数据(from / performative) + from_agent = "" + mail_type = "" + try: + meta = json.loads(must_haves) if must_haves else {} + from_agent = meta.get("from", "") + mail_type = meta.get("performative", meta.get("type", "")) + except Exception: + pass + ctx = PromptContext( + task_id=task_id, title=title, description=description or "", + must_haves=must_haves or "", project_id=project_id, + agent_id=agent_id, role=spawn_type, + spawn_type=spawn_type, + from_agent=from_agent, mail_type=mail_type, + ) + return handler.build_prompt(ctx) + + # 旧路径保留:_general 等非 handler 项目 # 走 BootstrapBuilder 新路径 if self.bootstrap_builder and task is not None: @@ -321,8 +341,14 @@ class AgentSpawner: def _build_api_section(self, project_id: str, task_id: str, agent_id: str) -> str: """构建 API 回写操作指令(BootstrapBuilder 模式下补充)""" - # mail 任务直接 done,不走 review - success_status = '"done"' if project_id == "_mail" else '"review"' + # handler 项目(_mail/_toolchain)的 success_status 由 PromptSection 处理 + # 这里只处理无 handler 的项目(normal task) + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(project_id) + if handler: + success_status = '"done"' if handler.target_success_status == "done" else '"review"' + else: + success_status = '"review"' return f"""## 操作指令 ### 状态回写 diff --git a/src/daemon/task_handler.py b/src/daemon/task_handler.py index 2a402a1..6a535a5 100644 --- a/src/daemon/task_handler.py +++ b/src/daemon/task_handler.py @@ -185,6 +185,51 @@ class TaskHandler(BaseTaskHandler): # === 子类实现 === + def post_complete(self, task_id: str, agent_id: str, + outcome: str, db_path: Path) -> None: + """Task on_complete:区分 executor 和 review。 + + executor: 基类统一流程(crash → verify → mark review) + review: handle_review_complete(读 verdict → done/keep review) + """ + # crash 处理(所有类型共用) + if outcome in self.CRASH_OUTCOMES: + self._rollback_current_agent(db_path, task_id, agent_id) + return + + # 检查当前任务状态:如果是 review 状态 → review 完成流程 + try: + conn = get_connection(db_path) + try: + row = conn.execute( + "SELECT status FROM tasks WHERE id=?", (task_id,) + ).fetchone() + task_status = row["status"] if row else "unknown" + finally: + conn.close() + except Exception: + task_status = "unknown" + + if task_status == "review": + # review 完成流程:只处理正常 outcome + if outcome in ("completed", "session_revived"): + self.handle_review_complete(task_id, db_path) + else: + logger.warning( + "Task %s: review agent %s abnormal outcome=%s, keeping review", + task_id, agent_id, outcome) + else: + # executor 完成流程:基类统一 verify → mark + result = self.verify_completion(task_id, db_path) + if result.passed: + self._mark_task_status(db_path, task_id, self.target_success_status()) + logger.info("Task %s: verify passed (%s), marked %s", + task_id, result.reason, self.target_success_status()) + else: + logger.info( + "Task %s: verify not passed (%s), leaving working", + task_id, result.reason) + def target_success_status(self) -> str: """task 类型验证通过后进 review。""" return "review" @@ -309,19 +354,18 @@ class TaskHandler(BaseTaskHandler): task_id, reviewer) else: # 非 approved:通过 blackboard comment @mention assignee + # 保持 review 状态,让 assignee 自行决定下一步 conn.execute( - "INSERT INTO comments (task_id, author, content) " - "VALUES (?, 'system', ?)", + "INSERT INTO comments (task_id, author, content, comment_type) " + "VALUES (?, 'system', ?, 'review')", (task_id, f"@{assignee} review 未通过 (verdict={verdict}, " f"reviewer={reviewer}): {review_comment}") ) conn.commit() - # 回到 working 让 assignee 重新处理 - self._mark_task_status(db_path, task_id, "working") logger.info( "Task %s: review not approved (%s by %s), " - "@mentioned assignee %s, back to working", + "@mentioned assignee %s, keeping review status", task_id, verdict, reviewer, assignee ) finally: diff --git a/src/daemon/ticker.py b/src/daemon/ticker.py index 7796bd6..cc89a67 100644 --- a/src/daemon/ticker.py +++ b/src/daemon/ticker.py @@ -215,18 +215,21 @@ class Ticker: logger.exception("Tick %d _general error", tick_num) results["projects"]["_general"] = {"error": str(e)} - # 虚拟项目 _mail:飞鸽传书 - mail_db = Path(self.registry.root) / "_mail" / "blackboard.db" - if mail_db.exists() and "_mail" not in active_projects: - try: - pr = await self._tick_project("_mail", { - "id": "_mail", "name": "飞鸽传书", - "status": "active", "source": "virtual", - }) - results["projects"]["_mail"] = pr - except Exception as e: - logger.exception("Tick %d _mail error", tick_num) - results["projects"]["_mail"] = {"error": str(e)} + # 虚拟项目:从注册表自动发现 + _general 硬编码 + from src.daemon.task_type_registry import TaskTypeRegistry + for vp in TaskTypeRegistry.virtual_projects(): + vp_db = Path(self.registry.root) / vp / "blackboard.db" + if vp_db.exists() and vp not in active_projects: + try: + vp_name = {"_mail": "飞鸽传书", "_toolchain": "工具链事件"}.get(vp, vp) + pr = await self._tick_project(vp, { + "id": vp, "name": vp_name, + "status": "active", "source": "virtual", + }) + results["projects"][vp] = pr + except Exception as e: + logger.exception("Tick %d %s error", tick_num, vp) + results["projects"][vp] = {"error": str(e)} logger.debug( "Tick %d complete: %d projects", @@ -948,9 +951,11 @@ Parent Task ID: {parent_task.id} now = datetime.utcnow().isoformat() # 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent) - # 但 Mail 的 assignee 是收件人,永不清空 + # handler 虚拟项目(_mail 等)的 assignee 是收件人,永不清空 if new_status == "pending": - if self._current_project_id == "_mail": + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(self._current_project_id) + if handler: conn.execute( "UPDATE tasks SET status=?, updated_at=? WHERE id=?", (new_status, now, task_id), @@ -1025,15 +1030,17 @@ Parent Task ID: {parent_task.id} "full", "escalate"): conn = get_connection(db_path) try: - # [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed - if project_id == "_mail": + # [Step 5] handler 项目已在 dispatcher 中标 working,跳过 claimed + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(project_id) + if handler: conn.execute( "UPDATE tasks SET current_agent=? WHERE id=?", (result["agent_id"], task.id), ) conn.commit() dispatched.append(task.id) - logger.info("Dispatched %s to %s (session=%s, mail auto-working)", + logger.info("Dispatched %s to %s (session=%s, handler auto-working)", task.id, result["agent_id"], result.get("session_id")) else: @@ -1300,8 +1307,10 @@ Parent Task ID: {parent_task.id} async def _dispatch_reviews(self, db_path: Path, project_id: str) -> List[str]: """扫描 review 状态任务,检查是否有产出,调度审查 Agent""" - # mail 任务不走 review 流程,直接跳过 - if project_id == "_mail": + # handler 项目(_mail/_toolchain)不走 review 流程 + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(project_id) + if handler: return [] queries = Queries(db_path) @@ -1470,10 +1479,10 @@ Parent Task ID: {parent_task.id} elapsed = (now - start_time).total_seconds() / 60.0 if elapsed > timeout_minutes: - # [v2.7.1] Mail 幻觉门控兜底:有回复 + working → done - if self._current_project_id == "_mail": - has_reply = self._mail_check_reply(task.id, db_path) - if has_reply: + # [Step 5] handler 幻觉门控兜底:check_completion 通过 + working → done + from src.daemon.task_type_registry import TaskTypeRegistry + handler = TaskTypeRegistry.get_by_project(self._current_project_id) + if handler and handler.check_completion(task.id, db_path): conn = get_connection(db_path) try: ok = self._transition_status( @@ -1621,8 +1630,10 @@ Parent Task ID: {parent_task.id} project_dirs[project_id] = self.registry.root / \ project_id / "blackboard.db" - # 虚拟项目 - for virtual_id in ("_general", "_mail"): + # 虚拟项目:_general + 注册表自动发现 + from src.daemon.task_type_registry import TaskTypeRegistry + virtual_ids = ["_general"] + TaskTypeRegistry.virtual_projects() + for virtual_id in virtual_ids: virtual_db = Path(self.registry.root) / \ virtual_id / "blackboard.db" if virtual_db.exists() and virtual_id not in project_dirs: diff --git a/src/daemon/toolchain_handler.py b/src/daemon/toolchain_handler.py index 8e33799..b3d7f1c 100644 --- a/src/daemon/toolchain_handler.py +++ b/src/daemon/toolchain_handler.py @@ -199,6 +199,23 @@ class ToolchainHandler(BaseTaskHandler): event_type, event_data, ) + def _build_gitea_links(self, event_type: str, event_data: dict) -> str: + """根据事件类型构建 Gitea 链接。""" + links = [] + repo = event_data.get("repo", "") + base_url = "http://192.168.2.154:3000" + + if "pr_number" in event_data: + links.append(f"PR: {base_url}/{repo}/pulls/{event_data['pr_number']}") + if "issue_number" in event_data: + links.append(f"Issue: {base_url}/{repo}/issues/{event_data['issue_number']}") + if "commit" in event_data: + links.append(f"Commit: {base_url}/{repo}/commit/{event_data['commit']}") + if "branch" in event_data and "commit" not in event_data: + links.append(f"分支: {event_data['branch']}") + + return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)" + def _notify_via_mail_api( self, task_id: str, @@ -225,6 +242,9 @@ class ToolchainHandler(BaseTaskHandler): f" - {k}: {v}" for k, v in event_data.items() ) + # 构建 Gitea 链接 + gitea_links = self._build_gitea_links(event_type, event_data) + title = f"[toolchain-handler] 工具链事件处理失败: {task_id}" text = ( f"任务 {task_id} 验证失败\n\n" @@ -232,7 +252,7 @@ class ToolchainHandler(BaseTaskHandler): f"事件详情:\n{event_details or ' (无)'}\n\n" f"失败原因: {reason}\n" f"证据: {evidence}\n\n" - f"黑板任务: http://localhost:8083/ → 项目 _toolchain → 任务 {task_id}\n\n" + f"{gitea_links}\n\n" f"行动指引: {action_hint}" )