Files
sanguo_moziplus_v2/src/api/toolchain_routes.py
T
cfdaily 9880091f52
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions
auto-sync: 2026-06-07 11:56:54
2026-06-07 11:56:54 +08:00

417 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""API 路由 — 工具链事件中枢(Toolchain Event Hub
接收 Gitea Webhook,翻译成 Mail 通知推送给 Agent。
端点: POST /webhook/gitea
支持事件: pull_request, pull_request_review, issues, issue_comment
"""
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import os
import time
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, List, Optional, Set, Tuple
import httpx
from fastapi import APIRouter, Header, Request, Response
from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.daemon.toolchain_templates import render_template
from src.utils import get_data_root
logger = logging.getLogger(__name__)
router = APIRouter(tags=["toolchain"])
# ---------------------------------------------------------------------------
# 幂等检查:内存 set,保留最近 7 天
# ---------------------------------------------------------------------------
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
def _is_duplicate(event: str, delivery: str) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
# ---------------------------------------------------------------------------
# 签名验证
# ---------------------------------------------------------------------------
_WEBHOOK_SECRET: Optional[str] = os.environ.get("GITEA_WEBHOOK_SECRET")
def _verify_signature(body: bytes, signature: Optional[str]) -> bool:
"""验证 HMAC-SHA256 签名。secret 为空时跳过验签。"""
if not _WEBHOOK_SECRET:
return True
if not signature:
return False
expected = hmac.new(
_WEBHOOK_SECRET.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
# ---------------------------------------------------------------------------
# Gitea API 调用
# ---------------------------------------------------------------------------
_GITEA_TOKEN: str = os.environ.get(
"GITEA_TOKEN", "a6d596b826f4bfeaf983ef4d25ac25dab95bbc4e"
)
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
async def _fetch_pr_files(repo: str, pr_number: int) -> List[str]:
"""通过 Gitea API 获取 PR changed files 列表。
Args:
repo: 仓库路径(如 "sanguo/sanguo_moziplus_v2"
pr_number: PR 编号
Returns:
文件路径列表
"""
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files"
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
files: List[Dict[str, Any]] = resp.json()
return [f.get("filename", "") for f in files]
except Exception:
logger.warning("Failed to fetch PR files: %s/pulls/%d", repo, pr_number, exc_info=True)
return []
# ---------------------------------------------------------------------------
# 风险级别判定
# ---------------------------------------------------------------------------
_HIGH_PATTERNS = [
"**/spawner*", "**/ticker*", "**/dispatcher*",
"**/router*", "**/guardrails*", "**/strategy*", "**/risk*",
]
def _calc_risk_level(changed_files: List[str]) -> str:
"""根据改动文件列表判定风险级别。"""
for filepath in changed_files:
for pattern in _HIGH_PATTERNS:
if PurePath(filepath).match(pattern):
return "high"
return "standard"
# ---------------------------------------------------------------------------
# Mail 创建
# ---------------------------------------------------------------------------
MAIL_PROJECT_ID = "_mail"
def _mail_db_path() -> "Path":
"""获取 Mail 数据库路径,确保目录存在。"""
from pathlib import Path as P
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_mail(
to_agent: str,
title: str,
description: str,
source: str = "webhook",
) -> str:
"""创建 Mail Task 并写入数据库。
Args:
to_agent: 收件人 Agent ID
title: 邮件标题
description: 邮件正文
source: 来源标识
Returns:
创建的 Mail ID
Raises:
Exception: 数据库写入失败
"""
mail_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
notify_meta = {
"type": "inform",
"performative": "inform",
"is_read": False,
"conversation_id": f"conv-{mail_id}",
"from": "system",
"source": source,
}
task = Task(
id=mail_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=json.dumps(notify_meta, ensure_ascii=False),
task_type="mail",
status="pending",
)
bb = Blackboard(_mail_db_path())
bb.create_task(task)
logger.info("Mail sent: %s%s [%s]", title[:40], to_agent, mail_id)
return mail_id
# ---------------------------------------------------------------------------
# 辅助:从 payload 提取仓库全名
# ---------------------------------------------------------------------------
def _repo_fullname(payload: Dict[str, Any]) -> str:
"""从 Webhook payload 提取仓库全名(owner/repo)。"""
repo = payload.get("repository") or {}
return repo.get("full_name", "")
# ---------------------------------------------------------------------------
# 事件处理函数
# ---------------------------------------------------------------------------
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
"""处理 pull_request 事件:opened → 通知 simayi-challenger。"""
action = payload.get("action", "")
if action != "opened":
return
pr = payload.get("pull_request") or {}
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
branch = pr.get("head", {}).get("ref", "unknown")
# 获取改动文件列表
changed_files = await _fetch_pr_files(repo, pr_number)
risk_level = _calc_risk_level(changed_files)
file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无法获取文件列表)"
text = render_template("review_request", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
"file_list": file_list,
})
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_mail("simayi-challenger", title, text)
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。"""
review = payload.get("review") or {}
state = review.get("state", "")
# 只通知 APPROVED 和 REJECTED,跳过 COMMENTED
if state == "COMMENTED":
return
pr = payload.get("pull_request") or {}
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
reviewer = review.get("user", {}).get("login", "unknown")
review_body = review.get("body", "(无评论)")
result_map = {"APPROVED": "通过 ✓", "REJECTED": "驳回 ✗", "PENDING": "待定"}
result = result_map.get(state, state)
text = render_template("review_result", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"reviewer": reviewer,
"result": result,
"review_body": review_body,
})
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
_send_mail(pr_author, title, text)
async def _handle_issues(payload: Dict[str, Any]) -> None:
"""处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。"""
action = payload.get("action", "")
issue = payload.get("issue") or {}
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
issue_title = issue.get("title", "")
if action == "assigned":
assignee = ""
assignees = issue.get("assignees") or []
if assignees:
assignee = assignees[-1].get("login", "")
if not assignee:
logger.debug("Issue assigned but no assignee found, skipping")
return
labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])]
labels = ", ".join(labels_list) if labels_list else "(无标签)"
issue_body = issue.get("body", "(无描述)")
brief = issue_title[:20].replace(" ", "-").lower()
text = render_template("issue_assigned", {
"repo": repo,
"issue_number": str(issue_number),
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
})
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_mail(assignee, title, text)
elif action == "opened" and "部署失败" in issue_title:
commit_sha = issue.get("body", "")[:40] if issue.get("body") else ""
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": commit_sha or "(未知)",
})
title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_mail(agent_id, title, text)
async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
"""处理 issue_comment 事件:CI 失败关键词 → 通知 PR 作者。"""
comment = payload.get("comment") or {}
body = comment.get("body", "")
# 检查是否包含 CI 失败关键词
if "[CI]" not in body and "CI 失败" not in body:
return
issue = payload.get("issue") or {}
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
# 尝试从关联 PR 获取信息
pr_author = issue.get("user", {}).get("login", "unknown")
branch = "(未知)"
# 提取错误摘要(取 comment body 前 500 字符)
error_summary = body[:500] if body else "(无错误信息)"
text = render_template("ci_failure", {
"repo": repo,
"pr_number": str(issue_number),
"branch": branch,
"error_summary": error_summary,
})
title = f"CI 失败: {repo}#{issue_number}"
_send_mail(pr_author, title, text)
# ---------------------------------------------------------------------------
# 事件分发
# ---------------------------------------------------------------------------
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_review": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
# ---------------------------------------------------------------------------
# Webhook 端点
# ---------------------------------------------------------------------------
@router.post("/webhook/gitea")
async def gitea_webhook(
request: Request,
x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"),
x_gitea_delivery: Optional[str] = Header(None, alias="X-Gitea-Delivery"),
x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"),
) -> Response:
"""Gitea Webhook 接收端点。
处理流程:签名验证 → 幂等检查 → 事件分发 → Mail 推送。
返回策略:
- payload 解析失败 / 未知事件 / 幂等重复 → 200(不触发重试)
- Mail 创建失败 → 500(触发 Gitea 重试)
"""
body = await request.body()
# 1. 签名验证
if not _verify_signature(body, x_gitea_signature):
logger.warning("Webhook signature verification failed")
return Response(status_code=200, content="signature verification failed")
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 4. 查找 handler
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
logger.debug("Unhandled event type: %s", x_gitea_event)
return Response(status_code=200, content=f"unhandled event: {x_gitea_event}")
# 5. 执行 handler
try:
await handler(payload)
except Exception:
logger.exception("Mail creation failed for %s event", x_gitea_event)
return Response(status_code=500, content="mail creation failed")
return Response(status_code=200, content="ok")