Compare commits

...

21 Commits

Author SHA1 Message Date
cfdaily ce1b0902dd fix: S1 handler display_name + S2 import 移顶部 + W1 注释
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 3s
- S1: vp_name 硬编码字典 → handler.display_name 属性
- S2: ticker/spawner 中 TaskTypeRegistry 局部 import → 移文件顶部
- W1: TaskHandler executor verify 失败不调 on_failure 加注释说明
2026-06-10 22:38:55 +08:00
cfdaily 8d72a1fa19 feat: Step 5 引擎接入 + H1-H3/S3 修复 + 审计 D1/D2/D5 修复
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 3s
引擎接入(dispatcher/spawner/ticker → handler 统一路由):
- dispatcher: guardrail/on_checks_passed/on_complete → handler 查询
- spawner: _build_prompt/_build_api_section → handler.build_prompt
- ticker: 虚拟项目扫描/assignee/claimed/review/幻觉门控 → handler 判断

Handler 缺陷修复:
- H1: _mark_task_status 加 3 次重试(防 DB 锁)
- H2: review @mention 加 comment_type='review'
- H3: review 非 approved 保持 review 状态(不标 working)
- S3: 通知链接改 Gitea(PR/Issue/Commit)

审计修复:
- D1: pre_spawn 返回值未检查 → 加 if not 抛 RuntimeError
- D2: PromptContext 缺 from_agent/mail_type → 从 must_haves 解析
- D5: _check_reply 查错表 → 恢复查 tasks 表找 in_reply_to

