feat: §17 ToolchainHandler 强约束实现(Step 1-4)
CI / lint (pull_request) Failing after 7s
CI / test (pull_request) Has been skipped
CI / notify-on-failure (pull_request) Successful in 1s

Step 1: 基础设施
- prompt_composer.py: PromptContext 新增 action_type + action_steps 字段
- spawner.py: handler 路径提取 action_type/action_steps 传入 PromptContext
- db.py: comments CHECK 约束加入 action_report

Step 2: ToolchainHandler 强化
- ToolchainContextSection: 加 steps 渲染 + action_hint(按 action_type)
- ToolchainApiSection: 改为 action_report 提交指引 + Gitea 协作指引
- ToolchainConstraintsSection: 5 条强约束 + Red Flags 防self-rationalization
- verify_completion: action_report → output → comment 三层 fallback
  - review_merged 始终通过(纯通知)
  - infrastructure_failure 始终通过(防递归)
  - 修复 LENGTH(content) → LENGTH(body) bug
- on_failure 三分路: 业务→Gitea PR comment / 系统→Gitea Issue / 基础设施→toolchain task

Step 3: toolchain_routes 改造
- 新增 _toolchain_db_path() + _send_toolchain_task()
- 所有 8 个 handler 改为 _send_toolchain_task
- _send_mail 保留但不再被 toolchain handler 调用
- _send_deploy_failure_mail → _send_deploy_failure_task

Step 4: 测试
- 29 个单元测试全部通过
- 全量 456 passed, 3 skipped, 0 failures
This commit is contained in:
cfdaily
2026-06-13 23:36:44 +08:00
parent 90f4e3284c
commit c89863a288
6 changed files with 1140 additions and 125 deletions
+274 -15
View File
@@ -189,6 +189,7 @@ def _calc_risk_level(changed_files: List[str]) -> str:
MAIL_PROJECT_ID = "_mail"
TOOLCHAIN_PROJECT_ID = "_toolchain"
def _mail_db_path() -> Path:
@@ -200,6 +201,73 @@ def _mail_db_path() -> Path:
return db
def _toolchain_db_path() -> Path:
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
root = get_data_root()
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_toolchain_task(
to_agent: str,
title: str,
description: str,
event_type: str,
action_type: str,
steps: list,
context_data: dict | None = None,
source: str = "webhook",
) -> str:
"""创建 Toolchain Task 并写入 _toolchain DB。
Args:
to_agent: 收件人 Agent ID
title: 任务标题
description: 任务描述(模板渲染后的事件信息)
event_type: 事件类型(review_result / ci_failure / ...
action_type: 动作分类(用于步骤选择和日志统计)
steps: 结构化编号步骤列表
context_data: 事件上下文数据(PR 号、仓库名等)
source: 来源标识
Returns:
创建的 Task ID
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
return ""
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": event_type,
"action_type": action_type,
"steps": steps,
"context": context_data or {},
"from": "system",
"source": source,
}, ensure_ascii=False)
task = Task(
id=task_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=must_hives,
task_type="toolchain",
status="pending",
)
bb = Blackboard(_toolchain_db_path())
bb.create_task(task)
logger.info(
"Toolchain task sent: %s%s [%s] action_type=%s",
title[:40], to_agent, task_id, action_type,
)
return task_id
def _send_mail(
to_agent: str,
title: str,
@@ -327,7 +395,25 @@ async def _send_mention_mails(
})
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="mention",
action_type="mention",
steps=[
"按上方 mention 模板中的 response_guidance 执行",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"content_snippet": content[:500],
"repo": repo,
"issue_number": issue_number,
},
)
# ---------------------------------------------------------------------------
@@ -379,7 +465,27 @@ async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
})
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_mail("simayi-challenger", title, text)
_send_toolchain_task(
to_agent="simayi-challenger",
title=title,
description=text,
event_type="review_request",
action_type="review_request",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"按审查清单审查(参考 code-review Skill",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
},
)
# S3: PR body @mention 通知
pr_body = pr.get("body", "") or ""
@@ -488,7 +594,25 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_comment",
action_type="review_comment",
steps=[
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments",
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
},
)
# S5: Review body @mention 通知(COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -510,7 +634,34 @@ async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
})
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
if state == "APPROVED":
tc_steps = [
f"合并 PRGitea API: POST /repos/{repo}/pulls/{pr_number}/merge",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
else: # REQUEST_CHANGES
tc_steps = [
"按审查意见逐条修改代码",
"push 到原分支 → CI 自动跑",
"CI 通过后等重新 Review",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_result",
action_type="review_result",
steps=tc_steps,
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"result": result,
"reviewer": reviewer,
"review_body": review_body,
},
)
# S5: Review body @mention 通知(非 COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
@@ -579,11 +730,31 @@ async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
})
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_mail(reviewer, title, text)
_send_toolchain_task(
to_agent=reviewer,
title=title,
description=text,
event_type="review_updated",
action_type="review_updated",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"重点检查上次 Review 意见的修改部分",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
},
)
def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,复用 deploy_failure 模板"""
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,走 ToolchainHandler。"""
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": f"PR #{pr_number}",
@@ -591,7 +762,25 @@ def _send_deploy_failure_mail(repo: str, pr_number: int, pr_title: str, reason:
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
full_text = f"{text}\n\n失败原因: {reason}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, full_text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=full_text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"pr_number": pr_number,
"pr_title": pr_title,
"reason": reason,
},
)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
@@ -623,7 +812,21 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
})
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_merged",
action_type="review_merged",
steps=[], # 纯通知,无步骤
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
},
)
# 自动部署:git pull + rsync + 按需 post_deploy
try:
@@ -676,7 +879,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if rsync_proc.returncode != 0:
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
return
# Step 3: 判断是否需要执行 post_deploy
@@ -731,7 +934,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
if deploy_proc.returncode != 0:
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
_send_deploy_failure_mail(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
break
else:
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
@@ -740,7 +943,7 @@ async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
except asyncio.TimeoutError:
logger.error("Auto-deploy: timeout for %s", repo)
_send_deploy_failure_mail(repo, pr_number, pr_title, "部署超时")
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时")
except Exception as e:
logger.error("Auto-deploy: unexpected error: %s", e)
@@ -787,7 +990,29 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
})
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_mail(assignee, title, text)
_send_toolchain_task(
to_agent=assignee,
title=title,
description=text,
event_type="issue_assigned",
action_type="issue_assigned",
steps=[
f"创建分支 fix/{issue_number}-{brief}",
"编码 + 写 UT",
"push → 等 CI",
f"CI 通过后创建 PRGitea API: POST /repos/{repo}/pulls",
"等 Review",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"issue_number": issue_number,
"repo": repo,
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
},
)
elif action == "opened":
if "部署失败" in issue_title:
@@ -802,7 +1027,23 @@ async def _handle_issues(payload: Dict[str, Any]) -> None:
title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, text)
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"commit_sha": commit_sha or "(未知)",
},
)
# Issue body @mentionopened 时检查)
issue_body = issue.get("body", "") or ""
@@ -869,7 +1110,25 @@ async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
})
title = f"CI 失败: {repo}#{issue_number}"
_send_mail(pr_author, title, text)
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="ci_failure",
action_type="ci_failure",
steps=[
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
"修复失败的测试",
"push → CI 自动重跑",
f"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": issue_number,
"repo": repo,
"branch": branch,
"error_summary": error_summary,
},
)
# CI 处理完不 return,继续检查 @mention
# === 路径 2:@mention 通知(新增,独立路径) ===