Files
sanguo_moziplus_v2/src/daemon/toolchain_handler.py
T
cfdaily f74ae30d41
CI / lint (pull_request) Successful in 10s
CI / test (pull_request) Successful in 27s
CI / frontend (pull_request) Successful in 13s
CI / notify-on-failure (pull_request) Successful in 0s
[moz] impl(§17): CI/部署失败 steps 分支指引 + 基础设施 Issue 转交流程
改动 1: ci_failure steps 增加分支指引(代码问题自己修/基础设施问题提Issue给姜维)
改动 2: deploy_failure steps 同上分支指引(2处定义都改)
改动 3: issue_assigned handler 按 type/infrastructure label 分流
  - infrastructure label → infrastructure_failure event_type(运维排查 steps)
  - 其他 → 原有编码 steps
改动 4: ToolchainApiSection 新增「需要创建 Issue 时」API 指引段落
改动 5: Red Flags 新增「不是我代码的问题」条目
测试: 8 个新测试,463 passed
2026-06-19 13:18:11 +08:00

563 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""toolchain_handler.py - 工具链事件 handler。
处理 Gitea Webhook 事件(CI 失败、Review 请求、Issue 指派等)。
L2 引擎层强约束:输入(结构化步骤)+ 执行(Red Flags)+ 输出(action_report 验证)。
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
from pathlib import Path
from typing import Dict, List
from src.daemon.base_task_handler import BaseTaskHandler, VerifyResult
from src.daemon.prompt_composer import PromptComposer, PromptContext, GiteaConventionSection, WikiGuideSection, DeliveryChecklistSection
from src.daemon.toolchain_templates import render_template, _TEMPLATE_MAP
from src.blackboard.db import get_connection
logger = logging.getLogger("moziplus-v2.handler.toolchain")
# ---------------------------------------------------------------------------
# Gitea API 配置
# ---------------------------------------------------------------------------
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
_GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
# action_type → action_hint 映射
_ACTION_HINTS: Dict[str, str] = {
"review_result": "你收到一个 Review 结果通知,这是一个需要你执行动作的事件(不是纯通知)。",
"review_request": "你收到一个 Review 请求,这是一个需要你审查并提交 Review 的事件。",
"review_updated": "你收到一个 PR 更新通知,这是一个需要你重新审查修改部分的事件。",
"review_comment": "你收到一个 Review 评论,这是一个需要你查看并响应的事件。",
"ci_failure": "你收到一个 CI 失败通知,这是一个需要你修复失败测试的事件。",
"issue_assigned": "你收到一个 Issue 指派,这是一个需要你编码实现的事件。",
"deploy_failure": "你收到一个部署失败通知,这是一个需要你排查并修复的事件。",
"mention": "你收到一个 @mention 通知,这是一个需要你按指引响应的事件。",
"review_merged": "你收到一个 PR 合并通知。这是一条纯通知,阅读即可。",
"infrastructure_failure": "你收到一个基础设施问题报告,请排查并修复。",
}
# ---------------------------------------------------------------------------
# Toolchain PromptSections
# ---------------------------------------------------------------------------
class ToolchainContextSection:
"""事件类型 + 事件详情 + 结构化步骤 + action_hint(priority=10)"""
name: str = "toolchain_context"
priority: int = 10
EVENT_LABELS_ZH: Dict[str, str] = {
"review_request": "Review 请求",
"review_result": "Review 结果",
"review_merged": "PR 合并",
"review_comment": "Review 评论",
"review_updated": "Review 更新",
"ci_failure": "CI 失败",
"deploy_failure": "部署失败",
"issue_assigned": "Issue 指派",
"mention": "@提及",
}
def render(self, context: PromptContext) -> str:
event_type = context.event_type
event_data: Dict = context.event_data or {}
# 事件类型中文标签
event_label = self.EVENT_LABELS_ZH.get(event_type, event_type or '未知')
# from / to 信息
to_agent = context.agent_id or ''
from_agent = 'system'
# Part 1: 事件信息(现有模板引擎)
if event_type in _TEMPLATE_MAP:
variables = {k: str(v) for k, v in event_data.items()}
event_text = render_template(event_type, variables)
# 补充事件类型中文标签 + from/to
header = f"- **事件类型**: {event_label}\n- **来源**: {from_agent}\n- **指派**: {to_agent}\n"
event_text = header + "\n" + event_text
else:
lines = ["## 工具链事件", ""]
lines.append(f"- **事件类型**: {event_label}")
lines.append(f"- **来源**: {from_agent}")
lines.append(f"- **指派**: {to_agent}")
if event_data:
lines.append("- **事件详情**:")
for key, value in event_data.items():
lines.append(f" - {key}: {value}")
lines.append("")
event_text = "\n".join(lines)
# Part 2: 结构化编号步骤(新增,从 action_steps 渲染)
steps: List[str] = context.action_steps or []
if steps:
step_lines = ["", "### 必须执行的步骤", ""]
for i, step in enumerate(steps, 1):
step_lines.append(f"{i}. {step}")
steps_text = "\n".join(step_lines)
else:
steps_text = ""
# Part 3: action 指引(新增,按 action_type 选择)
action_hint = _ACTION_HINTS.get(
context.action_type,
"你收到一个工具链事件,这是一个需要你执行动作的事件。",
)
return f"{action_hint}\n\n{event_text}{steps_text}"
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainApiSection:
"""API 操作指令(priority=40)-- action_report 提交指引"""
name: str = "toolchain_api"
priority: int = 40
API_HOST = "localhost:8083"
def render(self, context: PromptContext) -> str:
task_id = context.task_id
project_id = context.project_id
agent_id = context.agent_id
lines = [
"## API 操作指令",
"",
f"项目 ID: `{project_id}`",
f"任务 ID: `{task_id}`",
"",
"### 完成后必须提交 action report",
"",
"执行完所有步骤后,必须提交 action report:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/comments" \\',
' -H "Content-Type: application/json" \\',
f' -d \'{{"author": "{agent_id}", "comment_type": "action_report", "body": "简要描述你执行了什么操作及结果"}}\'',
"```",
"",
"⚠️ 不提交 action report 的任务会被标记为 failed。",
"",
"### 提交产出",
"",
"如有产出(如 review 结果、修复方案),提交到任务 outputs:",
"```bash",
f'curl -s -X POST "http://{self.API_HOST}/api/projects/{project_id}/tasks/{task_id}/outputs" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"content": "<你的产出内容>", "type": "text"}\'',
"```",
"",
"### 需要其他角色支持时",
"",
"如果在执行过程中需要其他角色协助(如缺数据、需要审批等),在关联的 PR/Issue 上创建 comment @对方:",
"```bash",
f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues/{{pr_number}}/comments" \\',
' -H "Authorization: token <your-token>" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"body": "@{agent-id} 需要你的支持:{描述问题}"}\'',
"```",
"",
"⚠️ 不要使用 Mail API(飞鸽传书)。所有协作通过 Gitea 留痕。",
"",
"### 需要创建 Issue 时",
"",
"如果步骤中要求创建 Issue 指派他人(如 jiangwei-infra):",
"```bash",
f'curl -s -X POST "{_GITEA_BASE}/repos/{{repo}}/issues" \\',
' -H "Authorization: token <your-token>" \\',
' -H "Content-Type: application/json" \\',
' -d \'{"title": "[moz] infra: 简述问题", "body": "## 问题描述\\n\\n<简要描述问题现象>\\n\\n## 错误来源\\n\\n- 仓库: <repo>\\n- PR/Commit: <链接>\\n- CI/Deploy run: <Gitea Actions 页面链接>\\n\\n## 日志关键片段\\n\\n```<错误日志摘要>```\\n\\n## 判断依据\\n\\n<为什么判断为基础设施问题>", "assignees": ["jiangwei-infra"], "labels": [<label_id>]}\'',
"```",
"",
"⚠️ Issue body 必须包含错误来源链接(PR/Commit + CI run),让排查者能直接看到全貌。",
"⚠️ label 数字 ID 先 GET /repos/{repo}/labels 查询 type/infrastructure 对应的 ID。",
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
class ToolchainConstraintsSection:
"""硬约束 + Red Flags(priority=50)"""
name: str = "toolchain_constraints"
priority: int = 50
def render(self, context: PromptContext) -> str:
lines = [
"## 硬约束(必须遵守)",
"",
"⚠️ 以下是强制要求,不是建议或参考。违反任何一条都会导致任务失败。",
"",
"### 1. 必须按步骤执行",
'- 检查上方“必须执行的步骤”列表',
'- 逐条执行每个步骤,不可跳过',
'- 不要只读不做——这不是纯通知',
"",
"### 2. 必须提交 action report",
'- 执行完所有步骤后,必须提交 action report',
"- 提交方式:POST commentcomment_type='action_report'",
'- 报告内容:简要描述你执行了什么操作、结果如何',
'- ⚠️ 不提交 action report 的任务会被标记为 failed',
"",
"### 3. 不要执行任何状态转换命令",
'- 不要手动标 working/done/review/failed,系统会自动处理',
"",
"### 4. 不需要回复",
'- action report 就是你的完成凭证',
'- 不要发送 Mail(飞鸽传书),你的所有操作在 toolchain 流程内完成',
"",
"### 5. 所有协作通过 Gitea 完成",
'- 如果遇到问题需要其他角色支持,在关联的 PR/Issue 上创建 comment @对方',
'- 不要使用 Mail API(飞鸽传书)发送消息',
'- 你的所有操作都在 toolchain 流程内,通过 Gitea 留痕',
"",
"### Red Flags(如果脑海中出现以下想法,说明你错了)",
"",
'| Agent 想法 | Red Flag 驳回 |',
'|------------|--------------|',
'| “这个通知看看就行了” | ❌ 错!这是 action 指令,必须执行步骤列表中的每一项 |',
'| “我不需要做任何事” | ❌ 错!检查“必须执行的步骤”列表,每一步都要执行 |',
'| “先放着等会处理” | ❌ 错!立即执行,不要推迟 |',
'| “我已经知道了” | ❌ 知道不等于执行。执行步骤 + 提交 action report 才算完成 |',
'| “步骤太多了,选几个做就行” | ❌ 错!必须逐条执行,不可跳过 |',
'| “这个步骤不适用于当前情况” | ❌ 如果确实不适用,在 action report 中说明原因,但其他步骤必须执行 |',
'| “CI/部署失败不是我代码的问题,我什么也不用做” | ❌ 错!即使是基础设施问题,你也必须创建 Issue 指派 jiangwei-infrabody 含错误来源链接 + 日志 + 判断依据),并在 action report 中说明。不能只报告“不是我的问题”就完事 |',
"",
]
return "\n".join(lines)
def should_include(self, context: PromptContext) -> bool:
return True
# ---------------------------------------------------------------------------
# ToolchainHandler
# ---------------------------------------------------------------------------
class ToolchainHandler(BaseTaskHandler):
"""工具链事件 handler。"""
task_type = "toolchain"
virtual_project = "_toolchain"
display_name = "工具链事件"
def target_success_status(self) -> str:
return "done"
def pre_spawn(self, task_id: str, db_path: Path) -> bool:
"""auto_working:pending → working"""
return self._auto_mark_working(task_id, db_path)
def get_sections(self) -> list:
"""返回 Toolchain PromptSection 实例"""
return [
ToolchainContextSection(),
ToolchainApiSection(),
ToolchainConstraintsSection(),
GiteaConventionSection(),
WikiGuideSection(),
DeliveryChecklistSection(),
]
def build_prompt(self, context: PromptContext) -> str:
"""通过 PromptComposer 拼装 sections 为最终 prompt"""
composer = PromptComposer()
composer.add_many(self.get_sections())
return composer.compose(context)
def verify_completion(self, task_id: str, db_path: Path) -> VerifyResult:
"""检查 action report(精确验证)+ 三层 fallback"""
try:
conn = get_connection(db_path)
try:
# 特殊处理:infrastructure_failure 始终通过(防递归)
row = conn.execute(
"SELECT must_haves FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row and row["must_haves"]:
try:
meta = json.loads(row["must_haves"])
except Exception:
meta = {}
if meta.get("action_type") == "infrastructure_failure":
return VerifyResult(True, "infrastructure_passthrough",
"infrastructure_failure auto-pass")
# 特殊处理:review_merged 始终通过(纯通知)
if meta.get("action_type") == "review_merged":
return VerifyResult(True, "merged_passthrough",
"review_merged auto-pass")
# 1. 优先检查 action_report comment
report_row = conn.execute(
"SELECT id FROM comments WHERE task_id=? "
"AND comment_type='action_report' LIMIT 1",
(task_id,)
).fetchone()
if report_row:
return VerifyResult(True, "has_action_report", "action_report found")
# 2. fallback:检查 output(向后兼容)
output_count = conn.execute(
"SELECT COUNT(*) FROM outputs WHERE task_id=?", (task_id,)
).fetchone()[0]
if output_count > 0:
return VerifyResult(True, "has_output", f"output_count={output_count}")
# 3. fallback:检查有实质内容的 comment(向后兼容)
comment_count = conn.execute(
"SELECT COUNT(*) FROM comments WHERE task_id=? "
"AND author != 'system' AND LENGTH(body) >= 20",
(task_id,)
).fetchone()[0]
if comment_count > 0:
return VerifyResult(True, "has_comment", f"comment_count={comment_count}")
return VerifyResult(False, "no_action",
"no action_report, no output, no valid comment")
finally:
conn.close()
except Exception as e:
logger.error("Toolchain %s: verify error: %s", task_id, e)
return VerifyResult(False, "verify_error", str(e))
def on_failure(self, task_id: str, agent_id: str,
db_path: Path, verify: VerifyResult) -> None:
"""验证失败 → 三分路处理(业务/系统/基础设施)"""
self._mark_task_status(db_path, task_id, "failed")
logger.info("Toolchain %s: verify failed (%s), marked failed",
task_id, verify.reason)
# 读取 must_hives 获取事件上下文 + assignee 从 tasks 表读取
meta = {}
assignee = agent_id
try:
conn = get_connection(db_path)
row = conn.execute(
"SELECT must_haves, assignee FROM tasks WHERE id=?", (task_id,)
).fetchone()
if row:
if row["must_haves"]:
meta = json.loads(row["must_haves"])
assignee = row["assignee"] or agent_id
conn.close()
except Exception:
pass
action_type = meta.get("action_type", "")
context_data = meta.get("context", {})
# 三分路决策
route = self._classify_failure(verify)
if route == "business":
self._handle_business_failure(
task_id, agent_id, verify, action_type, context_data, assignee, db_path)
elif route == "system":
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else: # infrastructure
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _classify_failure(self, verify: VerifyResult) -> str:
"""分类失败类型:business / infrastructuresystem 通过升级到达)"""
# verify_error 或 DB 不可用 → 基础设施失败
if verify.reason == "verify_error":
return "infrastructure"
# 默认:业务失败
return "business"
def _handle_business_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, assignee: str,
db_path: Path,
) -> None:
"""业务失败 → 在关联 PR/Issue 上创建 comment @原始 assignee"""
repo = context_data.get("repo", "")
pr_number = context_data.get("pr_number") or context_data.get("issue_number", "")
if repo and pr_number:
comment_body = (
f"@{assignee or agent_id} 工具链任务执行失败\n\n"
f"任务 ID: {task_id}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"请检查黑板任务并处理。"
)
success = self._create_gitea_comment(repo, pr_number, comment_body)
if success:
logger.info("Toolchain %s: business failure → Gitea comment on %s#%s",
task_id, repo, pr_number)
return
# Gitea API failed → escalate to system failure
logger.warning(
"Toolchain %s: Gitea comment failed, escalating to system failure",
task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
else:
# 没有 PR/Issue 关联 → fallback 到系统失败
logger.warning(
"Toolchain %s: no PR/Issue context for business failure, "
"escalating to system failure", task_id)
self._handle_system_failure(
task_id, agent_id, verify, action_type, context_data, db_path)
def _handle_system_failure(
self, task_id: str, agent_id: str, verify: VerifyResult,
action_type: str, context_data: dict, db_path: Path,
) -> None:
"""系统失败 → 创建 Gitea Issue @pangtong-fujunshi"""
repo = context_data.get("repo", "sanguo/sanguo_moziplus_v2")
title = f"[toolchain-handler] 工具链事件处理失败: {task_id}"
body = (
f"任务 {task_id} 验证失败\n\n"
f"事件类型: {action_type or '未知'}\n"
f"失败原因: {verify.reason}\n"
f"证据: {verify.evidence}\n\n"
f"@pangtong-fujunshi 请检查黑板任务并手动处理。"
)
# 尝试在 Gitea 创建 Issue
created = self._create_gitea_issue(repo, title, body, ["pangtong-fujunshi"])
if created:
logger.info("Toolchain %s: system failure → Gitea Issue created on %s",
task_id, repo)
else:
# Gitea API 不可用 → 基础设施失败
logger.error(
"Toolchain %s: Gitea API unavailable, escalating to infrastructure failure",
task_id)
self._handle_infrastructure_failure(
task_id, agent_id, verify, db_path)
def _handle_infrastructure_failure(
self, task_id: str, agent_id: str,
verify: VerifyResult, db_path: Path,
) -> None:
"""基础设施失败 → 直接在 _toolchain DB 创建 task @jiangwei-infra(防递归)"""
try:
from datetime import datetime
new_task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": "infrastructure_failure",
"action_type": "infrastructure_failure",
"steps": [
"检查 Gitea 服务状态(http://192.168.2.154:3000)",
"检查网络连通性",
"恢复后提交 action report",
],
"context": {"original_task_id": task_id, "verify_reason": verify.reason},
"from": "system",
"source": "toolchain_handler_on_failure",
}, ensure_ascii=False)
conn = get_connection(db_path)
conn.execute(
"INSERT INTO tasks (id, title, description, assignee, assigned_by, "
"must_haves, task_type, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
new_task_id,
f"[基础设施] Gitea API 不可用 - {task_id}",
f"Gitea API 不可用,原任务 {task_id} 无法通过正常路径处理。\n"
f"请检查 Gitea 服务状态和网络连通性。",
"jiangwei-infra",
"system",
must_hives,
"toolchain",
"pending",
)
)
conn.commit()
conn.close()
logger.info(
"Toolchain %s: infrastructure failure → task %s created for jiangwei-infra",
task_id, new_task_id)
except Exception as e:
logger.error(
"Toolchain %s: failed to create infrastructure_failure task: %s",
task_id, e)
# -----------------------------------------------------------------------
# Gitea API 辅助
# -----------------------------------------------------------------------
def _create_gitea_comment(
self, repo: str, pr_number: int, body: str,
) -> bool:
"""在 PR/Issue 上创建 comment。返回是否成功。"""
if not _GITEA_TOKEN:
return False
payload = json.dumps({"body": body}, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues/{pr_number}/comments",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea comment failed on %s#%s: %s", repo, pr_number, e)
return False
def _create_gitea_issue(
self, repo: str, title: str, body: str,
assignees: list = None,
) -> bool:
"""创建 Gitea Issue。返回是否成功。"""
if not _GITEA_TOKEN:
return False
data = {"title": title, "body": body}
if assignees:
data["assignees"] = assignees
payload = json.dumps(data, ensure_ascii=False).encode("utf-8")
try:
req = urllib.request.Request(
f"{_GITEA_BASE}/repos/{repo}/issues",
data=payload,
headers={
"Authorization": f"token {_GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
logger.warning("Gitea create issue failed on %s: %s", repo, e)
return False
# -----------------------------------------------------------------------
# 兼容:保留旧方法签名(但不再被 on_failure 调用)
# -----------------------------------------------------------------------
def _build_gitea_links(self, event_type: str, event_data: dict) -> str:
"""根据事件类型构建 Gitea 链接。"""
links = []
repo = event_data.get("repo", "")
base_url = "http://192.168.2.154:3000"
if "pr_number" in event_data:
links.append(f"PR: {base_url}/{repo}/pulls/{event_data['pr_number']}")
if "issue_number" in event_data:
links.append(f"Issue: {base_url}/{repo}/issues/{event_data['issue_number']}")
if "commit" in event_data:
links.append(f"Commit: {base_url}/{repo}/commit/{event_data['commit']}")
if "branch" in event_data and "commit" not in event_data:
links.append(f"分支: {event_data['branch']}")
return "\n".join(links) if links else "(无法提取链接,请检查黑板任务详情)"