Compare commits

...

73 Commits

Author SHA1 Message Date
cfdaily 17b87290c8 [moz] fix(api): list_tasks 默认排序改为 created_at DESC(最新在前)
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 29s
CI / frontend (pull_request) Successful in 10s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 16:53:35 +08:00
pangtong-fujunshi bd5735f970 Merge PR #76: [moz] refactor(frontend): 工具链 Tab 移入系统设置子页签
Deploy / ci (push) Successful in 9s
Deploy / deploy (push) Successful in 13s
Deploy / notify-deploy-failure (push) Successful in 1s
Deploy / notify-deploy-success (push) Successful in 1s
2026-06-14 08:37:16 +00:00
cfdaily 05f9112fab [moz] refactor(frontend): 工具链 Tab 移入系统设置子页签
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 28s
CI / frontend (pull_request) Successful in 11s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 16:36:34 +08:00
jiangwei-infra b926b35703 Merge PR #75: [moz] fix(ci): 修复 deploy push trigger 不触发问题
Deploy / ci (push) Successful in 9s
Deploy / deploy (push) Successful in 13s
Deploy / notify-deploy-failure (push) Successful in 0s
Deploy / notify-deploy-success (push) Successful in 0s
2026-06-14 08:31:35 +00:00
jiangwei-infra 8df1d4a83c Merge branch 'main' into fix/cd-push-trigger-yaml
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 27s
CI / frontend (pull_request) Successful in 11s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 08:30:20 +00:00
cfdaily aad5a6b317 [moz] fix(ci): 修复 deploy push trigger 不触发问题
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
根因:deploy.yml notify-deploy-success job 中 python3 -c 使用多行字符串,
Python 代码零缩进(column 0)破坏了 YAML literal block scalar (run: |),
导致 Gitea YAML 解析器报错 'line 114: could not find expected :',
在 DetectWorkflows 阶段被静默丢弃,push 事件无法触发 deploy。

Gitea 日志证据:
  ignore invalid workflow "deploy.yml": yaml: line 114: could not find expected ':'

修复:将多行 python3 -c 改为单行,避免零缩进代码行破坏 YAML 块结构。

