Compare commits

..

1 Commits

Author SHA1 Message Date
cfdaily 178818bb15 fix: 修复 §07 中 compact 文件路径引用 (24→15)
CI / lint (pull_request) Successful in 7s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
2026-06-13 10:14:45 +08:00
12 changed files with 135 additions and 4341 deletions
+2 -15
View File
@@ -27,7 +27,6 @@ jobs:
- name: Setup Python - name: Setup Python
run: | run: |
python3 -m venv /tmp/ci-venv-lint python3 -m venv /tmp/ci-venv-lint
/tmp/ci-venv-lint/bin/pip install --quiet --upgrade pip
/tmp/ci-venv-lint/bin/pip install --quiet flake8 /tmp/ci-venv-lint/bin/pip install --quiet flake8
- name: Lint with flake8 - name: Lint with flake8
@@ -43,24 +42,12 @@ jobs:
- name: Setup Python - name: Setup Python
run: | run: |
rm -rf /tmp/ci-venv-test
python3 -m venv /tmp/ci-venv-test python3 -m venv /tmp/ci-venv-test
/tmp/ci-venv-test/bin/pip install --quiet --upgrade pip /tmp/ci-venv-test/bin/pip install --quiet fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
/tmp/ci-venv-test/bin/pip install --quiet --no-cache-dir fastapi pydantic pyyaml uvicorn requests pytest pytest-asyncio httpx
- name: Debug environment
run: |
echo "PWD=$(pwd)"
echo "PYTHONPATH=$PYTHONPATH"
python3 -c "import sys; [print(p) for p in sys.path if 'sanguo' in p.lower() or 'openclaw' in p.lower()]"
grep -c "assignee = agent_id" src/daemon/toolchain_handler.py || true
grep -c "_BUSINESS_FAIL_THRESHOLD" src/daemon/toolchain_handler.py || true
- name: Run tests (exclude E2E) - name: Run tests (exclude E2E)
run: | run: |
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q || \ /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -q
(echo '=== RETRY WITH VERBOSE ===' && \
PYTHONPATH=$(pwd) /tmp/ci-venv-test/bin/pytest tests/ -m "not e2e" -x -v 2>&1 | tail -30)
# ── Job 3: CI 失败通知 ─────────────────────────────── # ── Job 3: CI 失败通知 ───────────────────────────────
# 使用 needs.<job>.result 直接判断,不查询 commit status API # 使用 needs.<job>.result 直接判断,不查询 commit status API
-307
View File
@@ -1,307 +0,0 @@
# #16 知识注入设计
> 状态:v2 设计中
> 作者:庞统
> 日期:2026-06-13v1),2026-06-14v2 对齐 #11 四层架构)
> 评审:待司马懿评审
## 一、问题
### 1.1 现状
Agent(庞统、司马懿、张飞等)在执行任务时,不主动查询已有知识库(wiki-vault)。导致:
1. **重复调研**:赵云查过的数据清洗经验,张飞又从头调研一遍
2. **重复踩坑**wiki-vault 里已有"vnpy load_bar 需要显式指定 end=None"的实践,张飞还是踩了
3. **方案质量低**:做方案时纯靠推理,不查已有的优秀实践
4. **知识 gap 无人管**:查不到相关知识时没记录,下次还是查不到
### 1.2 根因
不是没有知识库(wiki-vault 有 50+ practices 页面),也不是没有检索能力(wiki-query Skill 已存在)。
**根因是注入时机**:Agent 不知道什么时候该查、没有强制机制让 Agent 在关键决策点查。
### 1.3 目标
1. Agent 在关键决策点**主动查询** wiki-vault
2. 查不到相关知识时**自动记录** knowledge gap
3. 定时任务处理 gap + 总结经验,**持续丰富** wiki-vault
4. 不增加 prompt token 负担(不自动注入知识全文,只引导查询)
## 二、调研
### 2.1 Superpowers:强制 Skill 检查(最有效)
**核心设计**session-start hook 注入铁律级指令——
> "If you think there is even a **1% chance** a skill might apply, you **ABSOLUTELY MUST** invoke the skill. This is not negotiable."
配合 **Red Flags 表**防止 Agent 自合理化跳过:
| Agent 的想法 | Red Flag 驳回 |
|---|---|
| "这个问题很简单" | 简单问题也需要查实践 |
| "我需要更多上下文" | Skill 检查在澄清问题之前 |
| "先看看代码" | Skill 告诉你怎么看代码 |
| "我记住了这个 Skill" | Skill 会更新,重新读 |
**为什么有效**:不靠 Agent "想起来",靠铁律强制。Skill 触发在任何响应之前。
### 2.2 Hermes:经验闭环 + Session Search
**经验闭环**:完成复杂任务(5+ tool calls)→ 自动创建 Skill → 下次自然触发。
**Session Search**:系统提示注入——"当用户提及过去内容时,主动搜索而非要求用户重复"。
**为什么有效**:不是"知识查询"而是"行为内化"——经验变成 SkillSkill 有 description 触发词。
### 2.3 结论
综合两个项目的优势:
| 设计点 | 来源 | 我们的做法 |
|--------|------|-----------|
| 铁律级强制 | Superpowers | L0 Hook 注入 + L1 SOUL.md 行为引导 |
| Red Flags 反合理化 | Superpowers | 知识查询 Red Flags 表(L1 SOUL.md |
| 经验内化 | Hermes | 经验→wiki-vault→下次查询 |
| 渐进式披露 | Hermes | 先查 summary,按需读全文 |
## 三、设计决策(对齐 #11 四层架构)
> **层级体系严格对齐 [#11](./11-context-layers-redesign.md)**,不自创命名。
### 总览
| #11 层级 | 知识注入角色 | 本设计覆盖 | 注入方式 |
|----------|------------|-----------|---------|
| **L0 铁律层** | "做方案前先查 wiki-vault" | ✅ D16-1 | Hook 每轮强制注入 |
| **L1 角色层** | TOOLS.md 知识库速查表 + SOUL.md Red Flags | ✅ D16-2 | Workspace 文件自动注入 |
| **L2 引擎注入层** | 三种 handler 各注入 WikiGuideSection | ✅ D16-3 | PromptComposer 拼装 |
| **L3 被动参考层** | wiki-query Skill 按需触发 | ✅ D16-4 | extraDirs Description 匹配 |
| 运维层 | gap 闭环 cron job | ✅ D16-5 | 不属于上下文分层 |
### D16-1L0 铁律层 — 新增一条 wiki 查询铁律
L0 只放跨系统通用的、不可绕过的行为底线。wiki 查询铁律和 GATE 门控同级。
**新增铁律**
```
<wiki-rule>
做方案前先查 wiki-vault,有 1% 相关就要查。查不到记 knowledge-gaps.md。
</wiki-rule>
```
**注入方式**:和 `<gate-rules>` / `<delegation-rule>` 并列,Hook 每轮强制注入。
**覆盖范围**:所有 Agent、所有场景(不限于 moziplus spawn 的子任务)。
### D16-2L1 角色层 — TOOLS.md + SOUL.md
#### TOOLS.md(✅ 已完成)
各 Agent workspace 的 TOOLS.md 中已有「LLM Wiki 知识库」段,包含:
- 速查表(场景 → 怎么做 → 什么时候用)
- 检索原则(index.md → summary → grep → 整页读取,从便宜到昂贵)
- 目录结构(wiki-vault / practices / concepts / skills / ...
- 铁律(做方案前先查、查不到记 gap)
#### SOUL.md Red Flags
在各 Agent 的 SOUL.md 中加入知识查询 Red Flags 表(和 Superpowers 一致):
| Agent 的想法 | 反驳 |
|---|---|
| "这个我以前做过" | 知识库可能已更新,查一下确认 |
| "先做再说" | 做方案前查实践比做错了返工便宜 |
| "这个领域我熟悉" | 熟悉≠知道最新实践,wiki-vault 持续更新 |
| "查知识库浪费时间" | 重复踩坑浪费的时间远大于查询时间 |
### D16-3L2 引擎注入层 — 三种 handler 各注入 WikiGuideSection
L2 是 BootstrapBuilder/PromptComposer 动态拼装的 prompt 段。当前有三种 handler,各有自己的 PromptSection 实现:
#### 当前 handler 结构
| Handler | Sectionspriority | 有 wiki 引导? |
|---------|---------------------|--------------|
| **TaskHandler** | Context(10) → Prior(20) → RoleSkill(30) → API(40) → Constraints(50) | ❌ |
| **MailHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
| **ToolchainHandler** | Context(10) → API(40) → Constraints(50) | ❌ |
#### 新增 WikiGuideSectionpriority=60PRIORITY_EXTENSION
创建一个**通用 PromptSection**,三种 handler 的 `get_sections()` 都注入:
```python
# 可放在 prompt_composer.py 或独立文件,三种 handler 共用
class WikiGuideSection:
"""知识查询引导段 — 引导 Agent 在关键决策点查 wiki-vault。"""
name: str = "wiki_guide"
priority: int = 60 # PRIORITY_EXTENSION
WIKI_GUIDE = (
"## 知识查询引导\n"
"涉及方案设计、编码实现、故障排查时,先查 wiki-vault 相关实践:\n"
"- 路径:/Volumes/KnowledgeBase/wiki-vault/\n"
"- 速查:index.md → grep 关键词 → summary 字段 → 按需读全文\n"
"- 查不到:在 _meta/knowledge-gaps.md 记录"
)
def render(self, context: PromptContext) -> str:
return self.WIKI_GUIDE
def should_include(self, context: PromptContext) -> bool:
return True
```
#### 三种 handler 改动
每种 handler 的 `get_sections()` 末尾加 `WikiGuideSection()`
```python
# TaskHandler
def get_sections(self) -> list:
return [
TaskContextSection(),
PriorOutputsSection(),
RoleSkillSection(),
TaskApiSection(),
TaskConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# MailHandler
def get_sections(self) -> list:
return [
MailContextSection(),
MailApiSection(),
MailConstraintsSection(),
WikiGuideSection(), # ← 新增
]
# ToolchainHandler
def get_sections(self) -> list:
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
WikiGuideSection(), # ← 新增
]
```
#### 为什么三种 handler 都需要
- **TaskHandler**executor 做方案/编码,最需要查实践
- **ToolchainHandler**CI 失败排查、部署问题,有相关运维实践可参考
- **MailHandler**request 类型回复杂问题时也可能需要查已有经验
#### token 开销
WikiGuideSection 固定 ~60 字(~30 tokens),对 L2 预算影响可忽略。
### D16-4L3 被动参考层 — wiki-query Skill
#### 现状
`wiki-query` Skill 已部署在 `~/.sanguo_projects/sanguo_mozi/skills/wiki/wiki-query/SKILL.md`description 包含中文触发词:
> 调查、研究、分析、优秀实践、最佳实践、经验、怎么做X、有没有X的经验、以前怎么处理的
#### 触发机制
Agent 通过 extraDirs 加载 Skill headername + description),按 Description 匹配自主 `read` 全文。这是标准 L3 行为,和 #11 设计一致。
#### 待确认:extraDirs 子目录递归
wiki-query 在 `skills/wiki/wiki-query/` 子目录下。需确认 moziplus spawn 子 agent 时 extraDirs 是否递归扫描子目录。如果不递归,需要:
- 方案 A:把 wiki-query 移到 `skills/` 顶层
- 方案 B:配置 extraDirs 包含 `skills/wiki/` 子目录
### D16-5:知识 gap 记录 + 定时任务(运维层)
> 不属于上下文分层体系,是独立的运维流程。
#### gap 记录机制(已有基础设施)
- **位置**`/Volumes/KnowledgeBase/wiki-vault/_meta/knowledge-gaps.md`
- **格式**`- [日期] Agent名查"主题" → 待处理`
- **已有 20+ 条历史记录**,处理后标注 `→ 已建立 ✅`
wiki-query Skill 的 Step 5 已内置 gap 记录逻辑。
#### 定时任务(已有 cron 基础)
| 任务 | 时间 | 内容 | 状态 |
|------|------|------|------|
| wiki-daily-update | 每天 04:00 | 处理 knowledge gaps + 当天经验总结 → 写入 wiki-vault | ✅ 已有 cron,需完善 |
| pangtong-vault-sync | 每天 05:00 | 同步 wiki-vault 到 agent workspace | ✅ 已有 |
**wiki-daily-update 完善内容**
1. 读取 knowledge-gaps.md 中"待处理"条目
2. 对每个 gap:搜索 knowledge_base 是否有相关源码/文档 → 有则提炼写入 wiki-vault
3. 搜索最近一天的 jsonl 日志,提取有价值的经验
4. 新建或更新 wiki-vault 页面
5. 更新 knowledge-gaps.md(标记为"已建立 ✅"或"无KB内容,跳过"
### D16-6:和 #11 各层关系总结
| #11 层级 | #11 原始定义 | 知识注入贡献 | 本设计 |
|---------|------------|------------|--------|
| L0 铁律 | GATE 门控 + Delegation + 安全底线 | wiki 查询铁律 | ✅ D16-1 |
| L1 角色 | SOUL.md + AGENTS.md + TOOLS.md + MEMORY.md | TOOLS.md 速查表 + SOUL.md Red Flags | ✅ D16-2 |
| L2 引擎 | 任务上下文 + 角色操作规范 + 硬约束 | WikiGuideSection 通用段 | ✅ D16-3 |
| L3 参考 | A/B/C/D 类 Skill,靠 Description 触发 | wiki-query Skill | ✅ D16-4 |
| 运维 | — | gap 闭环 cron job | ✅ D16-5 |
### D16-7:为什么不做 PromptComposer 自动注入知识全文
1. **token 浪费**:每次任务都注入可能不相关的知识
2. **覆盖范围有限**:只影响 moziplus 子任务 Agent
3. **Agent 主动查询更精准**:知道自己缺什么知识,按需查询
## 四、改动清单
### 4.1 已完成 ✅
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| TOOLS.md 知识库段 | 各 Agent workspace TOOLS.md | L1 | 速查表 + 检索原则 + 目录结构 + 铁律 |
| wiki-query Skill 部署 | `skills/wiki/wiki-query/SKILL.md` | L3 | 中文触发词 + 渐进式检索协议 |
| knowledge-gaps.md | `_meta/knowledge-gaps.md` | 运维 | 已有 20+ 条记录 |
| wiki-daily-update cron | cron job | 运维 | 每天 04:00,需完善处理逻辑 |
| pangtong-vault-sync cron | cron job | 运维 | 每天 05:00 |
### 4.2 待实现
| 改动 | 文件 | 层级 | 说明 |
|------|------|------|------|
| L0 wiki 铁律 | Hook 注入配置(`prependContext` | L0 | 新增 `<wiki-rule>` 段 |
| SOUL.md Red Flags | 各 Agent workspace SOUL.md | L1 | 知识查询 Red Flags 表 |
| WikiGuideSection | `prompt_composer.py` 或独立文件 | L2 | 通用 PromptSection,三种 handler 共用 |
| TaskHandler 注入 | `task_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| MailHandler 注入 | `mail_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| ToolchainHandler 注入 | `toolchain_handler.py` `get_sections()` | L2 | 末尾加 `WikiGuideSection()` |
| extraDirs 递归确认 | moziplus spawn 配置 | L3 | 确认 wiki-query 子目录可被发现 |
| wiki-daily-update 完善 | cron job 脚本 | 运维 | gap 处理 + jsonl 经验提取 |
### 4.3 不做
| 项目 | 原因 |
|------|------|
| PromptComposer 知识全文注入 | token 浪费,Agent 主动查询更精准 |
| experiences 表 | wiki-vault 已覆盖,不重复建设 |
| 新 Skill(除 wiki-query 外) | wiki-query 已有,不需要新的 |
## 五、风险
| 风险 | 概率 | 缓解 |
|------|------|------|
| Agent 不主动查 wiki | 中 | L0 铁律强制 + L2 引导 + L3 Description 触发,三层保障 |
| wiki-query 在子目录不被 extraDirs 发现 | 中 | 确认后决定移顶层或配置子目录 |
| wiki-daily-update gap 处理质量不够 | 低 | 人工审核 + 逐步完善 |
| WikiGuideSection 增加 token | 低 | 固定 ~30 tokens,影响可忽略 |
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+17 -286
View File
@@ -50,15 +50,7 @@ router = APIRouter(tags=["toolchain"])
_delivery_cache: Set[str] = set() _delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = [] _delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600 _TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock: Optional[asyncio.Lock] = None _idempotency_lock = asyncio.Lock()
def _get_idempotency_lock() -> asyncio.Lock:
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
global _idempotency_lock
if _idempotency_lock is None:
_idempotency_lock = asyncio.Lock()
return _idempotency_lock
def _is_duplicate(event: str, delivery: str, def _is_duplicate(event: str, delivery: str,
@@ -197,7 +189,6 @@ def _calc_risk_level(changed_files: List[str]) -> str:
MAIL_PROJECT_ID = "_mail" MAIL_PROJECT_ID = "_mail"
TOOLCHAIN_PROJECT_ID = "_toolchain"
def _mail_db_path() -> Path: def _mail_db_path() -> Path:
@@ -209,73 +200,6 @@ def _mail_db_path() -> Path:
return db return db
def _toolchain_db_path() -> Path:
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
root = get_data_root()
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_toolchain_task(
to_agent: str,
title: str,
description: str,
event_type: str,
action_type: str,
steps: list,
context_data: dict | None = None,
source: str = "webhook",
) -> str:
"""创建 Toolchain Task 并写入 _toolchain DB。
Args:
to_agent: 收件人 Agent ID
title: 任务标题
description: 任务描述模板渲染后的事件信息
event_type: 事件类型review_result / ci_failure / ...
action_type: 动作分类用于步骤选择和日志统计
steps: 结构化编号步骤列表
context_data: 事件上下文数据PR 仓库名等
source: 来源标识
Returns:
创建的 Task ID
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
return ""
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": event_type,
"action_type": action_type,
"steps": steps,
"context": context_data or {},
"from": "system",
"source": source,
}, ensure_ascii=False)
task = Task(
id=task_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=must_hives,
task_type="toolchain",
status="pending",
)
bb = Blackboard(_toolchain_db_path())
bb.create_task(task)
logger.info(
"Toolchain task sent: %s%s [%s] action_type=%s",
title[:40], to_agent, task_id, action_type,
)
return task_id
def _send_mail( def _send_mail(
to_agent: str, to_agent: str,
title: str, title: str,
@@ -403,25 +327,7 @@ async def _send_mention_mails(
}) })
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})" title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
_send_toolchain_task( _send_mail(agent_id, title, text)
to_agent=agent_id,
title=title,
description=text,
event_type="mention",
action_type="mention",
steps=[
"按上方 mention 模板中的 response_guidance 执行",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"content_snippet": content[:500],
"repo": repo,
"issue_number": issue_number,
},
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -436,8 +342,6 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
await _handle_pr_opened(payload) await _handle_pr_opened(payload)
elif action == "closed": elif action == "closed":
await _handle_pr_closed(payload) await _handle_pr_closed(payload)
elif action == "synchronize":
await _handle_pr_synchronize(payload)
async def _handle_pr_opened(payload: Dict[str, Any]) -> None: async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
@@ -473,27 +377,7 @@ async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
}) })
title = f"Review 请求: {pr_title} ({repo}#{pr_number})" title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task( _send_mail("simayi-challenger", title, text)
to_agent="simayi-challenger",
title=title,
description=text,
event_type="review_request",
action_type="review_request",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"按审查清单审查(参考 code-review Skill",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
},
)
# S3: PR body @mention 通知 # S3: PR body @mention 通知
pr_body = pr.get("body", "") or "" pr_body = pr.get("body", "") or ""
@@ -602,25 +486,7 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
}) })
title = f"Review 评论: {pr_title} ({repo}#{pr_number})" title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task( _send_mail(pr_author, title, text)
to_agent=pr_author,
title=title,
description=text,
event_type="review_comment",
action_type="review_comment",
steps=[
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments",
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
},
)
# S5: Review body @mention 通知(COMMENTED 路径) # S5: Review body @mention 通知(COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number) await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -642,34 +508,7 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
}) })
title = f"Review {result}: {pr_title} ({repo}#{pr_number})" title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
if state == "APPROVED": _send_mail(pr_author, title, text)
tc_steps = [
f"合并 PRGitea API: POST /repos/{repo}/pulls/{pr_number}/merge",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
else: # REQUEST_CHANGES
tc_steps = [
"按审查意见逐条修改代码",
"push 到原分支 → CI 自动跑",
"CI 通过后等重新 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_result",
action_type="review_result",
steps=tc_steps,
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"result": result,
"reviewer": reviewer,
"review_body": review_body,
},
)
# S5: Review body @mention 通知(非 COMMENTED 路径) # S5: Review body @mention 通知(非 COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number) await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -738,31 +577,11 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
}) })
title = f"PR 更新: {pr_title} ({repo}#{pr_number})" title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task( _send_mail(reviewer, title, text)
to_agent=reviewer,
title=title,
description=text,
event_type="review_updated",
action_type="review_updated",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"重点检查上次 Review 意见的修改部分",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
},
)
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None: def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,走 ToolchainHandler。""" """CD 部署失败通知,复用 deploy_failure 模板"""
text = render_template("deploy_failure", { text = render_template("deploy_failure", {
"repo": repo, "repo": repo,
"commit_sha": f"PR #{pr_number}", "commit_sha": f"PR #{pr_number}",
@@ -770,25 +589,7 @@ def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason:
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})" title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
full_text = f"{text}\n\n失败原因: {reason}" full_text = f"{text}\n\n失败原因: {reason}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_toolchain_task( _send_mail(agent_id, title, full_text)
to_agent=agent_id,
title=title,
description=full_text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"pr_number": pr_number,
"pr_title": pr_title,
"reason": reason,
},
)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None: async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
@@ -820,21 +621,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
}) })
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})" title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task( _send_mail(pr_author, title, text)
to_agent=pr_author,
title=title,
description=text,
event_type="review_merged",
action_type="review_merged",
steps=[], # 纯通知,无步骤
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
},
)
# 自动部署:git pull + rsync + 按需 post_deploy # 自动部署:git pull + rsync + 按需 post_deploy
try: try:
@@ -887,7 +674,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if rsync_proc.returncode != 0: if rsync_proc.returncode != 0:
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode()) logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}") _send_deploy_failure_mail(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
return return
# Step 3: 判断是否需要执行 post_deploy # Step 3: 判断是否需要执行 post_deploy
@@ -942,7 +729,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if deploy_proc.returncode != 0: if deploy_proc.returncode != 0:
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode()) logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}") _send_deploy_failure_mail(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
break break
else: else:
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5])) logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
@@ -951,7 +738,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error("Auto-deploy: timeout for %s", repo) logger.error("Auto-deploy: timeout for %s", repo)
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时") _send_deploy_failure_mail(repo, pr_number, pr_title, "部署超时")
except Exception as e: except Exception as e:
logger.error("Auto-deploy: unexpected error: %s", e) logger.error("Auto-deploy: unexpected error: %s", e)
@@ -998,29 +785,7 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
}) })
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})" title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_toolchain_task( _send_mail(assignee, title, text)
to_agent=assignee,
title=title,
description=text,
event_type="issue_assigned",
action_type="issue_assigned",
steps=[
f"创建分支 fix/{issue_number}-{brief}",
"编码 + 写 UT",
"push → 等 CI",
f"CI 通过后创建 PRGitea API: POST /repos/{repo}/pulls",
"等 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"issue_number": issue_number,
"repo": repo,
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
},
)
elif action == "opened": elif action == "opened":
if "部署失败" in issue_title: if "部署失败" in issue_title:
@@ -1035,23 +800,7 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
title = f"部署失败: {repo}" title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_toolchain_task( _send_mail(agent_id, title, text)
to_agent=agent_id,
title=title,
description=text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"commit_sha": commit_sha or "(未知)",
},
)
# Issue body @mentionopened 时检查) # Issue body @mentionopened 时检查)
issue_body = issue.get("body", "") or "" issue_body = issue.get("body", "") or ""
@@ -1118,25 +867,7 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
}) })
title = f"CI 失败: {repo}#{issue_number}" title = f"CI 失败: {repo}#{issue_number}"
_send_toolchain_task( _send_mail(pr_author, title, text)
to_agent=pr_author,
title=title,
description=text,
event_type="ci_failure",
action_type="ci_failure",
steps=[
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
"修复失败的测试",
"push → CI 自动重跑",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": issue_number,
"repo": repo,
"branch": branch,
"error_summary": error_summary,
},
)
# CI 处理完不 return,继续检查 @mention # CI 处理完不 return,继续检查 @mention
# === 路径 2:@mention 通知(新增,独立路径) === # === 路径 2:@mention 通知(新增,独立路径) ===
@@ -1227,7 +958,7 @@ async def gitea_webhook(
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重) # 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery: if x_gitea_event and x_gitea_delivery:
async with _get_idempotency_lock(): async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload): if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug( logger.debug(
"Duplicate webhook: %s/%s", "Duplicate webhook: %s/%s",
+1 -1
View File
@@ -293,7 +293,7 @@ _SCHEMA_STATEMENTS = [
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL REFERENCES tasks(id), task_id TEXT NOT NULL REFERENCES tasks(id),
author TEXT NOT NULL, author TEXT NOT NULL,
comment_type TEXT NOT NULL DEFAULT 'general', comment_type TEXT NOT NULL DEFAULT 'general' CHECK (comment_type IN ('general','handoff','observation','review','rebuttal','rebuttal_response','debate_argument','debate_rebuttal','debate_judgment')),
body TEXT NOT NULL, body TEXT NOT NULL,
mentions TEXT, mentions TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')) created_at TEXT NOT NULL DEFAULT (datetime('now'))
+2 -2
View File
@@ -9,7 +9,7 @@ import logging
from pathlib import Path from pathlib import Path
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext, WikiGuideSection from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.mail") logger = logging.getLogger("moziplus-v2.handler.mail")
@@ -36,7 +36,7 @@ class MailHandler(BaseTaskHandler):
return composer.compose(context) return composer.compose(context)
def get_sections(self) -> list: def get_sections(self) -> list:
return [MailContextSection(), MailApiSection(), MailConstraintsSection(), WikiGuideSection()] return [MailContextSection(), MailApiSection(), MailConstraintsSection()]
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""Mail 完成验证:区分 inform/request。 """Mail 完成验证:区分 inform/request。
-26
View File
@@ -65,8 +65,6 @@ class PromptContext:
# toolchain 专用 # toolchain 专用
event_type: str = "" # ci_failure / review_request / ... event_type: str = "" # ci_failure / review_request / ...
event_data: Dict = field(default_factory=dict) event_data: Dict = field(default_factory=dict)
action_type: str = "" # 动作分类(review_result / ci_failure / ...
action_steps: list = field(default_factory=list) # 结构化编号步骤列表
# 前序产出 # 前序产出
depends_on_outputs: Optional[List] = None depends_on_outputs: Optional[List] = None
@@ -127,27 +125,3 @@ class PromptComposer:
) )
return result return result
# ---------------------------------------------------------------------------
# WikiGuideSection — 知识查询引导段
# ---------------------------------------------------------------------------
class WikiGuideSection:
"""知识查询引导段 — 引导 Agent 在关键决策点查 wiki-vault。"""
name: str = "wiki_guide"
priority: int = 60 # PRIORITY_EXTENSION
WIKI_GUIDE = (
"## 知识查询引导\n"
"涉及方案设计、编码实现、故障排查时,先查 wiki-vault 相关实践:\n"
"- 路径:/Volumes/KnowledgeBase/wiki-vault/\n"
"- 速查:index.md → grep 关键词 → summary 字段 → 按需读全文\n"
"- 查不到:在 _meta/knowledge-gaps.md 记录"
)
def render(self, context: "PromptContext") -> str:
return self.WIKI_GUIDE
def should_include(self, context: "PromptContext") -> bool:
return True
-6
View File
@@ -286,15 +286,10 @@ class AgentSpawner:
# 从 must_haves 解析 mail 元数据(from / performative # 从 must_haves 解析 mail 元数据(from / performative
from_agent = "" from_agent = ""
mail_type = "" mail_type = ""
action_type = ""
action_steps = []
try: try:
meta = json.loads(must_haves) if must_haves else {} meta = json.loads(must_haves) if must_haves else {}
from_agent = meta.get("from", "") from_agent = meta.get("from", "")
mail_type = meta.get("performative", meta.get("type", "")) mail_type = meta.get("performative", meta.get("type", ""))
# toolchain 字段提取
action_type = meta.get("action_type", "")
action_steps = meta.get("steps", [])
except Exception: except Exception:
pass pass
ctx = PromptContext( ctx = PromptContext(
@@ -303,7 +298,6 @@ class AgentSpawner:
agent_id=agent_id, role=spawn_type, agent_id=agent_id, role=spawn_type,
spawn_type=spawn_type, spawn_type=spawn_type,
from_agent=from_agent, mail_type=mail_type, from_agent=from_agent, mail_type=mail_type,
action_type=action_type, action_steps=action_steps,
) )
return handler.build_prompt(ctx) return handler.build_prompt(ctx)
+1 -2
View File
@@ -10,7 +10,7 @@ from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext, WikiGuideSection from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.blackboard.db import get_connection from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler") logger = logging.getLogger("moziplus-v2.handler")
@@ -313,7 +313,6 @@ class TaskHandler(BaseTaskHandler):
RoleSkillSection(), RoleSkillSection(),
TaskApiSection(), TaskApiSection(),
TaskConstraintsSection(), TaskConstraintsSection(),
WikiGuideSection(),
] ]
def build_prompt(self, context: PromptContext) -> str: def build_prompt(self, context: PromptContext) -> str:
+112 -357
View File
@@ -1,52 +1,29 @@
"""toolchain_handler.py - 工具链事件 handler。 """toolchain_handler.py 工具链事件 handler。
处理 Gitea Webhook 事件(CI 失败Review 请求Issue 指派等) 处理 Gitea Webhook 事件CI 失败Review 请求Issue 指派等
L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)
""" """
from __future__ import annotations from __future__ import annotations
import json import json
import logging import logging
import os
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Dict
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext, WikiGuideSection from src.daemon.prompt_composer import PromptComposer, PromptContext
from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP
from src.blackboard.db import get_connection from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain") logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Gitea API 配置
# ---------------------------------------------------------------------------
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
_GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
# action_type → action_hint 映射
_ACTION_HINTS: Dict[str, str] = {
"review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。",
"review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。",
"review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。",
"review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。",
"ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。",
"issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。",
"deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。",
"mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。",
"review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。",
"infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。",
}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Toolchain PromptSections # Toolchain PromptSections
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class ToolchainContextSection: class ToolchainContextSection:
"""事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)""" """事件类型 + 事件详情priority=10"""
name: str = "toolchain_context" name: str = "toolchain_context"
priority: int = 10 priority: int = 10
@@ -55,44 +32,27 @@ class ToolchainContextSection:
event_type = context.event_type event_type = context.event_type
event_data: Dict = context.event_data or {} event_data: Dict = context.event_data or {}
# Part 1: 事件信息(现有模板引擎)
if event_type in _TEMPLATE_MAP: if event_type in _TEMPLATE_MAP:
# 使用模板引擎渲染已知事件
variables = {k: str(v) for k, v in event_data.items()} variables = {k: str(v) for k, v in event_data.items()}
event_text = render_template(event_type, variables) return render_template(event_type, variables)
else:
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_type or '未知'}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
event_text = "\n".join(lines)
# Part 2: 结构化编号步骤(新增,从 action_steps 渲染) # fallback:通用事件描述
steps: List[str] = context.action_steps or [] lines = ["## 工具链事件", ""]
if steps: lines.append(f"- **事件类型**: {event_type or '未知'}")
step_lines = ["", "### 必须执行的步骤", ""] if event_data:
for i, step in enumerate(steps, 1): lines.append("- **事件详情**:")
step_lines.append(f"{i}. {step}") for key, value in event_data.items():
steps_text = "\n".join(step_lines) lines.append(f" - {key}: {value}")
else: lines.append("")
steps_text = "" return "\n".join(lines)
# Part 3: action 指引(新增,按 action_type 选择)
action_hint = _ACTION_HINTS.get(
context.action_type,
"你收到一个工具链事件,这是一个需要你执行动作的事件。",
)
return f"{action_hint}\n\n{event_text}{steps_text}"
def should_include(self, context: PromptContext) -> bool: def should_include(self, context: PromptContext) -> bool:
return True return True
class ToolchainApiSection: class ToolchainApiSection:
"""API 操作指令(priority=40)-- action_report 提交指引""" """API 操作指令priority=40),success_status=done"""
name: str = "toolchain_api" name: str = "toolchain_api"
priority: int = 40 priority: int = 40
@@ -100,48 +60,28 @@ class ToolchainApiSection:
API_HOST = "localhost:8083" API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str: def render(self, context: PromptContext) -> str:
task_id = context.task_id
project_id = context.project_id
agent_id = context.agent_id
lines = [ lines = [
"## API 操作指令", "## API 操作指令",
"", "",
f"项目 ID: `{project_id}`", f"项目 ID: `{context.project_id}`",
f"任务 ID: `{task_id}`", f"任务 ID: `{context.task_id}`",
"", "",
"### 完成后必须提交 action report", "### 完成后必须更新任务状态",
"", "完成后务必通过以下命令将任务标记为 **done**:",
"执行完所有步骤后,必须提交 action report:",
"```bash", "```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\', f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/status" \\',
' -H "Content-Type: application/json" \\', ' -H "Content-Type: application/json" \\',
f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'', ' -d \'{"status": "done"}\'',
"```", "```",
"", "",
"⚠️ 不提交 action report 的任务会被标记为 failed。",
"",
"### 提交产出", "### 提交产出",
"", "如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash", "```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\', f'curl -s -X POST "http://{self.API_HOST}/api/projects/{context.project_id}/tasks/{context.task_id}/outputs" \\',
' -H "Content-Type: application/json" \\', ' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'', ' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```", "```",
"", "",
"### 需要其他角色支持时",
"",
"如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:",
"```bash",
f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\',
' -H "Authorization: token <your-token>" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'',
"```",
"",
"⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。",
"",
] ]
return "\n".join(lines) return "\n".join(lines)
@@ -150,50 +90,20 @@ class ToolchainApiSection:
class ToolchainConstraintsSection: class ToolchainConstraintsSection:
"""硬约束 + Red Flags(priority=50)""" """硬约束priority=50"""
name: str = "toolchain_constraints" name: str = "toolchain_constraints"
priority: int = 50 priority: int = 50
def render(self, context: PromptContext) -> str: def render(self, context: PromptContext) -> str:
lines = [ lines = [
"## 硬约束(必须遵守)", "## 硬约束",
"", "",
"⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。", "1. **必须标 done**:处理完成后必须通过 API 将任务状态更新为 `done`,否则视为未完成",
"", "2. **产出不能为空**:必须提交有意义的产出(output 或 comment),不能只改状态",
"### 1. 必须按步骤执行", "3. **单一职责**:只处理本次事件相关的操作,不要越界执行无关任务",
'- 检查上方“必须执行的步骤”列表', "4. **出错即报告**:如果无法处理(如权限不足、资源不存在),在 comment 中说明原因并标 done",
'- 逐条执行每个步骤,不可跳过', "5. **不要创建新任务**:工具链事件只处理当前事件,不衍生新任务",
'- 不要只读不做——这不是纯通知',
"",
"### 2. 必须提交 action report",
'- 执行完所有步骤后,必须提交 action report',
"- 提交方式:POST commentcomment_type='action_report'",
'- 报告内容:简要描述你执行了什么操作、结果如何',
'- ⚠️ 不提交 action report 的任务会被标记为 failed',
"",
"### 3. 不要执行任何状态转换命令",
'- 不要手动标 working/done/review/failed,系统会自动处理',
"",
"### 4. 不需要回复",
'- action report 就是你的完成凭证',
'- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成',
"",
"### 5. 所有协作通过 Gitea 完成",
'- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方',
'- 不要使用 Mail API(飞鸽传书)发送消息',
'- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕',
"",
"### Red Flags(如果脑海中出现以下想法,说明你错了)",
"",
'| Agent 想法 | Red Flag 驳回 |',
'|------------|--------------|',
'| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |',
'| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |',
'| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |',
'| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |',
'| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |',
'| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |',
"", "",
] ]
return "\n".join(lines) return "\n".join(lines)
@@ -217,7 +127,7 @@ class ToolchainHandler(BaseTaskHandler):
return "done" return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool: def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_working:pending → working""" """auto_workingpending → working"""
return self._auto_mark_working(task_id, db_path) return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list: def get_sections(self) -> list:
@@ -226,7 +136,6 @@ class ToolchainHandler(BaseTaskHandler):
ToolchainContextSection(), ToolchainContextSection(),
ToolchainApiSection(), ToolchainApiSection(),
ToolchainConstraintsSection(), ToolchainConstraintsSection(),
WikiGuideSection(),
] ]
def build_prompt(self, context: PromptContext) -> str: def build_prompt(self, context: PromptContext) -> str:
@@ -236,55 +145,27 @@ class ToolchainHandler(BaseTaskHandler):
return composer.compose(context) return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult: def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查 action report(精确验证)+ 三层 fallback""" """检查行动输出(output 或 comment 有实质内容)"""
try: try:
conn = get_connection(db_path) conn = get_connection(db_path)
try: try:
# 特殊处理:infrastructure_failure 始终通过(防递归) # 检查 output
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
try:
meta = json.loads(row["must_haves"])
except Exception:
meta = {}
if meta.get("action_type") == "infrastructure_failure":
return VerifyResult(True, "infrastructure_passthrough",
"infrastructure_failure auto-pass")
# 特殊处理:review_merged 始终通过(纯通知)
if meta.get("action_type") == "review_merged":
return VerifyResult(True, "merged_passthrough",
"review_merged auto-pass")
# 1. 优先检查 action_report comment
report_row = conn.execute(
"SELECT id FROM comments WHERE task_id=? "
"AND comment_type='action_report' LIMIT 1",
(task_id,)
).fetchone()
if report_row:
return VerifyResult(True, "has_action_report", "action_report found")
# 2. fallback:检查 output(向后兼容)
output_count = conn.execute( output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,) "SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0] ).fetchone()[0]
if output_count > 0: if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}") return VerifyResult(True, "has_output", f"output_count={output_count}")
# 3. fallback:检查有实质内容的 comment(向后兼容) # 检查 comment(非系统、有实质内容)
comment_count = conn.execute( comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? " "SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(body) >= 20", "AND author != 'system' AND LENGTH(content) >= 20",
(task_id,) (task_id,)
).fetchone()[0] ).fetchone()[0]
if comment_count > 0: if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}") return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action", return VerifyResult(False, "no_action", "output=0, comment=0")
"no action_report, no output, no valid comment")
finally: finally:
conn.close() conn.close()
except Exception as e: except Exception as e:
@@ -293,217 +174,32 @@ class ToolchainHandler(BaseTaskHandler):
def on_failure(self, task_id: str, agent_id: str, def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None: db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 三分路处理(业务/系统/基础设施)""" """验证失败 → 标 failed + Mail API 通知主公"""
self._mark_task_status(db_path, task_id, "failed") self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed", logger.info("Toolchain %s: verify failed (%s), marked failed", task_id, verify.reason)
task_id, verify.reason)
# 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取 # 从 db 读取事件上下文
meta = {} event_type = ""
assignee = agent_id event_data: Dict = {}
try: try:
conn = get_connection(db_path) conn = get_connection(db_path)
row = conn.execute( row = conn.execute(
"SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,) "SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone() ).fetchone()
if row: if row and row["must_haves"]:
if row["must_haves"]: meta = json.loads(row["must_haves"])
meta = json.loads(row["must_haves"]) event_type = meta.get("event_type", "")
assignee = row["assignee"] or agent_id raw = meta.get("event_data", "{}")
event_data = json.loads(raw) if isinstance(raw, str) else raw
conn.close() conn.close()
except Exception: except Exception:
pass pass
action_type = meta.get("action_type", "") self._notify_via_mail_api(
context_data = meta.get("context", {}) task_id, verify.reason, verify.evidence,
event_type, event_data,
# 三分路决策
route = self._classify_failure(verify)
if route == "business":
self._handle_business_failure(
task_id, agent_id, verify, action_type, context_data, assignee, db_path)
elif route == "system":
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else: # infrastructure
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _classify_failure(self, verify: VerifyResult) -> str:
"""分类失败类型:business / infrastructuresystem 通过升级到达)"""
# verify_error 或 DB 不可用 → 基础设施失败
if verify.reason == "verify_error":
return "infrastructure"
# 默认:业务失败
return "business"
def _handle_business_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, assignee: str,
db_path: Path,
) -> None:
"""业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee"""
repo = context_data.get("repo", "")
pr_number = context_data.get("pr_number") or context_data.get("issue_number", "")
if repo and pr_number:
comment_body = (
f"@{assignee or agent_id} 工具链任务执行失败\n\n"
f"任务 ID: {task_id}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"请检查黑板任务并处理。"
)
success = self._create_gitea_comment(repo, pr_number, comment_body)
if success:
logger.info("Toolchain %s: business failure → Gitea comment on %s#%s",
task_id, repo, pr_number)
return
# Gitea API failed → escalate to system failure
logger.warning(
"Toolchain %s: Gitea comment failed, escalating to system failure",
task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else:
# 没有 PR/Issue 关联 → fallback 到系统失败
logger.warning(
"Toolchain %s: no PR/Issue context for business failure, "
"escalating to system failure", task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
def _handle_system_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, db_path: Path,
) -> None:
"""系统失败 → 创建 Gitea Issue @pangtong-fujunshi"""
repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2")
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
body = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {action_type or '未知'}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"@pangtong-fujunshi 请检查黑板任务并手动处理。"
) )
# 尝试在 Gitea 创建 Issue
created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"])
if created:
logger.info("Toolchain %s: system failure → Gitea Issue created on %s",
task_id, repo)
else:
# Gitea API 不可用 → 基础设施失败
logger.error(
"Toolchain %s: Gitea API unavailable, escalating to infrastructure failure",
task_id)
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _handle_infrastructure_failure(
self, task_id: str, agent_id: str,
verify: VerifyResult, db_path: Path,
) -> None:
"""基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)"""
try:
from datetime import datetime
new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": "infrastructure_failure",
"action_type": "infrastructure_failure",
"steps": [
"检查 Gitea 服务状态(http://192.168.2.154:3000)",
"检查网络连通性",
"恢复后提交 action report",
],
"context": {"original_task_id": task_id, "verify_reason": verify.reason},
"from": "system",
"source": "toolchain_handler_on_failure",
}, ensure_ascii=False)
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
new_task_id,
f"[基础设施] Gitea API 不可用 - {task_id}",
f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n"
f"请检查 Gitea 服务状态和网络连通性。",
"jiangwei-infra",
"system",
must_hives,
"toolchain",
"pending",
)
)
conn.commit()
conn.close()
logger.info(
"Toolchain %s: infrastructure failure → task %s created for jiangwei-infra",
task_id, new_task_id)
except Exception as e:
logger.error(
"Toolchain %s: failed to create infrastructure_failure task: %s",
task_id, e)
# -----------------------------------------------------------------------
# Gitea API 辅助
# -----------------------------------------------------------------------
def _create_gitea_comment(
self, repo: str, pr_number: int, body: str,
) -> bool:
"""在 PR/Issue 上创建 comment。返回是否成功。"""
if not _GITEA_TOKEN:
return False
payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e)
return False
def _create_gitea_issue(
self, repo: str, title: str, body: str,
assignees: list = None,
) -> bool:
"""创建 Gitea Issue。返回是否成功。"""
if not _GITEA_TOKEN:
return False
data = {"title": title, "body": body}
if assignees:
data["assignees"] = assignees
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea create issue failed on %s: %s", repo, e)
return False
# -----------------------------------------------------------------------
# 兼容:保留旧方法签名(但不再被 on_failure 调用)
# -----------------------------------------------------------------------
def _build_gitea_links(self, event_type: str, event_data: dict) -> str: def _build_gitea_links(self, event_type: str, event_data: dict) -> str:
"""根据事件类型构建 Gitea 链接。""" """根据事件类型构建 Gitea 链接。"""
links = [] links = []
@@ -519,4 +215,63 @@ class ToolchainHandler(BaseTaskHandler):
if "branch" in event_data and "commit" not in event_data: if "branch" in event_data and "commit" not in event_data:
links.append(f"分支: {event_data['branch']}") links.append(f"分支: {event_data['branch']}")
return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)" return "\n".join(links) if links else "无法提取链接请检查黑板任务详情"
def _notify_via_mail_api(
self,
task_id: str,
reason: str,
evidence: str,
event_type: str,
event_data: Dict,
) -> None:
"""通过 Mail API 发送丰富的失败通知给主公。"""
# 构建行动指引
action_hint = "请检查黑板任务并手动处理。"
et_lower = event_type.lower()
if "ci" in et_lower or "deploy" in et_lower:
action_hint = "建议创建任务派给 jiangwei-infra 检查 CI/部署问题。"
elif "review" in et_lower:
action_hint = "建议查看 PR review 状态,必要时通知相关开发者。"
elif "issue" in et_lower:
action_hint = "建议创建任务派给对应开发者处理 Issue。"
# 构建事件详情
event_details = ""
if event_data:
event_details = "\n".join(
f" - {k}: {v}" for k, v in event_data.items()
)
# 构建 Gitea 链接
gitea_links = self._build_gitea_links(event_type, event_data)
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
text = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {event_type or '未知'}\n"
f"事件详情:\n{event_details or ' (无)'}\n\n"
f"失败原因: {reason}\n"
f"证据: {evidence}\n\n"
f"{gitea_links}\n\n"
f"行动指引: {action_hint}"
)
payload = json.dumps({
"from": "daemon",
"to": "pangtong-fujunshi",
"title": title,
"text": text,
"type": "inform",
}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
"http://localhost:8083/api/mail",
data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
logger.info("Toolchain %s: sent failure notification via Mail API", task_id)
except Exception as e:
logger.warning("Toolchain %s: failed to notify via Mail API: %s", task_id, e)
-525
View File
@@ -1,525 +0,0 @@
"""Unit tests for §17 ToolchainHandler 强约束实现."""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from src.daemon.prompt_composer import PromptContext, PromptComposer
from src.daemon.toolchain_handler import (
ToolchainHandler,
ToolchainContextSection,
ToolchainApiSection,
ToolchainConstraintsSection,
_ACTION_HINTS,
)
from src.daemon.base_task_handler import VerifyResult
from src.blackboard.db import init_db, get_connection
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_db():
"""Create a temporary _toolchain DB for testing."""
with tempfile.TemporaryDirectory() as d:
db_path = Path(d) / "blackboard.db"
init_db(db_path)
yield db_path
@pytest.fixture
def handler():
return ToolchainHandler()
def _insert_task(db_path, task_id, must_haves_json, status="working"):
"""Insert a task into DB for testing."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, "test", "test desc", "zhangfei-dev", "system",
must_haves_json, "toolchain", status)
)
conn.commit()
conn.close()
def _insert_comment(db_path, task_id, author, body, comment_type="general"):
"""Insert a comment into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(task_id, author, comment_type, body)
)
conn.commit()
conn.close()
def _insert_output(db_path, task_id, content="test output"):
"""Insert an output into DB."""
conn = get_connection(db_path)
conn.execute(
"INSERT INTO outputs (task_id, agent, output_type, title, summary) "
"VALUES (?, ?, ?, ?, ?)",
(task_id, "zhangfei-dev", "document", "test", content)
)
conn.commit()
conn.close()
# ---------------------------------------------------------------------------
# Step 1a: PromptContext new fields
# ---------------------------------------------------------------------------
class TestPromptContextFields:
def test_action_type_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_type == ""
def test_action_steps_default(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
assert ctx.action_steps == []
def test_action_type_set(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_type="review_result",
)
assert ctx.action_type == "review_result"
def test_action_steps_set(self):
steps = ["step 1", "step 2"]
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
action_steps=steps,
)
assert ctx.action_steps == steps
# ---------------------------------------------------------------------------
# Step 2a: ToolchainContextSection steps rendering + action_hint
# ---------------------------------------------------------------------------
class TestToolchainContextSection:
def test_renders_steps(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_result",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="review_result",
action_steps=["合并 PR", "提交 action report"],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" in result
assert "1. 合并 PR" in result
assert "2. 提交 action report" in result
def test_renders_action_hint(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="ci_failure",
action_type="ci_failure",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "CI 失败" in result
assert "需要你修复" in result
def test_renders_default_hint_for_unknown_action_type(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="unknown",
action_type="unknown_type",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "需要你执行动作的事件" in result
def test_no_steps_no_steps_section(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
event_type="review_merged",
action_type="review_merged",
action_steps=[],
)
section = ToolchainContextSection()
result = section.render(ctx)
assert "必须执行的步骤" not in result
# ---------------------------------------------------------------------------
# Step 2b: ToolchainApiSection action_report guidance
# ---------------------------------------------------------------------------
class TestToolchainApiSection:
def test_has_action_report_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "action_report" in result
assert "comment_type" in result
assert "tc-123" in result
def test_no_manual_done_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
# Should NOT contain the old "标记为 done" instruction
assert "标记为 **done**" not in result
assert '"status": "done"' not in result
def test_has_outputs_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "outputs" in result
def test_has_gitea_collaboration_instruction(self):
ctx = PromptContext(
task_id="tc-123", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="zhangfei-dev",
)
section = ToolchainApiSection()
result = section.render(ctx)
assert "Gitea" in result
assert "Mail API" in result
# ---------------------------------------------------------------------------
# Step 2c: ToolchainConstraintsSection Red Flags
# ---------------------------------------------------------------------------
class TestToolchainConstraintsSection:
def test_has_red_flags_table(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "Red Flags" in result
assert "" in result
def test_has_all_5_constraints(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "必须按步骤执行" in result
assert "必须提交 action report" in result
assert "不要执行任何状态转换命令" in result
assert "不需要回复" in result
assert "所有协作通过 Gitea 完成" in result
def test_has_strong_language(self):
ctx = PromptContext(
task_id="t1", title="test", description="d",
must_haves="", project_id="_toolchain", agent_id="a1",
)
section = ToolchainConstraintsSection()
result = section.render(ctx)
assert "强制要求" in result
assert "不是建议" in result
# ---------------------------------------------------------------------------
# Step 2d: verify_completion tests
# ---------------------------------------------------------------------------
class TestVerifyCompletion:
def test_action_report_passes(self, handler, tmp_db):
"""action_report comment → pass"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t1", must_haves)
_insert_comment(tmp_db, "t1", "zhangfei-dev",
"已修复 CI", comment_type="action_report")
result = handler.verify_completion("t1", tmp_db)
assert result.passed is True
assert result.reason == "has_action_report"
def test_no_action_report_fallback_output(self, handler, tmp_db):
"""No action_report but has output → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t2", must_haves)
_insert_output(tmp_db, "t2", "review result content")
result = handler.verify_completion("t2", tmp_db)
assert result.passed is True
assert result.reason == "has_output"
def test_no_action_report_fallback_comment(self, handler, tmp_db):
"""No action_report but has substantial comment → pass (fallback)"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t3", must_haves)
_insert_comment(tmp_db, "t3", "zhangfei-dev",
"This is a sufficiently long comment about the task.")
result = handler.verify_completion("t3", tmp_db)
assert result.passed is True
assert result.reason == "has_comment"
def test_nothing_passes(self, handler, tmp_db):
"""No action_report, no output, no comment → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t4", must_haves)
result = handler.verify_completion("t4", tmp_db)
assert result.passed is False
assert result.reason == "no_action"
def test_short_comment_fails(self, handler, tmp_db):
"""Comment < 20 chars → fail"""
must_haves = json.dumps({"action_type": "review_result"})
_insert_task(tmp_db, "t5", must_haves)
_insert_comment(tmp_db, "t5", "zhangfei-dev", "ok")
result = handler.verify_completion("t5", tmp_db)
assert result.passed is False
def test_review_merged_auto_passes(self, handler, tmp_db):
"""review_merged → always pass"""
must_haves = json.dumps({"action_type": "review_merged"})
_insert_task(tmp_db, "t6", must_haves)
result = handler.verify_completion("t6", tmp_db)
assert result.passed is True
assert result.reason == "merged_passthrough"
def test_infrastructure_failure_auto_passes(self, handler, tmp_db):
"""infrastructure_failure → always pass (anti-recursion)"""
must_haves = json.dumps({"action_type": "infrastructure_failure"})
_insert_task(tmp_db, "t7", must_haves)
result = handler.verify_completion("t7", tmp_db)
assert result.passed is True
assert result.reason == "infrastructure_passthrough"
# ---------------------------------------------------------------------------
# Step 3a: _send_toolchain_task tests
# ---------------------------------------------------------------------------
class TestSendToolchainTask:
def test_creates_task_in_toolchain_db(self):
"""_send_toolchain_task creates a task in _toolchain DB."""
from src.api.toolchain_routes import _send_toolchain_task, _toolchain_db_path
with patch("src.api.toolchain_routes.get_data_root") as mock_root:
with tempfile.TemporaryDirectory() as d:
mock_root.return_value = Path(d)
task_id = _send_toolchain_task(
to_agent="zhangfei-dev",
title="Test Task",
description="Test description",
event_type="ci_failure",
action_type="ci_failure",
steps=["Fix test", "Submit report"],
context_data={"pr_number": 42},
)
assert task_id.startswith("tc-")
# Verify task was written to _toolchain DB
db_path = _toolchain_db_path()
conn = get_connection(db_path)
row = conn.execute(
"SELECT * FROM tasks WHERE id=?", (task_id,)
).fetchone()
assert row is not None
assert row["task_type"] == "toolchain"
assert row["assignee"] == "zhangfei-dev"
# Verify must_haves JSON
meta = json.loads(row["must_haves"])
assert meta["event_type"] == "ci_failure"
assert meta["action_type"] == "ci_failure"
assert meta["steps"] == ["Fix test", "Submit report"]
assert meta["context"]["pr_number"] == 42
conn.close()
def test_unknown_agent_returns_empty(self):
"""_send_toolchain_task with unknown agent returns empty string."""
from src.api.toolchain_routes import _send_toolchain_task
task_id = _send_toolchain_task(
to_agent="unknown-agent",
title="Test",
description="desc",
event_type="test",
action_type="test",
steps=[],
)
assert task_id == ""
# ---------------------------------------------------------------------------
# Step 2e: on_failure three-way routing tests
# ---------------------------------------------------------------------------
class TestOnFailureRouting:
def test_business_failure_creates_gitea_comment(self, handler, tmp_db):
"""Business failure → Gitea PR comment @task assignee (not must_hives field)"""
# S4: must_hives does NOT contain assignee — production data doesn't have it
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
"from": "system",
})
# assignee is set on the tasks table row (as production code writes it)
_insert_task(tmp_db, "t-fail", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = True
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-fail", "zhangfei-dev", tmp_db, verify)
mock_comment.assert_called_once()
call_args = mock_comment.call_args
assert call_args[0][0] == "sanguo/test"
assert call_args[0][1] == 42
# M2: comment body should @ the task's assignee from tasks table
comment_body = call_args[0][2]
assert "@zhangfei-dev" in comment_body
def test_infrastructure_failure_creates_task(self, handler, tmp_db):
"""Infrastructure failure → direct DB task for jiangwei-infra (no reverse dep)"""
must_haves = json.dumps({
"action_type": "review_result",
"context": {"repo": "sanguo/test", "pr_number": 42},
})
_insert_task(tmp_db, "t-infra", must_haves)
with patch.object(handler, "_create_gitea_comment") as mock_comment:
mock_comment.return_value = False # Gitea API down
with patch.object(handler, "_create_gitea_issue") as mock_issue:
mock_issue.return_value = False # Gitea API still down
verify = VerifyResult(False, "no_action", "no action_report")
handler.on_failure("t-infra", "zhangfei-dev", tmp_db, verify)
# S3: should directly INSERT into DB, not call _send_toolchain_task
# Verify a new task was created in DB for jiangwei-infra
conn = get_connection(tmp_db)
rows = conn.execute(
"SELECT * FROM tasks WHERE assignee=?",
("jiangwei-infra",)
).fetchall()
conn.close()
assert len(rows) >= 1, "No infrastructure_failure task created"
infra_task = rows[0]
assert infra_task["task_type"] == "toolchain"
meta = json.loads(infra_task["must_haves"])
assert meta["action_type"] == "infrastructure_failure"
# ---------------------------------------------------------------------------
# Regression: _mail path unaffected
# ---------------------------------------------------------------------------
class TestMailRegression:
def test_send_mail_still_exists(self):
"""_send_mail function is preserved."""
from src.api.toolchain_routes import _send_mail
assert callable(_send_mail)
def test_send_mail_not_called_by_handlers(self):
"""No toolchain handler calls _send_mail."""
import inspect
from src.api import toolchain_routes
# Get source of handler functions
source = inspect.getsource(toolchain_routes)
# _send_mail should appear only in its own definition, not in handler bodies
lines = source.split("\n")
in_handler = False
handler_send_mail_calls = []
for i, line in enumerate(lines):
if line.strip().startswith("async def _handle_") or line.strip().startswith("async def _send_mention_mails"):
in_handler = True
elif line.strip().startswith("async def ") or line.strip().startswith("def _"):
if not line.strip().startswith("async def _handle_") and not line.strip().startswith("async def _send_mention_mails"):
in_handler = False
if in_handler and "_send_mail(" in line and not line.strip().startswith("#"):
handler_send_mail_calls.append((i, line.strip()))
assert len(handler_send_mail_calls) == 0, \
f"_send_mail still called in handlers: {handler_send_mail_calls}"
# ---------------------------------------------------------------------------
# Integration: full prompt build
# ---------------------------------------------------------------------------
class TestFullPromptBuild:
def test_prompt_contains_all_sections(self, handler):
"""Full prompt has context, API, and constraints sections."""
ctx = PromptContext(
task_id="tc-test",
title="CI 失败修复",
description="Fix CI failure",
must_haves=json.dumps({
"event_type": "ci_failure",
"action_type": "ci_failure",
"steps": ["Fix test", "Push", "Submit report"],
"context": {"pr_number": 42},
}),
project_id="_toolchain",
agent_id="zhangfei-dev",
event_type="ci_failure",
event_data={"pr_number": "42", "repo": "sanguo/test"},
action_type="ci_failure",
action_steps=["Fix test", "Push", "Submit report"],
)
prompt = handler.build_prompt(ctx)
# Must have action hint
assert "CI 失败" in prompt
assert "需要你修复" in prompt
# Must have steps
assert "必须执行的步骤" in prompt
assert "1. Fix test" in prompt
# Must have API section with action_report
assert "action_report" in prompt
assert "tc-test" in prompt
# Must have constraints with Red Flags
assert "Red Flags" in prompt
assert "强制要求" in prompt