旧方法保留未删(deprecated),确认稳定后再清理。
2026-06-10 22:33:03 +08:00
pangtong-fujunshi 2c970557c8 Merge pull request 'feat: Step 2-4 — Task/Mail/Toolchain Handlers + 11 PromptSections + BaseTaskHandler' (#25) from feat/task-type-handlers-step2-4 into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 21:47:03 +08:00
cfdaily 4a4e99f738 fix: S1-S3 review suggestions — type annotations unified, urllib replaces curl, rich notification content
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 2s
2026-06-10 21:44:47 +08:00
cfdaily 1b0007f244 feat: Step 2-4 Task/Mail/Toolchain handlers + PromptSections + BaseTaskHandler
CI / lint (pull_request) Failing after 6s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 0s
- base_task_handler.py: 基类统一4步流程(crash→verify→mark→notify)
- task_handler.py: 5 PromptSections + 三信号验证 + review流程
- mail_handler.py: 3 PromptSections + inform/request区分 + 基类统一流程
- toolchain_handler.py: 3 PromptSections + 模板引擎渲染 + Mail API通知
- 背靠背设计-编码一致性检查通过(4严重已修/6轻微保留)
2026-06-10 20:45:06 +08:00
pangtong-fujunshi b953f6da02 Merge pull request 'fix: S1-S4 建议项修复(终验)' (#24) from docs/s-fixes into main
Deploy / ci (push) Failing after 7s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 16:40:41 +08:00
cfdaily cc974bf258 fix: S1-S4 建议项修复 — 类型标注精确化+BaseTaskHandler标注后续PR+token预算说明
CI / lint (pull_request) Failing after 8s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 2s
2026-06-10 16:38:00 +08:00
pangtong-fujunshi 42a28585b8 Merge pull request 'docs: Task 系统架构重构设计 v3.0 — 五层架构+BaseTaskHandler+执行流程+决策记录' (#23) from docs/task-type-architecture into main
Deploy / ci (push) Failing after 6s
Deploy / deploy (push) Has been skipped
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-10 15:42:10 +08:00
cfdaily 0e4d12898d fix: M1-M4 修复 Protocol 签名与设计文档对齐 + §14 去重
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-10 15:41:54 +08:00
cfdaily 86504faf1f docs: 20-task-type-architecture.md v3.0 - §14-§18 五层架构+BaseTaskHandler+执行流程+决策记录 2026-06-10 15:41:54 +08:00
cfdaily 2eba38a5a0 feat: Step 1 — TaskTypeRegistry + PromptComposer 基础设施
- task_type_registry.py: TaskTypeHandler Protocol (10方法+2属性) + TaskTypeRegistry 注册表
- prompt_composer.py: PromptSection Protocol + PromptContext dataclass + PromptComposer 拼装器
- 零依赖,纯新增文件,不影响现有功能
2026-06-10 15:41:54 +08:00
pangtong-fujunshi b0b9a72445 Merge pull request 'docs: Task 系统架构重构设计文档 v2.1(纯文档)' (#22) from docs/task-type-architecture into main
Deploy / ci (push) Successful in 9s
Deploy / deploy (push) Successful in 13s
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 12:42:25 +08:00
cfdaily 1c6b66dc63 docs: 20-task-type-architecture.md v2.1 - 修复 review M1-M3 必修项
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 3s
2026-06-10 12:41:43 +08:00
cfdaily 3fa6040b93 docs: 20-task-type-architecture.md v2.0 - 新增 §11-§13 PromptSection 模式 2026-06-10 12:41:43 +08:00
cfdaily 1485719b0e docs: add 20-task-type-architecture.md - TaskTypeRegistry + Handler 架构重构设计 2026-06-10 12:41:43 +08:00
cfdaily b00d43c8ac docs(#13): merge #19 context layers into #13, delete standalone #19
§19 上下文四层改造方案(原独立文档 #19)合并到 #13 工具链设计文档末尾。
v3.1 → v3.3。两个专题本就是一个整体,分开维护增加认知负担。
2026-06-10 12:41:43 +08:00
jiangwei-infra 25e7d46328 Merge pull request 'fix(frontend): resumed_from null→undefined 类型兼容' (#21) from fix/frontend-null-vs-undefined into main
Deploy / ci (push) Successful in 10s
Deploy / deploy (push) Successful in 11s
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 08:12:08 +08:00
cfdaily c1381c0c93 fix(frontend): resumed_from null→undefined 类型兼容
CI / lint (pull_request) Successful in 9s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
TypeScript: resumed_from 是 string|null,StatusButtons 期望 string|undefined。
用 ?? undefined 转换。
2026-06-10 08:10:58 +08:00
jiangwei-infra 9a62a45a12 Merge pull request 'fix(test): e2e test collection crash - 跳过 import 安装目录' (#20) from fix/e2e-collection-crash into main
Deploy / ci (push) Successful in 9s
Deploy / deploy (push) Failing after 8s
Deploy / notify-deploy-failure (push) Successful in 0s
2026-06-10 07:53:58 +08:00
cfdaily b90b7b37c7 fix(test): e2e test 在 collection 阶段跳过(不 import 安装目录)
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
根因: test_e2e_v27.py 的 skipif 只标记了函数级别,pytest collection 阶段
仍会 import 该文件,触发 sys.path.insert 指向安装目录的 spawner.py。
如果安装目录有 merge conflict 残留,整个 test job crash。

修复: 将 skipif 加入 pytestmark 级别,collection 阶段即跳过。
2026-06-10 07:52:41 +08:00
jiangwei-infra 672fadfee4 Merge pull request 'fix: deploy.yml requirements.txt + frontend resumed_from TS编译' (#18) from fix/deploy-workflow into main
Deploy / ci (push) Successful in 10s
Deploy / deploy (push) Failing after 11s
Deploy / notify-deploy-failure (push) Successful in 1s
2026-06-10 07:21:24 +08:00
16 changed files with 3203 additions and 520 deletions
+376 -1
View File
@@ -1,6 +1,6 @@
# 三国团队工具链与开发流程设计
> **状态**: v3.1P3 端到端验证通过 + 调研结论写入 + Review API 枚举值修正
> **状态**: v3.3#19 上下文四层改造合并 + CI 修复 + A13 修订
> **作者**: 庞统(副军师)🐦
> **评审**: 司马懿(仲达)🗡️
> **日期**: 2026-06-06
@@ -2765,3 +2765,378 @@ Gitea v1.23.4 自带完整的 CI 管理界面:
| §16.8 #10 | Gitea v1.23.4 review payload 调研结论(姜维 2026-06-08):Gitea v1.23.4 review payload 只有 `type` + `content`,没有 `state`/`body`/`user`,这不是 org vs repo 差异而是 Gitea 设计。v1.24.0 格式不变。双格式兼容是防御性编码,保持现状 |
| §16.8 #11 | Spawner compact 检测窗口修复:窗口 300s→900s,尾部读取 50KB→1MB。实测长对话中 compact 记录被推出窗口导致漏检 |
| §16.8 #12 | inform 类型 Mail crash 误标 done bug 修复:`_mail_auto_complete` 增加 outcome 感知,inform 用白名单(completed/claimed/no_reply)控制 done 标记。spawner crash cooldown 300s→60s |
---
### 一、问题诊断
#### 1.1 E2E 真实场景测试暴露的三个断层
主公在 moziplus-v2 仓库创建了 Issue #32(添加 /api/stats 端点),指派张飞。链条在第一步就断了。
| 断层 | 现象 | 根因 |
|------|------|------|
| **Agent 不知道该做什么** | 张飞收到 Issue 指派 Mail,回复"已阅"就结束了 | Mail 模板(issue_assigned.md5 行信息,无流程引导;spawn prompt 说"已阅即可" |
| **Agent 去错了仓库** | 张飞去读了 sanguo_moziplus_v2 平台代码,而不是空的实验仓库 moziplus-v2 | Mail 模板没有仓库 clone URL,张飞凭习惯去了开发目录 |
| **Agent 在 Control UI 提问** | 张飞遇到问题直接在 Control UI 问主公,没有去 Gitea Issue 评论 | 没有任何地方引导"有疑问去 Gitea Issue 评论" |
| **Agent 不知道怎么协作** | 张飞判断任务需要澄清,但不知道该怎么请求澄清 | 没有"做不了→在 Issue 评论 / Mail 庞统"的回退路径 |
| **跨 Agent @mention 无法通知** | 张飞在 Issue 评论 @赵云,赵云收不到通知 | issue_comment handler 只处理 [CI] 评论,@mention 被忽略 |
#### 1.2 根因:工具链在四层架构中的断层
| 层 | 应该有 | 实际有 | Gap |
|---|---|---|---|
| **L0 铁律** | — | — | 无需改动 |
| **L1 角色** | 工具链协作行为规范(所有 Agent 共享) | 无 | AGENTS.md 没有工具链相关内容 |
| **L2 引擎注入** | 事件上下文(仓库 clone URL、Gitea API、Issue/PR 详情) | Mail 模板只有 5 行摘要 | 缺仓库信息和流程引导 |
| **L3 被动参考** | 技术细节(分支命名、commit 规范、PR 创建方式) | git-workflow 等 Skill 已存在但没人触发 | Agent 不知道该加载哪个 Skill |
---
### 二、改造方案:四层归属
#### 2.1 分层原则
| 层 | 放什么 | 不放什么 | 理由 |
|---|---|---|---|
| **L0** | 不放 | — | 工具链不是安全底线 |
| **L1** | 协作行为规范:收到什么通知该做什么、遇到问题怎么办 | 技术细节(分支命名、commit 格式) | 行为规范是团队常识,每个 Agent 都要知道 |
| **L2** | 事件上下文:仓库 clone URL、Gitea API URL、Issue/PR 链接、动态信息 | 固定的协作流程 | 动态信息每次不同,由 Mail 模板 + spawn 时注入 |
| **L3** | 技术细节:git-workflow、code-review 等 Skill 全文 | — | 按需加载,Agent 知道"我要提 PR"后自己读 |
#### 2.2 各层具体内容
##### L1AGENTS.md 加工具链协作行为段(所有 Agent 统一)
```markdown
## 工具链协作(Gitea
收到 Gitea 事件通知(Issue 指派、Review 请求、CI 失败等)时,按以下流程操作:
### 基本流程
- **Issue 指派** → clone 仓库 → 开分支 → 编码 → 提 PR(参考 git-workflow Skill
- **Review 请求** → 读 PR diffGitea API)→ 提交 Review(参考 code-review Skill
- **Review 通过** → 等 merge
- **Review 驳回** → 看 review body → 修代码 → 重新 push
- **CI 失败** → 看错误摘要 → 修代码 → push(自动重触发 CI)
- **部署失败** → 查 deploy 日志 → 修复
### 协作规则
- **有疑问?** 在 Gitea Issue 下评论,不要在 Control UI 或 Mail 里问
- **需要别人帮忙?** 在 Issue 评论中 @mention 对应 Agent(如 @zhaoyun-data
- **做不了?** 回复 Mail 说明原因和建议的接手人
- **获取完整上下文** → 用 Gitea API 拉取 Issue 详情和评论,不要只看 Mail 里的快照
### Gitea API 速查
> 其中 `{owner}/{repo}` 替换为实际仓库,如 `sanguo/sanguo_moziplus_v2`
- Issue 详情: GET /api/v1/repos/{owner}/{repo}/issues/{number}
- Issue 评论: GET /api/v1/repos/{owner}/{repo}/issues/{number}/comments
- PR diff: GET /api/v1/repos/{owner}/{repo}/pulls/{number}.diff
- 提交 Review: POST /api/v1/repos/{owner}/{repo}/pulls/{number}/reviews
```
**改动范围**6 个 Agent 的 AGENTS.md 各加一段(内容统一)。
##### L2:Mail 模板精简 + 事件上下文注入
**原则**:模板只放摘要 + 链接 + 仓库信息,不写固定步骤(步骤在 L1)。
**issue_assigned.md** 改为:
```markdown
Issue 指派
Issue: {issue_url}
标题: {issue_title}
标签: {labels}
📋 获取完整上下文(先读再动手):
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
仓库: {repo_clone_url}
建议分支: feat/issue-{issue_number}-{brief}
```
**review_request.md** 改为:
```markdown
PR Review 请求
PR: {pr_url}
标题: {pr_title}
作者: {pr_author}
分支: {branch}
风险级别: {risk_level}
📋 获取完整上下文:
- PR diff: GET {gitea_api}/repos/{repo}/pulls/{pr_number}.diff
- PR 文件列表: GET {gitea_api}/repos/{repo}/pulls/{pr_number}/files
```
**review_result.md** 改为:
```markdown
Review {result}
PR: {pr_url}
标题: {pr_title}
审查者: {reviewer}
{review_body}
```
**ci_failure.md** 改为:
```markdown
CI 失败
Issue: {issue_url}
分支: {branch}
错误摘要:
{error_summary}
📋 CI 日志: {gitea_url}/{repo}/actions
修复后 push 会自动重触发 CI。
```
**deploy_failure.md** 改为:
```markdown
部署失败
仓库: {repo}
Commit: {commit_sha}
📋 排查步骤:
- CI 日志: {gitea_url}/{repo}/actions
- 服务器: pm2 logs {service_name}
```
**L2 代码改动**`toolchain_routes.py`):
1. 从 Webhook payload 的 `repository` 对象提取 `clone_url``html_url`
2. `render_template()` 传入新变量:`gitea_api``gitea_url``repo_clone_url`
3. 所有模板变量统一补齐
##### L3Skill 按需加载(不改 Skill 本身)
git-workflow、code-review 等 Skill 保持不变。
L1 的协作行为段里会引用 Skill 名称("参考 git-workflow Skill"),Agent 收到 Mail 后根据 L1 的引导自主加载对应 Skill。
**不改 Skill 路由机制**——靠 L1 的文案触发 Agent 的 Skill 路由器匹配。
---
### 三、新增功能:issue_comment @mention 通知
#### 3.1 设计
当前 `_handle_issue_comment` 只处理 `[CI]` 前缀评论。扩展为:
```
issue_comment 事件
├── 含 [CI] / CI 失败 → 原有 CI 失败通知逻辑
└── 含 @username → 解析 @mention → Mail 通知被 @的 Agent
```
#### 3.2 实现
**`toolchain_routes.py` 新增 `_handle_issue_comment_mention()`**
```python
AGENT_IDS = {
"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "simayi-challenger", "pangtong-fujunshi",
}
# 前缀映射:@张飞 → zhangfei-dev
# 中文名映射:Agent 在 Gitea Issue 评论中可能用中文名 @mention
# 英文短名映射:Agent 可能用不带 -dev/-infra 后缀的短名
AGENT_ALIAS = {
"张飞": "zhangfei-dev",
"关羽": "guanyu-dev",
"赵云": "zhaoyun-data",
"姜维": "jiangwei-infra",
"司马懿": "simayi-challenger",
"庞统": "pangtong-fujunshi",
"pangtong": "pangtong-fujunshi",
"simayi": "simayi-challenger",
"zhangfei": "zhangfei-dev",
"guanyu": "guanyu-dev",
"zhaoyun": "zhaoyun-data",
"jiangwei": "jiangwei-infra",
}
def extract_mentions(body: str, sender: str) -> list[str]:
"""从评论 body 中提取 @mention 的 Agent ID"""
candidates = re.findall(r"@([a-zA-Z\u4e00-\u9fa5][a-zA-Z0-9\u4e00-\u9fff-]*)", body)
result = set()
for c in candidates:
# 精确匹配
if c in AGENT_IDS:
result.add(c)
# 前缀/别名匹配
elif c in AGENT_ALIAS:
result.add(AGENT_ALIAS[c])
else:
# 前缀模糊匹配:@zhangfei → zhangfei-dev
for aid in AGENT_IDS:
if aid.startswith(c):
result.add(aid)
break
# 过滤掉评论者自己
result.discard(sender)
return list(result)
```
**新增 mention 通知模板** `templates/toolchain/mention.md`
```markdown
你在 Issue 中被 @mention
Issue: {issue_url}
评论者: {commenter}
评论内容:
{comment_body}
📋 获取完整上下文:
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
```
**改动 `_handle_issue_comment`**
```python
async def _handle_issue_comment(payload):
comment = payload.get("comment", {})
body = comment.get("body", "")
sender = comment.get("user", {}).get("login", "")
repo = _repo_fullname(payload)
issue = payload.get("issue", {})
# 原有 CI 失败逻辑(不变)
if "[CI]" in body or "CI 失败" in body:
# ... 原有逻辑 ...
# 新增:@mention 通知
mentions = extract_mentions(body, sender)
if mentions:
issue_number = issue.get("number", 0)
issue_title = issue.get("title", "")
text = render_template("mention", {
"repo": repo,
"issue_number": str(issue_number),
"issue_url": issue.get("html_url", ""),
"commenter": sender,
"comment_body": body[:500],
"gitea_api": "http://192.168.2.154:3000/api/v1",
})
title = f"@mention: {issue_title} ({repo}#{issue_number})"
for agent_id in mentions:
_send_mail(agent_id, title, text)
```
#### 3.3 去重
- 同一条评论 @多人:每人一封 Mail(不同 to,内容相同)
- 同一事件 org webhook + repo webhook 双触发:现有 delivery UUID 去重机制覆盖
- 同一人被 @多次`extract_mentions` 返回 set,自动去重
---
### 四、Mail Spawn Prompt 改造
#### 4.1 问题
当前工具链 Mail 走 Mail 通道,spawn prompt 是:
```
你收到一封飞鸽传书(纯通知)。
发件者: system
主题: Issue 指派: xxx
内容: [工具链模板]
已阅即可。
```
"已阅即可"直接让 Agent 不做事。
#### 4.2 方案
**不改 MAIL_INFORM_TEMPLATE / MAIL_REQUEST_TEMPLATE 本身**(那是 Mail 系统通用的)。
改为:**工具链 Mail 使用 `type=request`(而不是默认的 inform**。
`_send_mail()` 中,工具链事件创建的 Mail 默认 `performative=request`,这样 Agent 收到时走 `MAIL_REQUEST_TEMPLATE`,知道需要处理。
具体改动在 `_send_mail()` 函数或其调用处:工具链路由调用 `_send_mail` 时传入 `performative="request"`
**⚠️ 验证要点**:改为 request 后,Agent spawn prompt 变为 "请处理以下请求",需确认:
1. Agent 不再把工具链 Mail 当纯通知忽略
2. Agent 能正确处理「已阅型」工具链事件(如 CI 失败通知——不需要回复,但需要知道)
3. 对已关闭 PR/Issue 的延迟通知,Agent 不会尝试去处理
验证方法:部署后发一条 Issue 指派 Mail,观察 Agent 行为是否符合预期。
---
### 五、完整改动清单
| # | 改什么 | 改动内容 | 层 | 风险 |
|---|--------|---------|---|------|
| 1 | 6 个 Agent 的 `AGENTS.md` | 加"工具链协作"段(内容统一) | L1 | 低(纯追加) |
| 2 | `templates/toolchain/issue_assigned.md` | 精简 + 加仓库上下文 + Gitea API 引导 | L2 | 低 |
| 3 | `templates/toolchain/review_request.md` | 精简 + 加 Gitea API 引导 | L2 | 低 |
| 4 | `templates/toolchain/review_result.md` | 精简 | L2 | 低 |
| 5 | `templates/toolchain/ci_failure.md` | 精简 + 加 CI 日志链接 | L2 | 低 |
| 6 | `templates/toolchain/deploy_failure.md` | 精简 + 加排查步骤 | L2 | 低 |
| 7 | **新建** `templates/toolchain/mention.md` | @mention 通知模板 | L2 | 低 |
| 8 | `src/api/toolchain_routes.py` | 提取 clone_url/html_url 传入模板;issue_comment 增加 @mention 解析;工具链 Mail 改为 request 类型 | L2 | 中 |
| 9 | 不改 | git-workflow 等 Skill 保持不变 | L3 | — |
| 10 | 不改 | daemon 核心逻辑、BootstrapBuilder、Skill 路由 | — | — |
---
### 六、验证方案
#### 6.1 单元验证
| 验证点 | 方法 |
|--------|------|
| `extract_mentions()` 提取 `@zhangfei-dev` | unit test |
| `extract_mentions()` 别名匹配 `@张飞` → zhangfei-dev | unit test |
| `extract_mentions()` 前缀匹配 `@zhangfei` → zhangfei-dev | unit test |
| `extract_mentions()` 过滤自己 | unit test |
| 模板渲染新变量不报错 | unit test |
#### 6.2 真实场景 E2E 验证
重复 Issue #32 的场景:
1. 创建 Issue 指派张飞
2. **验证**:张飞收到的 Mail 含 clone URL + Gitea API 引导
3. **验证**:张飞 spawn 后知道该做什么(L1 AGENTS.md 有流程引导)
4. **验证**:张飞有疑问时去 Gitea Issue 评论(而不是 Control UI
5. 在 Issue 评论 @赵云
6. **验证**:赵云收到 @mention Mail
---
### 七、不做的事(标记为后续)
| 标记 | 描述 | 原因 |
|------|------|------|
| 后续-1 | Agent 离开工具链讨论后,是否有意识回到工具链 | 需要更多真实场景观察 |
| 后续-2 | 工具链使用标准在所有 Agent 间的一致性验证 | L1 统一段落是第一步,需要 E2E 验证 |
| 后续-3 | Mail 通道接入 BootstrapBuilder L2 注入 | 改动大,当前方案(L1 统一段落 + 模板引导)够用 |
| 后续-4 | Skill 路由器自动触发(引擎注入) | 改动 daemon 核心,当前靠 L1 文案触发 |
---
### 八、变更记录
| 日期 | 版本 | 变更 |
|------|------|------|
| 2026-06-09 | v1.0 | 初版:E2E 真实场景暴露问题 → 四层改造方案 + @mention 通知 + Mail type 改造 |
-382
View File
@@ -1,382 +0,0 @@
# #19 工具链事件中枢 — 上下文四层改造方案
> 版本: v1.0
> 日期: 2026-06-09
> 作者: 庞统(副军师)
> 状态: 待主公确认
> 前置: #13 工具链与开发流程 §16, #05 上下文四层架构
> 来源: E2E 真实场景测试暴露的三个断层
---
## 一、问题诊断
### 1.1 E2E 真实场景测试暴露的三个断层
主公在 moziplus-v2 仓库创建了 Issue #32(添加 /api/stats 端点),指派张飞。链条在第一步就断了。
| 断层 | 现象 | 根因 |
|------|------|------|
| **Agent 不知道该做什么** | 张飞收到 Issue 指派 Mail,回复"已阅"就结束了 | Mail 模板(issue_assigned.md5 行信息,无流程引导;spawn prompt 说"已阅即可" |
| **Agent 去错了仓库** | 张飞去读了 sanguo_moziplus_v2 平台代码,而不是空的实验仓库 moziplus-v2 | Mail 模板没有仓库 clone URL,张飞凭习惯去了开发目录 |
| **Agent 在 Control UI 提问** | 张飞遇到问题直接在 Control UI 问主公,没有去 Gitea Issue 评论 | 没有任何地方引导"有疑问去 Gitea Issue 评论" |
| **Agent 不知道怎么协作** | 张飞判断任务需要澄清,但不知道该怎么请求澄清 | 没有"做不了→在 Issue 评论 / Mail 庞统"的回退路径 |
| **跨 Agent @mention 无法通知** | 张飞在 Issue 评论 @赵云,赵云收不到通知 | issue_comment handler 只处理 [CI] 评论,@mention 被忽略 |
### 1.2 根因:工具链在四层架构中的断层
| 层 | 应该有 | 实际有 | Gap |
|---|---|---|---|
| **L0 铁律** | — | — | 无需改动 |
| **L1 角色** | 工具链协作行为规范(所有 Agent 共享) | 无 | AGENTS.md 没有工具链相关内容 |
| **L2 引擎注入** | 事件上下文(仓库 clone URL、Gitea API、Issue/PR 详情) | Mail 模板只有 5 行摘要 | 缺仓库信息和流程引导 |
| **L3 被动参考** | 技术细节(分支命名、commit 规范、PR 创建方式) | git-workflow 等 Skill 已存在但没人触发 | Agent 不知道该加载哪个 Skill |
---
## 二、改造方案:四层归属
### 2.1 分层原则
| 层 | 放什么 | 不放什么 | 理由 |
|---|---|---|---|
| **L0** | 不放 | — | 工具链不是安全底线 |
| **L1** | 协作行为规范:收到什么通知该做什么、遇到问题怎么办 | 技术细节(分支命名、commit 格式) | 行为规范是团队常识,每个 Agent 都要知道 |
| **L2** | 事件上下文:仓库 clone URL、Gitea API URL、Issue/PR 链接、动态信息 | 固定的协作流程 | 动态信息每次不同,由 Mail 模板 + spawn 时注入 |
| **L3** | 技术细节:git-workflow、code-review 等 Skill 全文 | — | 按需加载,Agent 知道"我要提 PR"后自己读 |
### 2.2 各层具体内容
#### L1AGENTS.md 加工具链协作行为段(所有 Agent 统一)
```markdown
## 工具链协作(Gitea
收到 Gitea 事件通知(Issue 指派、Review 请求、CI 失败等)时,按以下流程操作:
### 基本流程
- **Issue 指派** → clone 仓库 → 开分支 → 编码 → 提 PR(参考 git-workflow Skill
- **Review 请求** → 读 PR diffGitea API)→ 提交 Review(参考 code-review Skill
- **Review 通过** → 等 merge
- **Review 驳回** → 看 review body → 修代码 → 重新 push
- **CI 失败** → 看错误摘要 → 修代码 → push(自动重触发 CI)
- **部署失败** → 查 deploy 日志 → 修复
### 协作规则
- **有疑问?** 在 Gitea Issue 下评论,不要在 Control UI 或 Mail 里问
- **需要别人帮忙?** 在 Issue 评论中 @mention 对应 Agent(如 @zhaoyun-data
- **做不了?** 回复 Mail 说明原因和建议的接手人
- **获取完整上下文** → 用 Gitea API 拉取 Issue 详情和评论,不要只看 Mail 里的快照
### Gitea API 速查
> 其中 `{owner}/{repo}` 替换为实际仓库,如 `sanguo/sanguo_moziplus_v2`
- Issue 详情: GET /api/v1/repos/{owner}/{repo}/issues/{number}
- Issue 评论: GET /api/v1/repos/{owner}/{repo}/issues/{number}/comments
- PR diff: GET /api/v1/repos/{owner}/{repo}/pulls/{number}.diff
- 提交 Review: POST /api/v1/repos/{owner}/{repo}/pulls/{number}/reviews
```
**改动范围**6 个 Agent 的 AGENTS.md 各加一段(内容统一)。
#### L2:Mail 模板精简 + 事件上下文注入
**原则**:模板只放摘要 + 链接 + 仓库信息,不写固定步骤(步骤在 L1)。
**issue_assigned.md** 改为:
```markdown
Issue 指派
Issue: {issue_url}
标题: {issue_title}
标签: {labels}
📋 获取完整上下文(先读再动手):
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
仓库: {repo_clone_url}
建议分支: feat/issue-{issue_number}-{brief}
```
**review_request.md** 改为:
```markdown
PR Review 请求
PR: {pr_url}
标题: {pr_title}
作者: {pr_author}
分支: {branch}
风险级别: {risk_level}
📋 获取完整上下文:
- PR diff: GET {gitea_api}/repos/{repo}/pulls/{pr_number}.diff
- PR 文件列表: GET {gitea_api}/repos/{repo}/pulls/{pr_number}/files
```
**review_result.md** 改为:
```markdown
Review {result}
PR: {pr_url}
标题: {pr_title}
审查者: {reviewer}
{review_body}
```
**ci_failure.md** 改为:
```markdown
CI 失败
Issue: {issue_url}
分支: {branch}
错误摘要:
{error_summary}
📋 CI 日志: {gitea_url}/{repo}/actions
修复后 push 会自动重触发 CI。
```
**deploy_failure.md** 改为:
```markdown
部署失败
仓库: {repo}
Commit: {commit_sha}
📋 排查步骤:
- CI 日志: {gitea_url}/{repo}/actions
- 服务器: pm2 logs {service_name}
```
**L2 代码改动**`toolchain_routes.py`):
1. 从 Webhook payload 的 `repository` 对象提取 `clone_url``html_url`
2. `render_template()` 传入新变量:`gitea_api``gitea_url``repo_clone_url`
3. 所有模板变量统一补齐
#### L3Skill 按需加载(不改 Skill 本身)
git-workflow、code-review 等 Skill 保持不变。
L1 的协作行为段里会引用 Skill 名称("参考 git-workflow Skill"),Agent 收到 Mail 后根据 L1 的引导自主加载对应 Skill。
**不改 Skill 路由机制**——靠 L1 的文案触发 Agent 的 Skill 路由器匹配。
---
## 三、新增功能:issue_comment @mention 通知
### 3.1 设计
当前 `_handle_issue_comment` 只处理 `[CI]` 前缀评论。扩展为:
```
issue_comment 事件
├── 含 [CI] / CI 失败 → 原有 CI 失败通知逻辑
└── 含 @username → 解析 @mention → Mail 通知被 @的 Agent
```
### 3.2 实现
**`toolchain_routes.py` 新增 `_handle_issue_comment_mention()`**
```python
AGENT_IDS = {
"zhangfei-dev", "guanyu-dev", "zhaoyun-data",
"jiangwei-infra", "simayi-challenger", "pangtong-fujunshi",
}
# 前缀映射:@张飞 → zhangfei-dev
# 中文名映射:Agent 在 Gitea Issue 评论中可能用中文名 @mention
# 英文短名映射:Agent 可能用不带 -dev/-infra 后缀的短名
AGENT_ALIAS = {
"张飞": "zhangfei-dev",
"关羽": "guanyu-dev",
"赵云": "zhaoyun-data",
"姜维": "jiangwei-infra",
"司马懿": "simayi-challenger",
"庞统": "pangtong-fujunshi",
"pangtong": "pangtong-fujunshi",
"simayi": "simayi-challenger",
"zhangfei": "zhangfei-dev",
"guanyu": "guanyu-dev",
"zhaoyun": "zhaoyun-data",
"jiangwei": "jiangwei-infra",
}
def extract_mentions(body: str, sender: str) -> list[str]:
"""从评论 body 中提取 @mention 的 Agent ID"""
candidates = re.findall(r"@([a-zA-Z\u4e00-\u9fa5][a-zA-Z0-9\u4e00-\u9fff-]*)", body)
result = set()
for c in candidates:
# 精确匹配
if c in AGENT_IDS:
result.add(c)
# 前缀/别名匹配
elif c in AGENT_ALIAS:
result.add(AGENT_ALIAS[c])
else:
# 前缀模糊匹配:@zhangfei → zhangfei-dev
for aid in AGENT_IDS:
if aid.startswith(c):
result.add(aid)
break
# 过滤掉评论者自己
result.discard(sender)
return list(result)
```
**新增 mention 通知模板** `templates/toolchain/mention.md`
```markdown
你在 Issue 中被 @mention
Issue: {issue_url}
评论者: {commenter}
评论内容:
{comment_body}
📋 获取完整上下文:
- Issue 详情: GET {gitea_api}/repos/{repo}/issues/{issue_number}
- Issue 评论: GET {gitea_api}/repos/{repo}/issues/{issue_number}/comments
```
**改动 `_handle_issue_comment`**
```python
async def _handle_issue_comment(payload):
comment = payload.get("comment", {})
body = comment.get("body", "")
sender = comment.get("user", {}).get("login", "")
repo = _repo_fullname(payload)
issue = payload.get("issue", {})
# 原有 CI 失败逻辑(不变)
if "[CI]" in body or "CI 失败" in body:
# ... 原有逻辑 ...
# 新增:@mention 通知
mentions = extract_mentions(body, sender)
if mentions:
issue_number = issue.get("number", 0)
issue_title = issue.get("title", "")
text = render_template("mention", {
"repo": repo,
"issue_number": str(issue_number),
"issue_url": issue.get("html_url", ""),
"commenter": sender,
"comment_body": body[:500],
"gitea_api": "http://192.168.2.154:3000/api/v1",
})
title = f"@mention: {issue_title} ({repo}#{issue_number})"
for agent_id in mentions:
_send_mail(agent_id, title, text)
```
### 3.3 去重
- 同一条评论 @多人:每人一封 Mail(不同 to,内容相同)
- 同一事件 org webhook + repo webhook 双触发:现有 delivery UUID 去重机制覆盖
- 同一人被 @多次`extract_mentions` 返回 set,自动去重
---
## 四、Mail Spawn Prompt 改造
### 4.1 问题
当前工具链 Mail 走 Mail 通道,spawn prompt 是:
```
你收到一封飞鸽传书(纯通知)。
发件者: system
主题: Issue 指派: xxx
内容: [工具链模板]
已阅即可。
```
"已阅即可"直接让 Agent 不做事。
### 4.2 方案
**不改 MAIL_INFORM_TEMPLATE / MAIL_REQUEST_TEMPLATE 本身**(那是 Mail 系统通用的)。
改为:**工具链 Mail 使用 `type=request`(而不是默认的 inform**。
`_send_mail()` 中,工具链事件创建的 Mail 默认 `performative=request`,这样 Agent 收到时走 `MAIL_REQUEST_TEMPLATE`,知道需要处理。
具体改动在 `_send_mail()` 函数或其调用处:工具链路由调用 `_send_mail` 时传入 `performative="request"`
**⚠️ 验证要点**:改为 request 后,Agent spawn prompt 变为 "请处理以下请求",需确认:
1. Agent 不再把工具链 Mail 当纯通知忽略
2. Agent 能正确处理「已阅型」工具链事件(如 CI 失败通知——不需要回复,但需要知道)
3. 对已关闭 PR/Issue 的延迟通知,Agent 不会尝试去处理
验证方法:部署后发一条 Issue 指派 Mail,观察 Agent 行为是否符合预期。
---
## 五、完整改动清单
| # | 改什么 | 改动内容 | 层 | 风险 |
|---|--------|---------|---|------|
| 1 | 6 个 Agent 的 `AGENTS.md` | 加"工具链协作"段(内容统一) | L1 | 低(纯追加) |
| 2 | `templates/toolchain/issue_assigned.md` | 精简 + 加仓库上下文 + Gitea API 引导 | L2 | 低 |
| 3 | `templates/toolchain/review_request.md` | 精简 + 加 Gitea API 引导 | L2 | 低 |
| 4 | `templates/toolchain/review_result.md` | 精简 | L2 | 低 |
| 5 | `templates/toolchain/ci_failure.md` | 精简 + 加 CI 日志链接 | L2 | 低 |
| 6 | `templates/toolchain/deploy_failure.md` | 精简 + 加排查步骤 | L2 | 低 |
| 7 | **新建** `templates/toolchain/mention.md` | @mention 通知模板 | L2 | 低 |
| 8 | `src/api/toolchain_routes.py` | 提取 clone_url/html_url 传入模板;issue_comment 增加 @mention 解析;工具链 Mail 改为 request 类型 | L2 | 中 |
| 9 | 不改 | git-workflow 等 Skill 保持不变 | L3 | — |
| 10 | 不改 | daemon 核心逻辑、BootstrapBuilder、Skill 路由 | — | — |
---
## 六、验证方案
### 6.1 单元验证
| 验证点 | 方法 |
|--------|------|
| `extract_mentions()` 提取 `@zhangfei-dev` | unit test |
| `extract_mentions()` 别名匹配 `@张飞` → zhangfei-dev | unit test |
| `extract_mentions()` 前缀匹配 `@zhangfei` → zhangfei-dev | unit test |
| `extract_mentions()` 过滤自己 | unit test |
| 模板渲染新变量不报错 | unit test |
### 6.2 真实场景 E2E 验证
重复 Issue #32 的场景:
1. 创建 Issue 指派张飞
2. **验证**:张飞收到的 Mail 含 clone URL + Gitea API 引导
3. **验证**:张飞 spawn 后知道该做什么(L1 AGENTS.md 有流程引导)
4. **验证**:张飞有疑问时去 Gitea Issue 评论(而不是 Control UI
5. 在 Issue 评论 @赵云
6. **验证**:赵云收到 @mention Mail
---
## 七、不做的事(标记为后续)
| 标记 | 描述 | 原因 |
|------|------|------|
| 后续-1 | Agent 离开工具链讨论后,是否有意识回到工具链 | 需要更多真实场景观察 |
| 后续-2 | 工具链使用标准在所有 Agent 间的一致性验证 | L1 统一段落是第一步,需要 E2E 验证 |
| 后续-3 | Mail 通道接入 BootstrapBuilder L2 注入 | 改动大,当前方案(L1 统一段落 + 模板引导)够用 |
| 后续-4 | Skill 路由器自动触发(引擎注入) | 改动 daemon 核心,当前靠 L1 文案触发 |
---
## 八、变更记录
| 日期 | 版本 | 变更 |
|------|------|------|
| 2026-06-09 | v1.0 | 初版:E2E 真实场景暴露问题 → 四层改造方案 + @mention 通知 + Mail type 改造 |
File diff suppressed because it is too large Load Diff
+74
View File
@@ -0,0 +1,74 @@
# Step 5 双重审计报告
## 摘要
- 设计一致性检查项: 8
- 特殊逻辑覆盖检查项: 22
- 一致/覆盖: 24
- **偏差/遗漏: 6(严重 3 / 轻微 3)**
---
## 偏差/遗漏清单
| # | 维度 | 设计要求 / 旧逻辑 | 代码实际 | 严重程度 | 建议 |
|---|------|-------------------|---------|---------|------|
| **D1** | B1.2 pre_spawn | 旧 `_mail_on_checks_passed`: `if not _mail_auto_working(): raise RuntimeError` — pre_spawn 失败时中止 spawn | 新 `_handler_on_checks_passed`: `_handler.pre_spawn(...)` 返回值未检查,`handler_marked_working = True` 无条件执行 | **严重** | 改为 `if not _handler.pre_spawn(...): raise RuntimeError("handler_pre_spawn_failed")` |
| **D2** | B3.1 PromptContext | 旧 `_build_mail_prompt` 从 must_haves JSON 解析 `from_agent``performative` 传入模板 | 新 `spawner._build_spawn_message` 构建 PromptContext 时缺少 `from_agent``mail_type`,均为空字符串 | **严重** | 从 `must_haves` JSON 提取 `from``performative` 填入 PromptContext |
| **D3** | B1.3 inform outcome 白名单 | 旧 `_mail_auto_complete`: inform 类型有 outcome 白名单 `{"completed", "claimed", "no_reply"}`,不在白名单的 outcome 跳过 auto-done | 新 `MailHandler.verify_completion`: inform 始终返回 True,不检查 outcome | **轻微** | CRASH_OUTCOMES 已被基类处理。剩余异常 outcomesession_revived/api_error/fallback_timeout)极少出现,且旧逻辑不标 done 只是等 ticker 重投,最终效果差异不大。但严格对齐需要加白名单检查 |
| **D4** | A. 设计 §6 retry 逻辑 | 设计文档要求 retry 逻辑中 `handler = TaskTypeRegistry.get_by_project(project_id); if handler: return handler.build_retry_prompt(...)` | spawner L1118-1130 重试 prompt 仍用 `is_mail = project_id == "_mail"` 硬编码 | **轻微** | 当前不影响运行(旧的 `_build_mail_prompt` 仍保留且可用),但与设计文档不一致 |
| **D5** | B1.5 _check_reply 语义差异 | 旧 `_mail_check_reply`: `SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ?` — 检查是否有其他任务的 must_haves 包含当前 task_id(即 in_reply_to 匹配) | 新 `MailHandler._check_reply`: `SELECT COUNT(*) FROM comments WHERE task_id=? AND author != 'daemon' AND comment_type != 'system'` — 检查当前任务是否有非系统 comment | **严重** | 两个查询语义完全不同。旧逻辑检查的是 **mail 表的回复任务**(通过 must_haves 中 in_reply_to 关联),新逻辑检查的是 **当前任务的 comments**。这可能导致 request 类型邮件的幻觉门控行为不同 |
| **D6** | B1.3 标 done 重试机制 | 旧 `_mail_auto_complete`: 标 done 时外层有 `for attempt in range(3)` 循环 | 新 `BaseTaskHandler._mark_task_status`: H1 修复后已有 3 次重试 | **轻微** | ✅ 已修复,但注意旧代码标 done 和标 failed 是分开的重试循环,新代码统一走 `_mark_task_status`。行为等价 |
---
## 一致确认项
### A. 设计一致性
| # | 维度 | 检查点 | 结果 |
|---|------|--------|------|
| A1 | §6 dispatcher | classify_outcome 后调 handler.post_complete | ✅ on_complete 闭包替换为 handler.post_complete |
| A2 | §6 dispatcher | on_checks_passed → handler.pre_spawn | ✅ _handler_on_checks_passed 调用 handler.pre_spawn(但返回值未检查,见 D1) |
| A3 | §6 dispatcher | guardrail 跳过 → handler 判断 | ✅ `is_handler_task = handler is not None` |
| A4 | §6 spawner | _build_prompt → handler.build_prompt | ✅ handler 路径调用 handler.build_prompt(ctx) |
| A5 | §6 spawner | _build_api_section → handler 查询 | ✅ handler 存在时 success_status 从 handler.target_success_status 获取 |
| A6 | §6 ticker | 虚拟项目扫描 → registry.virtual_projects() | ✅ 循环 `TaskTypeRegistry.virtual_projects()` |
| A7 | §6 ticker | check_completion → handler.check_completion | ✅ 超时检查中调 `handler.check_completion(task.id, db_path)` |
| A8 | §6 兼容期 | 设计说"兼容期保留旧逻辑" | ✅ 无 handler 的项目走旧路径(legacy_on_complete |
### B. 特殊逻辑覆盖
| # | 维度 | 检查点 | 结果 |
|---|------|--------|------|
| B1 | 1.1 guardrail | handler 项目跳过,_general 等走 guardrail | ✅ |
| B2 | 1.2 _mail_auto_working | `BEGIN IMMEDIATE` + status 检查 + 标 working | ✅ `_auto_mark_working` 完全一致 |
| B3 | 1.3 request 无回复 → 标 failed + notify | ✅ MailHandler.on_failure 调 `_mark_task_status(failed)` + `notify_mail_failed` |
| B4 | 1.4 _mail_revert_to_pending | spawn 失败回退 working → pending | ✅ Exception handler 中有 `BEGIN IMMEDIATE` + 状态检查回退 |
| B5 | 1.6 Task review verdict 读取 | approved → done | ✅ handle_review_complete |
| B6 | 1.6 Task review 非 approved → @mention assignee + 保持 review | ✅ H3 修复后保持 review + INSERT comment with comment_type='review' |
| B7 | 1.6 Task executor 三信号验证 | output/comment/terminal status → review | ✅ verify_completion 完全一致 |
| B8 | 1.7 Legacy dispatch 路径 | handler 替代 is_mail_legacy | ✅ handler_legacy 查注册表 |
| B9 | 2.1 _transition_status assignee 清空 | handler 项目不清空 | ✅ |
| B10 | 2.2 跳过 claimed 状态 | handler 项目跳过 claimed 直接 working | ✅ |
| B11 | 2.3 _dispatch_reviews 跳过 | handler 项目不走 review | ✅ |
| B12 | 2.5 startup recovery | `_general` + virtual_projects() | ✅ 不会重复扫描 |
| B13 | 3.1 _build_api_section | handler 存在时正确获取 success_status | ✅ |
| B14 | B4.1 TaskHandler.post_complete | 区分 executor/review 流程 | ✅ 通过读 DB status 判断 |
| B15 | B4.2 MailHandler.post_complete | 基类统一流程 | ✅ |
| B16 | B4.3 ToolchainHandler.post_complete | 基类统一流程 | ✅ |
| B17 | B1.5 _check_reply 异常保守处理 | 旧: return True(保守)/ 新: return False | 见 D5 |
| B18 | CRASH_OUTCOMES 集合 | 与旧 ROLLBACK_CURRENT_AGENT_OUTCOMES 一致 | ✅ 完全一致 |
| B19 | B2.1 _toolchain ticker 扫描 | _toolchain 会被 ticker 扫描 | ✅ _toolchain 有 blackboard.db 时会被 tick_project 处理 |
| B20 | B2.3 handler 项目都跳过 claimed | _toolchain 也跳过 | ✅ 所有 handler 项目统一处理 |
---
## 修复优先级
| 优先级 | # | 修复内容 |
|--------|---|---------|
| **P0** | D1 | dispatcher _handler_on_checks_passed 检查 pre_spawn 返回值 |
| **P0** | D2 | spawner PromptContext 从 must_haves 提取 from_agent 和 mail_type |
| **P0** | D5 | MailHandler._check_reply 恢复旧查询语义(检查 must_haves 中的 in_reply_to |
| P1 | D3 | inform outcome 白名单(可选,影响极小) |
| P2 | D4 | retry prompt 用 handler 路径替代硬编码 |
+324
View File
@@ -0,0 +1,324 @@
# Step 5 引擎接入 — 影响分析与逐点对照
## 方法论
逐行审查 dispatcher.py / spawner.py / ticker.py 中所有 `is_mail` / `_mail` / `project_id == "_mail"` 分支,
对照 handler 实现,确认每个特殊处理的去向。
---
## 一、dispatcher.py985 行)
### 1.1 Guardrail 跳过(L127-129
```python
is_mail = project_config.get("project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
violations = self.guardrails.check_task(task)
```
**特殊处理**Mail 不做 guardrail 检查。
**Handler 覆盖**:设计文档 D6 "skip_guardrail 从接口删除,guardrail 自己判断"。Step 5 改为:`if self.guardrails and handler is None`(无 handler 时走 guardrail),或者用 handler.virtual_project 判断。handler 存在时跳过 guardrail。
**改动**`is_mail``TaskTypeRegistry.get_by_project(project_id) is not None`
---
### 1.2 Mail on_checks_passedL194-213
```python
on_checks_passed = None
_mail_marked_working = False
if is_mail and db_path:
def _mail_on_checks_passed():
nonlocal _mail_marked_working
if not _disp._mail_auto_working(_task_id, _mail_db):
raise RuntimeError("mail_auto_working_failed")
_mail_marked_working = True
on_checks_passed = _mail_on_checks_passed
```
**特殊处理**Mail spawn 前通过 on_checks_passed 回调标 working,标记成功后才 spawnspawn 失败回退。
**Handler 覆盖**MailHandler.pre_spawn 调用 `_auto_mark_working`,和 `_mail_auto_working` 逻辑完全一致。
**改动**
- `on_checks_passed` 改为调用 `handler.pre_spawn(task_id, db_path)`
- `_mail_marked_working` 标记保留,用于 Exception 回退
---
### 1.3 Mail on_completeL224-238
```python
if is_mail:
def _mail_on_complete(aid, outcome):
_dispatcher._mail_auto_complete(_task_id, aid, _mail_db, _must_haves, outcome=outcome)
on_complete = _mail_on_complete
```
**特殊处理**Mail on_complete 调用 `_mail_auto_complete`(含 inform/request 分支、幻觉门控、重试 3 次、失败通知)。
**Handler 覆盖**MailHandler 使用基类 post_complete 统一流程(crash→verify→mark→notify)。但现有 `_mail_auto_complete` 有几个细节差异需要注意:
| 现有逻辑 | Handler 覆盖 | 差异 |
|---------|-------------|------|
| request 无回复 → 重试 3 次标 failed | on_failure 标 failed + notify | ⚠️ 缺少 3 次重试 |
| inform 只在特定 outcome 标 done | verify 始终返回 True → 基类标 done | ✅ 简化了,合理 |
| 标 done 重试 3 次 | _mark_task_status 单次 | ⚠️ 缺少重试 |
| notify_mail_failed | on_failure 中调用 notify_mail_failed | ✅ 一致 |
**⚠️ 关键发现**:现有代码标状态时有 **重试 3 次** 机制(防止 DB 锁),handler 的 `_mark_task_status` 只做一次。需要把重试逻辑补到 `_mark_task_status` 或在 handler 层加。
**改动**on_complete 改为调用 `handler.post_complete(task_id, agent_id, outcome, db_path)`
---
### 1.4 Task on_completeL241-310
```python
else:
def _task_on_complete(aid, outcome):
# #07.2: crash 回退
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db:
_dispatcher._rollback_current_agent(_task_db, _task_id, aid)
if _is_review:
if outcome in ("completed", "session_revived"):
# 读 verdict → approved 标 done / 非 approved @mention assignee
else:
logger.warning("review agent outcome=%s, NOT marking done", outcome)
else:
# executor: 三信号验证 → 标 review
_dispatcher._task_auto_complete(_task_id, _task_db)
```
**特殊处理清单**
1. **#07.2 crash 回退**executor 和 review 都回退 current_agent → assignee
2. **review 分支**outcome 必须是 "completed" 或 "session_revived" 才走 verdict 读取
3. **review verdict 读取**approved → done,非 approved → @mention assignee + 保持 review
4. **review @mention**:通过 Blackboard.add_commentcomment_type="review"
5. **executor 分支**:走 _task_auto_complete → 三信号验证 → review
**Handler 覆盖**
- crash 回退:✅ BaseTaskHandler.post_complete 第一步
- review verdict:⚠️ **TaskHandler.handle_review_complete 存在但未被 dispatcher 调用**。现有 dispatcher 直接在闭包里做了,不走 handler。
- @mention:⚠️ handler 用 `conn.execute("INSERT INTO comments")` 直接插入,dispatcher 用 `Blackboard.add_comment`(会做更多处理,如 comment_type="review"
- executor 三信号:✅ TaskHandler.verify_completion
**⚠️ 关键发现**
1. dispatcher 的 review @mention`bb.add_comment(..., comment_type="review")`handler 直接 INSERT 不带 comment_type。需要修复 handler。
2. dispatcher 对 review outcome 有白名单检查(只处理 "completed"/"session_revived"),handler 的 post_complete 没有 outcome 白名单——crash 已在基类处理,其他 outcome 都会走 verify。
3. dispatcher review 非 approved 时**保持 review 状态**handler 的 handle_review_complete 标回 working。这是**行为差异**。
**改动**:需要先修复 handler 的 review 分支,再替换 on_complete。
---
### 1.5 Mail spawn 失败回退(L355-358
```python
except Exception as e:
if _mail_marked_working:
self._mail_revert_to_pending(task.id, db_path)
```
**特殊处理**spawn 失败(subprocess 启动失败)回退 working → pending。
**Handler 覆盖**:❌ handler 没有这个。这是 dispatcher 级别的异常处理,和 handler 无关。但 toolchain 也需要类似逻辑。
**改动**:保留在 dispatcher 中,改为 `_mail_marked_working``handler_marked_working`
---
### 1.6 Legacy dispatchL584-660
```python
is_mail_legacy = project_config.get("project_id") == "_mail"
if is_mail_legacy:
if not self._mail_auto_working(task.id, db_path_legacy):
return error
```
**特殊处理**legacy 路径(router=None 时触发)也有 mail 特殊处理。
**Handler 覆盖**:同 1.2/1.3,用 handler 替代。
**改动**:同样用 handler.pre_spawn 和 handler.post_complete 替代。
---
### 1.7 现有 Mail 辅助方法(L658-870
`_mail_auto_working` / `_mail_revert_to_pending` / `_mail_auto_complete` / `_mail_check_reply`
**改动**:Step 5 不删这些方法(安全起见保留,标记 deprecated),只改调用方。确认稳定后再删。
---
## 二、spawner.py1704 行)
### 2.1 _build_prompt 中的 mail 分支(L282-284
```python
if project_id == "_mail":
return self._build_mail_prompt(task_id, title, description, must_haves, agent_id)
```
**特殊处理**Mail 用专用精简模板。
**Handler 覆盖**MailHandler.build_prompt 通过 PromptComposer 拼 3 个 section。
**改动**:查注册表 → handler.build_prompt(context)。需要构建 PromptContext 传入。
---
### 2.2 _build_api_sectionL321-325
```python
success_status = '"done"' if project_id == "_mail" else '"review"'
```
**特殊处理**Mail 的 success_status 是 done。
**Handler 覆盖**:已由 handler 的 PromptSection 处理(TaskApiSection hardcode reviewMailApiSection 不含 status 回写指令)。
**改动**:如果 handler 存在,跳过 _build_api_sectionhandler.build_prompt 已包含)。
---
### 2.3 classify_outcome 中的 handler 调用
spawner 在 classify_outcome 后调 on_complete(outcome)。on_complete 是 dispatcher 传入的闭包。
**改动**on_complete 闭包改为调用 handler.post_complete。spawner 本身不直接查注册表。
---
## 三、ticker.py1897 行)
### 3.1 虚拟项目扫描(L218-229
```python
mail_db = Path(self.registry.root) / "_mail" / "blackboard.db"
if mail_db.exists() and "_mail" not in active_projects:
pr = await self._tick_project("_mail", {...})
```
**特殊处理**_mail 硬编码扫描。
**Handler 覆盖**TaskTypeRegistry.virtual_projects() 返回 ["_toolchain", "_mail"]。
**改动**:循环 `TaskTypeRegistry.virtual_projects()` 替代硬编码。_toolchain 如果也需要 ticker 扫描就自动发现。但需确认 _toolchain 是否需要 ticker——当前 toolchain 任务创建和完成都在 toolchain_routes.py 中处理,可能不需要 ticker 扫描。
---
### 3.2 _transition_status 中 mail assignee 不清空(L953-960
```python
if new_status == "pending":
if self._current_project_id == "_mail":
# Mail 的 assignee 是收件人,永不清空
conn.execute("UPDATE tasks SET status=?, updated_at=? WHERE id=?", ...)
else:
conn.execute("UPDATE tasks SET status=?, assignee=NULL, ...", ...)
```
**特殊处理**Mail 重置到 pending 时不清空 assigneeassignee 是收件人)。
**Handler 覆盖**:❌ handler 不管 ticker 的状态转换逻辑。这是 ticker 内部逻辑。
**改动**:用 `TaskTypeRegistry.get_by_project(project_id)` 判断替代硬编码。
---
### 3.3 Mail 跳过 claimed 状态(L1029-1043
```python
if project_id == "_mail":
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ...)
# 跳过 claimed,直接 working
```
**特殊处理**Mail 不走 claimed 中间态(已在 dispatcher 中标 working)。
**Handler 覆盖**handler.pre_spawn 的 _auto_mark_working 跳过了 claimed。
**改动**:用 handler 判断替代硬编码。
---
### 3.4 _dispatch_reviews 跳过 mailL1304
```python
if project_id == "_mail":
return []
```
**特殊处理**Mail 不走 review 流程。
**Handler 覆盖**MailHandler.target_success_status = "done",不走 review。但 ticker 的 _dispatch_reviews 是看项目级。
**改动**:用 handler 判断。
---
### 3.5 Mail 幻觉门控兜底(L1474-1492
```python
if self._current_project_id == "_mail":
has_reply = self._mail_check_reply(task.id, db_path)
if has_reply:
# working → done
```
**特殊处理**:Ticker 超时检查时,如果 mail 有回复,标 done 而非 failed。
**Handler 覆盖**:❌ handler 的 check_completion 只返回 bool,不做状态标记。
**改动**:调用 handler.check_completion 替代 _mail_check_reply。状态标记逻辑保留在 ticker 中。
---
### 3.6 _mail_check_replyL1555-1575
和 dispatcher 版本一致。
**改动**:用 handler.check_completion 替代。
---
### 3.7 虚拟项目 init + recovery 扫描(L1625-1643
```python
for virtual_id in ("_general", "_mail"):
...
# _mail 项目不清空 assignee
```
**改动**virtual_projects() + _general 硬编码。
---
## 四、Handler 缺陷(需在 Step 5 前修复)
| # | 缺陷 | 影响 | 修复方案 |
|---|------|------|---------|
| H1 | BaseTaskHandler._mark_task_status 无重试 | DB 锁时标状态失败,任务卡住 | 加 3 次重试(和 dispatcher 现有行为一致) |
| H2 | TaskHandler.handle_review_complete 中 @mention 不带 comment_type="review" | review comment 无类型标记 | INSERT 加 comment_type |
| H3 | dispatcher review 非 approved 保持 review 状态,handler 标 working | **行为差异** | handler 改为保持 review 状态(和 dispatcher 一致) |
| H4 | dispatcher review outcome 有白名单("completed"/"session_revived"),handler 无 | crash 之外的异常 outcome 也会走 verify | handler 的 post_complete 已在基类处理 crash,其余 outcome 走 verify 是合理的 |
**H3 最关键**——dispatcher review 非 approved 保持 review 状态(等 assignee 自己处理),handler 标 working 会触发 ticker 重新 dispatch executor,这不是预期行为。
## 五、改动策略
**不删旧代码,只改调用方**
1. dispatcher 中 is_mail → handler 判断,on_checks_passed/on_complete → handler.pre_spawn/post_complete
2. spawner 中 _build_prompt → handler.build_prompt
3. ticker 中虚拟项目扫描 → registry.virtual_projects()mail 特殊判断 → handler 判断
4. 旧方法(_mail_auto_working 等)标记 @deprecated 保留,不删
**先修 handler 缺陷(H1-H3),再改引擎**
+183
View File
@@ -0,0 +1,183 @@
"""base_task_handler.py — Task type handler 基类。
收敛合理的共性能力(crash rollback + verify + mark + notify),
子类只实现差异点。
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from src.daemon.prompt_composer import PromptContext, PromptComposer, PromptSection
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
@dataclass
class VerifyResult:
"""验证结果"""
passed: bool
reason: str # "has_output" / "no_reply" / "no_signal" / ...
evidence: str # "output_count=1, comment_count=0"
can_retry: bool = True
retry_count: int = 0
class BaseTaskHandler:
"""所有 task type handler 的基类。
职责:L2 引擎注入层的业务逻辑——prompt 构建、完成验证、状态标记。
不管:进程生命周期、exit 分类、重试决策(这些归 spawner)。
"""
# crash 类 outcome(进程级异常,需要 rollback)
CRASH_OUTCOMES = frozenset({
"crashed", "compact_failed", "process_crash",
"session_stuck", "compact_hanging",
})
task_type: str = ""
virtual_project: Optional[str] = None
display_name: str = "" # 中文展示名(ticker 扫描日志用)
# === 子类必须实现 ===
def build_prompt(self, context: PromptContext) -> str:
"""构建 L2 prompt(通过 PromptComposer 拼 section)。子类实现。"""
raise NotImplementedError
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""验证任务完成质量。每个 handler 自己的验证逻辑。子类实现。"""
raise NotImplementedError
def target_success_status(self) -> str:
"""验证通过后的目标状态。task='review', mail/toolchain='done'"""
return "review"
def get_sections(self) -> list[PromptSection]:
"""返回此 handler 的 prompt section 列表。子类实现。"""
return []
# === 基类提供统一流程 ===
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""spawn 前业务准备。默认 True。
mail/toolchain override 为 auto_working。"""
return True
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""spawn 完成后的业务处理。统一 4 步流程:
1. crash 处理 → rollback current_agent
2. verify → 验证产出
3. mark → 标目标状态
4. notify → 失败时 on_failure
"""
# 1. crash 处理(基类提供,所有 handler 继承)
if outcome in self.CRASH_OUTCOMES:
self._rollback_current_agent(db_path, task_id, agent_id)
return
# 2. verify
result = self.verify_completion(task_id, db_path)
# 3. mark
if result.passed:
self._mark_task_status(db_path, task_id, self.target_success_status())
logger.info("Task %s: verify passed (%s), marked %s",
task_id, result.reason, self.target_success_status())
else:
# 4. notify
self.on_failure(task_id, agent_id, db_path, result)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败处理。默认:标 failed。子类可 override。"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Task %s: verify failed (%s), marked failed",
task_id, verify.reason)
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查。默认:False。"""
return False
# === 内部工具方法 ===
def _rollback_current_agent(self, db_path: Path, task_id: str, agent_id: str) -> None:
"""crash 后回退 current_agent → assignee,避免 exclude_current 卡死。
从 dispatcher._rollback_current_agent 迁移。"""
try:
conn = get_connection(db_path)
try:
conn.execute(
"UPDATE tasks SET current_agent = "
"(SELECT assignee FROM tasks WHERE id=?) "
"WHERE id=? AND current_agent=?",
(task_id, task_id, agent_id)
)
conn.commit()
finally:
conn.close()
logger.info("Task %s: rolled back current_agent from %s to assignee",
task_id, agent_id)
except Exception as e:
logger.warning("Task %s: failed to rollback current_agent: %s",
task_id, e)
def _mark_task_status(self, db_path: Path, task_id: str, status: str) -> None:
"""更新任务状态 + 写审计事件(带 3 次重试,防 SQLite DB 锁)。"""
for attempt in range(3):
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
old_row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
old_status = old_row["status"] if old_row else "unknown"
conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now') WHERE id=?",
(status, task_id),
)
conn.execute(
"INSERT INTO events (task_id, agent, event_type, payload) "
"VALUES (?, 'handler', 'status_change', ?)",
(task_id,
f'{{"from": "{old_status}", "to": "{status}", '
f'"source": "{self.task_type}_handler"}}'),
)
conn.commit()
return
finally:
conn.close()
except Exception as e:
logger.warning("Handler: mark %s%s attempt %d failed: %s",
task_id, status, attempt + 1, e)
logger.error("Handler: mark %s%s all 3 attempts failed", task_id, status)
def _auto_mark_working(self, task_id: str, db_path: Path) -> bool:
"""pending → workingmail/toolchain 通用)。"""
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row or row["status"] not in ("pending", "claimed"):
logger.warning("Task %s: cannot mark working (status=%s)",
task_id, row["status"] if row else "not found")
return False
conn.execute(
"UPDATE tasks SET status='working', updated_at=datetime('now') "
"WHERE id=?", (task_id,))
conn.commit()
logger.info("Task %s: auto-marked working", task_id)
return True
finally:
conn.close()
except Exception as e:
logger.error("Task %s: failed to mark working: %s", task_id, e)
return False
+73 -103
View File
@@ -22,6 +22,7 @@ from src.blackboard.models import Task
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
from src.daemon.router import AgentRouter
from src.daemon.task_type_registry import TaskTypeRegistry
logger = logging.getLogger("moziplus-v2.dispatcher")
@@ -123,10 +124,11 @@ class Dispatcher:
"status": "dispatched"|"skipped"|"error"|"blocked", "reason": str}
"""
# 安全红线检查(调度前拦截)
# Mail 是 Agent 间通信,不做 guardrail 检查
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if self.guardrails and not is_mail:
# handler 项目(_mail/_toolchain不做 guardrail 检查
handler = TaskTypeRegistry.get_by_project(
project_config.get("project_id", "") if project_config else "")
is_handler_task = handler is not None
if self.guardrails and not is_handler_task:
violations = self.guardrails.check_task(task)
critical = [
v for v in violations if v.action in (
@@ -190,27 +192,26 @@ class Dispatcher:
}
try:
# [v2.7.1] Mail: 标 working 移到 spawn_full_agent 内部(check 通过后、subprocess 前)
is_mail = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail:
db_path = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
# [Step 5] Handler: pre_spawn + on_checks_passed 统一
project_id = project_config.get("project_id", "") if project_config else ""
handler = TaskTypeRegistry.get_by_project(project_id)
db_path = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
# on_checks_passed: 所有检查通过后才标 working,检查失败不标
# on_checks_passed: handler 项目在 check 通过后调用 handler.pre_spawn
on_checks_passed = None
_mail_marked_working = False
if is_mail and db_path:
handler_marked_working = False
if handler and db_path:
_task_id = task.id
_mail_db = db_path
_disp = self
_handler_db = db_path
_handler = handler
def _mail_on_checks_passed():
nonlocal _mail_marked_working
if not _disp._mail_auto_working(_task_id, _mail_db):
raise RuntimeError("mail_auto_working_failed")
_mail_marked_working = True
on_checks_passed = _mail_on_checks_passed
def _handler_on_checks_passed():
nonlocal handler_marked_working
if not _handler.pre_spawn(_task_id, _handler_db):
raise RuntimeError("handler_pre_spawn_failed")
handler_marked_working = True
on_checks_passed = _handler_on_checks_passed
# 构建 spawn message
message = self._build_spawn_message(task, agent_id, project_config,
@@ -218,94 +219,46 @@ class Dispatcher:
"mode", ""),
spawn_type=action_type or "executor")
# v2.7.2: on_complete 只含业务逻辑,不含 counter.release
# counter.release 由 spawn_full_agent 内部的 wrapped_on_complete 保证
# [Step 5] Handler: on_complete 统一走 handler.post_complete
# 保留旧路径作为 fallback(无 handler 的项目)
on_complete = None
if is_mail:
if handler:
_task_id = task.id
_mail_db = db_path
_must_haves = task.must_haves or ""
_dispatcher = self
_handler_db = db_path
_handler = handler
def _mail_on_complete(aid, outcome):
# 幻觉门控:检查是否有回复,自动标 done/failed
def _handler_on_complete(aid, outcome):
try:
_dispatcher._mail_auto_complete(
_task_id, aid, _mail_db, _must_haves, outcome=outcome)
_handler.post_complete(
_task_id, aid, outcome, _handler_db)
except Exception as e:
logger.error(
"Mail %s: on_complete error: %s", _task_id, e)
on_complete = _mail_on_complete
"Handler %s: on_complete error: %s", _task_id, e)
on_complete = _handler_on_complete
else:
# #02: Task 路径也加 on_complete(幻觉门控
# 旧路径:无 handler 的项目(_general 等
_task_id = task.id
_task_db = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
_task_db = db_path
_dispatcher = self
_is_review = action_type == "review"
# #07.2: executor/review 统一 crash 回退
ROLLBACK_CURRENT_AGENT_OUTCOMES = frozenset({
"crashed", "compact_failed", "process_crash",
"session_stuck", "compact_hanging",
})
def _task_on_complete(aid, outcome):
def _legacy_on_complete(aid, outcome):
try:
# #07.2: 统一 crash 回退——executor 和 review 都回退 current_agent
if outcome in ROLLBACK_CURRENT_AGENT_OUTCOMES and _task_db:
_dispatcher._rollback_current_agent(
_task_db, _task_id, aid)
if _is_review:
if _task_db and outcome in (
"completed", "session_revived"):
# #09: 读 verdict 决定后续动作
conn = get_connection(_task_db)
try:
review = conn.execute(
"SELECT verdict FROM reviews WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(_task_id,)
).fetchone()
finally:
conn.close()
if review and review["verdict"] == "approved":
_dispatcher._mark_task_status(
_task_db, _task_id, "done")
logger.info(
"Task %s: review approved, marking done", _task_id)
else:
# 非 approved → @mention 被审
# agentassignee,非 current_agent
verdict_str = review["verdict"] if review else "未知"
conn2 = get_connection(_task_db)
try:
task_row = conn2.execute(
"SELECT assignee FROM tasks WHERE id=?", (_task_id,)).fetchone()
finally:
conn2.close()
if task_row and task_row["assignee"]:
from src.blackboard.blackboard import Blackboard
bb = Blackboard(_task_db)
bb.add_comment(_task_id, "daemon",
f"@{task_row['assignee']} 审查结论: {verdict_str},请查看详情并决定接受或反驳",
comment_type="review")
logger.info("Task %s: review verdict=%s, notified assignee=%s",
_task_id, verdict_str, task_row["assignee"] if task_row else "?")
# 不标 done,保持 review 状态
else:
logger.warning(
"Task %s: review agent %s (%s), NOT marking done", _task_id, aid, outcome)
else:
# executor: 三信号验证 → 标 review
if not _is_review:
_dispatcher._task_auto_complete(
_task_id, _task_db)
except Exception as e:
logger.error(
"Task %s: on_complete error: %s", _task_id, e)
on_complete = _task_on_complete
"Legacy %s: on_complete error: %s", _task_id, e)
on_complete = _legacy_on_complete
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id,
@@ -354,8 +307,26 @@ class Dispatcher:
}
except Exception as e:
# on_checks_passed 已执行但 subprocess 失败 → 回退 working → pending
if _mail_marked_working:
self._mail_revert_to_pending(task.id, db_path)
if handler_marked_working and handler and db_path:
# handler 项目:回退到 pending
try:
conn = get_connection(db_path)
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task.id,)).fetchone()
if row and row["status"] == "working":
conn.execute(
"UPDATE tasks SET status='pending', updated_at=datetime('now') WHERE id=?",
(task.id,))
conn.commit()
logger.info(
"Task %s: reverted working → pending (spawn failed)", task.id)
finally:
conn.close()
except Exception as revert_err:
logger.error(
"Task %s: failed to revert to pending: %s", task.id, revert_err)
self._record_routing(
task, decision, "error", str(e), _routing_db)
return {
@@ -580,17 +551,18 @@ class Dispatcher:
try:
# NOTE: _legacy_dispatch 仅在 router=None 时触发,当前配置不会进入。
# Mail 永远走 dispatch() 主路径(on_checks_passed 方案),不走此路径。
# 如果未来 legacy 路径被启用,需同步 on_checks_passed 逻辑。
is_mail_legacy = project_config.get(
"project_id") == "_mail" if project_config else False
if is_mail_legacy:
# [Step 5] handler 统一:用注册表查 handler
project_id_legacy = project_config.get("project_id", "") if project_config else ""
handler_legacy = TaskTypeRegistry.get_by_project(project_id_legacy)
if handler_legacy:
db_path_legacy = Path(
project_config["db_path"]) if project_config and "db_path" in project_config else None
if not db_path_legacy or not self._mail_auto_working(
task.id, db_path_legacy):
if db_path_legacy:
handler_legacy.pre_spawn(task.id, db_path_legacy)
else:
return {"level": level.value, "agent_id": agent_id,
"session_id": None, "status": "error",
"reason": "mail_auto_working_failed"}
"reason": "no db_path for handler"}
if hasattr(self.spawner,
'build_spawn_message') and project_config:
@@ -612,20 +584,18 @@ class Dispatcher:
# v2.7.2: on_complete 只含业务逻辑
on_complete_legacy = None
if is_mail_legacy:
if handler_legacy:
_t_id = task.id
_m_db = db_path_legacy
_m_mh = task.must_haves or ""
_disp = self
_h_db = db_path_legacy
_h = handler_legacy
def _mail_oc_legacy(aid, outcome):
def _handler_oc_legacy(aid, outcome):
try:
_disp._mail_auto_complete(
_t_id, aid, _m_db, _m_mh, outcome=outcome)
_h.post_complete(_t_id, aid, outcome, _h_db)
except Exception as e:
logger.error(
"Mail %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _mail_oc_legacy
"Handler %s: legacy on_complete error: %s", _t_id, e)
on_complete_legacy = _handler_oc_legacy
session_id = await self.spawner.spawn_full_agent(
agent_id=agent_id, message=message,
+210
View File
@@ -0,0 +1,210 @@
"""mail_handler.py — Mail 任务 handler。
处理 Agent 间通信飞鸽传书 inform request 两种类型
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.mail")
class MailHandler(BaseTaskHandler):
"""Mail 任务 handler。"""
task_type = "mail"
virtual_project = "_mail"
display_name = "飞鸽传书"
def target_success_status(self) -> str:
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
return self._auto_mark_working(task_id, db_path)
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 3 个 section。"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def get_sections(self) -> list:
return [MailContextSection(), MailApiSection(), MailConstraintsSection()]
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""Mail 完成验证:区分 inform/request。
- inform: 始终通过通知已阅即 done不需要检查产出
- request: 检查是否已回复
"""
performative = self._parse_performative(task_id, db_path)
if performative == "inform":
return VerifyResult(True, "inform_auto", f"performative={performative}")
# request: 检查是否已回复
has_reply = self._check_reply(task_id, db_path)
if has_reply:
return VerifyResult(True, "has_reply", f"performative={performative}")
return VerifyResult(False, "no_reply", f"performative={performative}")
# post_complete 由基类 BaseTaskHandler 统一处理(crash→verify→mark→notify
# inform: verify 始终通过 → 基类 mark done ✅
# request 有回复: verify 通过 → 基类 mark done ✅
# request 无回复: verify 失败 → 基类调 on_failure ✅
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""request 验证失败 → 标 failed + 通知发件人"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Mail %s: request verify failed (%s), marked failed",
task_id, verify.reason)
# 通知发件人
try:
from src.daemon.mail_notify import notify_mail_failed
notify_mail_failed(db_path, task_id, "no_reply_found")
except Exception as e:
logger.warning("Mail %s: failed to send notification: %s", task_id, e)
# === 内部方法 ===
def _parse_performative(self, task_id: str, db_path: Path) -> str:
"""解析 mail 类型(inform/request"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
return meta.get("performative", meta.get("type", "request"))
finally:
conn.close()
except Exception:
pass
return "request"
def _check_reply(self, task_id: str, db_path: Path) -> bool:
"""检查是否已回复(查 tasks 表找 in_reply_to 回复邮件)
dispatcher._mail_check_reply 迁移
Mail 回复机制创建新 taskmust_haves JSON 中包含 in_reply_to = original_task_id
不能查 comments 回复邮件是独立的 task不是 comment
"""
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT id FROM tasks WHERE id != ? AND must_haves LIKE ? LIMIT 1",
(task_id, f'%{task_id}%'),
).fetchone()
return row is not None
finally:
conn.close()
except Exception as e:
logger.error("Mail %s: check reply error: %s", task_id, e)
# 查询失败时保守处理:假设有回复(避免误标 failed)
return True
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查:检查是否已回复"""
return self._check_reply(task_id, db_path)
# ===================================================================
# Mail PromptSections
# ===================================================================
class MailContextSection:
"""邮件上下文段 — 发件人/收件人/主题/内容,区分 inform/request。"""
name: str = "mail_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
if context.mail_type == "inform":
return self._render_inform(context)
return self._render_request(context)
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
return True
@staticmethod
def _render_inform(context: PromptContext) -> str:
return (
f"你收到一封飞鸽传书(纯通知)。\n\n"
f"发件者: {context.from_agent}\n"
f"主题: {context.title}\n"
f"内容: {context.description}\n\n"
f"已阅即可。如需回复,用 in_reply_to 回复发件者(不需要填 to)。\n"
f"⚠️ 不要执行任何状态转换命令。"
)
@staticmethod
def _render_request(context: PromptContext) -> str:
return (
f"你收到一封飞鸽传书,需要你处理并回复。\n\n"
f"发件者: {context.from_agent}\n"
f"主题: {context.title}\n"
f"内容: {context.description}\n\n"
f"### 如何回复发件者\n\n"
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
f" -H 'Content-Type: application/json' \\\n"
f' -d \'{{"from": "{context.agent_id}", '
f'"in_reply_to": "{context.task_id}", '
f'"title": "回复: {context.title}", '
f'"text": "你的回复内容"}}\'\n\n'
f"⚠️ 不需要填 \"to\",系统自动回复给发件者。"
)
class MailApiSection:
"""Mail API 操作指令段。"""
name: str = "mail_api"
priority: int = 40
def render(self, context: PromptContext) -> str:
return (
f"### 如何给其他人发新邮件\n\n"
f'curl -s -X POST http://localhost:8083/api/mail \\\n'
f" -H 'Content-Type: application/json' \\\n"
f' -d \'{{"from": "{context.agent_id}", '
f'"to": "对方agent-id", '
f'"title": "标题", '
f'"text": "正文", '
f'"type": "inform"}}\'\n\n'
f"⚠️ to 必须是有效的 agent id\n"
f"⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
)
def should_include(self, context: PromptContext) -> bool:
return context.mail_type == "request"
class MailConstraintsSection:
"""Mail 硬约束段。"""
name: str = "mail_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str: # noqa: ARG002
return (
"## 硬约束\n\n"
"1. ⚠️ 不要执行任何状态转换命令(标 working/done/review/failed 等),系统会自动处理。\n"
"2. ⚠️ 不能给自己发邮件\n"
"3. ⚠️ 发邮件时 to 必须是有效的 agent id\n"
"4. ⚠️ 纯通知用 type=inform,需要对方回复不填 type(默认 request)"
)
def should_include(self, context: PromptContext) -> bool: # noqa: ARG002
return True
+127
View File
@@ -0,0 +1,127 @@
"""
prompt_composer.py PromptSection Protocol + PromptContext + PromptComposer
拼装器有序管理 prompt 段落按优先级排序后合并为最终 prompt
"""
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
logger = logging.getLogger("moziplus-v2.prompt_composer")
# ---------------------------------------------------------------------------
# Section 优先级范围约定
# ---------------------------------------------------------------------------
PRIORITY_CONTEXT = 10 # 任务上下文
PRIORITY_PRIOR = 20 # 前序信息
PRIORITY_ROLE = 30 # 角色规范
PRIORITY_API = 40 # API 操作指令
PRIORITY_CONSTRAINTS = 50 # 硬约束
PRIORITY_EXTENSION = 60 # 扩展段
# ---------------------------------------------------------------------------
# PromptSection Protocol
# ---------------------------------------------------------------------------
@runtime_checkable
class PromptSection(Protocol):
"""一个 prompt 段"""
name: str # 段名(去重用,同名覆盖)
priority: int # 排序优先级(小数字=靠前)
def render(self, context: "PromptContext") -> str:
"""渲染此段的文本内容。返回空字符串表示不注入。"""
...
def should_include(self, context: "PromptContext") -> bool:
"""是否注入此段(默认 True,条件段可覆盖)。"""
...
# ---------------------------------------------------------------------------
# PromptContext 数据对象
# ---------------------------------------------------------------------------
@dataclass
class PromptContext:
"""Prompt 渲染的统一上下文"""
task_id: str
title: str
description: str
must_haves: str
project_id: str
agent_id: str
task: Optional[Dict] = None
role: str = "executor"
spawn_type: str = "executor"
# mail 专用
from_agent: str = ""
mail_type: str = "" # inform / request
# toolchain 专用
event_type: str = "" # ci_failure / review_request / ...
event_data: Dict = field(default_factory=dict)
# 前序产出
depends_on_outputs: Optional[List] = None
# ---------------------------------------------------------------------------
# PromptComposer 拼装器
# ---------------------------------------------------------------------------
class PromptComposer:
"""有序拼装 prompt sections"""
SEPARATOR = "\n\n---\n\n"
TOKEN_BUDGET_WARN = 800 # token 预算警告阈值
CHARS_PER_TOKEN = 3.5 # 估算比率
def __init__(self) -> None:
self._sections: List[PromptSection] = []
def add(self, section: PromptSection) -> None:
"""添加一个 section(同名覆盖)"""
self._sections = [s for s in self._sections if s.name != section.name]
self._sections.append(section)
def add_many(self, sections: List[PromptSection]) -> None:
"""批量添加"""
for s in sections:
self.add(s)
def compose(self, context: PromptContext) -> str:
"""拼装最终 prompt
1. 过滤 should_include=False 的段
2. priority 排序
3. 逐段 render
4. 过滤空段
5. 用分隔符连接
6. Token 预算警告不截断
"""
active = [s for s in self._sections if s.should_include(context)]
active.sort(key=lambda s: s.priority)
parts = [s.render(context) for s in active]
parts = [p for p in parts if p.strip()]
result = self.SEPARATOR.join(parts)
# Token 估算
tokens = max(1, int(len(result) / self.CHARS_PER_TOKEN))
logger.debug(
"Composed prompt from %d sections, %d tokens",
len(parts), tokens,
)
if tokens > self.TOKEN_BUDGET_WARN:
logger.warning(
"Prompt exceeds %d token budget: %d tokens (task_id=%s)",
self.TOKEN_BUDGET_WARN, tokens, context.task_id,
)
return result
+32 -6
View File
@@ -19,6 +19,8 @@ from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.spawner")
from src.daemon.task_type_registry import TaskTypeRegistry
# ── Prompt 模板 ──
@@ -278,10 +280,29 @@ class AgentSpawner:
task_id, title, description, must_haves,
project_id, agent_id)
# mail 任务用精简模板
if project_id == "_mail":
return self._build_mail_prompt(
task_id, title, description, must_haves, agent_id)
# handler 路径:Task/Mail/Toolchain 用各自的 PromptSection 构建
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
from src.daemon.prompt_composer import PromptContext
# 从 must_haves 解析 mail 元数据(from / performative
from_agent = ""
mail_type = ""
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", "")
mail_type = meta.get("performative", meta.get("type", ""))
except Exception:
pass
ctx = PromptContext(
task_id=task_id, title=title, description=description or "",
must_haves=must_haves or "", project_id=project_id,
agent_id=agent_id, role=spawn_type,
spawn_type=spawn_type,
from_agent=from_agent, mail_type=mail_type,
)
return handler.build_prompt(ctx)
# 旧路径保留:_general 等非 handler 项目
# 走 BootstrapBuilder 新路径
if self.bootstrap_builder and task is not None:
@@ -321,8 +342,13 @@ class AgentSpawner:
def _build_api_section(self, project_id: str, task_id: str,
agent_id: str) -> str:
"""构建 API 回写操作指令(BootstrapBuilder 模式下补充)"""
# mail 任务直接 done,不走 review
success_status = '"done"' if project_id == "_mail" else '"review"'
# handler 项目(_mail/_toolchain)的 success_status 由 PromptSection 处理
# 这里只处理无 handler 的项目(normal task
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
success_status = '"done"' if handler.target_success_status == "done" else '"review"'
else:
success_status = '"review"'
return f"""## 操作指令
### 状态回写
+378
View File
@@ -0,0 +1,378 @@
"""task_handler.py — 黑板任务 handlertask_type='task')。
标准黑板任务三信号验证 review 状态
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Dict, List, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
TERMINAL_STATES = frozenset({"review", "done", "failed", "cancelled"})
# ---------------------------------------------------------------------------
# Role → Skill 映射(D8 决策:L2 只给索引+引导语,不注全文)
# ---------------------------------------------------------------------------
ROLE_SKILL_MAP: Dict[str, str] = {
"executor": "blackboard-executor",
"reviewer": "blackboard-reviewer",
"reviewer-simayi": "blackboard-reviewer-simayi",
"reviewer-pangtong": "blackboard-reviewer-pangtong",
"planner": "blackboard-planner",
"claim": "blackboard-claim",
}
SKILL_BASE_PATH = "/Users/chufeng/.sanguo_projects/sanguo_mozi/skills"
# ---------------------------------------------------------------------------
# PromptSection 实现
# ---------------------------------------------------------------------------
class TaskContextSection:
"""段 1:任务上下文(title / desc / must_haves / status)。"""
name: str = "task_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
parts = ["## 任务上下文"]
if context.task_id:
parts.append(f"任务ID: {context.task_id}")
if context.title:
parts.append(f"标题: {context.title}")
if context.description:
parts.append(f"描述: {context.description}")
if context.must_haves:
parts.append(f"必须完成: {context.must_haves}")
if context.task and context.task.get("status"):
parts.append(f"当前状态: {context.task['status']}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.task_id or context.title)
class PriorOutputsSection:
"""段 2:前序产出摘要(depends_on 非空时注入)。"""
name: str = "prior_outputs"
priority: int = 20
def render(self, context: PromptContext) -> str:
outputs = context.depends_on_outputs or []
parts = ["## 前序产出"]
for out in outputs:
tid = out.get("task_id", "?")
summary = out.get("summary", "无摘要")
parts.append(f"- [{tid}] {summary}")
return "\n".join(parts)
def should_include(self, context: PromptContext) -> bool:
return bool(context.depends_on_outputs)
class RoleSkillSection:
"""段 3:角色 Skill 索引+引导语(D8 决策:不注全文)。"""
name: str = "role_skill"
priority: int = 30
def render(self, context: PromptContext) -> str:
skill_name = ROLE_SKILL_MAP.get(context.role, "")
lines = [
"## 角色操作规范",
f"你的角色:{context.role}",
]
if skill_name:
lines.append(f"对应 Skill{skill_name}")
lines.append(
f"请用 read 工具读取 {SKILL_BASE_PATH}/{skill_name}/SKILL.md "
"获取完整操作规范。"
)
else:
lines.append("无对应 Skill 文件,按通用规范执行。")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskApiSection:
"""段 4API 操作指令。"""
name: str = "task_api"
priority: int = 40
API_HOST = "localhost"
API_PORT = 8083
def render(self, context: PromptContext) -> str:
pid = context.project_id
tid = context.task_id
aid = context.agent_id
success_status = '"review"'
base = f"http://{self.API_HOST}:{self.API_PORT}/api/projects/{pid}/tasks/{tid}"
return (
"## 操作指令\n"
"### 状态回写\n"
f"开始工作:\n"
f'curl -X POST {base}/status \\\n'
f' -H "Content-Type: application/json" \\\n'
f' -d \'{{"status": "working", "agent": "{aid}"}}\'\n\n'
"### 写入产出\n"
f'curl -X POST {base}/outputs \\\n'
f' -H "Content-Type: application/json" \\\n'
f" -d '{{\"type\": \"text\", \"content\": \"<your output>\"}}'\n\n"
"### 完成后\n"
f"成功: status → {success_status} | 失败: status → \"failed\""
)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskConstraintsSection:
"""段 5:硬约束。"""
name: str = "task_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
constraints = ["## 硬约束"]
role = context.role
if role == "executor":
constraints.extend([
"- 完成后必须标 review",
"- 产出物不能为空(系统会验证)",
"- handoff comment ≥ 50 字符",
])
elif role.startswith("reviewer"):
constraints.extend([
"- 审查结果必须明确 pass/fail",
"- 评审意见须附证据(文件:行号)",
])
elif role == "planner":
constraints.extend([
"- 需求不清时提问,不要猜",
"- 子任务必须有明确的终态定义",
])
else:
constraints.append("- 按规范完成 assigned 任务")
return "\n".join(constraints)
def should_include(self, context: PromptContext) -> bool:
return True
class TaskHandler(BaseTaskHandler):
"""黑板标准任务 handler。
- verify: 三信号检查output / comment / terminal status
- 成功 review
- 失败 保持 working ticker 重试
- review 完成 读取 verdictapproved mark done
"""
task_type: str = "task"
virtual_project: Optional[str] = None
display_name = "黑板任务"
# === 子类实现 ===
def post_complete(self, task_id: str, agent_id: str,
outcome: str, db_path: Path) -> None:
"""Task on_complete:区分 executor 和 review。
executor: 基类统一流程crash verify mark review
review: handle_review_complete verdict done/keep review
"""
# crash 处理(所有类型共用)
if outcome in self.CRASH_OUTCOMES:
self._rollback_current_agent(db_path, task_id, agent_id)
return
# 检查当前任务状态:如果是 review 状态 → review 完成流程
try:
conn = get_connection(db_path)
try:
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
task_status = row["status"] if row else "unknown"
finally:
conn.close()
except Exception:
task_status = "unknown"
if task_status == "review":
# review 完成流程:只处理正常 outcome
if outcome in ("completed", "session_revived"):
self.handle_review_complete(task_id, db_path)
else:
logger.warning(
"Task %s: review agent %s abnormal outcome=%s, keeping review",
task_id, agent_id, outcome)
else:
# executor 完成流程:基类统一 verify → mark
result = self.verify_completion(task_id, db_path)
if result.passed:
self._mark_task_status(db_path, task_id, self.target_success_status())
logger.info("Task %s: verify passed (%s), marked %s",
task_id, result.reason, self.target_success_status())
else:
logger.info(
"Task %s: verify not passed (%s), leaving working",
task_id, result.reason)
# NOTE: executor verify 不通过时不标 failed,留 working。
# 原因:Agent 可能还在产出中(幻觉门控的后续轮次),
# ticker 超时检查会兜底处理。不调 on_failure 避免误判。
def target_success_status(self) -> str:
"""task 类型验证通过后进 review。"""
return "review"
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""三信号验证:output / comment / terminal status。"""
try:
conn = get_connection(db_path)
try:
# 信号 1terminal status
row = conn.execute(
"SELECT status FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not row:
return VerifyResult(False, "not_found", "task not found",
can_retry=False)
status = row["status"]
if status in TERMINAL_STATES:
return VerifyResult(
True, "terminal_status",
f"status={status}", can_retry=False
)
# 信号 2outputs
output_count = conn.execute(
"SELECT COUNT(*) as cnt FROM outputs WHERE task_id=?",
(task_id,)
).fetchone()["cnt"]
if output_count > 0:
return VerifyResult(
True, "has_output",
f"output_count={output_count}"
)
# 信号 3:非 system 且内容 >= 50 字的 comment
comment_count = conn.execute(
"SELECT COUNT(*) as cnt FROM comments "
"WHERE task_id=? AND author != 'system' "
"AND LENGTH(content) >= 50",
(task_id,)
).fetchone()["cnt"]
if comment_count > 0:
return VerifyResult(
True, "has_comment",
f"comment_count={comment_count}"
)
# 无信号
return VerifyResult(
False, "no_signal",
f"output=0, comment=0, status={status}"
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""task 类型不需要 pre_spawn 逻辑。"""
return True
def get_sections(self) -> list:
"""返回 5 个 PromptSection 实例。"""
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 prompt sections。"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败:不标 failed,保持 working 让 ticker 重试。"""
logger.info(
"Task %s: verify failed (%s, evidence=%s), leaving working for ticker retry",
task_id, verify.reason, verify.evidence
)
# === Review 流程 ===
def handle_review_complete(self, task_id: str, db_path: Path) -> None:
"""Review 完成后处理:读取 verdict → approved 则 mark done
否则 @mention assignee via blackboard comment"""
try:
conn = get_connection(db_path)
try:
# 读取最新 review
review_row = conn.execute(
"SELECT verdict, reviewer, comment FROM reviews "
"WHERE task_id=? ORDER BY created_at DESC LIMIT 1",
(task_id,)
).fetchone()
if not review_row:
logger.warning("Task %s: no review found", task_id)
return
verdict = review_row["verdict"]
reviewer = review_row["reviewer"]
review_comment = review_row["comment"] or ""
# 获取 assignee
task_row = conn.execute(
"SELECT assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if not task_row:
logger.warning("Task %s: task not found for review", task_id)
return
assignee = task_row["assignee"]
if verdict == "approved":
self._mark_task_status(db_path, task_id, "done")
logger.info("Task %s: review approved by %s, marked done",
task_id, reviewer)
else:
# 非 approved:通过 blackboard comment @mention assignee
# 保持 review 状态,让 assignee 自行决定下一步
conn.execute(
"INSERT INTO comments (task_id, author, content, comment_type) "
"VALUES (?, 'system', ?, 'review')",
(task_id,
f"@{assignee} review 未通过 (verdict={verdict}, "
f"reviewer={reviewer}): {review_comment}")
)
conn.commit()
logger.info(
"Task %s: review not approved (%s by %s), "
"@mentioned assignee %s, keeping review status",
task_id, verdict, reviewer, assignee
)
finally:
conn.close()
except Exception as e:
logger.error("Task %s: handle_review_complete error: %s", task_id, e)
+102
View File
@@ -0,0 +1,102 @@
"""
task_type_registry.py Task type handler Protocol + Registry.
启动时一次性加载 handler运行时只读
零依赖不导入项目内其他模块
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, runtime_checkable
if TYPE_CHECKING:
from src.daemon.prompt_composer import PromptContext
logger = logging.getLogger("moziplus-v2.registry")
# ---------------------------------------------------------------------------
# Protocol
# ---------------------------------------------------------------------------
@runtime_checkable
class TaskTypeHandler(Protocol):
"""所有 task type handler 的统一接口。"""
# 属性(通过 __init__ 设置)
task_type: str # 类型标识:'task' | 'mail' | 'toolchain'
virtual_project: Optional[str] # 虚拟项目 ID,如 '_mail'、'_toolchain'。普通任务为 None
def build_prompt(self, context: "PromptContext") -> str:
"""构建 Agent prompt(通过 PromptComposer 拼 section)。"""
...
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""spawn 前业务准备。默认 Truemail/toolchain override 为 auto_working。"""
...
def post_complete(
self,
task_id: str,
agent_id: str,
outcome: str,
db_path: Path,
) -> None:
"""spawn 完成后的业务处理。统一流程:crash→verify→mark→notify。"""
...
def check_completion(self, task_id: str, db_path: Path) -> bool:
"""ticker 级别的完成检查。"""
...
def get_sections(self) -> list:
"""返回此 handler 的 prompt section 列表。"""
...
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
class TaskTypeRegistry:
"""Task type handler 注册表。启动时一次性加载,运行时只读。"""
_handlers: Dict[str, TaskTypeHandler] = {}
@classmethod
def register(cls, handler: TaskTypeHandler) -> None:
"""注册一个 handler。启动时调用一次。"""
if handler.task_type in cls._handlers:
raise ValueError(f"Task type '{handler.task_type}' already registered")
cls._handlers[handler.task_type] = handler
vp = getattr(handler, "virtual_project", None)
logger.info("Registered task type handler: %s (virtual_project=%s)", handler.task_type, vp)
@classmethod
def get_by_project(cls, project_id: str) -> Optional[TaskTypeHandler]:
"""通过 project_id 查找 handler(匹配 virtual_project)。"""
for h in cls._handlers.values():
if h.virtual_project == project_id:
return h
return None
@classmethod
def get(cls, task_type: str) -> Optional[TaskTypeHandler]:
"""通过 task_type 标识查找 handler。"""
return cls._handlers.get(task_type)
@classmethod
def virtual_projects(cls) -> list[str]:
"""返回所有已注册的虚拟项目 ID(ticker 自动发现用)。"""
return [
h.virtual_project
for h in cls._handlers.values()
if h.virtual_project is not None
]
@classmethod
def clear(cls) -> None:
"""清空注册表(仅测试用)。"""
cls._handlers = {}
+33 -25
View File
@@ -19,6 +19,8 @@ from typing import Any, Callable, Coroutine, Dict, List, Optional
from dataclasses import dataclass, field as dc_field
from src.daemon.task_type_registry import TaskTypeRegistry
from src.blackboard.operations import Blackboard
from src.blackboard.db import get_connection
from src.daemon.spawner import AgentBusyError
@@ -215,18 +217,21 @@ class Ticker:
logger.exception("Tick %d _general error", tick_num)
results["projects"]["_general"] = {"error": str(e)}
# 虚拟项目 _mail:飞鸽传书
mail_db = Path(self.registry.root) / "_mail" / "blackboard.db"
if mail_db.exists() and "_mail" not in active_projects:
try:
pr = await self._tick_project("_mail", {
"id": "_mail", "name": "飞鸽传书",
"status": "active", "source": "virtual",
})
results["projects"]["_mail"] = pr
except Exception as e:
logger.exception("Tick %d _mail error", tick_num)
results["projects"]["_mail"] = {"error": str(e)}
# 虚拟项目:从注册表自动发现 + _general 硬编码
for vp in TaskTypeRegistry.virtual_projects():
vp_db = Path(self.registry.root) / vp / "blackboard.db"
if vp_db.exists() and vp not in active_projects:
try:
vp_handler = TaskTypeRegistry.get_by_project(vp)
vp_name = vp_handler.display_name if vp_handler and vp_handler.display_name else vp
pr = await self._tick_project(vp, {
"id": vp, "name": vp_name,
"status": "active", "source": "virtual",
})
results["projects"][vp] = pr
except Exception as e:
logger.exception("Tick %d %s error", tick_num, vp)
results["projects"][vp] = {"error": str(e)}
logger.debug(
"Tick %d complete: %d projects",
@@ -948,9 +953,10 @@ Parent Task ID: {parent_task.id}
now = datetime.utcnow().isoformat()
# 重置到 pending 时清空 assignee(避免残留导致重复路由到同一 Agent)
# 但 Mail 的 assignee 是收件人,永不清空
# handler 虚拟项目(_mail 等)的 assignee 是收件人,永不清空
if new_status == "pending":
if self._current_project_id == "_mail":
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
if handler:
conn.execute(
"UPDATE tasks SET status=?, updated_at=? WHERE id=?",
(new_status, now, task_id),
@@ -1025,15 +1031,16 @@ Parent Task ID: {parent_task.id}
"full", "escalate"):
conn = get_connection(db_path)
try:
# [v2.7.1] Mail 已在 dispatcher 中标 working,跳过 claimed
if project_id == "_mail":
# [Step 5] handler 项目已在 dispatcher 中标 working,跳过 claimed
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
conn.execute(
"UPDATE tasks SET current_agent=? WHERE id=?",
(result["agent_id"], task.id),
)
conn.commit()
dispatched.append(task.id)
logger.info("Dispatched %s to %s (session=%s, mail auto-working)",
logger.info("Dispatched %s to %s (session=%s, handler auto-working)",
task.id, result["agent_id"],
result.get("session_id"))
else:
@@ -1300,8 +1307,9 @@ Parent Task ID: {parent_task.id}
async def _dispatch_reviews(self, db_path: Path,
project_id: str) -> List[str]:
"""扫描 review 状态任务,检查是否有产出,调度审查 Agent"""
# mail 任务不走 review 流程,直接跳过
if project_id == "_mail":
# handler 项目(_mail/_toolchain不走 review 流程
handler = TaskTypeRegistry.get_by_project(project_id)
if handler:
return []
queries = Queries(db_path)
@@ -1470,10 +1478,9 @@ Parent Task ID: {parent_task.id}
elapsed = (now - start_time).total_seconds() / 60.0
if elapsed > timeout_minutes:
# [v2.7.1] Mail 幻觉门控兜底:有回复 + working → done
if self._current_project_id == "_mail":
has_reply = self._mail_check_reply(task.id, db_path)
if has_reply:
# [Step 5] handler 幻觉门控兜底:check_completion 通过 + working → done
handler = TaskTypeRegistry.get_by_project(self._current_project_id)
if handler and handler.check_completion(task.id, db_path):
conn = get_connection(db_path)
try:
ok = self._transition_status(
@@ -1621,8 +1628,9 @@ Parent Task ID: {parent_task.id}
project_dirs[project_id] = self.registry.root / \
project_id / "blackboard.db"
# 虚拟项目
for virtual_id in ("_general", "_mail"):
# 虚拟项目_general + 注册表自动发现
virtual_ids = ["_general"] + TaskTypeRegistry.virtual_projects()
for virtual_id in virtual_ids:
virtual_db = Path(self.registry.root) / \
virtual_id / "blackboard.db"
if virtual_db.exists() and virtual_id not in project_dirs:
+277
View File
@@ -0,0 +1,277 @@
"""toolchain_handler.py — 工具链事件 handler。
处理 Gitea Webhook 事件CI 失败Review 请求Issue 指派等
"""
from __future__ import annotations
import json
import logging
import urllib.request
from pathlib import Path
from typing import Dict
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Toolchain PromptSections
# ---------------------------------------------------------------------------
class ToolchainContextSection:
"""事件类型 + 事件详情(priority=10"""
name: str = "toolchain_context"
priority: int = 10
def render(self, context: PromptContext) -> str:
event_type = context.event_type
event_data: Dict = context.event_data or {}
if event_type in _TEMPLATE_MAP:
# 使用模板引擎渲染已知事件
variables = {k: str(v) for k, v in event_data.items()}
return render_template(event_type, variables)
# fallback:通用事件描述
lines = [f"## 工具链事件", f""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append(f"- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append(f"")
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainApiSection:
"""API 操作指令(priority=40),success_status=done"""
name: str = "toolchain_api"
priority: int = 40
API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str:
lines = [
"## API 操作指令",
"",
f"项目 ID: `{context.project_id}`",
f"任务 ID: `{context.task_id}`",
"",
"### 完成后必须更新任务状态",
"完成后务必通过以下命令将任务标记为 **done**:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"status": "done"}\'',
"```",
"",
"### 提交产出",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```",
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainConstraintsSection:
"""硬约束(priority=50"""
name: str = "toolchain_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
lines = [
"## 硬约束",
"",
"1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成",
"2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态",
"3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务",
"4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done",
"5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务",
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
# ---------------------------------------------------------------------------
# ToolchainHandler
# ---------------------------------------------------------------------------
class ToolchainHandler(BaseTaskHandler):
"""工具链事件 handler。"""
task_type = "toolchain"
virtual_project = "_toolchain"
display_name = "工具链事件"
def target_success_status(self) -> str:
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list:
"""返回 3 个 Toolchain PromptSection 实例"""
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 sections 为最终 prompt"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查行动输出(output 或 comment 有实质内容)"""
try:
conn = get_connection(db_path)
try:
# 检查 output
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}")
# 检查 comment(非系统、有实质内容)
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(content) >= 20",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action", "output=0, comment=0")
finally:
conn.close()
except Exception as e:
logger.error("Toolchain %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 标 failed + Mail API 通知主公"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason)
# 从 db 读取事件上下文
event_type = ""
event_data: Dict = {}
try:
conn = get_connection(db_path)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
event_type = meta.get("event_type", "")
raw = meta.get("event_data", "{}")
event_data = json.loads(raw) if isinstance(raw, str) else raw
conn.close()
except Exception:
pass
self._notify_via_mail_api(
task_id, verify.reason, verify.evidence,
event_type, event_data,
)
def _build_gitea_links(self, event_type: str, event_data: dict) -> str:
"""根据事件类型构建 Gitea 链接。"""
links = []
repo = event_data.get("repo", "")
base_url = "http://192.168.2.154:3000"
if "pr_number" in event_data:
links.append(f"PR: {base_url}/{repo}/pulls/{event_data['pr_number']}")
if "issue_number" in event_data:
links.append(f"Issue: {base_url}/{repo}/issues/{event_data['issue_number']}")
if "commit" in event_data:
links.append(f"Commit: {base_url}/{repo}/commit/{event_data['commit']}")
if "branch" in event_data and "commit" not in event_data:
links.append(f"分支: {event_data['branch']}")
return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)"
def _notify_via_mail_api(
self,
task_id: str,
reason: str,
evidence: str,
event_type: str,
event_data: Dict,
) -> None:
"""通过 Mail API 发送丰富的失败通知给主公。"""
# 构建行动指引
action_hint = "请检查黑板任务并手动处理。"
et_lower = event_type.lower()
if "ci" in et_lower or "deploy" in et_lower:
action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。"
elif "review" in et_lower:
action_hint = "建议查看 PR review 状态,必要时通知相关开发者。"
elif "issue" in et_lower:
action_hint = "建议创建任务派给对应开发者处理 Issue。"
# 构建事件详情
event_details = ""
if event_data:
event_details = "\n".join(
f" - {k}: {v}" for k, v in event_data.items()
)
# 构建 Gitea 链接
gitea_links = self._build_gitea_links(event_type, event_data)
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
text = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {event_type or '未知'}\n"
f"事件详情:\n{event_details or ' (无)'}\n\n"
f"失败原因: {reason}\n"
f"证据: {evidence}\n\n"
f"{gitea_links}\n\n"
f"行动指引: {action_hint}"
)
payload = json.dumps({
"from": "daemon",
"to": "pangtong-fujunshi",
"title": title,
"text": text,
"type": "inform",
}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
"http://localhost:8083/api/mail",
data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
logger.info("Toolchain %s: sent failure notification via Mail API", task_id)
except Exception as e:
logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)
+1 -1
View File
@@ -426,7 +426,7 @@ export default function TaskModal() {
{/* 状态操作 */}
<div style={{ marginBottom: 16 }}>
<SectionLabel icon="🔄" title="状态操作" />
<StatusButtons status={task.status} taskId={task.id} resumedFrom={task.resumed_from} />
<StatusButtons status={task.status} taskId={task.id} resumedFrom={task.resumed_from ?? undefined} />
</div>
{/* v2.7: 子 Task 进度 + 列表 */}
+2 -2
View File
@@ -1,12 +1,12 @@
import pytest
pytestmark = pytest.mark.e2e
skip_no_integration = pytest.mark.skipif(
not __import__("os").environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
)
pytestmark = [pytest.mark.e2e, skip_no_integration]
"""v2.7 端到端测试 — 全链路真实环境
覆盖项目管理 Task CRUD SubTask Stage进度 状态聚合 依赖链 超时 Mail 真实Agent调度