影响范围:仅 deploy.yml,不影响 ci.yml 和 e2e.yml
验证方式:YAML 解析已通过,合并后观察 push 事件是否触发 Actions
2026-06-14 16:28:41 +08:00
pangtong-fujunshi ad34750075 Merge PR #74: [moz] ci: CI 管道新增 frontend build job 2026-06-14 08:14:15 +00:00
cfdaily cd7e24cd3c [moz] ci: CI 管道新增 frontend build job(tsc + vite build)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / frontend (pull_request) Successful in 40s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 16:12:05 +08:00
pangtong-fujunshi 0521b7b6f0 Merge PR #73: [moz] feat(frontend): 工具链 Tab 2026-06-14 07:24:24 +00:00
cfdaily fc30f91183 [moz] feat(frontend): 新增工具链 Tab — 列表+详情+搜索栏
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 15:22:34 +08:00
pangtong-fujunshi 8c72ff0565 Merge PR #72: [moz] refactor(api): API 拆分 + expand 聚合 + 搜索 2026-06-14 06:55:08 +00:00
cfdaily cc2e5aa64c [moz] fix(api): Review M1 修复 — expand=all 保持旧格式 + _toolchain 加入 _VIRTUAL_PROJECTS
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 30s
CI / notify-on-failure (pull_request) Successful in 0s
- M1: expand=all 保持旧 list 格式(向后兼容 TaskModal .map()/.length)
- 细粒度 expand=comments,events 用新 {items,total_count,limit} 格式
- S1(PR#73): _toolchain 加入 _VIRTUAL_PROJECTS
- S1(PR#72): 移除 _validate_project 未使用 import
2026-06-14 14:22:14 +08:00
cfdaily d09fd4a173 [moz] fix(api): flake8 lint 修复 — 移除未使用 import
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-14 14:20:33 +08:00
cfdaily 5db4c89fe7 [moz] refactor(api): 拆分 blackboard_routes → task_routes + task_relation_routes + shared + expand 细粒度聚合
CI / lint (pull_request) Failing after 9s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 2s
2026-06-14 14:02:59 +08:00
pangtong-fujunshi e70816a69f Merge PR #71: [moz] docs: §18 API 聚合重构 + 工具链 Tab 设计 2026-06-14 05:57:21 +00:00
cfdaily 33521b8b39 [moz] docs: §18 职责分离 — 测试详细代码移入 18-test-design.md
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 27s
CI / notify-on-failure (pull_request) Successful in 0s
- 主文档 §6 只保留概要表格 + 文件指向
- 测试 fixture/完整代码/覆盖矩阵 → 18-test-design.md
- 删除误加的 GATE/委派/wiki 章节
- CI 集成改为表格格式 + 引用
2026-06-14 13:56:47 +08:00
cfdaily f55a037c98 [moz] docs: §18 API 聚合重构 + 工具链 Tab 设计文档
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 1s
- 18-api-refactor-and-toolchain-tab.md: 主设计(9章+实施约束)
  - 后端拆分方案 B(task_routes + task_relation_routes + shared)
  - expand 细粒度聚合(comments/events 带 limit+total_count)
  - 任务列表搜索参数 q
  - 工具链 Tab 设计(仿 MailPanel + 搜索栏)
  - GATE 门控 + 委派原则 + wiki 查询规则
  - 司马懿已审(mail-1781415763066)
- 18-test-design.md: 测试用例详细设计(34 个用例 + CI 集成)
- tests/scripts/verify_api_compat.sh: 路由兼容性验证脚本
2026-06-14 13:53:56 +08:00
pangtong-fujunshi 923971ad92 Merge PR #70: [moz] docs: 重写 §26 Gitea 协作规范设计 2026-06-14 04:09:22 +00:00
cfdaily 3f07c528b6 [moz] docs: 重写 §26 Gitea 协作规范设计(流水账→设计文档风格)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 1s
- §26 从落地流水账重写为正式设计文档
- 新增 26.1 设计目标(三个问题 + 目标)
- 新增 26.2 规范设计(标题规范/Label 体系/模板,含设计决策)
- 新增 26.3 执行机制(四层注入 + L2 priority 设计理由)
- 新增 26.4 Label 迁移策略(旧标签保留,不做批量迁移)
- 新增 26.5 与其他章节的交叉引用关系
- 保留 26.6 实现记录(压缩为记录而非主体)
- §4.5 末尾加交叉引用指向 §26
- P3 色值从 #c5def5 改为 #cfd3d7(解决与 type/impl 混淆)
2026-06-14 12:07:47 +08:00
pangtong-fujunshi 207c2aaaef Merge PR #69: [moz] feat: Gitea 协作规范落地 — 标题前缀+Label+模板+L2注入 2026-06-14 03:53:48 +00:00
cfdaily a89a70a983 feat: Gitea 协作规范落地 — 标题前缀+Label+模板+L2注入
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 30s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 11:51:57 +08:00
pangtong-fujunshi 1c939bfa27 Merge PR #68: impl: #16 知识注入 L2 引擎层 — WikiGuideSection 2026-06-14 02:35:34 +00:00
cfdaily 080d1d0b23 impl: #16 知识注入 L2 引擎层 — WikiGuideSection
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 31s
CI / notify-on-failure (pull_request) Successful in 0s
- prompt_composer.py: 新增 WikiGuideSection 类(priority=60)
- task_handler.py: get_sections() 注入 WikiGuideSection
- mail_handler.py: get_sections() 注入 WikiGuideSection
- toolchain_handler.py: get_sections() 注入 WikiGuideSection

L0(gate-enforcer wiki-rule)和 L1(SOUL.md Red Flags)不在本仓库,
直接在对应文件修改。

设计文档:docs/design/16-knowledge-injection.md(v2 已合并)
2026-06-14 10:32:26 +08:00
pangtong-fujunshi d1ef64b5cc Merge PR #67: docs: #16 知识注入设计 v2 — 对齐 #11 四层架构 2026-06-14 02:25:12 +00:00
cfdaily e83ad1de73 docs: #16 知识注入设计 v2 — 对齐 #11 四层架构
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
- 层级命名统一到 #11 体系(L0/L1/L2/L3),不再自创命名
- L0: 新增 wiki 查询铁律(做方案前先查 + 查不到记 gap)
- L1: TOOLS.md 速查表(已完成)+ SOUL.md Red Flags(待实现)
- L2: 三种 handler(task/mail/toolchain)各注入 WikiGuideSection
- L3: wiki-query Skill(已部署,待确认 extraDirs 递归)
- 运维层: gap 闭环 cron job(已有,需完善)
2026-06-14 10:22:16 +08:00
pangtong-fujunshi a27ea8ed89 Merge PR #59: docs: #16 知识注入设计(三层触发机制) 2026-06-13 23:11:37 +00:00
cfdaily 146816303f fix: M1 修复 §07 文件路径引用(24→15) + M2 D16-6 标题引用(#05→#11)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 07:10:04 +08:00
cfdaily 11349b5225 docs: 新增 #16 知识注入设计(三层触发机制) 2026-06-14 07:10:04 +08:00
pangtong-fujunshi a037497053 Merge PR #66: ci: pip upgrade + --no-cache-dir 2026-06-13 16:38:26 +00:00
cfdaily f6f26d7763 ci: pip upgrade + --no-cache-dir 防旧 pip dist-info 损坏(姜维建议)
CI / lint (pull_request) Successful in 13s
CI / test (pull_request) Successful in 29s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:35:15 +08:00
pangtong-fujunshi 920bc75c53 Merge PR #65: feat: §17 ToolchainHandler 强约束实现 2026-06-13 16:33:18 +00:00
cfdaily 976d9ce7c8 fix: M1 git rm 误提交的安装目录文件 + S1 docstring 修正 + S2 去掉 CHECK 约束(司马懿 Review #111)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 23s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:22:13 +08:00
cfdaily fd3a889fae ci: 每次清 venv 防止旧缓存损坏
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 22s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:17:44 +08:00
cfdaily 925ebe8556 ci: 加 debug 信息定位 test failure 根因
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 2s
2026-06-14 00:16:11 +08:00
cfdaily 4ef9f68ff3 fix(ci): PYTHONPATH=. 防止 runner 环境加载安装目录旧代码
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Failing after 5s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-14 00:13:01 +08:00
cfdaily 50d1d0b5e6 chore: trigger CI retry
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-14 00:11:23 +08:00
cfdaily 6e6b52fe3b fix: asyncio.Lock 懒加载防 event loop 关闭后 import 失败
CI / lint (pull_request) Successful in 12s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:09:27 +08:00
cfdaily 3bca794902 fix: M2 on_failure assignee 从 tasks 表读取 + infrastructure 防递归(司马懿 Review #65)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Failing after 9s
CI / notify-on-failure (pull_request) Successful in 1s
M2: on_failure 中 assignee = meta.get('from', '') 读到 'system' 而非实际 Agent
→ 改为 SELECT must_haves, assignee FROM tasks 直接读 tasks.assignee 字段

附带:infrastructure failure 改为直接 DB INSERT,不走 _send_toolchain_task 防递归
2026-06-13 23:47:12 +08:00
cfdaily a5d5d2d974 fix: P0 token 环境变量 + P1 fail_count 逻辑简化(姜维 Review)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 10s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:43:20 +08:00
cfdaily 3b9ad83405 fix(lint): F541 f-string 无占位符去掉 f 前缀
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-13 23:40:56 +08:00
cfdaily c89863a288 feat: §17 ToolchainHandler 强约束实现(Step 1-4)
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s
Step 1: 基础设施
- prompt_composer.py: PromptContext 新增 action_type + action_steps 字段
- spawner.py: handler 路径提取 action_type/action_steps 传入 PromptContext
- db.py: comments CHECK 约束加入 action_report

Step 2: ToolchainHandler 强化
- ToolchainContextSection: 加 steps 渲染 + action_hint(按 action_type)
- ToolchainApiSection: 改为 action_report 提交指引 + Gitea 协作指引
- ToolchainConstraintsSection: 5 条强约束 + Red Flags 防self-rationalization
- verify_completion: action_report → output → comment 三层 fallback
  - review_merged 始终通过(纯通知)
  - infrastructure_failure 始终通过(防递归)
  - 修复 LENGTH(content) → LENGTH(body) bug
- on_failure 三分路: 业务→Gitea PR comment / 系统→Gitea Issue / 基础设施→toolchain task

Step 3: toolchain_routes 改造
- 新增 _toolchain_db_path() + _send_toolchain_task()
- 所有 8 个 handler 改为 _send_toolchain_task
- _send_mail 保留但不再被 toolchain handler 调用
- _send_deploy_failure_mail → _send_deploy_failure_task

Step 4: 测试
- 29 个单元测试全部通过
- 全量 456 passed, 3 skipped, 0 failures
2026-06-13 23:36:44 +08:00
pangtong-fujunshi 90f4e3284c Merge PR #64: §17 toolchain/mail 完全分离——Gitea 为唯一协作媒介 2026-06-13 15:09:33 +00:00
cfdaily 946f7e1848 fix(design): §17 M1 §5.2 三分路展开 + M2 约束编号 #5 + S1/S2 一致性
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:08:50 +08:00
cfdaily 409e4ee51d fix(design): §17 清理 3 处遗留 Mail 引用
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:04:30 +08:00
cfdaily d1c0984082 refactor(design): §17 toolchain/mail 完全分离——Gitea 为唯一协作媒介
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:03:06 +08:00
pangtong-fujunshi 5be32bd0b8 Merge PR #63: §17 ToolchainHandler 强约束设计 2026-06-13 14:10:19 +00:00
cfdaily 71bab93308 fix(design): §17 M1 verify_completion 列名 content→body(司马懿 Review)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 22:09:18 +08:00
cfdaily 023de9862f feat(design): §17 ToolchainHandler 强约束设计(取代 action Mail 类型)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- 新增 17-toolchain-handler-enforcement.md 完整设计文档
- 标记 17-action-mail-type.md 为 SUPERSEDED
- 方向修正:让 toolchain 事件回归 ToolchainHandler(§14 已有架构)
2026-06-13 22:04:08 +08:00
pangtong-fujunshi 626e13c0d1 Merge PR #62: feat(design): §17 action Mail 类型设计 v2.0 2026-06-13 13:37:58 +00:00
cfdaily 6a7fe37d93 fix(design): §17 Review 反馈修复(M1+S1-S3+姜维关注点)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 21:37:11 +08:00
cfdaily 5bc53192d6 feat(design): §17 action Mail 类型设计 v2.0
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
新增 action Mail 类型(inform/request/action 三分),解决工具链事件
被 inform 语义导致流程断链的问题。

核心设计:
- 五层 Prompt 架构(L0 语义层 → L4 约束层)
- 10 个场景完整提示词设计(8 action + 1 inform + 1 补充)
- Steps API 调用级别,信息层与指令层严格分离
- 两个维度完整性验证(Webhook event × 流程流转)
- 防降级三层设计(语义层 + 结构层 + Red Flag 表)
- action_report comment 验证机制

参考优秀实践:Hermes Tool-Use Enforcement、Superpowers Red Flags、
moziplus L1/L2/L3 三层上下文模型
2026-06-13 21:30:35 +08:00
pangtong-fujunshi 2c2d8b55c9 Merge PR #61: fix(docs): 文件路径引用修正 2026-06-13 06:46:52 +00:00
cfdaily cb81251111 fix(docs): 24-compact-detection-fix → 15-compact-detection-fix 路径引用修正
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 14:45:34 +08:00
pangtong-fujunshi a8a7164335 Merge PR #60: fix synchronize dispatch 2026-06-13 06:44:07 +00:00
cfdaily fe7f914681 fix: _handle_pull_request 补充 synchronize action dispatch
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
姜维排查发现 _handle_pull_request 只处理 opened/closed,
Gitea 发 pull_request + action=synchronize 时被静默丢弃。
_handle_pr_synchronize 已存在但未被 dispatch 到。

修复:加 elif action == synchronize dispatch。
pull_request_sync 注册保留作为双保险。
2026-06-13 14:42:38 +08:00
cfdaily eccb4d2723 docs: 设计文档编号重排(20→14, 24→15) + 已完成文档状态标注更新
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 10:12:39 +08:00
pangtong-fujunshi 9e2145171a Merge PR #57 2026-06-13 01:36:24 +00:00
cfdaily 67cad2dd96 fix: _REASON_MAP 补 agent_error 条目(G2)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
spawner 会产生 agent_error reason,之前缺映射走到 _default 显示'未知原因'。
2026-06-13 09:35:15 +08:00
pangtong-fujunshi 79da0bd07e Merge PR #56 2026-06-13 01:34:39 +00:00
cfdaily a116f7e6c0 fix: 注释拼写 must_hives → must_haves
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 09:33:59 +08:00
cfdaily 7fb4d988ec fix: lint 修复 + api_error 测试更新
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
- mail_notify: f-string 反斜杠修复、行过长修复、unused import
- test_classify_outcome: api_error should_retry 改 True
2026-06-13 09:29:52 +08:00
cfdaily f4dd9ff78d feat(daemon): Mail 失败通知 v2.0 — api_error retry + 通知增强
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s
P1: api_error rate_limit/500/503 改为可恢复 retry(should_retry=True,60s cooldown)
P2: 通知模板动态化(reason 人话翻译 + detail 信息 + 重试情况 + AI Native 知识库)

设计文档:§20.7 (20-task-type-architecture.md)
2026-06-13 09:27:17 +08:00
pangtong-fujunshi 6520e78c0b Merge PR #55 2026-06-13 01:23:33 +00:00
cfdaily 0169823b72 chore(docs): 合并 mail-failure-notification 到 §20,更新设计方案
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- mail-failure-notification.md → archive-3.0/
- §20 新增 §20 Mail 失败通知机制(v2.0 AI Native)
  - 失败场景与重试耗时完整表
  - reason 人话翻译映射
  - 通知模板增强(detail 传入 + 重试情况)
  - api_error rate_limit 待改为可恢复 retry
- §18→§21,§19→§22 编号顺延
2026-06-13 09:22:32 +08:00
pangtong-fujunshi 77252c39c6 Merge PR #54 2026-06-13 00:59:11 +00:00
cfdaily 5a80d6c5cd chore(docs): gateway-watchdog.md 改编号 99
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 08:58:04 +08:00
pangtong-fujunshi 322263585d Merge PR #53 2026-06-13 00:54:39 +00:00
cfdaily c7b4b262b1 chore(docs): 归档 §13-sim §18 §21 §25 至 archive-3.0
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- 13-toolchain-and-dev-workflow-simulation.md → archive-3.0/(模拟报告,§16 已覆盖)
- 18-toolchain-e2e-test.md → archive-3.0/(E2E 测试记录,§13 已引用)
- 21-e2e-verification-handler.md → archive-3.0/(Handler 验证,§20 §19 已覆盖)
- 25-gitea-mention-toolchain.md → archive-3.0/(@mention 集成,§13 §16 已覆盖)
2026-06-13 08:53:23 +08:00
pangtong-fujunshi e43d87f3db Merge PR #52 2026-06-13 00:53:09 +00:00
cfdaily b07e311921 chore(docs): 归档 §22 §23 至 archive-3.0,§13 追加 §7.6
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
- 22-cd-production.md → archive-3.0/(部署成功通知草案)
- 23-toolchain-pr-lifecycle.md → archive-3.0/(PR 全生命周期,已由 §13 §16 覆盖)
- §13 §7 新增 §7.6 部署成功通知(草案引用)
2026-06-13 08:51:35 +08:00
pangtong-fujunshi 6ca9b19876 Merge PR #51 2026-06-13 00:50:36 +00:00
cfdaily 98eb15125d chore(docs): 归档 §20 审查文档至 archive-3.0,追加审查历史
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- review-v3-vs-head-pangtong.md → archive-3.0/
- review-v3-vs-head-simayi.md → archive-3.0/
- step5-audit-report.md → archive-3.0/
- step5-impact-analysis.md → archive-3.0/
- §20 新增 §19 审查与验证历史(关键发现+修复状态汇总)
2026-06-13 08:49:41 +08:00
pangtong-fujunshi a01bedb193 Merge PR #50 2026-06-13 00:35:50 +00:00
51 changed files with 6633 additions and 502 deletions
+34
View File
@@ -0,0 +1,34 @@
name: Bug 报告
about: 报告一个 Bug
title: "[moz] bug: "
labels:
- type/bug
body:
- type: textarea
id: description
attributes:
label: Bug 描述
description: 清晰描述什么行为是错误的
validations:
required: true
- type: textarea
id: reproduce
attributes:
label: 复现步骤
validations:
required: true
- type: textarea
id: expected
attributes:
label: 预期行为
validations:
required: true
- type: dropdown
id: priority
attributes:
label: 优先级
options:
- P0 紧急
- P1 高
- P2 中
- P3 低
+27
View File
@@ -0,0 +1,27 @@
name: 功能需求
about: 提出一个新功能需求
title: "[moz] feat: "
labels:
- type/feat
body:
- type: textarea
id: description
attributes:
label: 需求描述
description: 你希望实现什么功能?为什么需要?
validations:
required: true
- type: textarea
id: solution
attributes:
label: 建议方案
description: 如果有初步想法可以写 here
- type: dropdown
id: priority
attributes:
label: 优先级
options:
- P0 紧急
- P1 高
- P2 中
- P3 低
+20
View File
@@ -0,0 +1,20 @@
name: 测试任务
about: 创建一个测试任务(E2E、集成测试等)
title: "[moz] test: "
labels:
- type/test
body:
- type: textarea
id: description
attributes:
label: 测试目标
description: 要验证什么场景?
validations:
required: true
- type: textarea
id: steps
attributes:
label: 测试步骤
description: 关键步骤或验收标准
validations:
required: true
+26
View File
@@ -0,0 +1,26 @@
## 改动概述
<!-- 一句话说明这个 PR 做了什么 -->
## 关联 Issue
<!-- #issue_number,如果没有关联可删掉 -->
## 改动类型
- [ ] feat: 新功能
- [ ] impl: 实现
- [ ] fix: 修复
- [ ] docs: 文档
- [ ] test: 测试
- [ ] refactor: 重构
- [ ] ci: CI/CD
- [ ] chore: 杂项
## 检查清单
- [ ] 标题格式正确:`[代号] type(scope): 简述`
- [ ] 改动在开发目录(`~/.openclaw/sanguo_projects/`)完成
- [ ] 已同步到安装目录(`~/.sanguo_projects/`
- [ ] 已运行测试(如适用)
- [ ] 已更新相关设计文档(如适用)
+38 -5
View File
@@ -27,6 +27,7 @@ jobs:
- name: Setup Python
run: |
python3 -m venv /tmp/ci-venv-lint
/tmp/ci-venv-lint/bin/pip install --quiet --upgrade pip
/tmp/ci-venv-lint/bin/pip install --quiet flake8
- name: Lint with flake8
@@ -42,19 +43,49 @@ jobs:
- name: Setup Python
run: |
rm -rf /tmp/ci-venv-test
python3 -m venv /tmp/ci-venv-test
/tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
/tmp/ci-venv-test/bin/pip install --quiet --upgrade pip
/tmp/ci-venv-test/bin/pip install --quiet --no-cache-dir fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Debug environment
run: |
echo "PWD=$(pwd)"
echo "PYTHONPATH=$PYTHONPATH"
python3 -c "import sys; [print(p) for p in sys.path if 'sanguo' in p.lower() or 'openclaw' in p.lower()]"
grep -c "assignee = agent_id" src/daemon/toolchain_handler.py || true
grep -c "_BUSINESS_FAIL_THRESHOLD" src/daemon/toolchain_handler.py || true
- name: Run tests (exclude E2E)
run: |
/tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q || \
(echo '=== RETRY WITH VERBOSE ===' && \
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -v 2>&1 | tail -30)
# ── Job 3: CI 失败通知 ───────────────────────────────
# ── Job 3: Frontend Build ───────────────────────────
frontend:
runs-on: macos-arm64
needs: lint
steps:
- uses: actions/checkout@v4
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: 20
- name: Install & Build
run: |
cd src/frontend
npm ci || npm install
npm run build
# ── Job 4: CI 失败通知 ───────────────────────────────
# 使用 needs.<job>.result 直接判断,不查询 commit status API
# 根因:notify 自身的 pending status 会污染 commit status 查询结果(竞态条件)
notify-on-failure:
runs-on: macos-arm64
needs: [lint, test]
needs: [lint, test, frontend]
if: always()
steps:
- name: Check results and notify
@@ -62,12 +93,13 @@ jobs:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
LINT_RESULT: ${{ needs.lint.result }}
TEST_RESULT: ${{ needs.test.result }}
FRONTEND_RESULT: ${{ needs.frontend.result }}
run: |
echo "Lint result: $LINT_RESULT"
echo "Test result: $TEST_RESULT"
# 只有 lint 或 test 明确失败时才发通知
if [ "$LINT_RESULT" = "failure" ] || [ "$TEST_RESULT" = "failure" ]; then
if [ "$LINT_RESULT" = "failure" ] || [ "$TEST_RESULT" = "failure" ] || [ "$FRONTEND_RESULT" = "failure" ]; then
echo "CI has failures, sending notification..."
# 如果是 PR 事件,写评论通知
@@ -77,6 +109,7 @@ jobs:
FAILED_JOBS=""
[ "$LINT_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}lint "
[ "$TEST_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}test "
[ "$FRONTEND_RESULT" = "failure" ] && FAILED_JOBS="${FAILED_JOBS}frontend "
curl -sf -X POST \
-H "Authorization: token $GITEA_TOKEN" \
+1 -9
View File
@@ -110,15 +110,7 @@ jobs:
PR_AUTHOR=$(curl --max-time 5 -sf \
-H "Authorization: token $GITEA_TOKEN" \
"$API_URL/repos/$REPO/pulls?state=closed&sort=updated&order=desc&limit=10" | \
python3 -c "
import json, sys
sha = '$COMMIT_SHA'
for pr in json.load(sys.stdin):
merge_sha = pr.get('merge_commit_sha', '') or ''
if merge_sha.startswith(sha) or sha.startswith(merge_sha):
print(pr['user']['login'])
break
" 2>/dev/null || echo "")
python3 -c "import json,sys; sha='$COMMIT_SHA'; matches=[pr['user']['login'] for pr in json.load(sys.stdin) if (pr.get('merge_commit_sha','') or '').startswith(sha) or sha.startswith(pr.get('merge_commit_sha','') or '')]; print(matches[0] if matches else '')" 2>/dev/null || echo "")
# 确定通知对象
if [ -n "$PR_AUTHOR" ]; then
+1 -1
View File
@@ -6,7 +6,7 @@
**基于**: PRD-v3.0 §4 四相架构 + architecture-v3.0.md
**作者**: 庞统(副军师)🐦
**日期**: 2026-05-29
**状态**: 实现完成,待 E2E 验证
**状态**: ✅ 已完成(E2E 验证通过)
**评审**: 司马懿
---
+1 -1
View File
@@ -2,7 +2,7 @@
**日期**: 2026-05-30
**作者**: 庞统
**状态**: 已修订 v1.1(根据司马懿 2026-05-30 评审意见
**状态**: ✅ 已完成(spawner/ticker/dispatcher 全部 use_main_session=True
**前置**: `01-four-phase-loop.md`(四相循环 E2E 验证暴露 session 爆炸问题)
---
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-05-30
> 作者: 庞统(副军师)
> 状态: v1.1 修订(司马懿评审意见已纳入
> 状态: ✅ 已完成(@mention + mention_queue 已实现
> 前置: #02 Main Session + Delegation, #03 Prompt 进化
---
+1 -1
View File
@@ -3,7 +3,7 @@
> 版本: v1.2
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待评审(v1.2
> 状态: ✅ 已完成(_startup_recover 7 个方法已实现
> 前置: spawner-monitor-design.md §5 A0Agent crash 恢复)
> 变更: v1.2 两个关键改进:(1) working→pending 保留 current_agent 让同一 agent 接手;(2) reviewing 精确恢复到前置状态而非硬推 done
+4 -4
View File
@@ -1,6 +1,6 @@
# #07 Spawner Acquire-First 设计
> 状态:#07.1 已实施 ✅ | #07.2 已实施 ✅ | #07.3 设计中
> 状态:✅ 已完成(#07.1-#07.2 已实施)
> 作者:庞统
> 日期:2026-06-01
> 评审:司马懿
@@ -233,9 +233,9 @@ def _revive_session(agent_id: str) -> bool:
pass
```
### 4.5 O5: compact 检测(§24 rotation-only v3
### 4.5 O5: compact 检测(§15 rotation-only v3
§24 设计文档:`docs/design/24-compact-detection-fix.md`
§15 设计文档:`docs/design/15-compact-detection-fix.md`
**检测方法**:读 gateway 日志尾部 2MB,按 sessionKey 过滤 `[compaction] rotated active transcript` 事件。
如果最近的 rotation 事件在 120s 窗口内 → 视为 compact 循环进行中(可能还在 post-compact retry)。
@@ -243,7 +243,7 @@ def _revive_session(agent_id: str) -> bool:
旧方法 `_check_recent_compaction_jsonl`(扫描 session jsonl 的 `type=compaction` 事件)保留作为 fallback。
```python
# §24 v3: compact 检测优先用 gateway 日志 rotation 事件
# §15 v3: compact 检测优先用 gateway 日志 rotation 事件
if result["status"] not in ("idle", "unknown", None):
session_key = f"agent:{agent_id}:main"
result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
+1 -1
View File
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待评审(v1.1
> 状态: ✅ 已完成(rebuttal on_complete + goal gate 已实现
> 前置: #04 黑板协作(@mention+ #08 Classify Outcome
> 关联: T4 审查体系完善
> 变更: v1.1 纳入司马懿评审反馈 — verdict 读 reviews 表 + rebuttal mention spawn 带 on_complete 回调
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待终审(v1.1
> 状态: ✅ 已完成(SSE + TaskModal 自动刷新已实现
> 前置: #04 黑板协作(@mention + comment
> 关联: architecture-v3.0.md T3
> 变更: v1.1 纳入司马懿评审反馈 — checkpoint SSE 触发文件修正为 checkpoint_routes.pySSE payload 统一含 project_id
+167 -10
View File
@@ -1,6 +1,6 @@
# 三国团队工具链与开发流程设计
> **状态**: v3.3 — #19 上下文四层改造合并 + CI 修复 + A13 修订
> **状态**: ✅ 已完成(E2E 验证通过,所有 8 步 PASS)
> **作者**: 庞统(副军师)🐦
> **评审**: 司马懿(仲达)🗡️
> **日期**: 2026-06-06
@@ -245,11 +245,14 @@ pm2 show sanguo-act-runner # 详情
|------|------|
| `feat` | 新功能 |
| `fix` | Bug 修复 |
| `impl` | 按设计文档实现 |
| `refactor` | 重构(不改行为) |
| `test` | 测试相关 |
| `docs` | 文档 |
| `chore` | 构建/工具/配置 |
sanguo 组织所有仓库统一使用此 commit 规范。
---
## §4. 问题管理
@@ -266,15 +269,19 @@ pm2 show sanguo-act-runner # 详情
### 4.2 Issue 标签体系
| 标签 | 颜色 | 说明 |
|------|------|------|
| `bug` | 红 | 功能异常 |
| `feature` | 蓝 | 新功能需求 |
| `improvement` | 绿 | 改进优化 |
| `security` | 橙 | 安全相关 |
| `risk:high/standard/low` | 分级色 | 风险级别(见 §6.1 判定规则) |
| `priority:high/medium/low` | 黄/灰 | 优先级 |
| `blocked` | | 阻塞中 |
| 标签 | 颜色 | 色值 | 说明 |
|------|------|------|------|
| `type/bug` | 红 | #ee0701 | Bug 修复 |
| `type/feat` | 蓝 | #84b6eb | 新功能 |
| `type/impl` | 浅蓝 | #c5def5 | 按设计实现 |
| `type/docs` | 黄 | #fbca04 | 文档 |
| `type/test` | 绿 | #0e8a16 | 测试 |
| `type/ci` | 紫 | #d4c5f9 | CI/CD |
| `type/refactor` | | #ff6f00 | 重构 |
| `priority/P0` | 深红 | #b60205 | 紧急 |
| `priority/P1` | 红 | #d93f0b | 高 |
| `priority/P2` | 黄 | #fbca04 | 中 |
| `priority/P3` | 浅蓝 | #c5def5 | 低 |
### 4.3 需求/问题 Review 前置
@@ -296,6 +303,29 @@ Open → In Progress → Review → Closed
└──── Reopened ←───────────────────┘
```
### 4.5 标题规范
所有 Issue 和 PR 标题**必须**包含项目代号前缀,让人类一眼识别项目+类型:
**Issue**: `[代号] type: 简述`
**PR**: `[代号] type(scope): 简述`
项目代号:
| 仓库 | 代号 |
|------|------|
| sanguo_moziplus_v2 | moz |
| sanguo_quant_live | quant |
| sanguo_vnpy | vnpy |
示例:
- `[moz] bug: Mail API 500 when comment_type invalid`
- `[moz] impl(daemon): 知识注入 L2 引擎层 — WikiGuideSection`
- `[quant] feat: 趋势跟踪策略骨架`
此规范通过 L2 引擎层 `GiteaConventionSection`priority=55)自动注入所有 Agent prompt。
完整规范文档:L3 Skill `gitea-conventions`。规范设计原理详见 [§26](#26-gitea-协作规范设计)。
---
## §5. CI/CD 管道设计
@@ -554,6 +584,16 @@ jobs:
- revert 可能产生合并冲突 → 部署失败时人工介入
- 数据库变更回滚需人工介入 → schema 变更必须向前兼容(只加字段不删/不改),违反此规范由 CI 检查拦截(或人工 Review 拦截)
### 7.6 部署成功通知(草案)
> **状态**:草案,未实现。详细方案见 `archive-3.0/22-cd-production.md`。
当前 deploy.yml 缺少部署成功后的 Mail 通知(CI 失败和 Deploy 失败通知已实现)。待实现方案:
- deploy job 末尾追加通知 step
- 查询 Gitea API 获取关联 PR 作者
- 通过 Mail API 发送成功通知给 PR 作者 + pangtong-fujunshi
- direct push 场景通知 jiangwei-infra + pangtong-fujunshi
---
## §8. 验证流程集成
@@ -3287,3 +3327,120 @@ async def _handle_issue_comment(payload):
|------|------|------|
| 2026-06-09 | v1.0 | 初版:E2E 真实场景暴露问题 → 四层改造方案 + @mention 通知 + Mail type 改造 |
---
## §26. Gitea 协作规范设计
> **状态**: ✅ 已实现(PR #69
> **日期**: 2026-06-14
### 26.1 设计目标
团队三个仓库(moziplus_v2 / quant_live / vnpy)在 Gitea 上独立存在,协作时存在三个问题:
1. **标题不可辨识**`Fix mail API` 看不出是哪个项目、什么类型的改动
2. **Label 缺失**:无统一标签体系,无法按类型/优先级筛选
3. **填写无约束**:Issue/PR 内容格式随意,审查时缺少关键信息
本规范的目标:让人类一眼识别项目+类型,让 Agent 可程序化遵循,让模板降低填写门槛。
### 26.2 规范设计
#### 26.2.1 标题规范
**规则**:所有 Issue/PR 标题必须包含项目代号前缀。
| 类型 | 格式 | 示例 |
|------|------|------|
| Issue | `[代号] type: 简述` | `[moz] bug: Mail API 500 when comment_type invalid` |
| PR | `[代号] type(scope): 简述` | `[moz] impl(daemon): WikiGuideSection 注入` |
**项目代号**moz=moziplus_v2, quant=quant_live, vnpy=vnpy
**type 清单**bug / feat / impl / fix / docs / test / ci / refactor / chore
**设计决策**
- 为什么用代号前缀而不是靠仓库隔离?— 团队成员同时在多仓库协作,通知列表(Mail、Gitea dashboard)混合展示时代号前缀提供即时辨识。仓库隔离解决不了跨仓库视图的辨识问题
- PR 加 `scope` 是因为 PR 通常涉及具体模块(daemon / api / frontend),Issue 不需要
#### 26.2.2 Label 体系
采用 `type/*` + `priority/*` 双命名空间,替代旧标签(bug/feature/improvement/security):
| 标签 | 色值 | 说明 |
|------|------|------|
| `type/bug` | #ee0701 | Bug 修复 |
| `type/feat` | #84b6eb | 新功能 |
| `type/impl` | #c5def5 | 按设计实现 |
| `type/docs` | #fbca04 | 文档 |
| `type/test` | #0e8a16 | 测试 |
| `type/ci` | #d4c5f9 | CI/CD |
| `type/refactor` | #ff6f00 | 重构 |
| `priority/P0` | #b60205 | 紧急 |
| `priority/P1` | #d93f0b | 高 |
| `priority/P2` | #fbca04 | 中 |
| `priority/P3` | #cfd3d7 | 低 |
**设计决策**
-`type/` `priority/` 命名空间而非扁平命名,避免标签膨胀时冲突,且在 Gitea UI 中按前缀分组显示
- type 用暖色系(红/橙/黄),priority 用冷→热渐变(灰→蓝→黄→红),视觉上两类标签不混淆
- ⚠️ 已知问题:`type/impl`(#c5def5) 与 `priority/P3` 色值相近。P3 已调整为灰色 #cfd3d7 以区分
#### 26.2.3 Issue/PR 模板
**Issue 模板**3 种):bug.yml / feature.yml / test.yml
覆盖决策:只做最高频的 3 种类型(bug 报告、功能需求、测试任务),其余类型(docs/ci/refactor)频率低,走自由创建。每种模板包含描述、复现/方案、优先级字段。
**PR 模板**1 种):PULL_REQUEST_TEMPLATE.md
改动类型 checklist + 检查清单(标题格式、开发→安装目录同步、测试、设计文档更新)。
### 26.3 执行机制
规范通过四层路径保证执行,每层职责不同:
| 层 | 载体 | 职责 | Token 成本 |
|----|------|------|-----------|
| **L1** | TOOLS.mdAgent workspace | 代号表 + 格式速查,Agent 静态可见 | ~100 |
| **L2** | `GiteaConventionSection`priority=55 | 每次 spawn 动态注入,提醒标题格式 | ~80 |
| **L3** | `gitea-conventions` SkillextraDirs | 完整规范(标题/分支/commit/label),Agent 按需加载 | 按需 |
| **Gitea** | Issue/PR Template + Label(仓库级) | 人类创建时表单引导,标签选择 | — |
**L2 设计决策**
- `GiteaConventionSection` priority=55,排在 Constraints(50) 之后、Extension(60) 之前。标题规范属于约束类,但优先级低于安全/流程约束
- 注入所有 handlerTask/Toolchain/Mail),因为任何 handler 都可能创建 Issue/PR
- ⚠️ L1 文件在各 Agent workspace`~/.openclaw/workspace-*/TOOLS.md`),不在仓库管理。Agent workspace 变更不通过 PR
### 26.4 Label 迁移策略
旧标签(bug/feature/improvement/security/risk:high/priority:high)已由新体系替代:
- **旧标签保留不动**(不删除),避免历史 Issue 丢失标签信息
- **新 Issue/PR 使用新标签**type/* + priority/*
- 当前不做批量迁移。如有需要可后续通过 Gitea API 批量替换
### 26.5 与其他章节的关系
| 章节 | 关系 |
|------|------|
| §4.2 Issue 标签体系 | §26.2.2 Label 设计在问题管理场景的具体应用(已随 PR #69 更新) |
| §4.5 标题规范 | §26.2.1 标题规范的执行层摘要(已随 PR #69 新增) |
| §5 CI/CD 管道 | CI 事件通过标题前缀 `[CI]` 做事件路由(见 §16 事件中枢) |
| §6 代码审查流程 | PR Template 检查清单约束审查前置条件 |
### 26.6 实现记录
| 文件 | 改动 |
|------|------|
| `prompt_composer.py` | 新增 `GiteaConventionSection`priority=55 |
| `task_handler.py` | `get_sections()` 注册 `GiteaConventionSection()` |
| `toolchain_handler.py` | `get_sections()` 注册 `GiteaConventionSection()` |
| `mail_handler.py` | `get_sections()` 注册 `GiteaConventionSection()` |
| `db.py` | `COMMENT_TYPES``action_report`(修 API 500 bug |
| `.gitea/ISSUE_TEMPLATE/` | bug.yml / feature.yml / test.yml |
| `.gitea/PULL_REQUEST_TEMPLATE.md` | PR 检查清单模板 |
| Gitea Labels | 3 仓库各创建 11 个 Labeltype × 7 + priority × 4 |
PR #692026-06-14 合并。
@@ -4,6 +4,8 @@ created: 2026-06-10
version: v3.0
---
> 状态: ✅ 已完成(Step 1-5 全部合并,394 passed
# §1 现状分析(v3.0 更新说明:§1-§13 保留原样,新增 §14-§18,更新 §3/§5/§7
# §1 现状分析
@@ -950,7 +952,151 @@ handler.post_complete(task_id, agent_id, outcome, db_path)
---
# §18 设计决策记录
## §14. Mail 失败通知机制
### 20.1 背景
Mail 是 A→B 点对点通信,失败应通知发件人 A,而非统一 @pangtong
当前机制(v1.3 已实现):
- `_mark_task("failed")` 对 _mail 项目:调用 `mail_notify.notify_mail_failed` 通知发件人
- `_mark_task("failed")` 对 Task 项目:@pangtong-fujunshiF2 原逻辑不变)
- `_mail_auto_complete` 的 no_reply_found:标 failed 后通知发件人
- 防递归:`must_haves.system_notify=true` 的邮件失败不再递归通知
### 20.2 失败场景与重试机制
所有可能的失败路径及其重试/等待机制(重试上限 max_retries=3agent_timeout=630s):
| 失败类型 | 机制 | 重试次数 | 每次耗时 | cooldown | 最长总耗时 |
|---|---|---|---|---|---|
| `gateway_timeout` | 续杯 | 3 | 630s | 无 | ~31.5 分钟 |
| `crashed` | ticker 兜底 | 3 | ~2-5 分钟 | 60s + 30s ticker | ~15 分钟 |
| `api_error`rate_limit | 推 pending**待改为续杯** | 3 | ~2.5 分钟 | 120s | ~8 分钟 |
| `compact_interrupted` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `gateway_unreachable` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `lock_conflict` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `fallback_timeout` | 续杯(A3b) | 3 | 630s | 60s | ~34 分钟 |
| `compact_wait` | monitor 等待 | 3 | 630s | 无 | ~31.5 分钟 |
| `compact_hanging` | monitor → release | 3 | 630s | 300s | ~31.5 分钟 + ticker |
| `max_monitor_timeouts` | monitor 上限 | 3 | 630s | 无 | ~31.5 分钟 |
| `session_stuck` | revive 1 次 | 1 | ~30s | 无 | ~30 秒 |
| `compact_failed` | 无重试 | 0 | — | 300s | 立刻 failed |
| `auth_failed` | 无重试 | 0 | — | — | 立刻 failed |
| `agent_error` | 无重试 | 0 | — | 300s | 立刻 failed |
| `no_reply_found` | 无重试 | 0 | — | — | 立刻 failed |
### 20.3 触发点
| 触发点 | 文件 | 说明 |
|---|---|---|
| `_mark_task(failed)` | spawner.py | _mail 项目 → notify_mail_failedTask 项目 → @pangtong |
| `_mail_auto_complete` no_reply_found | dispatcher.py | Agent 正常退出但没回复 request → 标 failed → 通知发件人 |
### 20.4 实现位置
- `src/daemon/mail_notify.py``notify_mail_failed` + `_is_mail_project` + 通知模板
- `src/daemon/spawner.py``_mark_task` 中 _mail/Task 分流
- `src/daemon/dispatcher.py``_mail_auto_complete` 中 no_reply_found 后调 notify
### 20.5 通知设计(v2.0 — AI Native
通知提供充足事实信息,不做硬编码处理建议。收件 AI 自行判断下一步。
**通知结构**
```
邮件投递失败通知
📧 原始邮件:「{title}」
👤 收件人:{to_agent}
❌ 失败原因:{reason_human_readable}{reason_raw}
📊 重试情况:{attempt_info}
📋 上下文信息:
{detail_formatted}
常见失败原因参考:
• no_reply_found:收件人未回复(Agent 未能识别或处理此邮件)
• crashed / max_crash_count:收件人处理时进程崩溃(已自动重试 3 次)
• max_retries:续杯耗尽(已自动重试 3 次,共约 34 分钟)
• max_api_retry_countAPI 连续失败达上限(rate_limit/500/503
• max_monitor_timeouts:处理超时达上限(共约 31.5 分钟)
• gateway_timeoutAgent 执行超时(已续杯重试)
• session_stuckAgent 会话假死(lock PID 死亡,revive 失败)
• revive_failed:会话假死后恢复失败
• auth_failedAgent 认证失败(配置问题)
• fallback_exhausted:主模型和备用模型均失败
• agent_failed:收件人主动标记失败
• compact_failed:上下文压缩失败
• compact_hanging:上下文压缩长时间未完成(等待超 31.5 分钟)
• compact_interrupted:上下文压缩被中断(已自动重试 3 次)
• gateway_unreachableGateway 不可达(已自动重试 3 次)
• lock_conflict:会话锁冲突(已自动重试 3 次)
• 其他:建议排查系统日志
——系统自动通知
```
**reason 人话翻译映射**
| reason_raw | reason_human_readable | detail 提取 |
|---|---|---|
| `no_reply_found` | 收件人未回复 | 无额外信息 |
| `crashed` | 处理时进程崩溃 | stderr_preview 前 200 字 |
| `max_crash_count` | 连续崩溃达上限 | count + stderr_preview |
| `max_retries` | 续杯耗尽 | count + retry_field |
| `max_api_retry_count` | API 连续失败达上限 | count |
| `max_monitor_timeouts` | 处理超时达上限 | count + elapsed_seconds |
| `gateway_timeout` | Agent 执行超时 | retry_count |
| `session_stuck` | 会话假死 | stuck_count |
| `revive_failed` | 假死后恢复失败 | stuck_count |
| `auth_failed` | 认证失败 | stderr_preview |
| `fallback_exhausted` | 模型全部失败 | fallback_count + fallback_reason |
| `agent_failed` | 收件人主动标失败 | 无 |
| `compact_failed` | 上下文压缩失败 | stderr_preview |
| `compact_hanging` | 压缩长时间未完成 | compact_wait_count |
| `compact_interrupted` | 压缩被中断 | 无 |
| `gateway_unreachable` | Gateway 不可达 | stderr_preview |
| `lock_conflict` | 会话锁冲突 | 无 |
| 默认 | 未知原因 | reason + stderr_preview(如有) |
**重试情况格式**
- 有重试:`"已自动重试 {count} 次,共耗时约 {total_time}"`
- 无重试:`"无法重试({reason_human_readable}"`
### 20.6 防递归
系统通知邮件(from=system)本身也可能失败:
- 检查 `must_haves.system_notify=true` → 跳过递归通知
- system 不是有效 Agent → 通知路由到 pangtong-fujunshi 代处理
### 20.7 待实现改动
#### P1api_error rate_limit 改为可恢复 retry
**当前**`_classify_outcome` 中 rate_limit/500/503 → `api_error``should_retry=False`,走推 pending 路径。
**改为**`should_retry=True`,走续杯路径。cooldown 60s。上限仍 3 次。
**改动文件**`src/daemon/spawner.py` `_classify_outcome``api_error` 分支。
**影响**`api_retry_count` 机制可以废弃(统一用 `retry_count`),但保持向后兼容暂不删除。
#### P2:通知模板更新(v2.0
**当前**`mail_notify.py``_NOTIFY_TEMPLATE` 是静态模板,不传 detail。
**改为**:动态模板,根据 reason 选择人话翻译 + 提取 detail 信息 + 格式化重试情况。
**改动文件**`src/daemon/mail_notify.py`
**新增**`_REASON_MAP` 字典(reason → 人话 + detail 提取函数)。
### 20.8 不改的
| 项目 | 原因 |
|---|---|
| F2 @pangtong 对 Task 的逻辑 | Task failed 仍 @pangtong,只对 Mail 不同 |
| no_reply_found 的判定逻辑 | 只在判定后加通知,不改判定本身 |
| inform 类型邮件的完成逻辑 | inform 直接 done,不存在 no_reply_found |
| 外部 API 的 from 校验 | system 不走 HTTP,外部无法伪造 |
---
# §21 设计决策记录
本节记录设计过程中的关键讨论和决策,便于未来回顾。
@@ -1009,3 +1155,38 @@ handler.post_complete(task_id, agent_id, outcome, db_path)
**结论**L2 的 RoleSkillSection 改为注入索引+引导语(~100 token),引导 Agent 用 `read` 去读 Skill 全文(L3 层)。遵循 Hermes 的渐进式 Skill 加载模式。
---
## §22. 审查与验证历史
### Step 2-5 背靠背审查(2026-06-10/11
Step 2-5(Task 五层架构重构)合并前,庞统和司马懿分别独立完成 v3.0 → HEAD 的背靠背审查。
**审查范围**v3.0 tag → HEAD6 commits, +1584/-134 行, 9 个文件)
**关键发现与修复**
| # | 问题 | 严重度 | 状态 |
|---|------|--------|------|
| A1 | dispatcher review verdict 处理丢失 | 致命 | ✅ 已修复(PR #24 |
| A2 | Handler 注册初始化缺失 | 致命 | ✅ 已修复 |
| D1 | pre_spawn 返回值未检查 | 严重 | ✅ 已修复(H1 3次重试) |
| D2 | PromptContext 缺少 from_agent/mail_type | 严重 | ✅ 已修复 |
| D5 | _check_reply 语义差异 | 严重 | ✅ 已修复(恢复 tasks 表查询) |
| D3 | inform outcome 白名单 | 轻微 | ⚪ 保留(CRASH_OUTCOMES 已覆盖) |
| D4 | retry prompt 硬编码 | 轻微 | ⚪ 保留(旧方法 deprecated |
| D6 | 标 done 重试 | 轻微 | ✅ 已修复(统一 _mark_task_status |
**Handler 缺陷修复(Step 5 前)**
| # | 修复 | 状态 |
|---|------|------|
| H1 | _mark_task_status 3 次重试 | ✅ |
| H2 | review @mention comment_type | ✅ |
| H3 | review 非 approved 保持 review | ✅ |
**背靠背设计-编码一致性检查**:13 个专题(01-13),4 个严重偏差修复,6 个轻微保留。
**详细审查记录**:见 `archive-3.0/` 目录。
---
@@ -1,6 +1,6 @@
# §24 — Compact 检测方案修正
# §15 — Compact 检测方案修正
> 状态:**v5 已实现**gateway log + jsonl 配对)
> 状态:✅ 已完成gateway log + jsonl 配对)
> 作者:庞统
> 日期:2026-06-11v4),2026-06-13v5
> 框架:基于 §07 Spawner Acquire-First
+307
View File
@@ -0,0 +1,307 @@
# #16 知识注入设计
> 状态:v2 设计中
> 作者:庞统
> 日期:2026-06-13v1),2026-06-14v2 对齐 #11 四层架构)
> 评审:待司马懿评审
## 一、问题
### 1.1 现状
Agent(庞统、司马懿、张飞等)在执行任务时,不主动查询已有知识库(wiki-vault)。导致:
1. **重复调研**:赵云查过的数据清洗经验,张飞又从头调研一遍
2. **重复踩坑**wiki-vault 里已有"vnpy load_bar 需要显式指定 end=None"的实践,张飞还是踩了
3. **方案质量低**:做方案时纯靠推理,不查已有的优秀实践
4. **知识 gap 无人管**:查不到相关知识时没记录,下次还是查不到
### 1.2 根因
不是没有知识库(wiki-vault 有 50+ practices 页面),也不是没有检索能力(wiki-query Skill 已存在)。
**根因是注入时机**:Agent 不知道什么时候该查、没有强制机制让 Agent 在关键决策点查。
### 1.3 目标
1. Agent 在关键决策点**主动查询** wiki-vault
2. 查不到相关知识时**自动记录** knowledge gap
3. 定时任务处理 gap + 总结经验,**持续丰富** wiki-vault
4. 不增加 prompt token 负担(不自动注入知识全文,只引导查询)
## 二、调研
### 2.1 Superpowers:强制 Skill 检查(最有效)
**核心设计**session-start hook 注入铁律级指令——
> "If you think there is even a **1% chance** a skill might apply, you **ABSOLUTELY MUST** invoke the skill. This is not negotiable."
配合 **Red Flags 表**防止 Agent 自合理化跳过:
| Agent 的想法 | Red Flag 驳回 |
|---|---|
| "这个问题很简单" | 简单问题也需要查实践 |
| "我需要更多上下文" | Skill 检查在澄清问题之前 |
| "先看看代码" | Skill 告诉你怎么看代码 |
| "我记住了这个 Skill" | Skill 会更新,重新读 |
**为什么有效**:不靠 Agent "想起来",靠铁律强制。Skill 触发在任何响应之前。
### 2.2 Hermes:经验闭环 + Session Search
**经验闭环**:完成复杂任务(5+ tool calls)→ 自动创建 Skill → 下次自然触发。
**Session Search**:系统提示注入——"当用户提及过去内容时,主动搜索而非要求用户重复"。
**为什么有效**:不是"知识查询"而是"行为内化"——经验变成 SkillSkill 有 description 触发词。
### 2.3 结论
综合两个项目的优势:
| 设计点 | 来源 | 我们的做法 |
|--------|------|-----------|
| 铁律级强制 | Superpowers | L0 Hook 注入 + L1 SOUL.md 行为引导 |
| Red Flags 反合理化 | Superpowers | 知识查询 Red Flags 表(L1 SOUL.md |
| 经验内化 | Hermes | 经验→wiki-vault→下次查询 |
| 渐进式披露 | Hermes | 先查 summary,按需读全文 |
## 三、设计决策(对齐 #11 四层架构)
> **层级体系严格对齐 [#11](./11-context-layers-redesign.md)**,不自创命名。
### 总览
| #11 层级 | 知识注入角色 | 本设计覆盖 | 注入方式 |
|----------|------------|-----------|---------|
| **L0 铁律层** | "做方案前先查 wiki-vault" | ✅ D16-1 | Hook 每轮强制注入 |
| **L1 角色层** | TOOLS.md 知识库速查表 + SOUL.md Red Flags | ✅ D16-2 | Workspace 文件自动注入 |
| **L2 引擎注入层** | 三种 handler 各注入 WikiGuideSection | ✅ D16-3 | PromptComposer 拼装 |
| **L3 被动参考层** | wiki-query Skill 按需触发 | ✅ D16-4 | extraDirs Description 匹配 |
| 运维层 | gap 闭环 cron job | ✅ D16-5 | 不属于上下文分层 |
### D16-1L0 铁律层 — 新增一条 wiki 查询铁律
L0 只放跨系统通用的、不可绕过的行为底线。wiki 查询铁律和 GATE 门控同级。
**新增铁律**
```
<wiki-rule>
做方案前先查 wiki-vault,有 1% 相关就要查。查不到记 knowledge-gaps.md。
</wiki-rule>
```
**注入方式**:和 `<gate-rules>` / `<delegation-rule>` 并列,Hook 每轮强制注入。
**覆盖范围**:所有 Agent、所有场景(不限于 moziplus spawn 的子任务)。
### D16-2L1 角色层 — TOOLS.md + SOUL.md
#### TOOLS.md(✅ 已完成)
各 Agent workspace 的 TOOLS.md 中已有「LLM Wiki 知识库」段,包含:
- 速查表(场景 → 怎么做 → 什么时候用)
- 检索原则(index.md → summary → grep → 整页读取,从便宜到昂贵)
- 目录结构(wiki-vault / practices / concepts / skills / ...
- 铁律(做方案前先查、查不到记 gap)
#### SOUL.md Red Flags
在各 Agent 的 SOUL.md 中加入知识查询 Red Flags 表(和 Superpowers 一致):
| Agent 的想法 | 反驳 |
|---|---|
| "这个我以前做过" | 知识库可能已更新,查一下确认 |
| "先做再说" | 做方案前查实践比做错了返工便宜 |
| "这个领域我熟悉" | 熟悉≠知道最新实践,wiki-vault 持续更新 |
| "查知识库浪费时间" | 重复踩坑浪费的时间远大于查询时间 |
### D16-3L2 引擎注入层 — 三种 handler 各注入 WikiGuideSection
L2 是 BootstrapBuilder/PromptComposer 动态拼装的 prompt 段。当前有三种 handler,各有自己的 PromptSection 实现:
#### 当前 handler 结构
| Handler | Sectionspriority | 有 wiki 引导? |
|---------|---------------------|--------------|
| **TaskHandler** | Context(10) → Prior(20) → RoleSkill(30) → API(40) → Constraints(50) | ❌ |
| **MailHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
| **ToolchainHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
#### 新增 WikiGuideSectionpriority=60PRIORITY_EXTENSION
创建一个**通用 PromptSection**,三种 handler 的 `get_sections()` 都注入:
```python
# 可放在 prompt_composer.py 或独立文件,三种 handler 共用
class WikiGuideSection:
"""知识查询引导段 — 引导 Agent 在关键决策点查 wiki-vault。"""
name: str = "wiki_guide"
priority: int = 60 # PRIORITY_EXTENSION
WIKI_GUIDE = (
"## 知识查询引导\n"
"涉及方案设计、编码实现、故障排查时,先查 wiki-vault 相关实践:\n"
"- 路径:/Volumes/KnowledgeBase/wiki-vault/\n"
"- 速查:index.md → grep 关键词 → summary 字段 → 按需读全文\n"
"- 查不到:在 _meta/knowledge-gaps.md 记录"
)
def render(self, context: PromptContext) -> str:
return self.WIKI_GUIDE
def should_include(self, context: PromptContext) -> bool:
return True
```
#### 三种 handler 改动
每种 handler 的 `get_sections()` 末尾加 `WikiGuideSection()`
```python
# TaskHandler
def get_sections(self) -> list:
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# MailHandler
def get_sections(self) -> list:
return [
MailContextSection(),
MailApiSection(),
MailConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# ToolchainHandler
def get_sections(self) -> list:
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
WikiGuideSection(), # ← 新增
]
```
#### 为什么三种 handler 都需要
- **TaskHandler**:executor 做方案/编码,最需要查实践
- **ToolchainHandler**:CI 失败排查、部署问题,有相关运维实践可参考
- **MailHandler**:request 类型回复杂问题时也可能需要查已有经验
#### token 开销
WikiGuideSection 固定 ~60 字(~30 tokens),对 L2 预算影响可忽略。
### D16-4L3 被动参考层 — wiki-query Skill
#### 现状
`wiki-query` Skill 已部署在 `~/.sanguo_projects/sanguo_mozi/skills/wiki/wiki-query/SKILL.md`description 包含中文触发词:
> 调查、研究、分析、优秀实践、最佳实践、经验、怎么做X、有没有X的经验、以前怎么处理的
#### 触发机制
Agent 通过 extraDirs 加载 Skill headername + description),按 Description 匹配自主 `read` 全文。这是标准 L3 行为,和 #11 设计一致。
#### 待确认:extraDirs 子目录递归
wiki-query 在 `skills/wiki/wiki-query/` 子目录下。需确认 moziplus spawn 子 agent 时 extraDirs 是否递归扫描子目录。如果不递归,需要:
- 方案 A:把 wiki-query 移到 `skills/` 顶层
- 方案 B:配置 extraDirs 包含 `skills/wiki/` 子目录
### D16-5:知识 gap 记录 + 定时任务(运维层)
> 不属于上下文分层体系,是独立的运维流程。
#### gap 记录机制(已有基础设施)
- **位置**`/Volumes/KnowledgeBase/wiki-vault/_meta/knowledge-gaps.md`
- **格式**`- [日期] Agent名查"主题" → 待处理`
- **已有 20+ 条历史记录**,处理后标注 `→ 已建立 ✅`
wiki-query Skill 的 Step 5 已内置 gap 记录逻辑。
#### 定时任务(已有 cron 基础)
| 任务 | 时间 | 内容 | 状态 |
|------|------|------|------|
| wiki-daily-update | 每天 04:00 | 处理 knowledge gaps + 当天经验总结 → 写入 wiki-vault | ✅ 已有 cron,需完善 |
| pangtong-vault-sync | 每天 05:00 | 同步 wiki-vault 到 agent workspace | ✅ 已有 |
**wiki-daily-update 完善内容**
1. 读取 knowledge-gaps.md 中"待处理"条目
2. 对每个 gap:搜索 knowledge_base 是否有相关源码/文档 → 有则提炼写入 wiki-vault
3. 搜索最近一天的 jsonl 日志,提取有价值的经验
4. 新建或更新 wiki-vault 页面
5. 更新 knowledge-gaps.md(标记为"已建立 ✅"或"无KB内容,跳过"
### D16-6:和 #11 各层关系总结
| #11 层级 | #11 原始定义 | 知识注入贡献 | 本设计 |
|---------|------------|------------|--------|
| L0 铁律 | GATE 门控 + Delegation + 安全底线 | wiki 查询铁律 | ✅ D16-1 |
| L1 角色 | SOUL.md + AGENTS.md + TOOLS.md + MEMORY.md | TOOLS.md 速查表 + SOUL.md Red Flags | ✅ D16-2 |
| L2 引擎 | 任务上下文 + 角色操作规范 + 硬约束 | WikiGuideSection 通用段 | ✅ D16-3 |
| L3 参考 | A/B/C/D 类 Skill,靠 Description 触发 | wiki-query Skill | ✅ D16-4 |
| 运维 | — | gap 闭环 cron job | ✅ D16-5 |
### D16-7:为什么不做 PromptComposer 自动注入知识全文
1. **token 浪费**:每次任务都注入可能不相关的知识
2. **覆盖范围有限**:只影响 moziplus 子任务 Agent
3. **Agent 主动查询更精准**:知道自己缺什么知识,按需查询
## 四、改动清单
### 4.1 已完成 ✅
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| TOOLS.md 知识库段 | 各 Agent workspace TOOLS.md | L1 | 速查表 + 检索原则 + 目录结构 + 铁律 |
| wiki-query Skill 部署 | `skills/wiki/wiki-query/SKILL.md` | L3 | 中文触发词 + 渐进式检索协议 |
| knowledge-gaps.md | `_meta/knowledge-gaps.md` | 运维 | 已有 20+ 条记录 |
| wiki-daily-update cron | cron job | 运维 | 每天 04:00,需完善处理逻辑 |
| pangtong-vault-sync cron | cron job | 运维 | 每天 05:00 |
### 4.2 待实现
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| L0 wiki 铁律 | Hook 注入配置(`prependContext` | L0 | 新增 `<wiki-rule>` 段 |
| SOUL.md Red Flags | 各 Agent workspace SOUL.md | L1 | 知识查询 Red Flags 表 |
| WikiGuideSection | `prompt_composer.py` 或独立文件 | L2 | 通用 PromptSection,三种 handler 共用 |
| TaskHandler 注入 | `task_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| MailHandler 注入 | `mail_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| ToolchainHandler 注入 | `toolchain_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| extraDirs 递归确认 | moziplus spawn 配置 | L3 | 确认 wiki-query 子目录可被发现 |
| wiki-daily-update 完善 | cron job 脚本 | 运维 | gap 处理 + jsonl 经验提取 |
### 4.3 不做
| 项目 | 原因 |
|------|------|
| PromptComposer 知识全文注入 | token 浪费,Agent 主动查询更精准 |
| experiences 表 | wiki-vault 已覆盖,不重复建设 |
| 新 Skill(除 wiki-query 外) | wiki-query 已有,不需要新的 |
## 五、风险
| 风险 | 概率 | 缓解 |
|------|------|------|
| Agent 不主动查 wiki | 中 | L0 铁律强制 + L2 引导 + L3 Description 触发,三层保障 |
| wiki-query 在子目录不被 extraDirs 发现 | 中 | 确认后决定移顶层或配置子目录 |
| wiki-daily-update gap 处理质量不够 | 低 | 人工审核 + 逐步完善 |
| WikiGuideSection 增加 token | 低 | 固定 ~30 tokens,影响可忽略 |
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,503 @@
# API 聚合重构 + 工具链 Tab 设计
> **编号**: §18
> **状态**: 设计中
> **日期**: 2026-06-14
> **作者**: 庞统(副军师)🐦
> **审查**: 司马懿(mail-1781415763066 已回复,方案 B 调整版确认)
---
## 1. 背景与目标
### 1.1 问题
1. **blackboard_routes.py 膨胀**572 行、22 个路由,task/comment/output/review/event/decision/observation/archive 全堆一个文件,维护困难
2. **前端 N+1 请求**:打开 TaskModal 需要 5 次独立请求(task + events + subtasks + progress + comments),影响前端性能
3. **工具链事件无前端展示**`_toolchain` DB 隔离已完成,但前端无对应 Tab,工具链事件只能通过 Agent 收 Mail 感知
### 1.2 目标
1. 按领域拆分 blackboard_routes.py → 3 个文件
2. 实现细粒度 expand 聚合接口,前端 1-2 次请求拿全任务详情
3. 新增工具链 Tab(列表 + 详情 + 搜索栏)
4. 任务列表支持标题搜索
### 1.3 不做
- checkpoint_routes.py 不纳入拆分(已独立)
- mail_routes / toolchain_routes / project_routes 不动
- SQL JOIN / batch query 性能优化(当前 SQLite 单写下多次查询可接受)
---
## 2. 后端 API 文件拆分
### 2.1 拆分方案(方案 B 调整版,司马懿确认)
| 新文件 | 内容 | 预估行数 |
|--------|------|---------|
| `task_routes.py` | task CRUD + create(含 AI 标题) + patch + progress + claim + status(含广播) + archive + archive-done | ~280 |
| `task_relation_routes.py` | comments + outputs(含文件写入) + reviews + decisions + observations + events + experiences + summary | ~250 |
| `shared.py` | `_bb()` / `_q()` / `_validate_project()` / `_task_to_dict()` / `_init_agent_ids()` / `_extract_mentions()` / 常量导入 | ~30 |
### 2.2 路由分配明细
**task_routes.py**10 个路由):
| 路由 | 方法 | 函数 | 说明 |
|------|------|------|------|
| `/tasks` | GET | `list_tasks` | 列表(新增 `q` 搜索参数) |
| `/tasks` | POST | `create_task` | 创建(含 `_generate_title` |
| `/tasks/{tid}` | GET | `get_task` | 详情(含 expand 聚合) |
| `/tasks/{tid}` | PATCH | `patch_task` | 更新 |
| `/tasks/{tid}/progress` | GET | `task_progress` | 进度 |
| `/tasks/{tid}/claim` | POST | `claim_task` | 认领 |
| `/tasks/{tid}/status` | POST | `update_status` | 状态流转(含广播逻辑) |
| `/tasks/{tid}/archive` | POST | `archive_task` | 归档 |
| `/tasks/archive-done` | POST | `archive_done_tasks` | 批量归档 |
**task_relation_routes.py**13 个路由):
| 路由 | 方法 | 函数 | 说明 |
|------|------|------|------|
| `/tasks/{tid}/comments` | GET | `get_comments` | 评论列表 |
| `/tasks/{tid}/comments` | POST | `add_comment` | 添加评论(含 @mention 提取) |
| `/tasks/{tid}/outputs` | GET | `get_outputs` | 产出列表 |
| `/tasks/{tid}/outputs` | POST | `write_output` | 写入产出(含文件写入逻辑) |
| `/tasks/{tid}/decisions` | GET | `get_decisions` | 决策列表 |
| `/tasks/{tid}/decisions` | POST | `add_decision` | 添加决策 |
| `/tasks/{tid}/observations` | POST | `add_observation` | 添加观察 |
| `/tasks/{tid}/reviews` | GET | `get_reviews` | 审查列表 |
| `/tasks/{tid}/reviews` | POST | `add_review` | 添加审查 |
| `/tasks/{tid}/events` | GET | `get_task_events` | 事件列表 |
| `/tasks/{tid}/experiences` | GET | `get_task_experiences` | 经验列表 |
| `/events` | GET | `get_events` | 项目级事件 |
| `/summary` | GET | `task_summary` | 任务汇总 |
### 2.3 shared.py 共享件
从 blackboard_routes.py 提取到 shared.py
| 符号 | 类型 | 说明 |
|------|------|------|
| `_validate_project()` | function | 项目 ID 校验 |
| `_bb()` | function | Blackboard 实例获取 |
| `_q()` | function | Queries 实例获取 |
| `_task_to_dict()` | function | Task → dict 序列化 |
| `_init_agent_ids()` | function | Agent ID 初始化 |
| `_extract_mentions()` | function | @mention 提取 |
| `VALID_STATUSES` | import | 从 db.py 重导出 |
| `OUTPUT_TYPES` | import | 从 db.py 重导出 |
### 2.4 main.py 路由注册变更
```python
# 拆分前
from src.api.blackboard_routes import router as blackboard_router
app.include_router(blackboard_router)
# 拆分后
from src.api.task_routes import router as task_router
from src.api.task_relation_routes import router as task_relation_router
app.include_router(task_router)
app.include_router(task_relation_router)
```
URL prefix 不变:所有路由仍是 `/api/projects/{pid}/...`,前端 URL 零改动。
### 2.5 向后兼容
- 删除 `blackboard_routes.py`,所有引用指向新文件
- `expand=all` 保持兼容(内部映射为全量 expand)
- 不改变任何 API 的请求/响应格式(仅文件组织变化)
---
## 3. expand 聚合接口
### 3.1 设计
`GET /api/projects/{pid}/tasks/{tid}?expand=comments,outputs,reviews,events,decisions`
支持逗号分隔的细粒度选择,替代当前的 `expand=all`
### 3.2 返回格式
```json
{
"task": { "id": "...", "title": "...", "status": "working", ... },
"comments": {
"items": [...],
"total_count": 45,
"limit": 20
},
"events": {
"items": [...],
"total_count": 120,
"limit": 30
},
"outputs": [...],
"reviews": [...],
"decisions": [...]
}
```
### 3.3 limit 策略
| 关联资源 | expand 返回 | 分页支持 | 理由 |
|----------|------------|---------|------|
| comments | 最新 20 条 + total_count | `GET /comments?limit=50&offset=0` | 高频资源,长任务可能积累几十条 |
| events | 最新 30 条 + total_count | `GET /events?limit=100&offset=0` | 运行几天可能上百条 |
| outputs | 全部 | 不需要 | 通常 <5 条 |
| reviews | 全部 | 不需要 | 通常 <5 条 |
| decisions | 全部 | 不需要 | 通常 <5 条 |
前端拿到 `total_count > items.length` 时显示"还有 N 条",按需点击加载。
### 3.4 实现伪码
```python
@router.get("/tasks/{task_id}")
async def get_task(project_id: str, task_id: str,
expand: Optional[str] = None):
bb = _bb(project_id)
task = bb.get_task(task_id)
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
result = _task_to_dict(task)
if not expand:
return result
expand_list = expand.split(",") if expand != "all" else [
"comments", "outputs", "reviews", "events", "decisions"
]
q = _q(project_id)
if "comments" in expand_list:
all_comments = bb.get_comments(task_id)
result["comments"] = {
"items": [dict(c.__dict__) for c in all_comments[-20:]],
"total_count": len(all_comments),
"limit": 20,
}
if "events" in expand_list:
all_events = q.task_events(task_id)
result["events"] = {
"items": all_events[-30:],
"total_count": len(all_events),
"limit": 30,
}
if "outputs" in expand_list:
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
if "reviews" in expand_list:
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
if "decisions" in expand_list:
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
return result
```
### 3.5 性能分析
| 场景 | 当前(无 expand | expand 后 | 改善 |
|------|-----------------|-----------|------|
| 打开 TaskModal | 5 次 HTTP 请求 | 2 次(task+expand + subtasks | -60% 请求 |
| 单次 expand 响应体 | — | ~5-15KB(典型) | 一次大请求 < 五次小请求 |
| DB 查询次数 | 5 次(各端点独立查) | 5 次(expand 内部循环) | 相同,暂不优化 |
---
## 4. 任务列表搜索
### 4.1 设计
`GET /api/projects/{pid}/tasks?q=关键词`
在现有 `list_tasks` 基础上增加 `q` 查询参数,支持标题模糊搜索(SQL LIKE)。
### 4.2 实现
```python
@router.get("/tasks")
async def list_tasks(project_id: str, q: Optional[str] = None, ...):
bb = _bb(project_id)
tasks = bb.list_tasks(status=status, ...)
if q:
q_lower = q.lower()
tasks = [t for t in tasks if q_lower in (t.title or "").lower()]
return {"tasks": [_task_to_dict(t) for t in tasks]}
```
**设计决策**:过滤在 Python 层做而非 SQL 层。
- 理由:当前 `list_tasks` 已在 Python 层做 status 筛选,加一层 title 过滤一致性更好
- 如果后续任务量大(>1000),再改为 SQL LIKE 查询
---
## 5. 前端:工具链 Tab
### 5.1 Tab 定义
```typescript
// store.ts TabKey 新增
| 'toolchain'
// TAB_DEFS 新增(插在 settings 前面)
{ key: 'toolchain', label: '工具链', icon: '⛓️' },
```
### 5.2 数据加载
```typescript
// store.ts 新增
toolchainTasks: any[];
loadToolchain: async () => {
const res = await fetch('/api/projects/_toolchain/tasks');
const data = await res.json();
set({ toolchainTasks: data.tasks || [] });
}
// Tab 切换时加载
if (tab === 'toolchain') s.loadToolchain();
```
### 5.3 ToolchainPanel 组件
仿 MailPanel 结构,三个区域:
**搜索栏**(顶部):
- 文本输入框,输入关键词实时过滤列表
- 调用 `GET /api/projects/_toolchain/tasks?q=关键词`
**列表区**(左侧):
- 工具链事件列表(时间倒序)
- 每条显示:标题 + 时间 + 状态标签
- 点击选中,高亮当前选中项
**详情区**(右侧):
- 选中事件的完整内容
- 调用 `GET /api/projects/_toolchain/tasks/{tid}?expand=comments` 获取详情
- 展示:标题、描述、状态、评论(action_report 等)
### 5.4 和 Mail 的隔离
| 维度 | Mail Tab | 工具链 Tab |
|------|---------|-----------|
| 数据源 | `_mail` 项目 | `_toolchain` 项目 |
| 事件类型 | Agent 间通信(inform/request | 系统事件(CI/PR/部署/Review |
| 搜索 | 无(邮件量不大) | 有(工具链事件频率高) |
---
## 6. 测试设计
### 6.1 后端 API 拆分测试
**目标**:验证拆分后所有路由 URL 不变、行为不变。
**测试文件**`tests/integration/test_api.py`(扩展现有)+ 新增 `tests/unit/test_task_routes.py`
| 测试类 | 测试用例 | 验证点 |
|--------|---------|--------|
| TestTaskRoutes | test_list_tasks | GET /tasks 返回格式不变 |
| | test_list_tasks_with_search | q 参数过滤正确 |
| | test_list_tasks_empty_q | q 为空时返回全部 |
| | test_get_task | GET /tasks/{tid} 基本详情 |
| | test_get_task_expand_comments | expand=comments 返回带 total_count + limit |
| | test_get_task_expand_events | expand=events 返回带 total_count + limit |
| | test_get_task_expand_outputs | expand=outputs 全量返回 |
| | test_get_task_expand_multiple | expand=comments,outputs,reviews 组合 |
| | test_get_task_expand_all | expand=all 向后兼容 |
| | test_get_task_no_expand | 不传 expand 返回基本 task |
| | test_create_task | POST 格式不变 |
| | test_claim_task | 认领行为不变 |
| | test_update_status | 状态流转不变 |
| | test_patch_task | PATCH 不变 |
| | test_archive_task | 归档不变 |
| 测试类 | 测试用例 | 验证点 |
|--------|---------|--------|
| TestTaskRelationRoutes | test_comments_crud | GET/POST comments 不变 |
| | test_outputs_crud | GET/POST outputs 不变 |
| | test_write_output_file | 文件写入逻辑不变 |
| | test_reviews_crud | GET/POST reviews 不变 |
| | test_decisions_crud | GET/POST decisions 不变 |
| | test_observations_add | POST observations 不变 |
| | test_events_list | GET events 不变 |
| | test_experiences_list | GET experiences 不变 |
| | test_project_events | GET /events 不变 |
| | test_summary | GET /summary 不变 |
**兼容性验证脚本**
```bash
#!/bin/bash
# tests/scripts/verify_api_compat.sh
# 对比拆分前后所有路由 URL 和方法,确保零变化
echo "=== 拆分前路由清单 ==="
# 从 git stash 或 main 分支提取
git stash
python -c "
from src.main import app
for route in app.routes:
if hasattr(route, 'methods') and hasattr(route, 'path'):
for m in sorted(route.methods):
if m in ('GET','POST','PATCH','DELETE','PUT'):
print(f'{m} {route.path}')
" | sort > /tmp/routes_before.txt
git stash pop
echo "=== 拆分后路由清单 ==="
python -c "
from src.main import app
for route in app.routes:
if hasattr(route, 'methods') and hasattr(route, 'path'):
for m in sorted(route.methods):
if m in ('GET','POST','PATCH','DELETE','PUT'):
print(f'{m} {route.path}')
" | sort > /tmp/routes_after.txt
echo "=== Diff ==="
diff /tmp/routes_before.txt /tmp/routes_after.txt
if [ $? -eq 0 ]; then
echo "✅ 路由完全一致"
else
echo "❌ 路由有差异"
exit 1
fi
```
### 6.2 expand 聚合测试
**测试文件**`tests/unit/test_expand_api.py`
| 测试用例 | 验证点 |
|---------|--------|
| test_expand_comments_limit | comments 返回最新 20 条 + total_count=25 |
| test_expand_comments_are_latest | 验证返回的是最新 20 条(index 5-24 |
| test_expand_events_limit | events 返回最新 30 条 + total_count=35 |
| test_expand_outputs_full | outputs 全量返回(list 格式,不分页) |
| test_expand_reviews_full | reviews 全量返回 |
| test_expand_decisions_full | decisions 全量返回 |
| test_expand_multiple_fields | expand=comments,outputs,reviews 组合,未请求的不返回 |
| test_expand_all_compat | expand=all 向后兼容 |
| test_no_expand | 不传 expand 只返回基本 task |
| test_expand_invalid_field_ignored | 无效字段静默忽略 |
### 6.3 搜索测试
**测试文件**`tests/unit/test_task_routes.py``TestTaskListRoutes`
| 测试用例 | 验证点 |
|---------|--------|
| test_list_tasks_with_search | q 参数标题模糊搜索 |
| test_list_tasks_search_case_insensitive | 大小写不敏感 |
| test_list_tasks_search_no_match | 无匹配返回空列表 |
| test_list_tasks_search_empty_q | q 为空返回全部 |
### 6.4 前端测试(手动验证)
| 验证点 | 操作 | 预期 |
|--------|------|------|
| 工具链 Tab 出现 | 打开前端 | Tab 栏有 ⛓️ 工具链 |
| 列表加载 | 点击工具链 Tab | 显示 _toolchain 事件列表 |
| 搜索过滤 | 输入关键词 | 列表实时过滤 |
| 详情展示 | 点击某条事件 | 右侧/弹窗显示完整内容 |
| Tab 切换不丢数据 | 切到其他 Tab 再回来 | 数据保持 |
### 6.5 CI 集成
| 命令 | 说明 |
|------|------|
| `bash tests/scripts/verify_api_compat.sh` | 路由兼容性验证(CI 必跑) |
| `pytest tests/unit/test_task_routes.py tests/unit/test_expand_api.py tests/integration/test_api.py -m "not e2e" -v` | 新增单元 + 集成测试 |
> 测试用例详细设计(fixture + 完整代码 + 覆盖矩阵)见 `docs/design/18-test-design.md`
---
## 7. 实施计划
### Phase 1: 后端 API 拆分(不含功能变更)
| 步骤 | 内容 | 验证 |
|------|------|------|
| 1.1 | 创建 `shared.py`,提取共享 helper | import 无报错 |
| 1.2 | 创建 `task_routes.py`,迁移 10 个路由 | 路由注册成功 |
| 1.3 | 创建 `task_relation_routes.py`,迁移 13 个路由 | 路由注册成功 |
| 1.4 | 更新 `main.py` router 注册 | app 启动无报错 |
| 1.5 | 删除 `blackboard_routes.py` | — |
| 1.6 | 运行 `verify_api_compat.sh` | 路由清单 diff = 0 |
| 1.7 | 运行现有测试 | 全量通过 |
### Phase 2: expand 聚合 + 搜索
| 步骤 | 内容 | 验证 |
|------|------|------|
| 2.1 | 重写 `get_task` expand 逻辑(细粒度) | TestExpandAPI 通过 |
| 2.2 | `list_tasks``q` 参数 | TestTaskSearch 通过 |
| 2.3 | 新增测试用例 | 覆盖率达标 |
### Phase 3: 前端工具链 Tab
| 步骤 | 内容 | 验证 |
|------|------|------|
| 3.1 | store.ts 新增 toolchain 数据加载 | — |
| 3.2 | api.ts 新增 expand 调用封装 | — |
| 3.3 | 创建 `ToolchainPanel.tsx` | 组件渲染正常 |
| 3.4 | App.tsx 注册新 Tab | Tab 显示正确 |
| 3.5 | TaskModal 改用 expand 减少 | 请求次数减少 |
### Phase 4: 联调 + 评审
| 步骤 | 内容 |
|------|------|
| 4.1 | 全量测试 `pytest -m "not e2e"` |
| 4.2 | 发评审给司马懿 |
| 4.3 | 前端手动验证 |
---
## 8. 风险评估
| 风险 | 级别 | 缓解 |
|------|------|------|
| 拆分后 import 路径断裂 | 中 | IDE 全局搜索 + 运行时验证 |
| expand 返回体过大 | 低 | comments/events 有 limit |
| 工具链事件量大影响前端 | 低 | 搜索栏 + 分页 |
| expand=all 向后兼容 | 低 | 单独兼容分支处理 |
---
## 9. 评审记录
### 司马懿 mail-17814157630662026-06-14
| 项目 | 结论 |
|------|------|
| 文件拆分 | 方案 B 调整版(task_routes + task_relation_routes + shared |
| expand | 细粒度,events/comments 带 limit+total_count |
| 性能 | 当前 SQLite 多次查询可接受 |
| checkpoint | 不纳入 |
| _generate_title | 留在 task_routes.py |
| write_output | 注意不是简单 CRUD |
---
## 10. 变更记录
| 日期 | 版本 | 内容 |
|------|------|------|
| 2026-06-14 | v1.0 | 初版设计 |
+484
View File
@@ -0,0 +1,484 @@
# §18 测试用例详细设计
> **关联**: `docs/design/18-api-refactor-and-toolchain-tab.md`
> **日期**: 2026-06-14
---
## 1. 测试文件规划
| 文件 | 类型 | 测试数 | 说明 |
|------|------|--------|------|
| `tests/integration/test_api.py` | 集成 | 扩展现有 | 拆分后回归验证 |
| `tests/unit/test_task_routes.py` | 单元 | 14 | task_routes 专项 |
| `tests/unit/test_expand_api.py` | 单元 | 7 | expand 聚合专项 |
| `tests/unit/test_task_search.py` | 单元 | 4 | 搜索专项 |
| `tests/scripts/verify_api_compat.sh` | 脚本 | 1 | CI 路由兼容性 |
**总计**26 个测试 + 1 个 CI 脚本
---
## 2. 预置 Fixture
```python
# tests/conftest.py 新增(如果不存在则补充)
@pytest.fixture
def expand_env(tmp_path):
"""expand 测试环境:1 个 task + 预置关联数据"""
project_root = tmp_path / "projects"
project_root.mkdir()
os.environ["BLACKBOARD_ROOT"] = str(project_root)
reg = ProjectRegistry(project_root)
reg.create_project("test-proj", "Test Project", agents=["agent1"])
bb = Blackboard(project_root / "test-proj" / "blackboard.db")
bb.create_task(Task(id="t1", title="Expand Test Task", task_type="coding"))
# 预置 25 条 comment
for i in range(25):
bb.add_comment("t1", agent="agent1", comment_type="general",
content=f"Comment number {i}")
# 预置 5 条 output
for i in range(5):
bb.write_output("t1", agent="agent1",
output_type="code",
content=f"output content {i}",
filename=f"file_{i}.py")
# 预置 3 条 review
for i in range(3):
bb.add_review("t1", reviewer="agent1",
verdict="APPROVE",
confidence=0.9,
risk_level="low",
summary=f"Review {i}")
# 预置 2 条 decision
for i in range(2):
bb.add_decision("t1", agent="agent1",
decision_type="scope",
rationale=f"Decision {i}")
# 预置 35 条 event
from src.blackboard.queries import Queries
q = Queries(project_root / "test-proj" / "blackboard.db")
for i in range(35):
q.add_event("t1", event_type="status_change",
detail=f"Event {i}")
yield project_root
del os.environ["BLACKBOARD_ROOT"]
```
---
## 3. task_routes.py 测试(test_task_routes.py
```python
"""task_routes.py 路由测试 — 验证拆分后行为不变"""
import pytest
from fastapi.testclient import TestClient
from src.main import app
client = TestClient(app)
pytestmark = pytest.mark.integration
class TestTaskListRoutes:
"""GET /tasks + 搜索"""
def test_list_tasks_basic(self, project_env):
"""列表基本返回格式不变"""
resp = client.get("/api/projects/test-proj/tasks")
assert resp.status_code == 200
data = resp.json()
assert "tasks" in data
assert isinstance(data["tasks"], list)
def test_list_tasks_with_search(self, project_env):
"""q 参数搜索标题"""
resp = client.get("/api/projects/test-proj/tasks?q=Existing")
data = resp.json()
assert len(data["tasks"]) == 1
assert "Existing" in data["tasks"][0]["title"]
def test_list_tasks_search_case_insensitive(self, project_env):
"""大小写不敏感"""
resp = client.get("/api/projects/test-proj/tasks?q=existing")
data = resp.json()
assert len(data["tasks"]) == 1
def test_list_tasks_search_no_match(self, project_env):
"""无匹配返回空列表"""
resp = client.get("/api/projects/test-proj/tasks?q=nonexistent_xyz")
data = resp.json()
assert len(data["tasks"]) == 0
def test_list_tasks_search_empty_q(self, project_env):
"""q 为空返回全部"""
resp = client.get("/api/projects/test-proj/tasks?q=")
data = resp.json()
assert len(data["tasks"]) >= 1
class TestTaskDetailRoutes:
"""GET /tasks/{tid} + expand"""
def test_get_task_basic(self, project_env):
"""无 expand 返回基本 task"""
resp = client.get("/api/projects/test-proj/tasks/t1")
assert resp.status_code == 200
data = resp.json()
assert data["id"] == "t1"
assert "comments" not in data # 无 expand 不含关联数据
def test_get_task_404(self, project_env):
"""不存在的 task 返回 404"""
resp = client.get("/api/projects/test-proj/tasks/nonexistent")
assert resp.status_code == 404
class TestTaskActionRoutes:
"""claim/status/patch/archive 行为不变"""
def test_claim_task(self, project_env):
"""认领行为不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/claim",
json={"agent": "agent1"})
assert resp.status_code == 200
def test_update_status(self, project_env):
"""状态流转不变"""
client.post("/api/projects/test-proj/tasks/t1/claim",
json={"agent": "agent1"})
resp = client.post("/api/projects/test-proj/tasks/t1/status",
json={"agent": "agent1", "status": "working"})
assert resp.status_code == 200
def test_invalid_status_transition(self, project_env):
"""非法状态转换返回 409"""
resp = client.post("/api/projects/test-proj/tasks/t1/status",
json={"agent": "agent1", "status": "done"})
assert resp.status_code == 409
def test_patch_task(self, project_env):
"""PATCH 更新不变"""
resp = client.patch("/api/projects/test-proj/tasks/t1",
json={"priority": 5})
assert resp.status_code == 200
def test_archive_task(self, project_env):
"""归档不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/archive",
json={"agent": "agent1"})
assert resp.status_code == 200
class TestTaskCreateRoute:
"""POST /tasks 创建行为不变"""
def test_create_task(self, project_env):
"""创建格式不变"""
resp = client.post("/api/projects/test-proj/tasks",
json={"title": "New Task", "description": "test"})
assert resp.status_code == 201
data = resp.json()
assert "id" in data
```
---
## 4. expand 聚合测试(test_expand_api.py
```python
"""expand 聚合接口测试"""
import pytest
from fastapi.testclient import TestClient
from src.main import app
client = TestClient(app)
pytestmark = pytest.mark.integration
class TestExpandComments:
"""expand=comments"""
def test_comments_limit_and_count(self, expand_env):
"""返回最新 20 条 + total_count=25"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=comments")
data = resp.json()
comments = data["comments"]
assert isinstance(comments, dict)
assert len(comments["items"]) == 20
assert comments["total_count"] == 25
assert comments["limit"] == 20
def test_comments_are_latest(self, expand_env):
"""返回的是最新 20 条(Comment 5-24"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=comments")
data = resp.json()
first_content = data["comments"]["items"][0]["content"]
last_content = data["comments"]["items"][-1]["content"]
# 最新 20 条 = index 5 到 24
assert "5" in first_content or "24" in last_content
class TestExpandEvents:
"""expand=events"""
def test_events_limit_and_count(self, expand_env):
"""返回最新 30 条 + total_count=35"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=events")
data = resp.json()
events = data["events"]
assert isinstance(events, dict)
assert len(events["items"]) == 30
assert events["total_count"] == 35
assert events["limit"] == 30
class TestExpandFullResources:
"""outputs/reviews/decisions 全量返回"""
def test_expand_outputs_full(self, expand_env):
"""outputs 全量返回(5 条),格式是 list 不是 dict"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=outputs")
data = resp.json()
outputs = data["outputs"]
assert isinstance(outputs, list)
assert len(outputs) == 5
def test_expand_reviews_full(self, expand_env):
"""reviews 全量返回(3 条)"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=reviews")
data = resp.json()
reviews = data["reviews"]
assert isinstance(reviews, list)
assert len(reviews) == 3
def test_expand_decisions_full(self, expand_env):
"""decisions 全量返回(2 条)"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=decisions")
data = resp.json()
decisions = data["decisions"]
assert isinstance(decisions, list)
assert len(decisions) == 2
class TestExpandCombinations:
"""组合 expand"""
def test_expand_multiple_fields(self, expand_env):
"""expand=comments,outputs,reviews 组合"""
resp = client.get(
"/api/projects/test-proj/tasks/t1?expand=comments,outputs,reviews"
)
data = resp.json()
assert "comments" in data
assert "outputs" in data
assert "reviews" in data
assert "events" not in data # 未请求
assert "decisions" not in data
def test_expand_all_compat(self, expand_env):
"""expand=all 向后兼容"""
resp = client.get("/api/projects/test-proj/tasks/t1?expand=all")
data = resp.json()
# all 返回所有关联资源
assert "comments" in data
assert "outputs" in data
assert "reviews" in data
assert "events" in data
assert "decisions" in data
def test_no_expand(self, expand_env):
"""不传 expand 只返回基本 task"""
resp = client.get("/api/projects/test-proj/tasks/t1")
data = resp.json()
assert "comments" not in data
assert "outputs" not in data
assert data["id"] == "t1"
def test_expand_invalid_field_ignored(self, expand_env):
"""无效 expand 字段静默忽略"""
resp = client.get(
"/api/projects/test-proj/tasks/t1?expand=comments,invalid_field"
)
data = resp.json()
assert "comments" in data
assert "invalid_field" not in data
```
---
## 5. task_relation_routes.py 回归测试
```python
"""task_relation_routes.py 路由回归 — 验证拆分后行为不变"""
import pytest
from fastapi.testclient import TestClient
from src.main import app
client = TestClient(app)
pytestmark = pytest.mark.integration
class TestRelationRoutesRegression:
"""关联路由回归测试"""
def test_comments_crud(self, project_env):
"""GET/POST comments 不变"""
# POST
resp = client.post("/api/projects/test-proj/tasks/t1/comments",
json={"agent": "a1", "comment_type": "general",
"content": "test comment"})
assert resp.status_code == 201
# GET
resp = client.get("/api/projects/test-proj/tasks/t1/comments")
assert resp.status_code == 200
assert len(resp.json()["comments"]) >= 1
def test_outputs_crud(self, project_env):
"""GET/POST outputs 不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/outputs",
json={"agent": "a1", "type": "code",
"content": "print('hello')"})
assert resp.status_code == 201
resp = client.get("/api/projects/test-proj/tasks/t1/outputs")
assert resp.status_code == 200
def test_write_output_with_filename(self, project_env):
"""output 含 filename 的文件写入不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/outputs",
json={"agent": "a1", "type": "code",
"content": "x = 1",
"filename": "test.py"})
assert resp.status_code == 201
def test_write_output_invalid_type(self, project_env):
"""output 无效 type 返回 422"""
resp = client.post("/api/projects/test-proj/tasks/t1/outputs",
json={"agent": "a1", "type": "invalid_type",
"content": "x"})
assert resp.status_code == 422
def test_reviews_crud(self, project_env):
"""GET/POST reviews 不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/reviews",
json={"reviewer": "a1", "verdict": "APPROVE",
"confidence": 0.9, "risk_level": "low",
"summary": "LGTM"})
assert resp.status_code == 201
resp = client.get("/api/projects/test-proj/tasks/t1/reviews")
assert resp.status_code == 200
def test_decisions_crud(self, project_env):
"""GET/POST decisions 不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/decisions",
json={"agent": "a1", "decision_type": "scope",
"rationale": "test"})
assert resp.status_code == 201
resp = client.get("/api/projects/test-proj/tasks/t1/decisions")
assert resp.status_code == 200
def test_observations_add(self, project_env):
"""POST observations 不变"""
resp = client.post("/api/projects/test-proj/tasks/t1/observations",
json={"agent": "a1", "observation_type": "note",
"content": "observed"})
assert resp.status_code == 201
def test_events_list(self, project_env):
"""GET events 不变"""
resp = client.get("/api/projects/test-proj/tasks/t1/events")
assert resp.status_code == 200
assert "events" in resp.json()
def test_project_events(self, project_env):
"""GET /events 项目级事件不变"""
resp = client.get("/api/projects/test-proj/events")
assert resp.status_code == 200
def test_summary(self, project_env):
"""GET /summary 不变"""
resp = client.get("/api/projects/test-proj/summary")
assert resp.status_code == 200
```
---
## 6. CI 集成
### .gitea/workflows/ci.yml 新增步骤
```yaml
test:
runs-on: ubuntu-latest
steps:
# ... 现有步骤 ...
# API 兼容性验证
- name: Verify API Compatibility
run: |
cd src/frontend && npm run build
bash tests/scripts/verify_api_compat.sh
# 新增测试
- name: Run API Tests
run: |
pytest tests/unit/test_task_routes.py \
tests/unit/test_expand_api.py \
tests/integration/test_api.py \
-m "not e2e" -v
```
### 本地开发验证流程
```bash
# 1. 改完代码后先跑兼容性验证
bash tests/scripts/verify_api_compat.sh
# 2. 跑全量测试
pytest -m "not e2e" -v
# 3. 跑新增专项测试
pytest tests/unit/test_task_routes.py tests/unit/test_expand_api.py -v
```
---
## 7. 测试覆盖矩阵
| 设计文档章节 | 测试文件 | 测试类 | 用例数 |
|-------------|---------|--------|--------|
| §2 路由拆分(兼容性) | verify_api_compat.sh | — | 1 |
| §3.1 基本详情 | test_task_routes.py | TestTaskDetailRoutes | 2 |
| §3.2 搜索 | test_task_routes.py | TestTaskListRoutes | 5 |
| §3.3 动作路由 | test_task_routes.py | TestTaskActionRoutes | 5 |
| §3.4 创建 | test_task_routes.py | TestTaskCreateRoute | 1 |
| §4 expand comments | test_expand_api.py | TestExpandComments | 2 |
| §4 expand events | test_expand_api.py | TestExpandEvents | 1 |
| §4 expand 全量 | test_expand_api.py | TestExpandFullResources | 3 |
| §4 expand 组合 | test_expand_api.py | TestExpandCombinations | 4 |
| §5.2 关联回归 | test_api.py | TestRelationRoutesRegression | 10 |
| **合计** | | | **34** |
+73
View File
@@ -0,0 +1,73 @@
"""共享 helper 和常量"""
from typing import Any, Dict
from fastapi import HTTPException
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.blackboard.models import Task
from src.blackboard.registry import ProjectRegistry
from src.utils import get_data_root
# 虚拟项目白名单
_VIRTUAL_PROJECTS = frozenset({"_general", "_mail", "_toolchain"})
def _validate_project(project_id: str) -> str:
"""校验 project_id"""
if project_id in _VIRTUAL_PROJECTS:
return project_id
reg = ProjectRegistry(get_data_root())
if reg.get_project(project_id):
return project_id
raise HTTPException(400, {
"ok": False,
"error": "project_not_found",
"detail": f"Project '{project_id}' is not registered.",
"suggestions": [
f"Register first: POST /api/projects with id='{project_id}'",
"Or use '_general' for tasks without a specific project",
],
})
def _bb(project_id: str) -> Blackboard:
_validate_project(project_id)
return Blackboard(get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
_validate_project(project_id)
return Queries(get_data_root() / project_id / "blackboard.db")
def _task_to_dict(t: Task) -> Dict[str, Any]:
d = {k: v for k, v in t.__dict__.items() if v is not None}
return d
_KNOWN_AGENT_IDS: list = []
def _init_agent_ids():
"""从配置文件加载 Agent ID 列表"""
global _KNOWN_AGENT_IDS
if _KNOWN_AGENT_IDS:
return
try:
import yaml
import os
cfg_path = os.path.join(os.path.dirname(__file__), "..", "..", "config", "default.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
_KNOWN_AGENT_IDS = list(cfg.get("daemon", {}).get("agent_profiles", {}).keys())
except Exception:
_KNOWN_AGENT_IDS = []
def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
_init_agent_ids()
candidates = set(re.findall(r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)', text))
return [a for a in candidates if a in _KNOWN_AGENT_IDS]
+240
View File
@@ -0,0 +1,240 @@
"""Task 关联路由 — comments / outputs / decisions / observations / reviews / events / experiences / summary"""
from __future__ import annotations
import json
import os
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.models import Review
from src.blackboard.db import OUTPUT_TYPES
from src.api.shared import (
_bb,
_q,
_extract_mentions,
)
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
# --------------------------------------------------------------------------- #
# Comments
# --------------------------------------------------------------------------- #
@router.get("/tasks/{task_id}/comments")
async def get_comments(project_id: str, task_id: str,
comment_type: Optional[str] = None):
bb = _bb(project_id)
comments = bb.get_comments(task_id, comment_type=comment_type)
return {"comments": [dict(c.__dict__) for c in comments]}
@router.post("/tasks/{task_id}/comments")
async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
mentions_raw = body.get("mentions")
comment_body = body["body"]
# #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集
auto_mentions = _extract_mentions(comment_body)
if isinstance(mentions_raw, str):
try:
explicit_mentions = json.loads(mentions_raw)
except Exception:
explicit_mentions = []
elif isinstance(mentions_raw, list):
explicit_mentions = mentions_raw
else:
explicit_mentions = []
merged_mentions = list(set(explicit_mentions + auto_mentions))
cid = bb.add_comment(task_id, body["author"], comment_body,
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
if merged_mentions:
bb.record_mentions(cid, task_id, merged_mentions)
# #10: SSE 通知前端黑板有新 comment
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("comment_added", {
"project_id": project_id,
"task_id": task_id,
"comment_id": cid,
"author": body["author"],
})
except Exception:
pass
return {"ok": True, "comment_id": cid, "mentions": merged_mentions}
# --------------------------------------------------------------------------- #
# Outputs
# --------------------------------------------------------------------------- #
@router.get("/tasks/{task_id}/outputs")
async def get_outputs(project_id: str, task_id: str):
bb = _bb(project_id)
return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]}
@router.post("/tasks/{task_id}/outputs")
async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
# 字段校验 + Agent-friendly 错误
agent = body.get("agent")
if not agent:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: agent",
"hint": "Provide your agent ID, e.g. 'zhangfei-dev'",
})
# type 字段:接受 type 或 content_type(别名兼容)
output_type = body.get("type") or body.get("content_type")
valid_types = sorted(OUTPUT_TYPES)
if not output_type:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: type",
"valid_values": {"type": valid_types},
"hint": "Use 'type' field. Also accepts 'content_type' as alias.",
})
if output_type not in OUTPUT_TYPES:
raise HTTPException(422, {
"error": "validation_failed",
"detail": f"Invalid type: '{output_type}'",
"valid_values": {"type": valid_types},
})
title = body.get("title")
if not title:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: title",
"hint": "Provide a brief title describing this output",
})
# 内容模式:content(直传)或 content_path(引用)
content = body.get("content")
content_path = body.get("content_path") or body.get("path")
if content and not content_path:
# 内容直传模式:自动写文件
artifacts_dir = os.path.join(
os.path.dirname(bb.db_path), "artifacts", task_id
)
os.makedirs(artifacts_dir, exist_ok=True)
# 安全文件名
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
content_path = file_path
oid = bb.write_output(
task_id, agent, output_type, title,
content_path=content_path,
summary=body.get("summary"),
metadata=body.get("metadata"),
)
return {"ok": True, "output_id": oid}
# --------------------------------------------------------------------------- #
# Decisions
# --------------------------------------------------------------------------- #
@router.get("/tasks/{task_id}/decisions")
async def get_decisions(project_id: str, task_id: str):
bb = _bb(project_id)
return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]}
@router.post("/tasks/{task_id}/decisions")
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
# --------------------------------------------------------------------------- #
# Observations
# --------------------------------------------------------------------------- #
@router.post("/tasks/{task_id}/observations")
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
# --------------------------------------------------------------------------- #
# Reviews
# --------------------------------------------------------------------------- #
@router.get("/tasks/{task_id}/reviews")
async def get_reviews(project_id: str, task_id: str):
bb = _bb(project_id)
return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]}
@router.post("/tasks/{task_id}/reviews")
async def add_review(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
review = Review(
id=body["id"], task_id=task_id, reviewer=body["reviewer"],
review_type=body["review_type"], verdict=body["verdict"],
summary=body["summary"], confidence=body.get("confidence"),
round=body.get("round", 1), max_rounds=body.get("max_rounds", 3),
)
bb.add_review(review)
return {"ok": True, "review_id": review.id}
# --------------------------------------------------------------------------- #
# Per-task Events & Experiences
# --------------------------------------------------------------------------- #
@router.get("/tasks/{task_id}/events")
async def get_task_events(project_id: str, task_id: str,
limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.task_events(task_id, limit)}
@router.get("/tasks/{task_id}/experiences")
async def get_task_experiences(project_id: str, task_id: str):
q = _q(project_id)
return {"experiences": q.task_experiences(task_id)}
# --------------------------------------------------------------------------- #
# Global Events
# --------------------------------------------------------------------------- #
@router.get("/events")
async def get_events(project_id: str, limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.recent_events(limit)}
# --------------------------------------------------------------------------- #
# Summary
# --------------------------------------------------------------------------- #
@router.get("/summary")
async def task_summary(project_id: str):
q = _q(project_id)
return {"summary": q.task_summary()}
@@ -1,68 +1,45 @@
"""API 路由 — 黑板 CRUD"""
"""Task 核心路由 — CRUD、状态、归档"""
from __future__ import annotations
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task, Review
from src.blackboard.queries import Queries
from src.blackboard.db import VALID_STATUSES, OUTPUT_TYPES
from src.blackboard.registry import ProjectRegistry
from fastapi import APIRouter, HTTPException
from src.blackboard.models import Task
from src.blackboard.db import VALID_STATUSES, VALID_TRANSITIONS
from src.utils import get_data_root
from src.api.shared import (
_bb,
_q,
_task_to_dict,
_extract_mentions,
)
router = APIRouter(prefix="/api/projects/{project_id}", tags=["blackboard"])
# 虚拟项目白名单(不需要在 registry 注册)
_VIRTUAL_PROJECTS = frozenset({"_general", "_mail"})
def _validate_project(project_id: str) -> str:
"""校验 project_id,已知项目/虚拟项目放行,未知项目返回 400"""
if project_id in _VIRTUAL_PROJECTS:
return project_id
reg = ProjectRegistry(get_data_root())
if reg.get_project(project_id):
return project_id
raise HTTPException(400, {
"ok": False,
"error": "project_not_found",
"detail": f"Project '{project_id}' is not registered.",
"suggestions": [
f"Register first: POST /api/projects with id='{project_id}'",
"Or use '_general' for tasks without a specific project",
],
})
def _bb(project_id: str) -> Blackboard:
_validate_project(project_id)
return Blackboard(get_data_root() / project_id / "blackboard.db")
def _q(project_id: str) -> Queries:
_validate_project(project_id)
return Queries(get_data_root() / project_id / "blackboard.db")
# --- Tasks ---
# --------------------------------------------------------------------------- #
# Tasks
# --------------------------------------------------------------------------- #
@router.get("/tasks")
async def list_tasks(project_id: str,
status: Optional[str] = None,
assignee: Optional[str] = None,
parent_task: Optional[str] = None):
parent_task: Optional[str] = None,
q: Optional[str] = None):
bb = _bb(project_id)
tasks = bb.list_tasks(
status=status,
assignee=assignee,
parent_task=parent_task)
tasks = bb.list_tasks(status=status, assignee=assignee, parent_task=parent_task)
if q:
q_lower = q.lower()
tasks = [t for t in tasks if q_lower in (t.title or "").lower()]
return {"tasks": [_task_to_dict(t) for t in tasks]}
@@ -74,6 +51,11 @@ async def get_task(project_id: str, task_id: str,
if not task:
raise HTTPException(404, f"Task not found: {task_id}")
result = _task_to_dict(task)
if not expand:
return result
# expand=all: 保持旧格式(list + 聚合字段),向后兼容前端 TaskModal
if expand == "all":
q = _q(project_id)
detail = q.task_detail(task_id)
@@ -90,6 +72,37 @@ async def get_task(project_id: str, task_id: str,
for d in bb.get_decisions(task_id)]
result["events"] = q.task_events(task_id)
result["experiences"] = q.task_experiences(task_id)
return result
# 细粒度 expand: 新格式(comments/events 带 limit + total_count
expand_list = expand.split(",")
q = _q(project_id)
if "comments" in expand_list:
all_comments = bb.get_comments(task_id)
result["comments"] = {
"items": [dict(c.__dict__) for c in all_comments[-20:]],
"total_count": len(all_comments),
"limit": 20,
}
if "events" in expand_list:
all_events = q.task_events(task_id, limit=99999)
result["events"] = {
"items": all_events[-30:],
"total_count": len(all_events),
"limit": 30,
}
if "outputs" in expand_list:
result["outputs"] = [dict(o.__dict__) for o in bb.get_outputs(task_id)]
if "reviews" in expand_list:
result["reviews"] = [dict(r.__dict__) for r in bb.get_reviews(task_id)]
if "decisions" in expand_list:
result["decisions"] = [dict(d.__dict__) for d in bb.get_decisions(task_id)]
return result
@@ -100,11 +113,9 @@ async def create_task(project_id: str, body: Dict[str, Any]):
task_id = body.get("id")
if not task_id:
import re
from datetime import datetime
prefix = re.sub(r'[^a-z0-9]', '-', project_id.lower()).strip('-')[:20]
date_str = datetime.now().strftime('%Y%m%d')
# seq: 查当前项目最大 seq
import sqlite3
db_path = get_data_root() / project_id / "blackboard.db"
try:
conn = sqlite3.connect(str(db_path), timeout=5)
@@ -237,7 +248,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
})
# 检查转换是否合法
from src.blackboard.db import VALID_TRANSITIONS
current = old_task.status
allowed = VALID_TRANSITIONS.get(current, set())
if new_status not in allowed:
@@ -271,220 +281,6 @@ async def update_status(project_id: str, task_id: str, body: Dict[str, Any]):
return {"ok": True, "old_status": current, "new_status": new_status}
# --- @mention 自动提取(#04 ---
_KNOWN_AGENT_IDS: list = []
def _init_agent_ids():
"""从配置文件加载 Agent ID 列表"""
global _KNOWN_AGENT_IDS
if _KNOWN_AGENT_IDS:
return
try:
import yaml
cfg_path = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"config",
"default.yaml")
with open(cfg_path) as f:
cfg = yaml.safe_load(f)
_KNOWN_AGENT_IDS = list(
cfg.get(
"daemon",
{}).get(
"agent_profiles",
{}).keys())
except Exception:
_KNOWN_AGENT_IDS = []
def _extract_mentions(text: str) -> list:
"""从文本中自动提取 @agent-id 格式的 mention"""
import re
_init_agent_ids()
candidates = set(
re.findall(
r'@([a-z][a-z0-9]*(?:-[a-z][a-z0-9]*)+)',
text))
return [a for a in candidates if a in _KNOWN_AGENT_IDS]
# --- Comments ---
@router.get("/tasks/{task_id}/comments")
async def get_comments(project_id: str, task_id: str,
comment_type: Optional[str] = None):
bb = _bb(project_id)
comments = bb.get_comments(task_id, comment_type=comment_type)
return {"comments": [dict(c.__dict__) for c in comments]}
@router.post("/tasks/{task_id}/comments")
async def add_comment(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
mentions_raw = body.get("mentions")
comment_body = body["body"]
# #04: 自动从 body 提取 @mention,与显式传的 mentions 取并集
auto_mentions = _extract_mentions(comment_body)
if isinstance(mentions_raw, str):
try:
explicit_mentions = json.loads(mentions_raw)
except Exception:
explicit_mentions = []
elif isinstance(mentions_raw, list):
explicit_mentions = mentions_raw
else:
explicit_mentions = []
merged_mentions = list(set(explicit_mentions + auto_mentions))
cid = bb.add_comment(task_id, body["author"], comment_body,
comment_type=body.get("comment_type", "general"),
mentions=merged_mentions)
if merged_mentions:
bb.record_mentions(cid, task_id, merged_mentions)
# #10: SSE 通知前端黑板有新 comment
try:
from src.api.sse_routes import get_broker
broker = get_broker()
broker.publish_sync("comment_added", {
"project_id": project_id,
"task_id": task_id,
"comment_id": cid,
"author": body["author"],
})
except Exception:
pass
return {"ok": True, "comment_id": cid, "mentions": merged_mentions}
# --- Outputs ---
@router.get("/tasks/{task_id}/outputs")
async def get_outputs(project_id: str, task_id: str):
bb = _bb(project_id)
return {"outputs": [dict(o.__dict__) for o in bb.get_outputs(task_id)]}
@router.post("/tasks/{task_id}/outputs")
async def write_output(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
# 字段校验 + Agent-friendly 错误
agent = body.get("agent")
if not agent:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: agent",
"hint": "Provide your agent ID, e.g. 'zhangfei-dev'",
})
# type 字段:接受 type 或 content_type(别名兼容)
output_type = body.get("type") or body.get("content_type")
valid_types = sorted(OUTPUT_TYPES)
if not output_type:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: type",
"valid_values": {"type": valid_types},
"hint": "Use 'type' field. Also accepts 'content_type' as alias.",
})
if output_type not in OUTPUT_TYPES:
raise HTTPException(422, {
"error": "validation_failed",
"detail": f"Invalid type: '{output_type}'",
"valid_values": {"type": valid_types},
})
title = body.get("title")
if not title:
raise HTTPException(422, {
"error": "validation_failed",
"detail": "Missing required field: title",
"hint": "Provide a brief title describing this output",
})
# 内容模式:content(直传)或 content_path(引用)
content = body.get("content")
content_path = body.get("content_path") or body.get("path")
if content and not content_path:
# 内容直传模式:自动写文件
import os
artifacts_dir = os.path.join(
os.path.dirname(bb.db_path), "artifacts", task_id
)
os.makedirs(artifacts_dir, exist_ok=True)
# 安全文件名
safe_name = "".join(
c if c.isalnum() or c in "._-" else "_" for c in title)
if not safe_name:
safe_name = "output"
file_path = os.path.join(artifacts_dir, safe_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
content_path = file_path
oid = bb.write_output(
task_id, agent, output_type, title,
content_path=content_path,
summary=body.get("summary"),
metadata=body.get("metadata"),
)
return {"ok": True, "output_id": oid}
# --- Decisions ---
@router.get("/tasks/{task_id}/decisions")
async def get_decisions(project_id: str, task_id: str):
bb = _bb(project_id)
return {"decisions": [dict(d.__dict__) for d in bb.get_decisions(task_id)]}
@router.post("/tasks/{task_id}/decisions")
async def add_decision(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
did = bb.add_decision(task_id, body["decider"], body["decision"],
body["rationale"],
alternatives=body.get("alternatives"))
return {"ok": True, "decision_id": did}
# --- Observations ---
@router.post("/tasks/{task_id}/observations")
async def add_observation(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
oid = bb.add_observation(task_id, body["observer"], body["body"],
severity=body.get("severity", "info"))
return {"ok": True, "observation_id": oid}
# --- Reviews ---
@router.get("/tasks/{task_id}/reviews")
async def get_reviews(project_id: str, task_id: str):
bb = _bb(project_id)
return {"reviews": [dict(r.__dict__) for r in bb.get_reviews(task_id)]}
@router.post("/tasks/{task_id}/reviews")
async def add_review(project_id: str, task_id: str, body: Dict[str, Any]):
bb = _bb(project_id)
review = Review(
id=body["id"], task_id=task_id, reviewer=body["reviewer"],
review_type=body["review_type"], verdict=body["verdict"],
summary=body["summary"], confidence=body.get("confidence"),
round=body.get("round", 1), max_rounds=body.get("max_rounds", 3),
)
bb.add_review(review)
return {"ok": True, "review_id": review.id}
@router.patch("/tasks/{task_id}")
async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
"""更新任务元数据(归档、标题等)"""
@@ -497,7 +293,6 @@ async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
if not updates:
return {"ok": True}
# 直接用 SQL 更新
import sqlite3
conn = sqlite3.connect(str(bb.db_path), timeout=5)
try:
set_clause = ", ".join(f"{k}=?" for k in updates)
@@ -509,38 +304,9 @@ async def patch_task(project_id: str, task_id: str, body: Dict[str, Any]):
return {"ok": True}
# --- Per-task Events & Experiences ---
@router.get("/tasks/{task_id}/events")
async def get_task_events(project_id: str, task_id: str,
limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.task_events(task_id, limit)}
@router.get("/tasks/{task_id}/experiences")
async def get_task_experiences(project_id: str, task_id: str):
q = _q(project_id)
return {"experiences": q.task_experiences(task_id)}
# --- Global Events ---
@router.get("/events")
async def get_events(project_id: str, limit: int = Query(50, le=200)):
q = _q(project_id)
return {"events": q.recent_events(limit)}
# --- Summary ---
@router.get("/summary")
async def task_summary(project_id: str):
q = _q(project_id)
return {"summary": q.task_summary()}
# --- Archive (v2.8) ---
# --------------------------------------------------------------------------- #
# Archive (v2.8)
# --------------------------------------------------------------------------- #
@router.post("/tasks/{task_id}/archive")
async def archive_task(project_id: str, task_id: str,
@@ -563,10 +329,3 @@ async def archive_done_tasks(project_id: str):
bb = _bb(project_id)
count = bb.archive_done_tasks()
return {"ok": True, "archived_count": count}
# --- Helper ---
def _task_to_dict(t: Task) -> Dict[str, Any]:
d = {k: v for k, v in t.__dict__.items() if v is not None}
return d
+286 -17
View File
@@ -50,7 +50,15 @@ router = APIRouter(tags=["toolchain"])
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
_idempotency_lock: Optional[asyncio.Lock] = None
def _get_idempotency_lock() -> asyncio.Lock:
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
global _idempotency_lock
if _idempotency_lock is None:
_idempotency_lock = asyncio.Lock()
return _idempotency_lock
def _is_duplicate(event: str, delivery: str,
@@ -189,6 +197,7 @@ def _calc_risk_level(changed_files: List[str]) -> str:
MAIL_PROJECT_ID = "_mail"
TOOLCHAIN_PROJECT_ID = "_toolchain"
def _mail_db_path() -> Path:
@@ -200,6 +209,73 @@ def _mail_db_path() -> Path:
return db
def _toolchain_db_path() -> Path:
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
root = get_data_root()
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_toolchain_task(
to_agent: str,
title: str,
description: str,
event_type: str,
action_type: str,
steps: list,
context_data: dict | None = None,
source: str = "webhook",
) -> str:
"""创建 Toolchain Task 并写入 _toolchain DB。
Args:
to_agent: 收件人 Agent ID
title: 任务标题
description: 任务描述模板渲染后的事件信息
event_type: 事件类型review_result / ci_failure / ...
action_type: 动作分类用于步骤选择和日志统计
steps: 结构化编号步骤列表
context_data: 事件上下文数据PR 仓库名等
source: 来源标识
Returns:
创建的 Task ID
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
return ""
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": event_type,
"action_type": action_type,
"steps": steps,
"context": context_data or {},
"from": "system",
"source": source,
}, ensure_ascii=False)
task = Task(
id=task_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=must_hives,
task_type="toolchain",
status="pending",
)
bb = Blackboard(_toolchain_db_path())
bb.create_task(task)
logger.info(
"Toolchain task sent: %s%s [%s] action_type=%s",
title[:40], to_agent, task_id, action_type,
)
return task_id
def _send_mail(
to_agent: str,
title: str,
@@ -327,7 +403,25 @@ async def _send_mention_mails(
})
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="mention",
action_type="mention",
steps=[
"按上方 mention 模板中的 response_guidance 执行",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"content_snippet": content[:500],
"repo": repo,
"issue_number": issue_number,
},
)
# ---------------------------------------------------------------------------
@@ -342,6 +436,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
await _handle_pr_opened(payload)
elif action == "closed":
await _handle_pr_closed(payload)
elif action == "synchronize":
await _handle_pr_synchronize(payload)
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
@@ -377,7 +473,27 @@ async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
})
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_mail("simayi-challenger", title, text)
_send_toolchain_task(
to_agent="simayi-challenger",
title=title,
description=text,
event_type="review_request",
action_type="review_request",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"按审查清单审查(参考 code-review Skill",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
},
)
# S3: PR body @mention 通知
pr_body = pr.get("body", "") or ""
@@ -486,7 +602,25 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_comment",
action_type="review_comment",
steps=[
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments",
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
},
)
# S5: Review body @mention 通知(COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -508,7 +642,34 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
if state == "APPROVED":
tc_steps = [
f"合并 PRGitea API: POST /repos/{repo}/pulls/{pr_number}/merge",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
else: # REQUEST_CHANGES
tc_steps = [
"按审查意见逐条修改代码",
"push 到原分支 → CI 自动跑",
"CI 通过后等重新 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_result",
action_type="review_result",
steps=tc_steps,
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"result": result,
"reviewer": reviewer,
"review_body": review_body,
},
)
# S5: Review body @mention 通知(非 COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -577,11 +738,31 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
})
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_mail(reviewer, title, text)
_send_toolchain_task(
to_agent=reviewer,
title=title,
description=text,
event_type="review_updated",
action_type="review_updated",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"重点检查上次 Review 意见的修改部分",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
},
)
def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,复用 deploy_failure 模板"""
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,走 ToolchainHandler。"""
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": f"PR #{pr_number}",
@@ -589,7 +770,25 @@ def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason:
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
full_text = f"{text}\n\n失败原因: {reason}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, full_text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=full_text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"pr_number": pr_number,
"pr_title": pr_title,
"reason": reason,
},
)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
@@ -621,7 +820,21 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
})
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_merged",
action_type="review_merged",
steps=[], # 纯通知,无步骤
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
},
)
# 自动部署:git pull + rsync + 按需 post_deploy
try:
@@ -674,7 +887,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if rsync_proc.returncode != 0:
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
return
# Step 3: 判断是否需要执行 post_deploy
@@ -729,7 +942,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if deploy_proc.returncode != 0:
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
break
else:
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
@@ -738,7 +951,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
except asyncio.TimeoutError:
logger.error("Auto-deploy: timeout for %s", repo)
_send_deploy_failure_mail(repo, pr_number, pr_title, "部署超时")
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时")
except Exception as e:
logger.error("Auto-deploy: unexpected error: %s", e)
@@ -785,7 +998,29 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
})
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_mail(assignee, title, text)
_send_toolchain_task(
to_agent=assignee,
title=title,
description=text,
event_type="issue_assigned",
action_type="issue_assigned",
steps=[
f"创建分支 fix/{issue_number}-{brief}",
"编码 + 写 UT",
"push → 等 CI",
f"CI 通过后创建 PRGitea API: POST /repos/{repo}/pulls",
"等 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"issue_number": issue_number,
"repo": repo,
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
},
)
elif action == "opened":
if "部署失败" in issue_title:
@@ -800,7 +1035,23 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"commit_sha": commit_sha or "(未知)",
},
)
# Issue body @mentionopened 时检查)
issue_body = issue.get("body", "") or ""
@@ -867,7 +1118,25 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
})
title = f"CI 失败: {repo}#{issue_number}"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="ci_failure",
action_type="ci_failure",
steps=[
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
"修复失败的测试",
"push → CI 自动重跑",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": issue_number,
"repo": repo,
"branch": branch,
"error_summary": error_summary,
},
)
# CI 处理完不 return,继续检查 @mention
# === 路径 2:@mention 通知(新增,独立路径) ===
@@ -958,7 +1227,7 @@ async def gitea_webhook(
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
async with _get_idempotency_lock():
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",
+2 -1
View File
@@ -209,6 +209,7 @@ VALID_TRANSITIONS = {
COMMENT_TYPES = frozenset({
"general", "handoff", "observation", "review", "rebuttal",
"rebuttal_response", "debate_argument", "debate_rebuttal", "debate_judgment",
"action_report",
})
SEVERITY_LEVELS = frozenset({"blocking", "warning", "info", "audit"})
@@ -293,7 +294,7 @@ _SCHEMA_STATEMENTS = [
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id),
author TEXT NOT NULL,
comment_type TEXT NOT NULL DEFAULT 'general' CHECK (comment_type IN ('general','handoff','observation','review','rebuttal','rebuttal_response','debate_argument','debate_rebuttal','debate_judgment')),
comment_type TEXT NOT NULL DEFAULT 'general',
body TEXT NOT NULL,
mentions TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
+1 -1
View File
@@ -208,7 +208,7 @@ class Blackboard:
params.append(parent_task)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY priority ASC, created_at ASC"
query += " ORDER BY priority ASC, created_at DESC"
rows = conn.execute(query, params).fetchall()
return [Task.from_row(r) for r in rows]
finally:
+2 -2
View File
@@ -9,7 +9,7 @@ import logging
from pathlib import Path
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.mail")
@@ -36,7 +36,7 @@ class MailHandler(BaseTaskHandler):
return composer.compose(context)
def get_sections(self) -> list:
return [MailContextSection(), MailApiSection(), MailConstraintsSection()]
return [MailContextSection(), MailApiSection(), MailConstraintsSection(), GiteaConventionSection(), WikiGuideSection()]
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""Mail 完成验证:区分 inform/request。
+117 -21
View File
@@ -1,4 +1,4 @@
"""Mail 失败通知 — 以 system 身份通知发件人"""
"""Mail 失败通知 v2.0 — 以 system 身份通知发件人AI Native"""
from __future__ import annotations
@@ -6,7 +6,7 @@ import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from typing import Dict, Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
@@ -15,21 +15,121 @@ from src.config.agents import AGENT_IDS
logger = logging.getLogger(__name__)
# 邮件通知正文模板(统一模板,包含所有可能的失败原因和建议)
_NOTIFY_TEMPLATE = """你的邮件投递失败了。
# ── Reason 人话翻译 + detail 提取 ──────────────────────────────
📧 原始邮件{title}
👤 收件人{to_agent}
失败原因{reason}
def _extract_stderr(detail: dict, max_len: int = 200) -> str:
"""从 detail 中提取 stderr_preview"""
preview = (detail or {}).get("stderr_preview", "")
if preview and len(preview) > max_len:
preview = preview[:max_len] + "..."
return preview
常见失败原因及处理建议
no_reply_found收件人未回复建议重发邮件或通过黑板任务方式联系
auth_failed收件人认证失败需检查 Agent 配置联系姜维(jiangwei-infra)排查
crash_limit收件人处理时多次崩溃系统异常建议稍后重试
task_timeout处理超时建议重发或通过其他方式联系
其他原因建议联系副军师(pangtong-fujunshi)排查
系统自动通知"""
def _fmt_retry_info(reason: str, detail: dict) -> str:
"""格式化重试情况描述"""
_NO_RETRY_REASONS = {
"no_reply_found", "auth_failed", "agent_error",
"agent_failed", "compact_failed",
}
if reason in _NO_RETRY_REASONS:
reason_human = _REASON_MAP.get(reason, _REASON_MAP.get("_default", ("未知原因", lambda d: "")))[0]
return f"无法重试({reason_human}"
count = (detail or {}).get("count", 0)
fallback_count = (detail or {}).get("fallback_count", 0)
if count > 0:
return f"已自动重试 {count}"
if fallback_count > 0:
return f"已自动重试 {fallback_count} 次(fallback"
return "系统已尝试恢复,但仍失败"
# reason_raw → (reason_human_readable, detail_format_fn)
_REASON_MAP: Dict[str, tuple] = {
"no_reply_found": ("收件人未回复(Agent 未能识别或处理此邮件)", lambda d: ""),
"crashed": ("处理时进程崩溃", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else "无 stderr 输出"),
"max_crash_count": ("连续崩溃达上限", lambda d: f"崩溃 {d.get('count', '?')}"),
"max_retries": ("续杯耗尽(已自动重试)", lambda d: f"重试 {d.get('count', '?')}"),
"max_api_retry_count": ("API 连续失败达上限", lambda d: f"API 重试 {d.get('count', '?')}"),
"max_monitor_timeouts": (
"处理超时达上限",
lambda d: f"超时 {d.get('count', '?')} 次,"
f"共约 {d.get('elapsed_seconds', 0) // 60} 分钟"),
"gateway_timeout": ("Agent 执行超时(已续杯重试)", lambda d: ""),
"session_stuck": ("会话假死(lock PID 死亡)", lambda d: f"假死 {d.get('stuck_count', '?')}"),
"revive_failed": ("会话恢复失败", lambda d: f"假死 {d.get('stuck_count', '?')}"),
"auth_failed": ("Agent 认证失败(配置问题)", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"fallback_exhausted": (
"主模型和备用模型均失败",
lambda d: f"fallback {d.get('fallback_count', '?')} 次,"
f"原因: {d.get('fallback_reason', '未知')}"),
"agent_error": (
"Agent 内部错误",
lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"agent_failed": ("收件人主动标记失败", lambda d: ""),
"compact_failed": ("上下文压缩失败", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"compact_hanging": ("上下文压缩长时间未完成", lambda d: ""),
"compact_interrupted": ("上下文压缩被中断(已自动重试)", lambda d: ""),
"gateway_unreachable": (
"Gateway 不可达(已自动重试)",
lambda d: f"stderr: {_extract_stderr(d)}"
if _extract_stderr(d) else ""),
"lock_conflict": ("会话锁冲突(已自动重试)", lambda d: ""),
"max_retry_count": ("重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"max_lock_retry_count": ("锁冲突重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"max_connect_retry_count": ("连接重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"_default": ("未知原因", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
}
# 常见失败原因参考(AI Native:提供知识库让收件 AI 自行判断)
_REASON_REFERENCE = """常见失败原因参考:
no_reply_found收件人未回复Agent 未能识别或处理此邮件
crashed / max_crash_count收件人处理时进程崩溃已自动重试 3
max_retries续杯耗尽已自动重试 3 共约 34 分钟
max_api_retry_countAPI 连续失败达上限rate_limit/500/503
max_monitor_timeouts处理超时达上限共约 31.5 分钟
gateway_timeoutAgent 执行超时已续杯重试
session_stuckAgent 会话假死lock PID 死亡revive 失败
revive_failed会话假死后恢复失败
auth_failedAgent 认证失败配置问题
fallback_exhausted主模型和备用模型均失败
agent_failed收件人主动标记失败
compact_failed上下文压缩失败
compact_hanging上下文压缩长时间未完成等待超 31.5 分钟
compact_interrupted上下文压缩被中断已自动重试 3
gateway_unreachableGateway 不可达已自动重试 3
lock_conflict会话锁冲突已自动重试 3
其他建议排查系统日志"""
def _build_notify_text(title: str, to_agent: str, reason: str,
detail: Optional[dict] = None) -> str:
"""构建通知正文(v2.0 AI Native"""
reason_human, detail_fn = _REASON_MAP.get(reason, _REASON_MAP["_default"])
detail_info = detail_fn(detail or {})
retry_info = _fmt_retry_info(reason, detail or {})
lines = [
"邮件投递失败通知",
"",
f"📧 原始邮件:「{title}",
f"👤 收件人:{to_agent}",
f"❌ 失败原因:{reason_human}{reason}",
f"📊 重试情况:{retry_info}",
]
if detail_info:
lines.append("📋 上下文信息:")
lines.append(f" {detail_info}")
lines.append("")
lines.append(_REASON_REFERENCE)
lines.append("")
lines.append("——系统自动通知")
return "\n".join(lines)
def _is_mail_project(db_path: Path) -> bool:
@@ -43,7 +143,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
"""Mail 失败后以 system 身份给发件人发通知邮件
直接通过 Blackboard 创建 Task不走 HTTP API
防递归检查原邮件 must_hives.system_notify true 则跳过
防递归检查原邮件 must_haves.system_notify true 则跳过
发件人不是有效 Agent system 通知庞统代处理避免广播风暴
"""
try:
@@ -83,12 +183,8 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
original_mail_id, from_agent)
target_agent = "pangtong-fujunshi"
# 构造通知正文
text = _NOTIFY_TEMPLATE.format(
title=title,
to_agent=to_agent,
reason=reason,
)
# 构造通知正文v2.0 AI Native
text = _build_notify_text(title, to_agent, reason, detail)
# 创建通知邮件 Task
notify_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
+49
View File
@@ -65,6 +65,8 @@ class PromptContext:
# toolchain 专用
event_type: str = "" # ci_failure / review_request / ...
event_data: Dict = field(default_factory=dict)
action_type: str = "" # 动作分类(review_result / ci_failure / ...
action_steps: list = field(default_factory=list) # 结构化编号步骤列表
# 前序产出
depends_on_outputs: Optional[List] = None
@@ -125,3 +127,50 @@ class PromptComposer:
)
return result
# ---------------------------------------------------------------------------
class GiteaConventionSection:
"""Gitea 标题规范引导段 — 提醒 Agent 创建 Issue/PR 时遵循标题格式。"""
name: str = "gitea_convention"
priority: int = 55 # CONSTRAINTS(50) 和 EXTENSION(60) 之间
CONVENTION_TEXT = (
"## Gitea 标题规范\n"
"创建 Issue/PR 时,标题**必须**包含项目代号前缀:\n"
"- Issue: `[代号] type: 简述`,如 `[moz] bug: Mail API 500`\n"
"- PR: `[代号] type(scope): 简述`,如 `[moz] impl(daemon): WikiGuideSection 注入`\n"
"代号:moz=moziplus_v2, quant=quant_live, vnpy=vnpy\n"
"type: bug/feat/impl/fix/docs/test/ci/refactor/chore"
)
def render(self, context: "PromptContext") -> str:
return self.CONVENTION_TEXT
def should_include(self, context: "PromptContext") -> bool:
return True
# ---------------------------------------------------------------------------
# WikiGuideSection — 知识查询引导段
# ---------------------------------------------------------------------------
class WikiGuideSection:
"""知识查询引导段 — 引导 Agent 在关键决策点查 wiki-vault。"""
name: str = "wiki_guide"
priority: int = 60 # PRIORITY_EXTENSION
WIKI_GUIDE = (
"## 知识查询引导\n"
"涉及方案设计、编码实现、故障排查时,先查 wiki-vault 相关实践:\n"
"- 路径:/Volumes/KnowledgeBase/wiki-vault/\n"
"- 速查:index.md → grep 关键词 → summary 字段 → 按需读全文\n"
"- 查不到:在 _meta/knowledge-gaps.md 记录"
)
def render(self, context: "PromptContext") -> str:
return self.WIKI_GUIDE
def should_include(self, context: "PromptContext") -> bool:
return True
+10 -1
View File
@@ -286,10 +286,15 @@ class AgentSpawner:
# 从 must_haves 解析 mail 元数据(from / performative
from_agent = ""
mail_type = ""
action_type = ""
action_steps = []
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", "")
mail_type = meta.get("performative", meta.get("type", ""))
# toolchain 字段提取
action_type = meta.get("action_type", "")
action_steps = meta.get("steps", [])
except Exception:
pass
ctx = PromptContext(
@@ -298,6 +303,7 @@ class AgentSpawner:
agent_id=agent_id, role=spawn_type,
spawn_type=spawn_type,
from_agent=from_agent, mail_type=mail_type,
action_type=action_type, action_steps=action_steps,
)
return handler.build_prompt(ctx)
@@ -845,6 +851,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
cls.get("retry_field", "retry_count")
)
elif outcome == "api_error":
# A9: [DEPRECATED] api_error 已改为 should_retry=True 走续杯路径。
# 此分支理论上不再命中,保留作为安全兜底。
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
# 有上限:api_retry_count 累计达 max_retries 则标 failed
await self._do_on_complete_async(on_complete, agent_id, outcome)
@@ -1842,7 +1850,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in [
"rate_limit", "500", "503", "api error"]):
return {"outcome": "api_error", "should_retry": False}
return {"outcome": "api_error", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
+4 -2
View File
@@ -10,7 +10,7 @@ from pathlib import Path
from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler")
@@ -306,13 +306,15 @@ class TaskHandler(BaseTaskHandler):
return True
def get_sections(self) -> list:
"""返回 5 个 PromptSection 实例。"""
"""返回 PromptSection 实例。"""
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
GiteaConventionSection(),
WikiGuideSection(),
]
def build_prompt(self, context: PromptContext) -> str:
+359 -113
View File
@@ -1,29 +1,52 @@
"""toolchain_handler.py 工具链事件 handler。
"""toolchain_handler.py - 工具链事件 handler。
处理 Gitea Webhook 事件CI 失败Review 请求Issue 指派等
处理 Gitea Webhook 事件(CI 失败Review 请求Issue 指派等)
L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
from pathlib import Path
from typing import Dict
from typing import Dict, List
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection
from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Gitea API 配置
# ---------------------------------------------------------------------------
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
_GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
# action_type → action_hint 映射
_ACTION_HINTS: Dict[str, str] = {
"review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。",
"review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。",
"review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。",
"review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。",
"ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。",
"issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。",
"deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。",
"mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。",
"review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。",
"infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。",
}
# ---------------------------------------------------------------------------
# Toolchain PromptSections
# ---------------------------------------------------------------------------
class ToolchainContextSection:
"""事件类型 + 事件详情priority=10"""
"""事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)"""
name: str = "toolchain_context"
priority: int = 10
@@ -32,27 +55,44 @@ class ToolchainContextSection:
event_type = context.event_type
event_data: Dict = context.event_data or {}
# Part 1: 事件信息(现有模板引擎)
if event_type in _TEMPLATE_MAP:
# 使用模板引擎渲染已知事件
variables = {k: str(v) for k, v in event_data.items()}
return render_template(event_type, variables)
event_text = render_template(event_type, variables)
else:
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
event_text = "\n".join(lines)
# fallback:通用事件描述
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
return "\n".join(lines)
# Part 2: 结构化编号步骤(新增,从 action_steps 渲染)
steps: List[str] = context.action_steps or []
if steps:
step_lines = ["", "### 必须执行的步骤", ""]
for i, step in enumerate(steps, 1):
step_lines.append(f"{i}. {step}")
steps_text = "\n".join(step_lines)
else:
steps_text = ""
# Part 3: action 指引(新增,按 action_type 选择)
action_hint = _ACTION_HINTS.get(
context.action_type,
"你收到一个工具链事件,这是一个需要你执行动作的事件。",
)
return f"{action_hint}\n\n{event_text}{steps_text}"
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainApiSection:
"""API 操作指令priority=40),success_status=done"""
"""API 操作指令(priority=40)-- action_report 提交指引"""
name: str = "toolchain_api"
priority: int = 40
@@ -60,28 +100,48 @@ class ToolchainApiSection:
API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str:
task_id = context.task_id
project_id = context.project_id
agent_id = context.agent_id
lines = [
"## API 操作指令",
"",
f"项目 ID: `{context.project_id}`",
f"任务 ID: `{context.task_id}`",
f"项目 ID: `{project_id}`",
f"任务 ID: `{task_id}`",
"",
"### 完成后必须更新任务状态",
"完成后务必通过以下命令将任务标记为 **done**:",
"### 完成后必须提交 action report",
"",
"执行完所有步骤后,必须提交 action report:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\',
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"status": "done"}\'',
f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'',
"```",
"",
"⚠️ 不提交 action report 的任务会被标记为 failed。",
"",
"### 提交产出",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\',
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```",
"",
"### 需要其他角色支持时",
"",
"如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:",
"```bash",
f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\',
' -H "Authorization: token <your-token>" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'',
"```",
"",
"⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。",
"",
]
return "\n".join(lines)
@@ -90,20 +150,50 @@ class ToolchainApiSection:
class ToolchainConstraintsSection:
"""硬约束priority=50"""
"""硬约束 + Red Flags(priority=50)"""
name: str = "toolchain_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
lines = [
"## 硬约束",
"## 硬约束(必须遵守)",
"",
"1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成",
"2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态",
"3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务",
"4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done",
"5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务",
"⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。",
"",
"### 1. 必须按步骤执行",
'- 检查上方“必须执行的步骤”列表',
'- 逐条执行每个步骤,不可跳过',
'- 不要只读不做——这不是纯通知',
"",
"### 2. 必须提交 action report",
'- 执行完所有步骤后,必须提交 action report',
"- 提交方式:POST commentcomment_type='action_report'",
'- 报告内容:简要描述你执行了什么操作、结果如何',
'- ⚠️ 不提交 action report 的任务会被标记为 failed',
"",
"### 3. 不要执行任何状态转换命令",
'- 不要手动标 working/done/review/failed,系统会自动处理',
"",
"### 4. 不需要回复",
'- action report 就是你的完成凭证',
'- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成',
"",
"### 5. 所有协作通过 Gitea 完成",
'- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方',
'- 不要使用 Mail API(飞鸽传书)发送消息',
'- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕',
"",
"### Red Flags(如果脑海中出现以下想法,说明你错了)",
"",
'| Agent 想法 | Red Flag 驳回 |',
'|------------|--------------|',
'| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |',
'| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |',
'| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |',
'| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |',
'| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |',
'| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |',
"",
]
return "\n".join(lines)
@@ -127,15 +217,17 @@ class ToolchainHandler(BaseTaskHandler):
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
"""auto_working:pending → working"""
return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list:
"""返回 3 个 Toolchain PromptSection 实例"""
"""返回 Toolchain PromptSection 实例"""
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
GiteaConventionSection(),
WikiGuideSection(),
]
def build_prompt(self, context: PromptContext) -> str:
@@ -145,27 +237,55 @@ class ToolchainHandler(BaseTaskHandler):
return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查行动输出(output 或 comment 有实质内容)"""
"""检查 action report(精确验证)+ 三层 fallback"""
try:
conn = get_connection(db_path)
try:
# 检查 output
# 特殊处理:infrastructure_failure 始终通过(防递归)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
try:
meta = json.loads(row["must_haves"])
except Exception:
meta = {}
if meta.get("action_type") == "infrastructure_failure":
return VerifyResult(True, "infrastructure_passthrough",
"infrastructure_failure auto-pass")
# 特殊处理:review_merged 始终通过(纯通知)
if meta.get("action_type") == "review_merged":
return VerifyResult(True, "merged_passthrough",
"review_merged auto-pass")
# 1. 优先检查 action_report comment
report_row = conn.execute(
"SELECT id FROM comments WHERE task_id=? "
"AND comment_type='action_report' LIMIT 1",
(task_id,)
).fetchone()
if report_row:
return VerifyResult(True, "has_action_report", "action_report found")
# 2. fallback:检查 output(向后兼容)
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}")
# 检查 comment(非系统、有实质内容)
# 3. fallback:检查有实质内容的 comment(向后兼容)
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(content) >= 20",
"AND author != 'system' AND LENGTH(body) >= 20",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action", "output=0, comment=0")
return VerifyResult(False, "no_action",
"no action_report, no output, no valid comment")
finally:
conn.close()
except Exception as e:
@@ -174,32 +294,217 @@ class ToolchainHandler(BaseTaskHandler):
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 标 failed + Mail API 通知主公"""
"""验证失败 → 三分路处理(业务/系统/基础设施)"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason)
logger.info("Toolchain %s: verify failed (%s), marked failed",
task_id, verify.reason)
# 从 db 读取事件上下文
event_type = ""
event_data: Dict = {}
# 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取
meta = {}
assignee = agent_id
try:
conn = get_connection(db_path)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
"SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
event_type = meta.get("event_type", "")
raw = meta.get("event_data", "{}")
event_data = json.loads(raw) if isinstance(raw, str) else raw
if row:
if row["must_haves"]:
meta = json.loads(row["must_haves"])
assignee = row["assignee"] or agent_id
conn.close()
except Exception:
pass
self._notify_via_mail_api(
task_id, verify.reason, verify.evidence,
event_type, event_data,
action_type = meta.get("action_type", "")
context_data = meta.get("context", {})
# 三分路决策
route = self._classify_failure(verify)
if route == "business":
self._handle_business_failure(
task_id, agent_id, verify, action_type, context_data, assignee, db_path)
elif route == "system":
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else: # infrastructure
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _classify_failure(self, verify: VerifyResult) -> str:
"""分类失败类型:business / infrastructuresystem 通过升级到达)"""
# verify_error 或 DB 不可用 → 基础设施失败
if verify.reason == "verify_error":
return "infrastructure"
# 默认:业务失败
return "business"
def _handle_business_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, assignee: str,
db_path: Path,
) -> None:
"""业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee"""
repo = context_data.get("repo", "")
pr_number = context_data.get("pr_number") or context_data.get("issue_number", "")
if repo and pr_number:
comment_body = (
f"@{assignee or agent_id} 工具链任务执行失败\n\n"
f"任务 ID: {task_id}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"请检查黑板任务并处理。"
)
success = self._create_gitea_comment(repo, pr_number, comment_body)
if success:
logger.info("Toolchain %s: business failure → Gitea comment on %s#%s",
task_id, repo, pr_number)
return
# Gitea API failed → escalate to system failure
logger.warning(
"Toolchain %s: Gitea comment failed, escalating to system failure",
task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else:
# 没有 PR/Issue 关联 → fallback 到系统失败
logger.warning(
"Toolchain %s: no PR/Issue context for business failure, "
"escalating to system failure", task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
def _handle_system_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, db_path: Path,
) -> None:
"""系统失败 → 创建 Gitea Issue @pangtong-fujunshi"""
repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2")
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
body = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {action_type or '未知'}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"@pangtong-fujunshi 请检查黑板任务并手动处理。"
)
# 尝试在 Gitea 创建 Issue
created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"])
if created:
logger.info("Toolchain %s: system failure → Gitea Issue created on %s",
task_id, repo)
else:
# Gitea API 不可用 → 基础设施失败
logger.error(
"Toolchain %s: Gitea API unavailable, escalating to infrastructure failure",
task_id)
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _handle_infrastructure_failure(
self, task_id: str, agent_id: str,
verify: VerifyResult, db_path: Path,
) -> None:
"""基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)"""
try:
from datetime import datetime
new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": "infrastructure_failure",
"action_type": "infrastructure_failure",
"steps": [
"检查 Gitea 服务状态(http://192.168.2.154:3000)",
"检查网络连通性",
"恢复后提交 action report",
],
"context": {"original_task_id": task_id, "verify_reason": verify.reason},
"from": "system",
"source": "toolchain_handler_on_failure",
}, ensure_ascii=False)
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
new_task_id,
f"[基础设施] Gitea API 不可用 - {task_id}",
f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n"
f"请检查 Gitea 服务状态和网络连通性。",
"jiangwei-infra",
"system",
must_hives,
"toolchain",
"pending",
)
)
conn.commit()
conn.close()
logger.info(
"Toolchain %s: infrastructure failure → task %s created for jiangwei-infra",
task_id, new_task_id)
except Exception as e:
logger.error(
"Toolchain %s: failed to create infrastructure_failure task: %s",
task_id, e)
# -----------------------------------------------------------------------
# Gitea API 辅助
# -----------------------------------------------------------------------
def _create_gitea_comment(
self, repo: str, pr_number: int, body: str,
) -> bool:
"""在 PR/Issue 上创建 comment。返回是否成功。"""
if not _GITEA_TOKEN:
return False
payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e)
return False
def _create_gitea_issue(
self, repo: str, title: str, body: str,
assignees: list = None,
) -> bool:
"""创建 Gitea Issue。返回是否成功。"""
if not _GITEA_TOKEN:
return False
data = {"title": title, "body": body}
if assignees:
data["assignees"] = assignees
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea create issue failed on %s: %s", repo, e)
return False
# -----------------------------------------------------------------------
# 兼容:保留旧方法签名(但不再被 on_failure 调用)
# -----------------------------------------------------------------------
def _build_gitea_links(self, event_type: str, event_data: dict) -> str:
"""根据事件类型构建 Gitea 链接。"""
links = []
@@ -215,63 +520,4 @@ class ToolchainHandler(BaseTaskHandler):
if "branch" in event_data and "commit" not in event_data:
links.append(f"分支: {event_data['branch']}")
return "\n".join(links) if links else "无法提取链接请检查黑板任务详情"
def _notify_via_mail_api(
self,
task_id: str,
reason: str,
evidence: str,
event_type: str,
event_data: Dict,
) -> None:
"""通过 Mail API 发送丰富的失败通知给主公。"""
# 构建行动指引
action_hint = "请检查黑板任务并手动处理。"
et_lower = event_type.lower()
if "ci" in et_lower or "deploy" in et_lower:
action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。"
elif "review" in et_lower:
action_hint = "建议查看 PR review 状态,必要时通知相关开发者。"
elif "issue" in et_lower:
action_hint = "建议创建任务派给对应开发者处理 Issue。"
# 构建事件详情
event_details = ""
if event_data:
event_details = "\n".join(
f" - {k}: {v}" for k, v in event_data.items()
)
# 构建 Gitea 链接
gitea_links = self._build_gitea_links(event_type, event_data)
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
text = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {event_type or '未知'}\n"
f"事件详情:\n{event_details or ' (无)'}\n\n"
f"失败原因: {reason}\n"
f"证据: {evidence}\n\n"
f"{gitea_links}\n\n"
f"行动指引: {action_hint}"
)
payload = json.dumps({
"from": "daemon",
"to": "pangtong-fujunshi",
"title": title,
"text": text,
"type": "inform",
}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
"http://localhost:8083/api/mail",
data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
logger.info("Toolchain %s: sent failure notification via Mail API", task_id)
except Exception as e:
logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)
return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)"
@@ -5,6 +5,7 @@
import { useState, useCallback } from 'react';
import { api, AgentsStatusData } from '../api';
import ToolchainPanel from './ToolchainPanel';
interface ServiceCheckResult {
name: string;
@@ -15,7 +16,7 @@ interface ServiceCheckResult {
}
export default function SettingsPanel() {
const [tab, setTab] = useState<'connections' | 'security' | 'version' | 'logs'>('connections');
const [tab, setTab] = useState<'connections' | 'security' | 'version' | 'logs' | 'toolchain'>('connections');
// 接线状态巡检
const [checking, setChecking] = useState(false);
@@ -95,6 +96,7 @@ export default function SettingsPanel() {
{ key: 'security' as const, label: '🛡️ 安全防务' },
{ key: 'version' as const, label: '📦 版本更新' },
{ key: 'logs' as const, label: '📋 城防日志' },
{ key: 'toolchain' as const, label: '⛓️ 工具链' },
].map((t) => (
<button key={t.key} className={`btn ${tab === t.key ? 'btn-primary' : ''}`} onClick={() => setTab(t.key)}>
{t.label}
@@ -288,6 +290,9 @@ export default function SettingsPanel() {
</div>
</div>
)}
{/* ========== 工具链 ========== */}
{tab === 'toolchain' && <ToolchainPanel />}
</div>
);
}
@@ -0,0 +1,205 @@
/**
* ToolchainPanel
* _toolchain tasksCI/PR//Review
*/
import { useEffect, useState } from 'react';
const STATUS_COLORS: Record<string, string> = {
pending: '#f59e0b22', claimed: '#6a9eff22', working: '#6a9eff22',
review: '#818cf822', done: '#2ecc8a22', failed: '#ef444422',
cancelled: '#6b728022', blocked: '#ef444422',
};
const STATUS_LABELS: Record<string, string> = {
pending: '待处理', claimed: '已认领', working: '处理中',
review: '审查中', done: '已完成', failed: '失败',
cancelled: '已取消', blocked: '已拦截',
};
function fmtTime(iso: string): string {
try {
const d = new Date(iso.includes('T') ? iso : iso.replace(' ', 'T') + 'Z');
const now = Date.now();
const diff = now - d.getTime();
const mins = Math.floor(diff / 60000);
if (mins < 1) return '刚刚';
if (mins < 60) return `${mins}分钟前`;
const hrs = Math.floor(mins / 60);
if (hrs < 24) return `${hrs}小时前`;
return `${d.getMonth() + 1}/${d.getDate()} ${d.getHours()}:${String(d.getMinutes()).padStart(2, '0')}`;
} catch { return iso; }
}
export default function ToolchainPanel() {
const [tasks, setTasks] = useState<any[]>([]);
const [selectedId, setSelectedId] = useState<string | null>(null);
const [detail, setDetail] = useState<any>(null);
const [searchQuery, setSearchQuery] = useState('');
const [loading, setLoading] = useState(false);
const loadTasks = async (q?: string) => {
setLoading(true);
try {
const url = q
? `/api/projects/_toolchain/tasks?q=${encodeURIComponent(q)}`
: `/api/projects/_toolchain/tasks`;
const res = await fetch(url);
if (res.ok) {
const data = await res.json();
setTasks(data.tasks || []);
}
} catch { /* */ }
setLoading(false);
};
useEffect(() => { loadTasks(); }, []);
// 搜索防抖 300ms
useEffect(() => {
const timer = setTimeout(() => {
if (searchQuery !== undefined) loadTasks(searchQuery || undefined);
}, 300);
return () => clearTimeout(timer);
}, [searchQuery]);
useEffect(() => {
if (!selectedId) { setDetail(null); return; }
(async () => {
try {
const res = await fetch(
`/api/projects/_toolchain/tasks/${selectedId}?expand=comments`
);
if (res.ok) setDetail(await res.json());
} catch { /* */ }
})();
}, [selectedId]);
// 渲染评论列表(兼容 expand 和裸 list 格式)
const renderComments = (comments: any[]) => {
if (!comments || comments.length === 0) return null;
return (
<div style={{ marginTop: 16 }}>
<div style={{ fontSize: 11, color: 'var(--muted)', marginBottom: 8, fontWeight: 600 }}>
📋 ({comments.length})
</div>
{comments.map((c: any, i: number) => (
<div key={c.id || i} style={{
padding: '8px 12px', background: 'var(--panel2)', borderRadius: 6, marginBottom: 6,
}}>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 4 }}>
<span style={{ fontSize: 10, color: 'var(--acc)', fontWeight: 600 }}>
{c.author || 'system'}
</span>
<span style={{ fontSize: 9, color: 'var(--muted)' }}>{fmtTime(c.created_at)}</span>
</div>
<div style={{ fontSize: 12, color: '#a0aec0', lineHeight: 1.5 }}>{c.body}</div>
</div>
))}
</div>
);
};
return (
<div style={{ display: 'flex', gap: 0, height: '100%', minHeight: 500 }}>
{/* 左侧列表 */}
<div style={{ width: 380, borderRight: '1px solid var(--line)', display: 'flex', flexDirection: 'column', flexShrink: 0 }}>
{/* 搜索栏 + 刷新 */}
<div style={{ padding: '10px 14px', borderBottom: '1px solid var(--line)', display: 'flex', gap: 6, alignItems: 'center' }}>
<input
type="text"
placeholder="搜索工具链事件..."
value={searchQuery}
onChange={e => setSearchQuery(e.target.value)}
style={{
flex: 1, padding: '4px 8px', borderRadius: 4, fontSize: 11,
border: '1px solid #2a3550', background: '#161b2e', color: '#dde4f8',
outline: 'none',
}}
/>
<button onClick={() => loadTasks(searchQuery || undefined)} style={{
padding: '3px 8px', borderRadius: 4, fontSize: 10,
border: '1px solid #2a3550', background: '#161b2e', color: '#8899aa', cursor: 'pointer',
}}>🔄</button>
<span style={{ fontSize: 10, color: 'var(--muted)' }}>{tasks.length} </span>
</div>
{/* 事件列表 */}
<div style={{ flex: 1, overflowY: 'auto' }}>
{tasks.length === 0 && (
<div style={{ textAlign: 'center', padding: 40, color: 'var(--muted)', fontSize: 12 }}>
{loading ? '加载中...' : '暂无工具链事件'}
</div>
)}
{tasks.map((t: any) => (
<div key={t.id} onClick={() => setSelectedId(t.id)} style={{
padding: '10px 14px', borderBottom: '1px solid var(--line)',
cursor: 'pointer', transition: 'background .15s',
background: selectedId === t.id ? 'var(--panel2)' : 'transparent',
}}
onMouseEnter={e => e.currentTarget.style.background = 'var(--panel2)'}
onMouseLeave={e => e.currentTarget.style.background = selectedId === t.id ? 'var(--panel2)' : 'transparent'}
>
<div style={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', marginBottom: 4 }}>
<span style={{
fontSize: 9, padding: '1px 5px', borderRadius: 3,
background: STATUS_COLORS[t.status] || '#2a3550',
color: '#dde4f8',
}}>{STATUS_LABELS[t.status] || t.status}</span>
<span style={{ fontSize: 9, color: 'var(--muted)' }}>{fmtTime(t.created_at)}</span>
</div>
<div style={{
fontSize: 12, fontWeight: 500, color: '#dde4f8',
overflow: 'hidden', textOverflow: 'ellipsis', whiteSpace: 'nowrap',
}}>{t.title}</div>
</div>
))}
</div>
</div>
{/* 右侧详情 */}
<div style={{ flex: 1, padding: '16px 20px', overflowY: 'auto' }}>
{!detail ? (
<div style={{ textAlign: 'center', padding: 60, color: 'var(--muted)' }}>
<div style={{ fontSize: 36, marginBottom: 12 }}></div>
<div style={{ fontSize: 13 }}></div>
</div>
) : (
<>
{/* 头部 */}
<div style={{ marginBottom: 16 }}>
<div style={{ display: 'flex', alignItems: 'center', gap: 8, marginBottom: 6 }}>
<span style={{ fontSize: 10, padding: '2px 6px', borderRadius: 4, background: STATUS_COLORS[detail.status] || '#2a3550', color: '#dde4f8' }}>
{STATUS_LABELS[detail.status] || detail.status}
</span>
<span style={{ fontSize: 10, color: 'var(--muted)' }}>{detail.id}</span>
</div>
<div style={{ fontSize: 18, fontWeight: 700, lineHeight: 1.3 }}>{detail.title}</div>
<div style={{ fontSize: 12, color: 'var(--muted)', marginTop: 6 }}>
{fmtTime(detail.created_at)}
</div>
</div>
{/* 正文 */}
{detail.description && (
<div style={{
padding: '14px 16px', background: 'var(--panel2)', borderRadius: 10,
fontSize: 13, color: '#a0aec0', lineHeight: 1.7, whiteSpace: 'pre-wrap',
}}>
{detail.description}
</div>
)}
{/* action_report 评论 — expand 格式 {items, total_count} */}
{detail.comments && detail.comments.items && detail.comments.items.length > 0 &&
renderComments(detail.comments.items)
}
{/* 兼容裸 list 格式 */}
{detail.comments && Array.isArray(detail.comments) && detail.comments.length > 0 &&
renderComments(detail.comments)
}
</>
)}
</div>
</div>
);
}
+4 -2
View File
@@ -7,7 +7,8 @@ from src.api.sse_routes import router as sse_router
from src.api.project_routes import router as project_router
from src.api.daemon_routes import router as daemon_router
from src.api.checkpoint_routes import router as checkpoint_router
from src.api.blackboard_routes import router as blackboard_router
from src.api.task_routes import router as task_router
from src.api.task_relation_routes import router as task_relation_router
import logging
from contextlib import asynccontextmanager
@@ -273,7 +274,8 @@ app.add_middleware(
# ---------------------------------------------------------------------------
app.include_router(blackboard_router)
app.include_router(task_router)
app.include_router(task_relation_router)
app.include_router(checkpoint_router)
app.include_router(daemon_router)
app.include_router(project_router)
+70
View File
@@ -0,0 +1,70 @@
#!/bin/bash
# verify_api_compat.sh — 验证 API 拆分前后路由清单完全一致
#
# 用法:
# bash tests/scripts/verify_api_compat.sh
#
# 前置:
# - 当前在开发目录(sanguo_moziplus_v2/
# - git working tree 有拆分改动
# - main 分支是拆分前的基准
#
# 输出:
# ✅ 路由完全一致(exit 0)
# ❌ 路由有差异(exit 1,打印 diff)
set -euo pipefail
BEFORE_FILE="/tmp/routes_before_$$.txt"
AFTER_FILE="/tmp/routes_after_$$.txt"
echo "=== 提取拆分前路由清单(main 分支)==="
# stash 当前改动(如果有 untracked 新文件,--include-untracked
STASHED=0
if ! git diff --quiet || ! git diff --cached --quiet; then
git stash --include-untracked
STASHED=1
fi
python3 -c "
from src.main import app
for route in app.routes:
if hasattr(route, 'methods') and hasattr(route, 'path'):
for m in sorted(route.methods):
if m in ('GET','POST','PATCH','DELETE','PUT'):
print(f'{m} {route.path}')
" | sort > "$BEFORE_FILE"
echo "Routes before: $(wc -l < "$BEFORE_FILE")"
# 恢复改动
if [ "$STASHED" = "1" ]; then
git stash pop
fi
echo "=== 提取拆分后路由清单(当前 working tree==="
python3 -c "
from src.main import app
for route in app.routes:
if hasattr(route, 'methods') and hasattr(route, 'path'):
for m in sorted(route.methods):
if m in ('GET','POST','PATCH','DELETE','PUT'):
print(f'{m} {route.path}')
" | sort > "$AFTER_FILE"
echo "Routes after: $(wc -l < "$AFTER_FILE")"
echo ""
echo "=== Diff ==="
if diff "$BEFORE_FILE" "$AFTER_FILE"; then
echo "✅ 路由完全一致"
rm -f "$BEFORE_FILE" "$AFTER_FILE"
exit 0
else
echo "❌ 路由有差异"
rm -f "$BEFORE_FILE" "$AFTER_FILE"
exit 1
fi
+4 -2
View File
@@ -165,14 +165,16 @@ class TestClassifyErrorApi:
1, {"status": "error"}, "rate_limit exceeded", None
)
assert result["outcome"] == "api_error"
assert result["should_retry"] is False
assert result["should_retry"] is True
assert result["cooldown_seconds"] == 60
def test_stderr_500(self):
result = Spawner._classify_outcome(
1, {"status": "error"}, "HTTP 500 Internal Server Error", None
)
assert result["outcome"] == "api_error"
assert result["should_retry"] is False
assert result["should_retry"] is True
assert result["cooldown_seconds"] == 60
class TestClassifyErrorCompact:
+525
View File
@@ -0,0 +1,525 @@
"""Unit tests for §17 ToolchainHandler 强约束实现."""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from src.daemon.prompt_composer import PromptContext, PromptComposer
from src.daemon.toolchain_handler import (
ToolchainHandler,
ToolchainContextSection,
ToolchainApiSection,
ToolchainConstraintsSection,
_ACTION_HINTS,
)
from src.daemon.base_task_handler import VerifyResult
from src.blackboard.db import init_db, get_connection
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db():
"""Create a temporary _toolchain DB for testing."""
with tempfile.TemporaryDirectory() as d:
db_path = Path(d) / "blackboard.db"
init_db(db_path)
yield db_path
@pytest.fixture
def handler():
return ToolchainHandler()
def _insert_task(db_path, task_id, must_haves_json, status="working"):
"""Insert a task into DB for testing."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, "test", "test desc", "zhangfei-dev", "system",
must_haves_json, "toolchain", status)
)
conn.commit()
conn.close()
def _insert_comment(db_path, task_id, author, body, comment_type="general"):
"""Insert a comment into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(task_id, author, comment_type, body)
)
conn.commit()
conn.close()
def _insert_output(db_path, task_id, content="test output"):
"""Insert an output into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO outputs (task_id, agent, output_type, title, summary) "
"VALUES (?, ?, ?, ?, ?)",
(task_id, "zhangfei-dev", "document", "test", content)
)
conn.commit()
conn.close()
# ---------------------------------------------------------------------------
# Step 1a: PromptContext new fields
# ---------------------------------------------------------------------------
class TestPromptContextFields:
def test_action_type_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_type == ""
def test_action_steps_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_steps == []
def test_action_type_set(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_type="review_result",
)
assert ctx.action_type == "review_result"
def test_action_steps_set(self):
steps = ["step 1", "step 2"]
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_steps=steps,
)
assert ctx.action_steps == steps
# ---------------------------------------------------------------------------
# Step 2a: ToolchainContextSection steps rendering + action_hint
# ---------------------------------------------------------------------------
class TestToolchainContextSection:
def test_renders_steps(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_result",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="review_result",
action_steps=["合并 PR", "提交 action report"],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" in result
assert "1. 合并 PR" in result
assert "2. 提交 action report" in result
def test_renders_action_hint(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="ci_failure",
action_type="ci_failure",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "CI 失败" in result
assert "需要你修复" in result
def test_renders_default_hint_for_unknown_action_type(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="unknown",
action_type="unknown_type",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "需要你执行动作的事件" in result
def test_no_steps_no_steps_section(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_merged",
action_type="review_merged",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" not in result
# ---------------------------------------------------------------------------
# Step 2b: ToolchainApiSection action_report guidance
# ---------------------------------------------------------------------------
class TestToolchainApiSection:
def test_has_action_report_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "action_report" in result
assert "comment_type" in result
assert "tc-123" in result
def test_no_manual_done_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
# Should NOT contain the old "标记为 done" instruction
assert "标记为 **done**" not in result
assert '"status": "done"' not in result
def test_has_outputs_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "outputs" in result
def test_has_gitea_collaboration_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "Gitea" in result
assert "Mail API" in result
# ---------------------------------------------------------------------------
# Step 2c: ToolchainConstraintsSection Red Flags
# ---------------------------------------------------------------------------
class TestToolchainConstraintsSection:
def test_has_red_flags_table(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "Red Flags" in result
assert "" in result
def test_has_all_5_constraints(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "必须按步骤执行" in result
assert "必须提交 action report" in result
assert "不要执行任何状态转换命令" in result
assert "不需要回复" in result
assert "所有协作通过 Gitea 完成" in result
def test_has_strong_language(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "强制要求" in result
assert "不是建议" in result
# ---------------------------------------------------------------------------
# Step 2d: verify_completion tests
# ---------------------------------------------------------------------------
class TestVerifyCompletion:
def test_action_report_passes(self, handler, tmp_db):
"""action_report comment → pass"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t1", must_haves)
_insert_comment(tmp_db, "t1", "zhangfei-dev",
"已修复 CI", comment_type="action_report")
result = handler.verify_completion("t1", tmp_db)
assert result.passed is True
assert result.reason == "has_action_report"
def test_no_action_report_fallback_output(self, handler, tmp_db):
"""No action_report but has output → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t2", must_haves)
_insert_output(tmp_db, "t2", "review result content")
result = handler.verify_completion("t2", tmp_db)
assert result.passed is True
assert result.reason == "has_output"
def test_no_action_report_fallback_comment(self, handler, tmp_db):
"""No action_report but has substantial comment → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t3", must_haves)
_insert_comment(tmp_db, "t3", "zhangfei-dev",
"This is a sufficiently long comment about the task.")
result = handler.verify_completion("t3", tmp_db)
assert result.passed is True
assert result.reason == "has_comment"
def test_nothing_passes(self, handler, tmp_db):
"""No action_report, no output, no comment → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t4", must_haves)
result = handler.verify_completion("t4", tmp_db)
assert result.passed is False
assert result.reason == "no_action"
def test_short_comment_fails(self, handler, tmp_db):
"""Comment < 20 chars → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t5", must_haves)
_insert_comment(tmp_db, "t5", "zhangfei-dev", "ok")
result = handler.verify_completion("t5", tmp_db)
assert result.passed is False
def test_review_merged_auto_passes(self, handler, tmp_db):
"""review_merged → always pass"""
must_haves = json.dumps({"action_type": "review_merged"})
_insert_task(tmp_db, "t6", must_haves)
result = handler.verify_completion("t6", tmp_db)
assert result.passed is True
assert result.reason == "merged_passthrough"
def test_infrastructure_failure_auto_passes(self, handler, tmp_db):
"""infrastructure_failure → always pass (anti-recursion)"""
must_haves = json.dumps({"action_type": "infrastructure_failure"})
_insert_task(tmp_db, "t7", must_haves)
result = handler.verify_completion("t7", tmp_db)
assert result.passed is True
assert result.reason == "infrastructure_passthrough"
# ---------------------------------------------------------------------------
# Step 3a: _send_toolchain_task tests
# ---------------------------------------------------------------------------
class TestSendToolchainTask:
def test_creates_task_in_toolchain_db(self):
"""_send_toolchain_task creates a task in _toolchain DB."""
from src.api.toolchain_routes import _send_toolchain_task, _toolchain_db_path
with patch("src.api.toolchain_routes.get_data_root") as mock_root:
with tempfile.TemporaryDirectory() as d:
mock_root.return_value = Path(d)
task_id = _send_toolchain_task(
to_agent="zhangfei-dev",
title="Test Task",
description="Test description",
event_type="ci_failure",
action_type="ci_failure",
steps=["Fix test", "Submit report"],
context_data={"pr_number": 42},
)
assert task_id.startswith("tc-")
# Verify task was written to _toolchain DB
db_path = _toolchain_db_path()
conn = get_connection(db_path)
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)
).fetchone()
assert row is not None
assert row["task_type"] == "toolchain"
assert row["assignee"] == "zhangfei-dev"
# Verify must_haves JSON
meta = json.loads(row["must_haves"])
assert meta["event_type"] == "ci_failure"
assert meta["action_type"] == "ci_failure"
assert meta["steps"] == ["Fix test", "Submit report"]
assert meta["context"]["pr_number"] == 42
conn.close()
def test_unknown_agent_returns_empty(self):
"""_send_toolchain_task with unknown agent returns empty string."""
from src.api.toolchain_routes import _send_toolchain_task
task_id = _send_toolchain_task(
to_agent="unknown-agent",
title="Test",
description="desc",
event_type="test",
action_type="test",
steps=[],
)
assert task_id == ""
# ---------------------------------------------------------------------------
# Step 2e: on_failure three-way routing tests
# ---------------------------------------------------------------------------
class TestOnFailureRouting:
def test_business_failure_creates_gitea_comment(self, handler, tmp_db):
"""Business failure → Gitea PR comment @task assignee (not must_hives field)"""
# S4: must_hives does NOT contain assignee — production data doesn't have it
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
"from": "system",
})
# assignee is set on the tasks table row (as production code writes it)
_insert_task(tmp_db, "t-fail", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = True
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-fail", "zhangfei-dev", tmp_db, verify)
mock_comment.assert_called_once()
call_args = mock_comment.call_args
assert call_args[0][0] == "sanguo/test"
assert call_args[0][1] == 42
# M2: comment body should @ the task's assignee from tasks table
comment_body = call_args[0][2]
assert "@zhangfei-dev" in comment_body
def test_infrastructure_failure_creates_task(self, handler, tmp_db):
"""Infrastructure failure → direct DB task for jiangwei-infra (no reverse dep)"""
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
})
_insert_task(tmp_db, "t-infra", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = False # Gitea API down
with patch.object(handler, "_create_gitea_issue") as mock_issue:
mock_issue.return_value = False # Gitea API still down
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-infra", "zhangfei-dev", tmp_db, verify)
# S3: should directly INSERT into DB, not call _send_toolchain_task
# Verify a new task was created in DB for jiangwei-infra
conn = get_connection(tmp_db)
rows = conn.execute(
"SELECT * FROM tasks WHERE assignee=?",
("jiangwei-infra",)
).fetchall()
conn.close()
assert len(rows) >= 1, "No infrastructure_failure task created"
infra_task = rows[0]
assert infra_task["task_type"] == "toolchain"
meta = json.loads(infra_task["must_haves"])
assert meta["action_type"] == "infrastructure_failure"
# ---------------------------------------------------------------------------
# Regression: _mail path unaffected
# ---------------------------------------------------------------------------
class TestMailRegression:
def test_send_mail_still_exists(self):
"""_send_mail function is preserved."""
from src.api.toolchain_routes import _send_mail
assert callable(_send_mail)
def test_send_mail_not_called_by_handlers(self):
"""No toolchain handler calls _send_mail."""
import inspect
from src.api import toolchain_routes
# Get source of handler functions
source = inspect.getsource(toolchain_routes)
# _send_mail should appear only in its own definition, not in handler bodies
lines = source.split("\n")
in_handler = False
handler_send_mail_calls = []
for i, line in enumerate(lines):
if line.strip().startswith("async def _handle_") or line.strip().startswith("async def _send_mention_mails"):
in_handler = True
elif line.strip().startswith("async def ") or line.strip().startswith("def _"):
if not line.strip().startswith("async def _handle_") and not line.strip().startswith("async def _send_mention_mails"):
in_handler = False
if in_handler and "_send_mail(" in line and not line.strip().startswith("#"):
handler_send_mail_calls.append((i, line.strip()))
assert len(handler_send_mail_calls) == 0, \
f"_send_mail still called in handlers: {handler_send_mail_calls}"
# ---------------------------------------------------------------------------
# Integration: full prompt build
# ---------------------------------------------------------------------------
class TestFullPromptBuild:
def test_prompt_contains_all_sections(self, handler):
"""Full prompt has context, API, and constraints sections."""
ctx = PromptContext(
task_id="tc-test",
title="CI 失败修复",
description="Fix CI failure",
must_haves=json.dumps({
"event_type": "ci_failure",
"action_type": "ci_failure",
"steps": ["Fix test", "Push", "Submit report"],
"context": {"pr_number": 42},
}),
project_id="_toolchain",
agent_id="zhangfei-dev",
event_type="ci_failure",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="ci_failure",
action_steps=["Fix test", "Push", "Submit report"],
)
prompt = handler.build_prompt(ctx)
# Must have action hint
assert "CI 失败" in prompt
assert "需要你修复" in prompt
# Must have steps
assert "必须执行的步骤" in prompt
assert "1. Fix test" in prompt
# Must have API section with action_report
assert "action_report" in prompt
assert "tc-test" in prompt
# Must have constraints with Red Flags
assert "Red Flags" in prompt
assert "强制要求" in prompt