Compare commits

...

45 Commits

Author SHA1 Message Date
cfdaily e83ad1de73 docs: #16 知识注入设计 v2 — 对齐 #11 四层架构
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
- 层级命名统一到 #11 体系(L0/L1/L2/L3),不再自创命名
- L0: 新增 wiki 查询铁律(做方案前先查 + 查不到记 gap)
- L1: TOOLS.md 速查表(已完成)+ SOUL.md Red Flags(待实现)
- L2: 三种 handler(task/mail/toolchain)各注入 WikiGuideSection
- L3: wiki-query Skill(已部署,待确认 extraDirs 递归)
- 运维层: gap 闭环 cron job(已有,需完善)
2026-06-14 10:22:16 +08:00
pangtong-fujunshi a27ea8ed89 Merge PR #59: docs: #16 知识注入设计(三层触发机制) 2026-06-13 23:11:37 +00:00
cfdaily 146816303f fix: M1 修复 §07 文件路径引用(24→15) + M2 D16-6 标题引用(#05→#11)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 28s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 07:10:04 +08:00
cfdaily 11349b5225 docs: 新增 #16 知识注入设计(三层触发机制) 2026-06-14 07:10:04 +08:00
pangtong-fujunshi a037497053 Merge PR #66: ci: pip upgrade + --no-cache-dir 2026-06-13 16:38:26 +00:00
cfdaily f6f26d7763 ci: pip upgrade + --no-cache-dir 防旧 pip dist-info 损坏(姜维建议)
CI / lint (pull_request) Successful in 13s
CI / test (pull_request) Successful in 29s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:35:15 +08:00
pangtong-fujunshi 920bc75c53 Merge PR #65: feat: §17 ToolchainHandler 强约束实现 2026-06-13 16:33:18 +00:00
cfdaily 976d9ce7c8 fix: M1 git rm 误提交的安装目录文件 + S1 docstring 修正 + S2 去掉 CHECK 约束(司马懿 Review #111)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 23s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:22:13 +08:00
cfdaily fd3a889fae ci: 每次清 venv 防止旧缓存损坏
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 22s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:17:44 +08:00
cfdaily 925ebe8556 ci: 加 debug 信息定位 test failure 根因
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 2s
2026-06-14 00:16:11 +08:00
cfdaily 4ef9f68ff3 fix(ci): PYTHONPATH=. 防止 runner 环境加载安装目录旧代码
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Failing after 5s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-14 00:13:01 +08:00
cfdaily 50d1d0b5e6 chore: trigger CI retry
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-14 00:11:23 +08:00
cfdaily 6e6b52fe3b fix: asyncio.Lock 懒加载防 event loop 关闭后 import 失败
CI / lint (pull_request) Successful in 12s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-14 00:09:27 +08:00
cfdaily 3bca794902 fix: M2 on_failure assignee 从 tasks 表读取 + infrastructure 防递归(司马懿 Review #65)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Failing after 9s
CI / notify-on-failure (pull_request) Successful in 1s
M2: on_failure 中 assignee = meta.get('from', '') 读到 'system' 而非实际 Agent
→ 改为 SELECT must_haves, assignee FROM tasks 直接读 tasks.assignee 字段

附带:infrastructure failure 改为直接 DB INSERT,不走 _send_toolchain_task 防递归
2026-06-13 23:47:12 +08:00
cfdaily a5d5d2d974 fix: P0 token 环境变量 + P1 fail_count 逻辑简化(姜维 Review)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 10s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:43:20 +08:00
cfdaily 3b9ad83405 fix(lint): F541 f-string 无占位符去掉 f 前缀
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-13 23:40:56 +08:00
cfdaily c89863a288 feat: §17 ToolchainHandler 强约束实现(Step 1-4)
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s
Step 1: 基础设施
- prompt_composer.py: PromptContext 新增 action_type + action_steps 字段
- spawner.py: handler 路径提取 action_type/action_steps 传入 PromptContext
- db.py: comments CHECK 约束加入 action_report

Step 2: ToolchainHandler 强化
- ToolchainContextSection: 加 steps 渲染 + action_hint(按 action_type)
- ToolchainApiSection: 改为 action_report 提交指引 + Gitea 协作指引
- ToolchainConstraintsSection: 5 条强约束 + Red Flags 防self-rationalization
- verify_completion: action_report → output → comment 三层 fallback
  - review_merged 始终通过(纯通知)
  - infrastructure_failure 始终通过(防递归)
  - 修复 LENGTH(content) → LENGTH(body) bug
- on_failure 三分路: 业务→Gitea PR comment / 系统→Gitea Issue / 基础设施→toolchain task

Step 3: toolchain_routes 改造
- 新增 _toolchain_db_path() + _send_toolchain_task()
- 所有 8 个 handler 改为 _send_toolchain_task
- _send_mail 保留但不再被 toolchain handler 调用
- _send_deploy_failure_mail → _send_deploy_failure_task

Step 4: 测试
- 29 个单元测试全部通过
- 全量 456 passed, 3 skipped, 0 failures
2026-06-13 23:36:44 +08:00
pangtong-fujunshi 90f4e3284c Merge PR #64: §17 toolchain/mail 完全分离——Gitea 为唯一协作媒介 2026-06-13 15:09:33 +00:00
cfdaily 946f7e1848 fix(design): §17 M1 §5.2 三分路展开 + M2 约束编号 #5 + S1/S2 一致性
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:08:50 +08:00
cfdaily 409e4ee51d fix(design): §17 清理 3 处遗留 Mail 引用
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:04:30 +08:00
cfdaily d1c0984082 refactor(design): §17 toolchain/mail 完全分离——Gitea 为唯一协作媒介
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 23:03:06 +08:00
pangtong-fujunshi 5be32bd0b8 Merge PR #63: §17 ToolchainHandler 强约束设计 2026-06-13 14:10:19 +00:00
cfdaily 71bab93308 fix(design): §17 M1 verify_completion 列名 content→body(司马懿 Review)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 22:09:18 +08:00
cfdaily 023de9862f feat(design): §17 ToolchainHandler 强约束设计(取代 action Mail 类型)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- 新增 17-toolchain-handler-enforcement.md 完整设计文档
- 标记 17-action-mail-type.md 为 SUPERSEDED
- 方向修正:让 toolchain 事件回归 ToolchainHandler(§14 已有架构)
2026-06-13 22:04:08 +08:00
pangtong-fujunshi 626e13c0d1 Merge PR #62: feat(design): §17 action Mail 类型设计 v2.0 2026-06-13 13:37:58 +00:00
cfdaily 6a7fe37d93 fix(design): §17 Review 反馈修复(M1+S1-S3+姜维关注点)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 21:37:11 +08:00
cfdaily 5bc53192d6 feat(design): §17 action Mail 类型设计 v2.0
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
新增 action Mail 类型(inform/request/action 三分),解决工具链事件
被 inform 语义导致流程断链的问题。

核心设计:
- 五层 Prompt 架构(L0 语义层 → L4 约束层)
- 10 个场景完整提示词设计(8 action + 1 inform + 1 补充)
- Steps API 调用级别,信息层与指令层严格分离
- 两个维度完整性验证(Webhook event × 流程流转)
- 防降级三层设计(语义层 + 结构层 + Red Flag 表)
- action_report comment 验证机制

参考优秀实践:Hermes Tool-Use Enforcement、Superpowers Red Flags、
moziplus L1/L2/L3 三层上下文模型
2026-06-13 21:30:35 +08:00
pangtong-fujunshi 2c2d8b55c9 Merge PR #61: fix(docs): 文件路径引用修正 2026-06-13 06:46:52 +00:00
cfdaily cb81251111 fix(docs): 24-compact-detection-fix → 15-compact-detection-fix 路径引用修正
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 14:45:34 +08:00
pangtong-fujunshi a8a7164335 Merge PR #60: fix synchronize dispatch 2026-06-13 06:44:07 +00:00
cfdaily fe7f914681 fix: _handle_pull_request 补充 synchronize action dispatch
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
姜维排查发现 _handle_pull_request 只处理 opened/closed,
Gitea 发 pull_request + action=synchronize 时被静默丢弃。
_handle_pr_synchronize 已存在但未被 dispatch 到。

修复:加 elif action == synchronize dispatch。
pull_request_sync 注册保留作为双保险。
2026-06-13 14:42:38 +08:00
cfdaily eccb4d2723 docs: 设计文档编号重排(20→14, 24→15) + 已完成文档状态标注更新
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 10:12:39 +08:00
pangtong-fujunshi 9e2145171a Merge PR #57 2026-06-13 01:36:24 +00:00
cfdaily 67cad2dd96 fix: _REASON_MAP 补 agent_error 条目(G2)
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
spawner 会产生 agent_error reason,之前缺映射走到 _default 显示'未知原因'。
2026-06-13 09:35:15 +08:00
pangtong-fujunshi 79da0bd07e Merge PR #56 2026-06-13 01:34:39 +00:00
cfdaily a116f7e6c0 fix: 注释拼写 must_hives → must_haves
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 09:33:59 +08:00
cfdaily 7fb4d988ec fix: lint 修复 + api_error 测试更新
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 0s
- mail_notify: f-string 反斜杠修复、行过长修复、unused import
- test_classify_outcome: api_error should_retry 改 True
2026-06-13 09:29:52 +08:00
cfdaily f4dd9ff78d feat(daemon): Mail 失败通知 v2.0 — api_error retry + 通知增强
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s
P1: api_error rate_limit/500/503 改为可恢复 retry(should_retry=True,60s cooldown)
P2: 通知模板动态化(reason 人话翻译 + detail 信息 + 重试情况 + AI Native 知识库)

设计文档:§20.7 (20-task-type-architecture.md)
2026-06-13 09:27:17 +08:00
pangtong-fujunshi 6520e78c0b Merge PR #55 2026-06-13 01:23:33 +00:00
cfdaily 0169823b72 chore(docs): 合并 mail-failure-notification 到 §20,更新设计方案
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- mail-failure-notification.md → archive-3.0/
- §20 新增 §20 Mail 失败通知机制(v2.0 AI Native)
  - 失败场景与重试耗时完整表
  - reason 人话翻译映射
  - 通知模板增强(detail 传入 + 重试情况)
  - api_error rate_limit 待改为可恢复 retry
- §18→§21,§19→§22 编号顺延
2026-06-13 09:22:32 +08:00
pangtong-fujunshi 77252c39c6 Merge PR #54 2026-06-13 00:59:11 +00:00
cfdaily 5a80d6c5cd chore(docs): gateway-watchdog.md 改编号 99
CI / lint (pull_request) Successful in 6s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
2026-06-13 08:58:04 +08:00
pangtong-fujunshi 322263585d Merge PR #53 2026-06-13 00:54:39 +00:00
cfdaily c7b4b262b1 chore(docs): 归档 §13-sim §18 §21 §25 至 archive-3.0
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 9s
CI / notify-on-failure (pull_request) Successful in 0s
- 13-toolchain-and-dev-workflow-simulation.md → archive-3.0/(模拟报告,§16 已覆盖)
- 18-toolchain-e2e-test.md → archive-3.0/(E2E 测试记录,§13 已引用)
- 21-e2e-verification-handler.md → archive-3.0/(Handler 验证,§20 §19 已覆盖)
- 25-gitea-mention-toolchain.md → archive-3.0/(@mention 集成,§13 §16 已覆盖)
2026-06-13 08:53:23 +08:00
pangtong-fujunshi e43d87f3db Merge PR #52 2026-06-13 00:53:09 +00:00
28 changed files with 4597 additions and 170 deletions
+15 -2
View File
@@ -27,6 +27,7 @@ jobs:
- name: Setup Python
run: |
python3 -m venv /tmp/ci-venv-lint
/tmp/ci-venv-lint/bin/pip install --quiet --upgrade pip
/tmp/ci-venv-lint/bin/pip install --quiet flake8
- name: Lint with flake8
@@ -42,12 +43,24 @@ jobs:
- name: Setup Python
run: |
rm -rf /tmp/ci-venv-test
python3 -m venv /tmp/ci-venv-test
/tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
/tmp/ci-venv-test/bin/pip install --quiet --upgrade pip
/tmp/ci-venv-test/bin/pip install --quiet --no-cache-dir fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Debug environment
run: |
echo "PWD=$(pwd)"
echo "PYTHONPATH=$PYTHONPATH"
python3 -c "import sys; [print(p) for p in sys.path if 'sanguo' in p.lower() or 'openclaw' in p.lower()]"
grep -c "assignee = agent_id" src/daemon/toolchain_handler.py || true
grep -c "_BUSINESS_FAIL_THRESHOLD" src/daemon/toolchain_handler.py || true
- name: Run tests (exclude E2E)
run: |
/tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q || \
(echo '=== RETRY WITH VERBOSE ===' && \
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -v 2>&1 | tail -30)
# ── Job 3: CI 失败通知 ───────────────────────────────
# 使用 needs.<job>.result 直接判断,不查询 commit status API
+1 -1
View File
@@ -6,7 +6,7 @@
**基于**: PRD-v3.0 §4 四相架构 + architecture-v3.0.md
**作者**: 庞统(副军师)🐦
**日期**: 2026-05-29
**状态**: 实现完成,待 E2E 验证
**状态**: ✅ 已完成(E2E 验证通过)
**评审**: 司马懿
---
+1 -1
View File
@@ -2,7 +2,7 @@
**日期**: 2026-05-30
**作者**: 庞统
**状态**: 已修订 v1.1(根据司马懿 2026-05-30 评审意见
**状态**: ✅ 已完成(spawner/ticker/dispatcher 全部 use_main_session=True
**前置**: `01-four-phase-loop.md`(四相循环 E2E 验证暴露 session 爆炸问题)
---
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-05-30
> 作者: 庞统(副军师)
> 状态: v1.1 修订(司马懿评审意见已纳入
> 状态: ✅ 已完成(@mention + mention_queue 已实现
> 前置: #02 Main Session + Delegation, #03 Prompt 进化
---
+1 -1
View File
@@ -3,7 +3,7 @@
> 版本: v1.2
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待评审(v1.2
> 状态: ✅ 已完成(_startup_recover 7 个方法已实现
> 前置: spawner-monitor-design.md §5 A0Agent crash 恢复)
> 变更: v1.2 两个关键改进:(1) working→pending 保留 current_agent 让同一 agent 接手;(2) reviewing 精确恢复到前置状态而非硬推 done
+4 -4
View File
@@ -1,6 +1,6 @@
# #07 Spawner Acquire-First 设计
> 状态:#07.1 已实施 ✅ | #07.2 已实施 ✅ | #07.3 设计中
> 状态:✅ 已完成(#07.1-#07.2 已实施)
> 作者:庞统
> 日期:2026-06-01
> 评审:司马懿
@@ -233,9 +233,9 @@ def _revive_session(agent_id: str) -> bool:
pass
```
### 4.5 O5: compact 检测(§24 rotation-only v3
### 4.5 O5: compact 检测(§15 rotation-only v3
§24 设计文档:`docs/design/24-compact-detection-fix.md`
§15 设计文档:`docs/design/15-compact-detection-fix.md`
**检测方法**:读 gateway 日志尾部 2MB,按 sessionKey 过滤 `[compaction] rotated active transcript` 事件。
如果最近的 rotation 事件在 120s 窗口内 → 视为 compact 循环进行中(可能还在 post-compact retry)。
@@ -243,7 +243,7 @@ def _revive_session(agent_id: str) -> bool:
旧方法 `_check_recent_compaction_jsonl`(扫描 session jsonl 的 `type=compaction` 事件)保留作为 fallback。
```python
# §24 v3: compact 检测优先用 gateway 日志 rotation 事件
# §15 v3: compact 检测优先用 gateway 日志 rotation 事件
if result["status"] not in ("idle", "unknown", None):
session_key = f"agent:{agent_id}:main"
result["recent_compact"] = AgentSpawner._check_compact_in_progress_gateway(
+1 -1
View File
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待评审(v1.1
> 状态: ✅ 已完成(rebuttal on_complete + goal gate 已实现
> 前置: #04 黑板协作(@mention+ #08 Classify Outcome
> 关联: T4 审查体系完善
> 变更: v1.1 纳入司马懿评审反馈 — verdict 读 reviews 表 + rebuttal mention spawn 带 on_complete 回调
@@ -3,7 +3,7 @@
> 版本: v1.1
> 日期: 2026-06-03
> 作者: 庞统(副军师)
> 状态: 待终审(v1.1
> 状态: ✅ 已完成(SSE + TaskModal 自动刷新已实现
> 前置: #04 黑板协作(@mention + comment
> 关联: architecture-v3.0.md T3
> 变更: v1.1 纳入司马懿评审反馈 — checkpoint SSE 触发文件修正为 checkpoint_routes.pySSE payload 统一含 project_id
+1 -1
View File
@@ -1,6 +1,6 @@
# 三国团队工具链与开发流程设计
> **状态**: v3.3 — #19 上下文四层改造合并 + CI 修复 + A13 修订
> **状态**: ✅ 已完成(E2E 验证通过,所有 8 步 PASS)
> **作者**: 庞统(副军师)🐦
> **评审**: 司马懿(仲达)🗡️
> **日期**: 2026-06-06
@@ -4,6 +4,8 @@ created: 2026-06-10
version: v3.0
---
> 状态: ✅ 已完成(Step 1-5 全部合并,394 passed
# §1 现状分析(v3.0 更新说明:§1-§13 保留原样,新增 §14-§18,更新 §3/§5/§7
# §1 现状分析
@@ -950,7 +952,151 @@ handler.post_complete(task_id, agent_id, outcome, db_path)
---
# §18 设计决策记录
## §14. Mail 失败通知机制
### 20.1 背景
Mail 是 A→B 点对点通信,失败应通知发件人 A,而非统一 @pangtong
当前机制(v1.3 已实现):
- `_mark_task("failed")` 对 _mail 项目:调用 `mail_notify.notify_mail_failed` 通知发件人
- `_mark_task("failed")` 对 Task 项目:@pangtong-fujunshiF2 原逻辑不变)
- `_mail_auto_complete` 的 no_reply_found:标 failed 后通知发件人
- 防递归:`must_haves.system_notify=true` 的邮件失败不再递归通知
### 20.2 失败场景与重试机制
所有可能的失败路径及其重试/等待机制(重试上限 max_retries=3agent_timeout=630s):
| 失败类型 | 机制 | 重试次数 | 每次耗时 | cooldown | 最长总耗时 |
|---|---|---|---|---|---|
| `gateway_timeout` | 续杯 | 3 | 630s | 无 | ~31.5 分钟 |
| `crashed` | ticker 兜底 | 3 | ~2-5 分钟 | 60s + 30s ticker | ~15 分钟 |
| `api_error`rate_limit | 推 pending**待改为续杯** | 3 | ~2.5 分钟 | 120s | ~8 分钟 |
| `compact_interrupted` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `gateway_unreachable` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `lock_conflict` | 续杯 | 3 | 630s | 60s | ~34 分钟 |
| `fallback_timeout` | 续杯(A3b) | 3 | 630s | 60s | ~34 分钟 |
| `compact_wait` | monitor 等待 | 3 | 630s | 无 | ~31.5 分钟 |
| `compact_hanging` | monitor → release | 3 | 630s | 300s | ~31.5 分钟 + ticker |
| `max_monitor_timeouts` | monitor 上限 | 3 | 630s | 无 | ~31.5 分钟 |
| `session_stuck` | revive 1 次 | 1 | ~30s | 无 | ~30 秒 |
| `compact_failed` | 无重试 | 0 | — | 300s | 立刻 failed |
| `auth_failed` | 无重试 | 0 | — | — | 立刻 failed |
| `agent_error` | 无重试 | 0 | — | 300s | 立刻 failed |
| `no_reply_found` | 无重试 | 0 | — | — | 立刻 failed |
### 20.3 触发点
| 触发点 | 文件 | 说明 |
|---|---|---|
| `_mark_task(failed)` | spawner.py | _mail 项目 → notify_mail_failedTask 项目 → @pangtong |
| `_mail_auto_complete` no_reply_found | dispatcher.py | Agent 正常退出但没回复 request → 标 failed → 通知发件人 |
### 20.4 实现位置
- `src/daemon/mail_notify.py``notify_mail_failed` + `_is_mail_project` + 通知模板
- `src/daemon/spawner.py``_mark_task` 中 _mail/Task 分流
- `src/daemon/dispatcher.py``_mail_auto_complete` 中 no_reply_found 后调 notify
### 20.5 通知设计(v2.0 — AI Native
通知提供充足事实信息,不做硬编码处理建议。收件 AI 自行判断下一步。
**通知结构**
```
邮件投递失败通知
📧 原始邮件:「{title}」
👤 收件人:{to_agent}
❌ 失败原因:{reason_human_readable}{reason_raw}
📊 重试情况:{attempt_info}
📋 上下文信息:
{detail_formatted}
常见失败原因参考:
• no_reply_found:收件人未回复(Agent 未能识别或处理此邮件)
• crashed / max_crash_count:收件人处理时进程崩溃(已自动重试 3 次)
• max_retries:续杯耗尽(已自动重试 3 次,共约 34 分钟)
• max_api_retry_countAPI 连续失败达上限(rate_limit/500/503
• max_monitor_timeouts:处理超时达上限(共约 31.5 分钟)
• gateway_timeoutAgent 执行超时(已续杯重试)
• session_stuckAgent 会话假死(lock PID 死亡,revive 失败)
• revive_failed:会话假死后恢复失败
• auth_failedAgent 认证失败(配置问题)
• fallback_exhausted:主模型和备用模型均失败
• agent_failed:收件人主动标记失败
• compact_failed:上下文压缩失败
• compact_hanging:上下文压缩长时间未完成(等待超 31.5 分钟)
• compact_interrupted:上下文压缩被中断(已自动重试 3 次)
• gateway_unreachableGateway 不可达(已自动重试 3 次)
• lock_conflict:会话锁冲突(已自动重试 3 次)
• 其他:建议排查系统日志
——系统自动通知
```
**reason 人话翻译映射**
| reason_raw | reason_human_readable | detail 提取 |
|---|---|---|
| `no_reply_found` | 收件人未回复 | 无额外信息 |
| `crashed` | 处理时进程崩溃 | stderr_preview 前 200 字 |
| `max_crash_count` | 连续崩溃达上限 | count + stderr_preview |
| `max_retries` | 续杯耗尽 | count + retry_field |
| `max_api_retry_count` | API 连续失败达上限 | count |
| `max_monitor_timeouts` | 处理超时达上限 | count + elapsed_seconds |
| `gateway_timeout` | Agent 执行超时 | retry_count |
| `session_stuck` | 会话假死 | stuck_count |
| `revive_failed` | 假死后恢复失败 | stuck_count |
| `auth_failed` | 认证失败 | stderr_preview |
| `fallback_exhausted` | 模型全部失败 | fallback_count + fallback_reason |
| `agent_failed` | 收件人主动标失败 | 无 |
| `compact_failed` | 上下文压缩失败 | stderr_preview |
| `compact_hanging` | 压缩长时间未完成 | compact_wait_count |
| `compact_interrupted` | 压缩被中断 | 无 |
| `gateway_unreachable` | Gateway 不可达 | stderr_preview |
| `lock_conflict` | 会话锁冲突 | 无 |
| 默认 | 未知原因 | reason + stderr_preview(如有) |
**重试情况格式**
- 有重试:`"已自动重试 {count} 次,共耗时约 {total_time}"`
- 无重试:`"无法重试({reason_human_readable}"`
### 20.6 防递归
系统通知邮件(from=system)本身也可能失败:
- 检查 `must_haves.system_notify=true` → 跳过递归通知
- system 不是有效 Agent → 通知路由到 pangtong-fujunshi 代处理
### 20.7 待实现改动
#### P1api_error rate_limit 改为可恢复 retry
**当前**`_classify_outcome` 中 rate_limit/500/503 → `api_error``should_retry=False`,走推 pending 路径。
**改为**`should_retry=True`,走续杯路径。cooldown 60s。上限仍 3 次。
**改动文件**`src/daemon/spawner.py` `_classify_outcome``api_error` 分支。
**影响**`api_retry_count` 机制可以废弃(统一用 `retry_count`),但保持向后兼容暂不删除。
#### P2:通知模板更新(v2.0
**当前**`mail_notify.py``_NOTIFY_TEMPLATE` 是静态模板,不传 detail。
**改为**:动态模板,根据 reason 选择人话翻译 + 提取 detail 信息 + 格式化重试情况。
**改动文件**`src/daemon/mail_notify.py`
**新增**`_REASON_MAP` 字典(reason → 人话 + detail 提取函数)。
### 20.8 不改的
| 项目 | 原因 |
|---|---|
| F2 @pangtong 对 Task 的逻辑 | Task failed 仍 @pangtong,只对 Mail 不同 |
| no_reply_found 的判定逻辑 | 只在判定后加通知,不改判定本身 |
| inform 类型邮件的完成逻辑 | inform 直接 done,不存在 no_reply_found |
| 外部 API 的 from 校验 | system 不走 HTTP,外部无法伪造 |
---
# §21 设计决策记录
本节记录设计过程中的关键讨论和决策,便于未来回顾。
@@ -1010,7 +1156,7 @@ handler.post_complete(task_id, agent_id, outcome, db_path)
---
## §19. 审查与验证历史
## §22. 审查与验证历史
### Step 2-5 背靠背审查(2026-06-10/11
@@ -1,6 +1,6 @@
# §24 — Compact 检测方案修正
# §15 — Compact 检测方案修正
> 状态:**v5 已实现**gateway log + jsonl 配对)
> 状态:✅ 已完成gateway log + jsonl 配对)
> 作者:庞统
> 日期:2026-06-11v4),2026-06-13v5
> 框架:基于 §07 Spawner Acquire-First
+307
View File
@@ -0,0 +1,307 @@
# #16 知识注入设计
> 状态:v2 设计中
> 作者:庞统
> 日期:2026-06-13v1),2026-06-14v2 对齐 #11 四层架构)
> 评审:待司马懿评审
## 一、问题
### 1.1 现状
Agent(庞统、司马懿、张飞等)在执行任务时,不主动查询已有知识库(wiki-vault)。导致:
1. **重复调研**:赵云查过的数据清洗经验,张飞又从头调研一遍
2. **重复踩坑**wiki-vault 里已有"vnpy load_bar 需要显式指定 end=None"的实践,张飞还是踩了
3. **方案质量低**:做方案时纯靠推理,不查已有的优秀实践
4. **知识 gap 无人管**:查不到相关知识时没记录,下次还是查不到
### 1.2 根因
不是没有知识库(wiki-vault 有 50+ practices 页面),也不是没有检索能力(wiki-query Skill 已存在)。
**根因是注入时机**:Agent 不知道什么时候该查、没有强制机制让 Agent 在关键决策点查。
### 1.3 目标
1. Agent 在关键决策点**主动查询** wiki-vault
2. 查不到相关知识时**自动记录** knowledge gap
3. 定时任务处理 gap + 总结经验,**持续丰富** wiki-vault
4. 不增加 prompt token 负担(不自动注入知识全文,只引导查询)
## 二、调研
### 2.1 Superpowers:强制 Skill 检查(最有效)
**核心设计**session-start hook 注入铁律级指令——
> "If you think there is even a **1% chance** a skill might apply, you **ABSOLUTELY MUST** invoke the skill. This is not negotiable."
配合 **Red Flags 表**防止 Agent 自合理化跳过:
| Agent 的想法 | Red Flag 驳回 |
|---|---|
| "这个问题很简单" | 简单问题也需要查实践 |
| "我需要更多上下文" | Skill 检查在澄清问题之前 |
| "先看看代码" | Skill 告诉你怎么看代码 |
| "我记住了这个 Skill" | Skill 会更新,重新读 |
**为什么有效**:不靠 Agent "想起来",靠铁律强制。Skill 触发在任何响应之前。
### 2.2 Hermes:经验闭环 + Session Search
**经验闭环**:完成复杂任务(5+ tool calls)→ 自动创建 Skill → 下次自然触发。
**Session Search**:系统提示注入——"当用户提及过去内容时,主动搜索而非要求用户重复"。
**为什么有效**:不是"知识查询"而是"行为内化"——经验变成 SkillSkill 有 description 触发词。
### 2.3 结论
综合两个项目的优势:
| 设计点 | 来源 | 我们的做法 |
|--------|------|-----------|
| 铁律级强制 | Superpowers | L0 Hook 注入 + L1 SOUL.md 行为引导 |
| Red Flags 反合理化 | Superpowers | 知识查询 Red Flags 表(L1 SOUL.md |
| 经验内化 | Hermes | 经验→wiki-vault→下次查询 |
| 渐进式披露 | Hermes | 先查 summary,按需读全文 |
## 三、设计决策(对齐 #11 四层架构)
> **层级体系严格对齐 [#11](./11-context-layers-redesign.md)**,不自创命名。
### 总览
| #11 层级 | 知识注入角色 | 本设计覆盖 | 注入方式 |
|----------|------------|-----------|---------|
| **L0 铁律层** | "做方案前先查 wiki-vault" | ✅ D16-1 | Hook 每轮强制注入 |
| **L1 角色层** | TOOLS.md 知识库速查表 + SOUL.md Red Flags | ✅ D16-2 | Workspace 文件自动注入 |
| **L2 引擎注入层** | 三种 handler 各注入 WikiGuideSection | ✅ D16-3 | PromptComposer 拼装 |
| **L3 被动参考层** | wiki-query Skill 按需触发 | ✅ D16-4 | extraDirs Description 匹配 |
| 运维层 | gap 闭环 cron job | ✅ D16-5 | 不属于上下文分层 |
### D16-1L0 铁律层 — 新增一条 wiki 查询铁律
L0 只放跨系统通用的、不可绕过的行为底线。wiki 查询铁律和 GATE 门控同级。
**新增铁律**
```
<wiki-rule>
做方案前先查 wiki-vault,有 1% 相关就要查。查不到记 knowledge-gaps.md。
</wiki-rule>
```
**注入方式**:和 `<gate-rules>` / `<delegation-rule>` 并列,Hook 每轮强制注入。
**覆盖范围**:所有 Agent、所有场景(不限于 moziplus spawn 的子任务)。
### D16-2L1 角色层 — TOOLS.md + SOUL.md
#### TOOLS.md(✅ 已完成)
各 Agent workspace 的 TOOLS.md 中已有「LLM Wiki 知识库」段,包含:
- 速查表(场景 → 怎么做 → 什么时候用)
- 检索原则(index.md → summary → grep → 整页读取,从便宜到昂贵)
- 目录结构(wiki-vault / practices / concepts / skills / ...
- 铁律(做方案前先查、查不到记 gap)
#### SOUL.md Red Flags
在各 Agent 的 SOUL.md 中加入知识查询 Red Flags 表(和 Superpowers 一致):
| Agent 的想法 | 反驳 |
|---|---|
| "这个我以前做过" | 知识库可能已更新,查一下确认 |
| "先做再说" | 做方案前查实践比做错了返工便宜 |
| "这个领域我熟悉" | 熟悉≠知道最新实践,wiki-vault 持续更新 |
| "查知识库浪费时间" | 重复踩坑浪费的时间远大于查询时间 |
### D16-3L2 引擎注入层 — 三种 handler 各注入 WikiGuideSection
L2 是 BootstrapBuilder/PromptComposer 动态拼装的 prompt 段。当前有三种 handler,各有自己的 PromptSection 实现:
#### 当前 handler 结构
| Handler | Sectionspriority | 有 wiki 引导? |
|---------|---------------------|--------------|
| **TaskHandler** | Context(10) → Prior(20) → RoleSkill(30) → API(40) → Constraints(50) | ❌ |
| **MailHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
| **ToolchainHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
#### 新增 WikiGuideSectionpriority=60PRIORITY_EXTENSION
创建一个**通用 PromptSection**,三种 handler 的 `get_sections()` 都注入:
```python
# 可放在 prompt_composer.py 或独立文件,三种 handler 共用
class WikiGuideSection:
"""知识查询引导段 — 引导 Agent 在关键决策点查 wiki-vault。"""
name: str = "wiki_guide"
priority: int = 60 # PRIORITY_EXTENSION
WIKI_GUIDE = (
"## 知识查询引导\n"
"涉及方案设计、编码实现、故障排查时,先查 wiki-vault 相关实践:\n"
"- 路径:/Volumes/KnowledgeBase/wiki-vault/\n"
"- 速查:index.md → grep 关键词 → summary 字段 → 按需读全文\n"
"- 查不到:在 _meta/knowledge-gaps.md 记录"
)
def render(self, context: PromptContext) -> str:
return self.WIKI_GUIDE
def should_include(self, context: PromptContext) -> bool:
return True
```
#### 三种 handler 改动
每种 handler 的 `get_sections()` 末尾加 `WikiGuideSection()`
```python
# TaskHandler
def get_sections(self) -> list:
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# MailHandler
def get_sections(self) -> list:
return [
MailContextSection(),
MailApiSection(),
MailConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# ToolchainHandler
def get_sections(self) -> list:
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
WikiGuideSection(), # ← 新增
]
```
#### 为什么三种 handler 都需要
- **TaskHandler**:executor 做方案/编码,最需要查实践
- **ToolchainHandler**:CI 失败排查、部署问题,有相关运维实践可参考
- **MailHandler**:request 类型回复杂问题时也可能需要查已有经验
#### token 开销
WikiGuideSection 固定 ~60 字(~30 tokens),对 L2 预算影响可忽略。
### D16-4L3 被动参考层 — wiki-query Skill
#### 现状
`wiki-query` Skill 已部署在 `~/.sanguo_projects/sanguo_mozi/skills/wiki/wiki-query/SKILL.md`description 包含中文触发词:
> 调查、研究、分析、优秀实践、最佳实践、经验、怎么做X、有没有X的经验、以前怎么处理的
#### 触发机制
Agent 通过 extraDirs 加载 Skill headername + description),按 Description 匹配自主 `read` 全文。这是标准 L3 行为,和 #11 设计一致。
#### 待确认:extraDirs 子目录递归
wiki-query 在 `skills/wiki/wiki-query/` 子目录下。需确认 moziplus spawn 子 agent 时 extraDirs 是否递归扫描子目录。如果不递归,需要:
- 方案 A:把 wiki-query 移到 `skills/` 顶层
- 方案 B:配置 extraDirs 包含 `skills/wiki/` 子目录
### D16-5:知识 gap 记录 + 定时任务(运维层)
> 不属于上下文分层体系,是独立的运维流程。
#### gap 记录机制(已有基础设施)
- **位置**`/Volumes/KnowledgeBase/wiki-vault/_meta/knowledge-gaps.md`
- **格式**`- [日期] Agent名查"主题" → 待处理`
- **已有 20+ 条历史记录**,处理后标注 `→ 已建立 ✅`
wiki-query Skill 的 Step 5 已内置 gap 记录逻辑。
#### 定时任务(已有 cron 基础)
| 任务 | 时间 | 内容 | 状态 |
|------|------|------|------|
| wiki-daily-update | 每天 04:00 | 处理 knowledge gaps + 当天经验总结 → 写入 wiki-vault | ✅ 已有 cron,需完善 |
| pangtong-vault-sync | 每天 05:00 | 同步 wiki-vault 到 agent workspace | ✅ 已有 |
**wiki-daily-update 完善内容**
1. 读取 knowledge-gaps.md 中"待处理"条目
2. 对每个 gap:搜索 knowledge_base 是否有相关源码/文档 → 有则提炼写入 wiki-vault
3. 搜索最近一天的 jsonl 日志,提取有价值的经验
4. 新建或更新 wiki-vault 页面
5. 更新 knowledge-gaps.md(标记为"已建立 ✅"或"无KB内容,跳过"
### D16-6:和 #11 各层关系总结
| #11 层级 | #11 原始定义 | 知识注入贡献 | 本设计 |
|---------|------------|------------|--------|
| L0 铁律 | GATE 门控 + Delegation + 安全底线 | wiki 查询铁律 | ✅ D16-1 |
| L1 角色 | SOUL.md + AGENTS.md + TOOLS.md + MEMORY.md | TOOLS.md 速查表 + SOUL.md Red Flags | ✅ D16-2 |
| L2 引擎 | 任务上下文 + 角色操作规范 + 硬约束 | WikiGuideSection 通用段 | ✅ D16-3 |
| L3 参考 | A/B/C/D 类 Skill,靠 Description 触发 | wiki-query Skill | ✅ D16-4 |
| 运维 | — | gap 闭环 cron job | ✅ D16-5 |
### D16-7:为什么不做 PromptComposer 自动注入知识全文
1. **token 浪费**:每次任务都注入可能不相关的知识
2. **覆盖范围有限**:只影响 moziplus 子任务 Agent
3. **Agent 主动查询更精准**:知道自己缺什么知识,按需查询
## 四、改动清单
### 4.1 已完成 ✅
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| TOOLS.md 知识库段 | 各 Agent workspace TOOLS.md | L1 | 速查表 + 检索原则 + 目录结构 + 铁律 |
| wiki-query Skill 部署 | `skills/wiki/wiki-query/SKILL.md` | L3 | 中文触发词 + 渐进式检索协议 |
| knowledge-gaps.md | `_meta/knowledge-gaps.md` | 运维 | 已有 20+ 条记录 |
| wiki-daily-update cron | cron job | 运维 | 每天 04:00,需完善处理逻辑 |
| pangtong-vault-sync cron | cron job | 运维 | 每天 05:00 |
### 4.2 待实现
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| L0 wiki 铁律 | Hook 注入配置(`prependContext` | L0 | 新增 `<wiki-rule>` 段 |
| SOUL.md Red Flags | 各 Agent workspace SOUL.md | L1 | 知识查询 Red Flags 表 |
| WikiGuideSection | `prompt_composer.py` 或独立文件 | L2 | 通用 PromptSection,三种 handler 共用 |
| TaskHandler 注入 | `task_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| MailHandler 注入 | `mail_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| ToolchainHandler 注入 | `toolchain_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| extraDirs 递归确认 | moziplus spawn 配置 | L3 | 确认 wiki-query 子目录可被发现 |
| wiki-daily-update 完善 | cron job 脚本 | 运维 | gap 处理 + jsonl 经验提取 |
### 4.3 不做
| 项目 | 原因 |
|------|------|
| PromptComposer 知识全文注入 | token 浪费,Agent 主动查询更精准 |
| experiences 表 | wiki-vault 已覆盖,不重复建设 |
| 新 Skill(除 wiki-query 外) | wiki-query 已有,不需要新的 |
## 五、风险
| 风险 | 概率 | 缓解 |
|------|------|------|
| Agent 不主动查 wiki | 中 | L0 铁律强制 + L2 引导 + L3 Description 触发,三层保障 |
| wiki-query 在子目录不被 extraDirs 发现 | 中 | 确认后决定移顶层或配置子目录 |
| wiki-daily-update gap 处理质量不够 | 低 | 人工审核 + 逐步完善 |
| WikiGuideSection 增加 token | 低 | 固定 ~30 tokens,影响可忽略 |
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+286 -17
View File
@@ -50,7 +50,15 @@ router = APIRouter(tags=["toolchain"])
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
_idempotency_lock: Optional[asyncio.Lock] = None
def _get_idempotency_lock() -> asyncio.Lock:
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
global _idempotency_lock
if _idempotency_lock is None:
_idempotency_lock = asyncio.Lock()
return _idempotency_lock
def _is_duplicate(event: str, delivery: str,
@@ -189,6 +197,7 @@ def _calc_risk_level(changed_files: List[str]) -> str:
MAIL_PROJECT_ID = "_mail"
TOOLCHAIN_PROJECT_ID = "_toolchain"
def _mail_db_path() -> Path:
@@ -200,6 +209,73 @@ def _mail_db_path() -> Path:
return db
def _toolchain_db_path() -> Path:
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
root = get_data_root()
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_toolchain_task(
to_agent: str,
title: str,
description: str,
event_type: str,
action_type: str,
steps: list,
context_data: dict | None = None,
source: str = "webhook",
) -> str:
"""创建 Toolchain Task 并写入 _toolchain DB。
Args:
to_agent: 收件人 Agent ID
title: 任务标题
description: 任务描述模板渲染后的事件信息
event_type: 事件类型review_result / ci_failure / ...
action_type: 动作分类用于步骤选择和日志统计
steps: 结构化编号步骤列表
context_data: 事件上下文数据PR 仓库名等
source: 来源标识
Returns:
创建的 Task ID
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
return ""
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": event_type,
"action_type": action_type,
"steps": steps,
"context": context_data or {},
"from": "system",
"source": source,
}, ensure_ascii=False)
task = Task(
id=task_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=must_hives,
task_type="toolchain",
status="pending",
)
bb = Blackboard(_toolchain_db_path())
bb.create_task(task)
logger.info(
"Toolchain task sent: %s%s [%s] action_type=%s",
title[:40], to_agent, task_id, action_type,
)
return task_id
def _send_mail(
to_agent: str,
title: str,
@@ -327,7 +403,25 @@ async def _send_mention_mails(
})
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="mention",
action_type="mention",
steps=[
"按上方 mention 模板中的 response_guidance 执行",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"content_snippet": content[:500],
"repo": repo,
"issue_number": issue_number,
},
)
# ---------------------------------------------------------------------------
@@ -342,6 +436,8 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
await _handle_pr_opened(payload)
elif action == "closed":
await _handle_pr_closed(payload)
elif action == "synchronize":
await _handle_pr_synchronize(payload)
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
@@ -377,7 +473,27 @@ async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
})
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_mail("simayi-challenger", title, text)
_send_toolchain_task(
to_agent="simayi-challenger",
title=title,
description=text,
event_type="review_request",
action_type="review_request",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"按审查清单审查(参考 code-review Skill",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
},
)
# S3: PR body @mention 通知
pr_body = pr.get("body", "") or ""
@@ -486,7 +602,25 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_comment",
action_type="review_comment",
steps=[
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments",
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
},
)
# S5: Review body @mention 通知(COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -508,7 +642,34 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
if state == "APPROVED":
tc_steps = [
f"合并 PRGitea API: POST /repos/{repo}/pulls/{pr_number}/merge",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
else: # REQUEST_CHANGES
tc_steps = [
"按审查意见逐条修改代码",
"push 到原分支 → CI 自动跑",
"CI 通过后等重新 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_result",
action_type="review_result",
steps=tc_steps,
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"result": result,
"reviewer": reviewer,
"review_body": review_body,
},
)
# S5: Review body @mention 通知(非 COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -577,11 +738,31 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
})
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_mail(reviewer, title, text)
_send_toolchain_task(
to_agent=reviewer,
title=title,
description=text,
event_type="review_updated",
action_type="review_updated",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"重点检查上次 Review 意见的修改部分",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
},
)
def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,复用 deploy_failure 模板"""
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,走 ToolchainHandler。"""
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": f"PR #{pr_number}",
@@ -589,7 +770,25 @@ def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason:
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
full_text = f"{text}\n\n失败原因: {reason}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, full_text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=full_text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"pr_number": pr_number,
"pr_title": pr_title,
"reason": reason,
},
)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
@@ -621,7 +820,21 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
})
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_merged",
action_type="review_merged",
steps=[], # 纯通知,无步骤
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
},
)
# 自动部署:git pull + rsync + 按需 post_deploy
try:
@@ -674,7 +887,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if rsync_proc.returncode != 0:
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
return
# Step 3: 判断是否需要执行 post_deploy
@@ -729,7 +942,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if deploy_proc.returncode != 0:
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
break
else:
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
@@ -738,7 +951,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
except asyncio.TimeoutError:
logger.error("Auto-deploy: timeout for %s", repo)
_send_deploy_failure_mail(repo, pr_number, pr_title, "部署超时")
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时")
except Exception as e:
logger.error("Auto-deploy: unexpected error: %s", e)
@@ -785,7 +998,29 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
})
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_mail(assignee, title, text)
_send_toolchain_task(
to_agent=assignee,
title=title,
description=text,
event_type="issue_assigned",
action_type="issue_assigned",
steps=[
f"创建分支 fix/{issue_number}-{brief}",
"编码 + 写 UT",
"push → 等 CI",
f"CI 通过后创建 PRGitea API: POST /repos/{repo}/pulls",
"等 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"issue_number": issue_number,
"repo": repo,
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
},
)
elif action == "opened":
if "部署失败" in issue_title:
@@ -800,7 +1035,23 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"commit_sha": commit_sha or "(未知)",
},
)
# Issue body @mentionopened 时检查)
issue_body = issue.get("body", "") or ""
@@ -867,7 +1118,25 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
})
title = f"CI 失败: {repo}#{issue_number}"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="ci_failure",
action_type="ci_failure",
steps=[
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
"修复失败的测试",
"push → CI 自动重跑",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": issue_number,
"repo": repo,
"branch": branch,
"error_summary": error_summary,
},
)
# CI 处理完不 return,继续检查 @mention
# === 路径 2:@mention 通知(新增,独立路径) ===
@@ -958,7 +1227,7 @@ async def gitea_webhook(
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
async with _get_idempotency_lock():
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",
+1 -1
View File
@@ -293,7 +293,7 @@ _SCHEMA_STATEMENTS = [
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id),
author TEXT NOT NULL,
comment_type TEXT NOT NULL DEFAULT 'general' CHECK (comment_type IN ('general','handoff','observation','review','rebuttal','rebuttal_response','debate_argument','debate_rebuttal','debate_judgment')),
comment_type TEXT NOT NULL DEFAULT 'general',
body TEXT NOT NULL,
mentions TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
+117 -21
View File
@@ -1,4 +1,4 @@
"""Mail 失败通知 — 以 system 身份通知发件人"""
"""Mail 失败通知 v2.0 — 以 system 身份通知发件人AI Native"""
from __future__ import annotations
@@ -6,7 +6,7 @@ import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from typing import Dict, Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
@@ -15,21 +15,121 @@ from src.config.agents import AGENT_IDS
logger = logging.getLogger(__name__)
# 邮件通知正文模板(统一模板,包含所有可能的失败原因和建议)
_NOTIFY_TEMPLATE = """你的邮件投递失败了。
# ── Reason 人话翻译 + detail 提取 ──────────────────────────────
📧 原始邮件{title}
👤 收件人{to_agent}
失败原因{reason}
def _extract_stderr(detail: dict, max_len: int = 200) -> str:
"""从 detail 中提取 stderr_preview"""
preview = (detail or {}).get("stderr_preview", "")
if preview and len(preview) > max_len:
preview = preview[:max_len] + "..."
return preview
常见失败原因及处理建议
no_reply_found收件人未回复建议重发邮件或通过黑板任务方式联系
auth_failed收件人认证失败需检查 Agent 配置联系姜维(jiangwei-infra)排查
crash_limit收件人处理时多次崩溃系统异常建议稍后重试
task_timeout处理超时建议重发或通过其他方式联系
其他原因建议联系副军师(pangtong-fujunshi)排查
系统自动通知"""
def _fmt_retry_info(reason: str, detail: dict) -> str:
"""格式化重试情况描述"""
_NO_RETRY_REASONS = {
"no_reply_found", "auth_failed", "agent_error",
"agent_failed", "compact_failed",
}
if reason in _NO_RETRY_REASONS:
reason_human = _REASON_MAP.get(reason, _REASON_MAP.get("_default", ("未知原因", lambda d: "")))[0]
return f"无法重试({reason_human}"
count = (detail or {}).get("count", 0)
fallback_count = (detail or {}).get("fallback_count", 0)
if count > 0:
return f"已自动重试 {count}"
if fallback_count > 0:
return f"已自动重试 {fallback_count} 次(fallback"
return "系统已尝试恢复,但仍失败"
# reason_raw → (reason_human_readable, detail_format_fn)
_REASON_MAP: Dict[str, tuple] = {
"no_reply_found": ("收件人未回复(Agent 未能识别或处理此邮件)", lambda d: ""),
"crashed": ("处理时进程崩溃", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else "无 stderr 输出"),
"max_crash_count": ("连续崩溃达上限", lambda d: f"崩溃 {d.get('count', '?')}"),
"max_retries": ("续杯耗尽(已自动重试)", lambda d: f"重试 {d.get('count', '?')}"),
"max_api_retry_count": ("API 连续失败达上限", lambda d: f"API 重试 {d.get('count', '?')}"),
"max_monitor_timeouts": (
"处理超时达上限",
lambda d: f"超时 {d.get('count', '?')} 次,"
f"共约 {d.get('elapsed_seconds', 0) // 60} 分钟"),
"gateway_timeout": ("Agent 执行超时(已续杯重试)", lambda d: ""),
"session_stuck": ("会话假死(lock PID 死亡)", lambda d: f"假死 {d.get('stuck_count', '?')}"),
"revive_failed": ("会话恢复失败", lambda d: f"假死 {d.get('stuck_count', '?')}"),
"auth_failed": ("Agent 认证失败(配置问题)", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"fallback_exhausted": (
"主模型和备用模型均失败",
lambda d: f"fallback {d.get('fallback_count', '?')} 次,"
f"原因: {d.get('fallback_reason', '未知')}"),
"agent_error": (
"Agent 内部错误",
lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"agent_failed": ("收件人主动标记失败", lambda d: ""),
"compact_failed": ("上下文压缩失败", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
"compact_hanging": ("上下文压缩长时间未完成", lambda d: ""),
"compact_interrupted": ("上下文压缩被中断(已自动重试)", lambda d: ""),
"gateway_unreachable": (
"Gateway 不可达(已自动重试)",
lambda d: f"stderr: {_extract_stderr(d)}"
if _extract_stderr(d) else ""),
"lock_conflict": ("会话锁冲突(已自动重试)", lambda d: ""),
"max_retry_count": ("重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"max_lock_retry_count": ("锁冲突重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"max_connect_retry_count": ("连接重试耗尽", lambda d: f"重试 {d.get('count', '?')}"),
"_default": ("未知原因", lambda d: f"stderr: {_extract_stderr(d)}" if _extract_stderr(d) else ""),
}
# 常见失败原因参考(AI Native:提供知识库让收件 AI 自行判断)
_REASON_REFERENCE = """常见失败原因参考:
no_reply_found收件人未回复Agent 未能识别或处理此邮件
crashed / max_crash_count收件人处理时进程崩溃已自动重试 3
max_retries续杯耗尽已自动重试 3 共约 34 分钟
max_api_retry_countAPI 连续失败达上限rate_limit/500/503
max_monitor_timeouts处理超时达上限共约 31.5 分钟
gateway_timeoutAgent 执行超时已续杯重试
session_stuckAgent 会话假死lock PID 死亡revive 失败
revive_failed会话假死后恢复失败
auth_failedAgent 认证失败配置问题
fallback_exhausted主模型和备用模型均失败
agent_failed收件人主动标记失败
compact_failed上下文压缩失败
compact_hanging上下文压缩长时间未完成等待超 31.5 分钟
compact_interrupted上下文压缩被中断已自动重试 3
gateway_unreachableGateway 不可达已自动重试 3
lock_conflict会话锁冲突已自动重试 3
其他建议排查系统日志"""
def _build_notify_text(title: str, to_agent: str, reason: str,
detail: Optional[dict] = None) -> str:
"""构建通知正文(v2.0 AI Native"""
reason_human, detail_fn = _REASON_MAP.get(reason, _REASON_MAP["_default"])
detail_info = detail_fn(detail or {})
retry_info = _fmt_retry_info(reason, detail or {})
lines = [
"邮件投递失败通知",
"",
f"📧 原始邮件:「{title}",
f"👤 收件人:{to_agent}",
f"❌ 失败原因:{reason_human}{reason}",
f"📊 重试情况:{retry_info}",
]
if detail_info:
lines.append("📋 上下文信息:")
lines.append(f" {detail_info}")
lines.append("")
lines.append(_REASON_REFERENCE)
lines.append("")
lines.append("——系统自动通知")
return "\n".join(lines)
def _is_mail_project(db_path: Path) -> bool:
@@ -43,7 +143,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
"""Mail 失败后以 system 身份给发件人发通知邮件
直接通过 Blackboard 创建 Task不走 HTTP API
防递归检查原邮件 must_hives.system_notify true 则跳过
防递归检查原邮件 must_haves.system_notify true 则跳过
发件人不是有效 Agent system 通知庞统代处理避免广播风暴
"""
try:
@@ -83,12 +183,8 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
original_mail_id, from_agent)
target_agent = "pangtong-fujunshi"
# 构造通知正文
text = _NOTIFY_TEMPLATE.format(
title=title,
to_agent=to_agent,
reason=reason,
)
# 构造通知正文v2.0 AI Native
text = _build_notify_text(title, to_agent, reason, detail)
# 创建通知邮件 Task
notify_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
+2
View File
@@ -65,6 +65,8 @@ class PromptContext:
# toolchain 专用
event_type: str = "" # ci_failure / review_request / ...
event_data: Dict = field(default_factory=dict)
action_type: str = "" # 动作分类(review_result / ci_failure / ...
action_steps: list = field(default_factory=list) # 结构化编号步骤列表
# 前序产出
depends_on_outputs: Optional[List] = None
+10 -1
View File
@@ -286,10 +286,15 @@ class AgentSpawner:
# 从 must_haves 解析 mail 元数据(from / performative
from_agent = ""
mail_type = ""
action_type = ""
action_steps = []
try:
meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", "")
mail_type = meta.get("performative", meta.get("type", ""))
# toolchain 字段提取
action_type = meta.get("action_type", "")
action_steps = meta.get("steps", [])
except Exception:
pass
ctx = PromptContext(
@@ -298,6 +303,7 @@ class AgentSpawner:
agent_id=agent_id, role=spawn_type,
spawn_type=spawn_type,
from_agent=from_agent, mail_type=mail_type,
action_type=action_type, action_steps=action_steps,
)
return handler.build_prompt(ctx)
@@ -845,6 +851,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
cls.get("retry_field", "retry_count")
)
elif outcome == "api_error":
# A9: [DEPRECATED] api_error 已改为 should_retry=True 走续杯路径。
# 此分支理论上不再命中,保留作为安全兜底。
# A9: 429/API 错误 → release counter(on_complete)+ 推回 pending + 冷却
# 有上限:api_retry_count 累计达 max_retries 则标 failed
await self._do_on_complete_async(on_complete, agent_id, outcome)
@@ -1842,7 +1850,8 @@ curl -X POST http://{api_host}:{api_port}/api/projects/{project_id}/tasks/{task_
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in [
"rate_limit", "500", "503", "api error"]):
return {"outcome": "api_error", "should_retry": False}
return {"outcome": "api_error", "should_retry": True,
"retry_field": "retry_count", "cooldown_seconds": 60}
if any(kw in stderr_lower for kw in [
"compaction-diag", "context-overflow"]):
return {"outcome": "compact_failed", "should_retry": False}
+355 -111
View File
@@ -1,14 +1,16 @@
"""toolchain_handler.py 工具链事件 handler。
"""toolchain_handler.py - 工具链事件 handler。
处理 Gitea Webhook 事件CI 失败Review 请求Issue 指派等
处理 Gitea Webhook 事件(CI 失败Review 请求Issue 指派等)
L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
from pathlib import Path
from typing import Dict
from typing import Dict, List
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext
@@ -17,13 +19,34 @@ from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Gitea API 配置
# ---------------------------------------------------------------------------
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
_GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
# action_type → action_hint 映射
_ACTION_HINTS: Dict[str, str] = {
"review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。",
"review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。",
"review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。",
"review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。",
"ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。",
"issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。",
"deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。",
"mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。",
"review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。",
"infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。",
}
# ---------------------------------------------------------------------------
# Toolchain PromptSections
# ---------------------------------------------------------------------------
class ToolchainContextSection:
"""事件类型 + 事件详情priority=10"""
"""事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)"""
name: str = "toolchain_context"
priority: int = 10
@@ -32,27 +55,44 @@ class ToolchainContextSection:
event_type = context.event_type
event_data: Dict = context.event_data or {}
# Part 1: 事件信息(现有模板引擎)
if event_type in _TEMPLATE_MAP:
# 使用模板引擎渲染已知事件
variables = {k: str(v) for k, v in event_data.items()}
return render_template(event_type, variables)
event_text = render_template(event_type, variables)
else:
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
event_text = "\n".join(lines)
# fallback:通用事件描述
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
return "\n".join(lines)
# Part 2: 结构化编号步骤(新增,从 action_steps 渲染)
steps: List[str] = context.action_steps or []
if steps:
step_lines = ["", "### 必须执行的步骤", ""]
for i, step in enumerate(steps, 1):
step_lines.append(f"{i}. {step}")
steps_text = "\n".join(step_lines)
else:
steps_text = ""
# Part 3: action 指引(新增,按 action_type 选择)
action_hint = _ACTION_HINTS.get(
context.action_type,
"你收到一个工具链事件,这是一个需要你执行动作的事件。",
)
return f"{action_hint}\n\n{event_text}{steps_text}"
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainApiSection:
"""API 操作指令priority=40),success_status=done"""
"""API 操作指令(priority=40)-- action_report 提交指引"""
name: str = "toolchain_api"
priority: int = 40
@@ -60,28 +100,48 @@ class ToolchainApiSection:
API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str:
task_id = context.task_id
project_id = context.project_id
agent_id = context.agent_id
lines = [
"## API 操作指令",
"",
f"项目 ID: `{context.project_id}`",
f"任务 ID: `{context.task_id}`",
f"项目 ID: `{project_id}`",
f"任务 ID: `{task_id}`",
"",
"### 完成后必须更新任务状态",
"完成后务必通过以下命令将任务标记为 **done**:",
"### 完成后必须提交 action report",
"",
"执行完所有步骤后,必须提交 action report:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\',
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"status": "done"}\'',
f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'',
"```",
"",
"⚠️ 不提交 action report 的任务会被标记为 failed。",
"",
"### 提交产出",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\',
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```",
"",
"### 需要其他角色支持时",
"",
"如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:",
"```bash",
f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\',
' -H "Authorization: token <your-token>" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'',
"```",
"",
"⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。",
"",
]
return "\n".join(lines)
@@ -90,20 +150,50 @@ class ToolchainApiSection:
class ToolchainConstraintsSection:
"""硬约束priority=50"""
"""硬约束 + Red Flags(priority=50)"""
name: str = "toolchain_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
lines = [
"## 硬约束",
"## 硬约束(必须遵守)",
"",
"1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成",
"2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态",
"3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务",
"4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done",
"5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务",
"⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。",
"",
"### 1. 必须按步骤执行",
'- 检查上方“必须执行的步骤”列表',
'- 逐条执行每个步骤,不可跳过',
'- 不要只读不做——这不是纯通知',
"",
"### 2. 必须提交 action report",
'- 执行完所有步骤后,必须提交 action report',
"- 提交方式:POST commentcomment_type='action_report'",
'- 报告内容:简要描述你执行了什么操作、结果如何',
'- ⚠️ 不提交 action report 的任务会被标记为 failed',
"",
"### 3. 不要执行任何状态转换命令",
'- 不要手动标 working/done/review/failed,系统会自动处理',
"",
"### 4. 不需要回复",
'- action report 就是你的完成凭证',
'- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成',
"",
"### 5. 所有协作通过 Gitea 完成",
'- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方',
'- 不要使用 Mail API(飞鸽传书)发送消息',
'- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕',
"",
"### Red Flags(如果脑海中出现以下想法,说明你错了)",
"",
'| Agent 想法 | Red Flag 驳回 |',
'|------------|--------------|',
'| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |',
'| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |',
'| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |',
'| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |',
'| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |',
'| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |',
"",
]
return "\n".join(lines)
@@ -127,7 +217,7 @@ class ToolchainHandler(BaseTaskHandler):
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_workingpending → working"""
"""auto_working:pending → working"""
return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list:
@@ -145,27 +235,55 @@ class ToolchainHandler(BaseTaskHandler):
return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查行动输出(output 或 comment 有实质内容)"""
"""检查 action report(精确验证)+ 三层 fallback"""
try:
conn = get_connection(db_path)
try:
# 检查 output
# 特殊处理:infrastructure_failure 始终通过(防递归)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
try:
meta = json.loads(row["must_haves"])
except Exception:
meta = {}
if meta.get("action_type") == "infrastructure_failure":
return VerifyResult(True, "infrastructure_passthrough",
"infrastructure_failure auto-pass")
# 特殊处理:review_merged 始终通过(纯通知)
if meta.get("action_type") == "review_merged":
return VerifyResult(True, "merged_passthrough",
"review_merged auto-pass")
# 1. 优先检查 action_report comment
report_row = conn.execute(
"SELECT id FROM comments WHERE task_id=? "
"AND comment_type='action_report' LIMIT 1",
(task_id,)
).fetchone()
if report_row:
return VerifyResult(True, "has_action_report", "action_report found")
# 2. fallback:检查 output(向后兼容)
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}")
# 检查 comment(非系统、有实质内容)
# 3. fallback:检查有实质内容的 comment(向后兼容)
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(content) >= 20",
"AND author != 'system' AND LENGTH(body) >= 20",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action", "output=0, comment=0")
return VerifyResult(False, "no_action",
"no action_report, no output, no valid comment")
finally:
conn.close()
except Exception as e:
@@ -174,32 +292,217 @@ class ToolchainHandler(BaseTaskHandler):
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 标 failed + Mail API 通知主公"""
"""验证失败 → 三分路处理(业务/系统/基础设施)"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason)
logger.info("Toolchain %s: verify failed (%s), marked failed",
task_id, verify.reason)
# 从 db 读取事件上下文
event_type = ""
event_data: Dict = {}
# 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取
meta = {}
assignee = agent_id
try:
conn = get_connection(db_path)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
"SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
meta = json.loads(row["must_haves"])
event_type = meta.get("event_type", "")
raw = meta.get("event_data", "{}")
event_data = json.loads(raw) if isinstance(raw, str) else raw
if row:
if row["must_haves"]:
meta = json.loads(row["must_haves"])
assignee = row["assignee"] or agent_id
conn.close()
except Exception:
pass
self._notify_via_mail_api(
task_id, verify.reason, verify.evidence,
event_type, event_data,
action_type = meta.get("action_type", "")
context_data = meta.get("context", {})
# 三分路决策
route = self._classify_failure(verify)
if route == "business":
self._handle_business_failure(
task_id, agent_id, verify, action_type, context_data, assignee, db_path)
elif route == "system":
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else: # infrastructure
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _classify_failure(self, verify: VerifyResult) -> str:
"""分类失败类型:business / infrastructuresystem 通过升级到达)"""
# verify_error 或 DB 不可用 → 基础设施失败
if verify.reason == "verify_error":
return "infrastructure"
# 默认:业务失败
return "business"
def _handle_business_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, assignee: str,
db_path: Path,
) -> None:
"""业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee"""
repo = context_data.get("repo", "")
pr_number = context_data.get("pr_number") or context_data.get("issue_number", "")
if repo and pr_number:
comment_body = (
f"@{assignee or agent_id} 工具链任务执行失败\n\n"
f"任务 ID: {task_id}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"请检查黑板任务并处理。"
)
success = self._create_gitea_comment(repo, pr_number, comment_body)
if success:
logger.info("Toolchain %s: business failure → Gitea comment on %s#%s",
task_id, repo, pr_number)
return
# Gitea API failed → escalate to system failure
logger.warning(
"Toolchain %s: Gitea comment failed, escalating to system failure",
task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else:
# 没有 PR/Issue 关联 → fallback 到系统失败
logger.warning(
"Toolchain %s: no PR/Issue context for business failure, "
"escalating to system failure", task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
def _handle_system_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, db_path: Path,
) -> None:
"""系统失败 → 创建 Gitea Issue @pangtong-fujunshi"""
repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2")
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
body = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {action_type or '未知'}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"@pangtong-fujunshi 请检查黑板任务并手动处理。"
)
# 尝试在 Gitea 创建 Issue
created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"])
if created:
logger.info("Toolchain %s: system failure → Gitea Issue created on %s",
task_id, repo)
else:
# Gitea API 不可用 → 基础设施失败
logger.error(
"Toolchain %s: Gitea API unavailable, escalating to infrastructure failure",
task_id)
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _handle_infrastructure_failure(
self, task_id: str, agent_id: str,
verify: VerifyResult, db_path: Path,
) -> None:
"""基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)"""
try:
from datetime import datetime
new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": "infrastructure_failure",
"action_type": "infrastructure_failure",
"steps": [
"检查 Gitea 服务状态(http://192.168.2.154:3000)",
"检查网络连通性",
"恢复后提交 action report",
],
"context": {"original_task_id": task_id, "verify_reason": verify.reason},
"from": "system",
"source": "toolchain_handler_on_failure",
}, ensure_ascii=False)
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
new_task_id,
f"[基础设施] Gitea API 不可用 - {task_id}",
f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n"
f"请检查 Gitea 服务状态和网络连通性。",
"jiangwei-infra",
"system",
must_hives,
"toolchain",
"pending",
)
)
conn.commit()
conn.close()
logger.info(
"Toolchain %s: infrastructure failure → task %s created for jiangwei-infra",
task_id, new_task_id)
except Exception as e:
logger.error(
"Toolchain %s: failed to create infrastructure_failure task: %s",
task_id, e)
# -----------------------------------------------------------------------
# Gitea API 辅助
# -----------------------------------------------------------------------
def _create_gitea_comment(
self, repo: str, pr_number: int, body: str,
) -> bool:
"""在 PR/Issue 上创建 comment。返回是否成功。"""
if not _GITEA_TOKEN:
return False
payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e)
return False
def _create_gitea_issue(
self, repo: str, title: str, body: str,
assignees: list = None,
) -> bool:
"""创建 Gitea Issue。返回是否成功。"""
if not _GITEA_TOKEN:
return False
data = {"title": title, "body": body}
if assignees:
data["assignees"] = assignees
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea create issue failed on %s: %s", repo, e)
return False
# -----------------------------------------------------------------------
# 兼容:保留旧方法签名(但不再被 on_failure 调用)
# -----------------------------------------------------------------------
def _build_gitea_links(self, event_type: str, event_data: dict) -> str:
"""根据事件类型构建 Gitea 链接。"""
links = []
@@ -215,63 +518,4 @@ class ToolchainHandler(BaseTaskHandler):
if "branch" in event_data and "commit" not in event_data:
links.append(f"分支: {event_data['branch']}")
return "\n".join(links) if links else "无法提取链接请检查黑板任务详情"
def _notify_via_mail_api(
self,
task_id: str,
reason: str,
evidence: str,
event_type: str,
event_data: Dict,
) -> None:
"""通过 Mail API 发送丰富的失败通知给主公。"""
# 构建行动指引
action_hint = "请检查黑板任务并手动处理。"
et_lower = event_type.lower()
if "ci" in et_lower or "deploy" in et_lower:
action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。"
elif "review" in et_lower:
action_hint = "建议查看 PR review 状态,必要时通知相关开发者。"
elif "issue" in et_lower:
action_hint = "建议创建任务派给对应开发者处理 Issue。"
# 构建事件详情
event_details = ""
if event_data:
event_details = "\n".join(
f" - {k}: {v}" for k, v in event_data.items()
)
# 构建 Gitea 链接
gitea_links = self._build_gitea_links(event_type, event_data)
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
text = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {event_type or '未知'}\n"
f"事件详情:\n{event_details or ' (无)'}\n\n"
f"失败原因: {reason}\n"
f"证据: {evidence}\n\n"
f"{gitea_links}\n\n"
f"行动指引: {action_hint}"
)
payload = json.dumps({
"from": "daemon",
"to": "pangtong-fujunshi",
"title": title,
"text": text,
"type": "inform",
}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
"http://localhost:8083/api/mail",
data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
logger.info("Toolchain %s: sent failure notification via Mail API", task_id)
except Exception as e:
logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)
return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)"
+4 -2
View File
@@ -165,14 +165,16 @@ class TestClassifyErrorApi:
1, {"status": "error"}, "rate_limit exceeded", None
)
assert result["outcome"] == "api_error"
assert result["should_retry"] is False
assert result["should_retry"] is True
assert result["cooldown_seconds"] == 60
def test_stderr_500(self):
result = Spawner._classify_outcome(
1, {"status": "error"}, "HTTP 500 Internal Server Error", None
)
assert result["outcome"] == "api_error"
assert result["should_retry"] is False
assert result["should_retry"] is True
assert result["cooldown_seconds"] == 60
class TestClassifyErrorCompact:
+525
View File
@@ -0,0 +1,525 @@
"""Unit tests for §17 ToolchainHandler 强约束实现."""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from src.daemon.prompt_composer import PromptContext, PromptComposer
from src.daemon.toolchain_handler import (
ToolchainHandler,
ToolchainContextSection,
ToolchainApiSection,
ToolchainConstraintsSection,
_ACTION_HINTS,
)
from src.daemon.base_task_handler import VerifyResult
from src.blackboard.db import init_db, get_connection
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db():
"""Create a temporary _toolchain DB for testing."""
with tempfile.TemporaryDirectory() as d:
db_path = Path(d) / "blackboard.db"
init_db(db_path)
yield db_path
@pytest.fixture
def handler():
return ToolchainHandler()
def _insert_task(db_path, task_id, must_haves_json, status="working"):
"""Insert a task into DB for testing."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, "test", "test desc", "zhangfei-dev", "system",
must_haves_json, "toolchain", status)
)
conn.commit()
conn.close()
def _insert_comment(db_path, task_id, author, body, comment_type="general"):
"""Insert a comment into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(task_id, author, comment_type, body)
)
conn.commit()
conn.close()
def _insert_output(db_path, task_id, content="test output"):
"""Insert an output into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO outputs (task_id, agent, output_type, title, summary) "
"VALUES (?, ?, ?, ?, ?)",
(task_id, "zhangfei-dev", "document", "test", content)
)
conn.commit()
conn.close()
# ---------------------------------------------------------------------------
# Step 1a: PromptContext new fields
# ---------------------------------------------------------------------------
class TestPromptContextFields:
def test_action_type_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_type == ""
def test_action_steps_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_steps == []
def test_action_type_set(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_type="review_result",
)
assert ctx.action_type == "review_result"
def test_action_steps_set(self):
steps = ["step 1", "step 2"]
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_steps=steps,
)
assert ctx.action_steps == steps
# ---------------------------------------------------------------------------
# Step 2a: ToolchainContextSection steps rendering + action_hint
# ---------------------------------------------------------------------------
class TestToolchainContextSection:
def test_renders_steps(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_result",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="review_result",
action_steps=["合并 PR", "提交 action report"],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" in result
assert "1. 合并 PR" in result
assert "2. 提交 action report" in result
def test_renders_action_hint(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="ci_failure",
action_type="ci_failure",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "CI 失败" in result
assert "需要你修复" in result
def test_renders_default_hint_for_unknown_action_type(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="unknown",
action_type="unknown_type",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "需要你执行动作的事件" in result
def test_no_steps_no_steps_section(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_merged",
action_type="review_merged",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" not in result
# ---------------------------------------------------------------------------
# Step 2b: ToolchainApiSection action_report guidance
# ---------------------------------------------------------------------------
class TestToolchainApiSection:
def test_has_action_report_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "action_report" in result
assert "comment_type" in result
assert "tc-123" in result
def test_no_manual_done_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
# Should NOT contain the old "标记为 done" instruction
assert "标记为 **done**" not in result
assert '"status": "done"' not in result
def test_has_outputs_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "outputs" in result
def test_has_gitea_collaboration_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "Gitea" in result
assert "Mail API" in result
# ---------------------------------------------------------------------------
# Step 2c: ToolchainConstraintsSection Red Flags
# ---------------------------------------------------------------------------
class TestToolchainConstraintsSection:
def test_has_red_flags_table(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "Red Flags" in result
assert "" in result
def test_has_all_5_constraints(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "必须按步骤执行" in result
assert "必须提交 action report" in result
assert "不要执行任何状态转换命令" in result
assert "不需要回复" in result
assert "所有协作通过 Gitea 完成" in result
def test_has_strong_language(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "强制要求" in result
assert "不是建议" in result
# ---------------------------------------------------------------------------
# Step 2d: verify_completion tests
# ---------------------------------------------------------------------------
class TestVerifyCompletion:
def test_action_report_passes(self, handler, tmp_db):
"""action_report comment → pass"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t1", must_haves)
_insert_comment(tmp_db, "t1", "zhangfei-dev",
"已修复 CI", comment_type="action_report")
result = handler.verify_completion("t1", tmp_db)
assert result.passed is True
assert result.reason == "has_action_report"
def test_no_action_report_fallback_output(self, handler, tmp_db):
"""No action_report but has output → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t2", must_haves)
_insert_output(tmp_db, "t2", "review result content")
result = handler.verify_completion("t2", tmp_db)
assert result.passed is True
assert result.reason == "has_output"
def test_no_action_report_fallback_comment(self, handler, tmp_db):
"""No action_report but has substantial comment → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t3", must_haves)
_insert_comment(tmp_db, "t3", "zhangfei-dev",
"This is a sufficiently long comment about the task.")
result = handler.verify_completion("t3", tmp_db)
assert result.passed is True
assert result.reason == "has_comment"
def test_nothing_passes(self, handler, tmp_db):
"""No action_report, no output, no comment → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t4", must_haves)
result = handler.verify_completion("t4", tmp_db)
assert result.passed is False
assert result.reason == "no_action"
def test_short_comment_fails(self, handler, tmp_db):
"""Comment < 20 chars → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t5", must_haves)
_insert_comment(tmp_db, "t5", "zhangfei-dev", "ok")
result = handler.verify_completion("t5", tmp_db)
assert result.passed is False
def test_review_merged_auto_passes(self, handler, tmp_db):
"""review_merged → always pass"""
must_haves = json.dumps({"action_type": "review_merged"})
_insert_task(tmp_db, "t6", must_haves)
result = handler.verify_completion("t6", tmp_db)
assert result.passed is True
assert result.reason == "merged_passthrough"
def test_infrastructure_failure_auto_passes(self, handler, tmp_db):
"""infrastructure_failure → always pass (anti-recursion)"""
must_haves = json.dumps({"action_type": "infrastructure_failure"})
_insert_task(tmp_db, "t7", must_haves)
result = handler.verify_completion("t7", tmp_db)
assert result.passed is True
assert result.reason == "infrastructure_passthrough"
# ---------------------------------------------------------------------------
# Step 3a: _send_toolchain_task tests
# ---------------------------------------------------------------------------
class TestSendToolchainTask:
def test_creates_task_in_toolchain_db(self):
"""_send_toolchain_task creates a task in _toolchain DB."""
from src.api.toolchain_routes import _send_toolchain_task, _toolchain_db_path
with patch("src.api.toolchain_routes.get_data_root") as mock_root:
with tempfile.TemporaryDirectory() as d:
mock_root.return_value = Path(d)
task_id = _send_toolchain_task(
to_agent="zhangfei-dev",
title="Test Task",
description="Test description",
event_type="ci_failure",
action_type="ci_failure",
steps=["Fix test", "Submit report"],
context_data={"pr_number": 42},
)
assert task_id.startswith("tc-")
# Verify task was written to _toolchain DB
db_path = _toolchain_db_path()
conn = get_connection(db_path)
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)
).fetchone()
assert row is not None
assert row["task_type"] == "toolchain"
assert row["assignee"] == "zhangfei-dev"
# Verify must_haves JSON
meta = json.loads(row["must_haves"])
assert meta["event_type"] == "ci_failure"
assert meta["action_type"] == "ci_failure"
assert meta["steps"] == ["Fix test", "Submit report"]
assert meta["context"]["pr_number"] == 42
conn.close()
def test_unknown_agent_returns_empty(self):
"""_send_toolchain_task with unknown agent returns empty string."""
from src.api.toolchain_routes import _send_toolchain_task
task_id = _send_toolchain_task(
to_agent="unknown-agent",
title="Test",
description="desc",
event_type="test",
action_type="test",
steps=[],
)
assert task_id == ""
# ---------------------------------------------------------------------------
# Step 2e: on_failure three-way routing tests
# ---------------------------------------------------------------------------
class TestOnFailureRouting:
def test_business_failure_creates_gitea_comment(self, handler, tmp_db):
"""Business failure → Gitea PR comment @task assignee (not must_hives field)"""
# S4: must_hives does NOT contain assignee — production data doesn't have it
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
"from": "system",
})
# assignee is set on the tasks table row (as production code writes it)
_insert_task(tmp_db, "t-fail", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = True
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-fail", "zhangfei-dev", tmp_db, verify)
mock_comment.assert_called_once()
call_args = mock_comment.call_args
assert call_args[0][0] == "sanguo/test"
assert call_args[0][1] == 42
# M2: comment body should @ the task's assignee from tasks table
comment_body = call_args[0][2]
assert "@zhangfei-dev" in comment_body
def test_infrastructure_failure_creates_task(self, handler, tmp_db):
"""Infrastructure failure → direct DB task for jiangwei-infra (no reverse dep)"""
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
})
_insert_task(tmp_db, "t-infra", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = False # Gitea API down
with patch.object(handler, "_create_gitea_issue") as mock_issue:
mock_issue.return_value = False # Gitea API still down
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-infra", "zhangfei-dev", tmp_db, verify)
# S3: should directly INSERT into DB, not call _send_toolchain_task
# Verify a new task was created in DB for jiangwei-infra
conn = get_connection(tmp_db)
rows = conn.execute(
"SELECT * FROM tasks WHERE assignee=?",
("jiangwei-infra",)
).fetchall()
conn.close()
assert len(rows) >= 1, "No infrastructure_failure task created"
infra_task = rows[0]
assert infra_task["task_type"] == "toolchain"
meta = json.loads(infra_task["must_haves"])
assert meta["action_type"] == "infrastructure_failure"
# ---------------------------------------------------------------------------
# Regression: _mail path unaffected
# ---------------------------------------------------------------------------
class TestMailRegression:
def test_send_mail_still_exists(self):
"""_send_mail function is preserved."""
from src.api.toolchain_routes import _send_mail
assert callable(_send_mail)
def test_send_mail_not_called_by_handlers(self):
"""No toolchain handler calls _send_mail."""
import inspect
from src.api import toolchain_routes
# Get source of handler functions
source = inspect.getsource(toolchain_routes)
# _send_mail should appear only in its own definition, not in handler bodies
lines = source.split("\n")
in_handler = False
handler_send_mail_calls = []
for i, line in enumerate(lines):
if line.strip().startswith("async def _handle_") or line.strip().startswith("async def _send_mention_mails"):
in_handler = True
elif line.strip().startswith("async def ") or line.strip().startswith("def _"):
if not line.strip().startswith("async def _handle_") and not line.strip().startswith("async def _send_mention_mails"):
in_handler = False
if in_handler and "_send_mail(" in line and not line.strip().startswith("#"):
handler_send_mail_calls.append((i, line.strip()))
assert len(handler_send_mail_calls) == 0, \
f"_send_mail still called in handlers: {handler_send_mail_calls}"
# ---------------------------------------------------------------------------
# Integration: full prompt build
# ---------------------------------------------------------------------------
class TestFullPromptBuild:
def test_prompt_contains_all_sections(self, handler):
"""Full prompt has context, API, and constraints sections."""
ctx = PromptContext(
task_id="tc-test",
title="CI 失败修复",
description="Fix CI failure",
must_haves=json.dumps({
"event_type": "ci_failure",
"action_type": "ci_failure",
"steps": ["Fix test", "Push", "Submit report"],
"context": {"pr_number": 42},
}),
project_id="_toolchain",
agent_id="zhangfei-dev",
event_type="ci_failure",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="ci_failure",
action_steps=["Fix test", "Push", "Submit report"],
)
prompt = handler.build_prompt(ctx)
# Must have action hint
assert "CI 失败" in prompt
assert "需要你修复" in prompt
# Must have steps
assert "必须执行的步骤" in prompt
assert "1. Fix test" in prompt
# Must have API section with action_report
assert "action_report" in prompt
assert "tc-test" in prompt
# Must have constraints with Red Flags
assert "Red Flags" in prompt
assert "强制要求" in prompt