From 9880091f52cd85f2d5e0b467aaa9846e23648d6d Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 7 Jun 2026 11:56:54 +0800 Subject: [PATCH] auto-sync: 2026-06-07 11:56:54 --- src/api/toolchain_routes.py | 416 ++++++++++++++++++++++++++++++++++++ 1 file changed, 416 insertions(+) create mode 100644 src/api/toolchain_routes.py diff --git a/src/api/toolchain_routes.py b/src/api/toolchain_routes.py new file mode 100644 index 0000000..b22d1fa --- /dev/null +++ b/src/api/toolchain_routes.py @@ -0,0 +1,416 @@ +"""API 路由 — 工具链事件中枢(Toolchain Event Hub) + +接收 Gitea Webhook,翻译成 Mail 通知推送给 Agent。 + +端点: POST /webhook/gitea +支持事件: pull_request, pull_request_review, issues, issue_comment +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import os +import time +from datetime import datetime +from pathlib import PurePath +from typing import Any, Dict, List, Optional, Set, Tuple + +import httpx +from fastapi import APIRouter, Header, Request, Response + +from src.blackboard.db import init_db +from src.blackboard.models import Task +from src.blackboard.operations import Blackboard +from src.daemon.toolchain_templates import render_template +from src.utils import get_data_root + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["toolchain"]) + +# --------------------------------------------------------------------------- +# 幂等检查:内存 set,保留最近 7 天 +# --------------------------------------------------------------------------- + +_delivery_cache: Set[str] = set() +_delivery_timestamps: List[Tuple[float, str]] = [] +_TTL_SECONDS = 7 * 24 * 3600 + + +def _is_duplicate(event: str, delivery: str) -> bool: + """检查 Webhook 是否重复投递,自动清理过期条目。""" + now = time.time() + # 清理过期条目 + while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS: + _, key = _delivery_timestamps.pop(0) + _delivery_cache.discard(key) + + key = f"{event}-{delivery}" + if key in _delivery_cache: + return True + _delivery_cache.add(key) + _delivery_timestamps.append((now, key)) + return False + + +# --------------------------------------------------------------------------- +# 签名验证 +# --------------------------------------------------------------------------- + +_WEBHOOK_SECRET: Optional[str] = os.environ.get("GITEA_WEBHOOK_SECRET") + + +def _verify_signature(body: bytes, signature: Optional[str]) -> bool: + """验证 HMAC-SHA256 签名。secret 为空时跳过验签。""" + if not _WEBHOOK_SECRET: + return True + if not signature: + return False + expected = hmac.new( + _WEBHOOK_SECRET.encode(), body, hashlib.sha256 + ).hexdigest() + return hmac.compare_digest(expected, signature) + + +# --------------------------------------------------------------------------- +# Gitea API 调用 +# --------------------------------------------------------------------------- + +_GITEA_TOKEN: str = os.environ.get( + "GITEA_TOKEN", "a6d596b826f4bfeaf983ef4d25ac25dab95bbc4e" +) +_GITEA_BASE = "http://192.168.2.154:3000/api/v1" + + +async def _fetch_pr_files(repo: str, pr_number: int) -> List[str]: + """通过 Gitea API 获取 PR changed files 列表。 + + Args: + repo: 仓库路径(如 "sanguo/sanguo_moziplus_v2") + pr_number: PR 编号 + + Returns: + 文件路径列表 + """ + url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files" + headers = {"Authorization": f"token {_GITEA_TOKEN}"} + try: + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + files: List[Dict[str, Any]] = resp.json() + return [f.get("filename", "") for f in files] + except Exception: + logger.warning("Failed to fetch PR files: %s/pulls/%d", repo, pr_number, exc_info=True) + return [] + + +# --------------------------------------------------------------------------- +# 风险级别判定 +# --------------------------------------------------------------------------- + +_HIGH_PATTERNS = [ + "**/spawner*", "**/ticker*", "**/dispatcher*", + "**/router*", "**/guardrails*", "**/strategy*", "**/risk*", +] + + +def _calc_risk_level(changed_files: List[str]) -> str: + """根据改动文件列表判定风险级别。""" + for filepath in changed_files: + for pattern in _HIGH_PATTERNS: + if PurePath(filepath).match(pattern): + return "high" + return "standard" + + +# --------------------------------------------------------------------------- +# Mail 创建 +# --------------------------------------------------------------------------- + +MAIL_PROJECT_ID = "_mail" + + +def _mail_db_path() -> "Path": + """获取 Mail 数据库路径,确保目录存在。""" + from pathlib import Path as P + root = get_data_root() + db = root / MAIL_PROJECT_ID / "blackboard.db" + db.parent.mkdir(parents=True, exist_ok=True) + init_db(db) + return db + + +def _send_mail( + to_agent: str, + title: str, + description: str, + source: str = "webhook", +) -> str: + """创建 Mail Task 并写入数据库。 + + Args: + to_agent: 收件人 Agent ID + title: 邮件标题 + description: 邮件正文 + source: 来源标识 + + Returns: + 创建的 Mail ID + + Raises: + Exception: 数据库写入失败 + """ + mail_id = f"mail-{int(datetime.now().timestamp() * 1000)}" + notify_meta = { + "type": "inform", + "performative": "inform", + "is_read": False, + "conversation_id": f"conv-{mail_id}", + "from": "system", + "source": source, + } + task = Task( + id=mail_id, + title=title, + description=description, + assignee=to_agent, + assigned_by="system", + must_haves=json.dumps(notify_meta, ensure_ascii=False), + task_type="mail", + status="pending", + ) + bb = Blackboard(_mail_db_path()) + bb.create_task(task) + logger.info("Mail sent: %s → %s [%s]", title[:40], to_agent, mail_id) + return mail_id + + +# --------------------------------------------------------------------------- +# 辅助:从 payload 提取仓库全名 +# --------------------------------------------------------------------------- + + +def _repo_fullname(payload: Dict[str, Any]) -> str: + """从 Webhook payload 提取仓库全名(owner/repo)。""" + repo = payload.get("repository") or {} + return repo.get("full_name", "") + + +# --------------------------------------------------------------------------- +# 事件处理函数 +# --------------------------------------------------------------------------- + + +async def _handle_pull_request(payload: Dict[str, Any]) -> None: + """处理 pull_request 事件:opened → 通知 simayi-challenger。""" + action = payload.get("action", "") + if action != "opened": + return + + pr = payload.get("pull_request") or {} + repo = _repo_fullname(payload) + pr_number = pr.get("number", 0) + pr_title = pr.get("title", "") + pr_author = pr.get("user", {}).get("login", "unknown") + branch = pr.get("head", {}).get("ref", "unknown") + + # 获取改动文件列表 + changed_files = await _fetch_pr_files(repo, pr_number) + risk_level = _calc_risk_level(changed_files) + file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无法获取文件列表)" + + text = render_template("review_request", { + "repo": repo, + "pr_number": str(pr_number), + "pr_title": pr_title, + "pr_author": pr_author, + "branch": branch, + "risk_level": risk_level, + "file_list": file_list, + }) + + title = f"Review 请求: {pr_title} ({repo}#{pr_number})" + _send_mail("simayi-challenger", title, text) + + +async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: + """处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。""" + review = payload.get("review") or {} + state = review.get("state", "") + + # 只通知 APPROVED 和 REJECTED,跳过 COMMENTED + if state == "COMMENTED": + return + + pr = payload.get("pull_request") or {} + repo = _repo_fullname(payload) + pr_number = pr.get("number", 0) + pr_title = pr.get("title", "") + pr_author = pr.get("user", {}).get("login", "unknown") + reviewer = review.get("user", {}).get("login", "unknown") + review_body = review.get("body", "(无评论)") + + result_map = {"APPROVED": "通过 ✓", "REJECTED": "驳回 ✗", "PENDING": "待定"} + result = result_map.get(state, state) + + text = render_template("review_result", { + "repo": repo, + "pr_number": str(pr_number), + "pr_title": pr_title, + "reviewer": reviewer, + "result": result, + "review_body": review_body, + }) + + title = f"Review {result}: {pr_title} ({repo}#{pr_number})" + _send_mail(pr_author, title, text) + + +async def _handle_issues(payload: Dict[str, Any]) -> None: + """处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。""" + action = payload.get("action", "") + issue = payload.get("issue") or {} + repo = _repo_fullname(payload) + issue_number = issue.get("number", 0) + issue_title = issue.get("title", "") + + if action == "assigned": + assignee = "" + assignees = issue.get("assignees") or [] + if assignees: + assignee = assignees[-1].get("login", "") + if not assignee: + logger.debug("Issue assigned but no assignee found, skipping") + return + + labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])] + labels = ", ".join(labels_list) if labels_list else "(无标签)" + issue_body = issue.get("body", "(无描述)") + brief = issue_title[:20].replace(" ", "-").lower() + + text = render_template("issue_assigned", { + "repo": repo, + "issue_number": str(issue_number), + "issue_title": issue_title, + "labels": labels, + "issue_body": issue_body or "(无描述)", + "brief": brief, + }) + + title = f"Issue 指派: {issue_title} ({repo}#{issue_number})" + _send_mail(assignee, title, text) + + elif action == "opened" and "部署失败" in issue_title: + commit_sha = issue.get("body", "")[:40] if issue.get("body") else "" + + text = render_template("deploy_failure", { + "repo": repo, + "commit_sha": commit_sha or "(未知)", + }) + + title = f"部署失败: {repo}" + for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): + _send_mail(agent_id, title, text) + + +async def _handle_issue_comment(payload: Dict[str, Any]) -> None: + """处理 issue_comment 事件:CI 失败关键词 → 通知 PR 作者。""" + comment = payload.get("comment") or {} + body = comment.get("body", "") + + # 检查是否包含 CI 失败关键词 + if "[CI]" not in body and "CI 失败" not in body: + return + + issue = payload.get("issue") or {} + repo = _repo_fullname(payload) + issue_number = issue.get("number", 0) + + # 尝试从关联 PR 获取信息 + pr_author = issue.get("user", {}).get("login", "unknown") + branch = "(未知)" + + # 提取错误摘要(取 comment body 前 500 字符) + error_summary = body[:500] if body else "(无错误信息)" + + text = render_template("ci_failure", { + "repo": repo, + "pr_number": str(issue_number), + "branch": branch, + "error_summary": error_summary, + }) + + title = f"CI 失败: {repo}#{issue_number}" + _send_mail(pr_author, title, text) + + +# --------------------------------------------------------------------------- +# 事件分发 +# --------------------------------------------------------------------------- + +_EVENT_HANDLERS: Dict[str, Any] = { + "pull_request": _handle_pull_request, + "pull_request_review": _handle_pull_request_review, + "issues": _handle_issues, + "issue_comment": _handle_issue_comment, +} + + +# --------------------------------------------------------------------------- +# Webhook 端点 +# --------------------------------------------------------------------------- + + +@router.post("/webhook/gitea") +async def gitea_webhook( + request: Request, + x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"), + x_gitea_delivery: Optional[str] = Header(None, alias="X-Gitea-Delivery"), + x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"), +) -> Response: + """Gitea Webhook 接收端点。 + + 处理流程:签名验证 → 幂等检查 → 事件分发 → Mail 推送。 + + 返回策略: + - payload 解析失败 / 未知事件 / 幂等重复 → 200(不触发重试) + - Mail 创建失败 → 500(触发 Gitea 重试) + """ + body = await request.body() + + # 1. 签名验证 + if not _verify_signature(body, x_gitea_signature): + logger.warning("Webhook signature verification failed") + return Response(status_code=200, content="signature verification failed") + + # 2. 幂等检查 + if x_gitea_event and x_gitea_delivery: + if _is_duplicate(x_gitea_event, x_gitea_delivery): + logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) + return Response(status_code=200, content="duplicate") + + # 3. 解析 payload + try: + payload = await request.json() + except Exception: + logger.warning("Failed to parse webhook payload") + return Response(status_code=200, content="invalid payload") + + # 4. 查找 handler + handler = _EVENT_HANDLERS.get(x_gitea_event or "") + if not handler: + logger.debug("Unhandled event type: %s", x_gitea_event) + return Response(status_code=200, content=f"unhandled event: {x_gitea_event}") + + # 5. 执行 handler + try: + await handler(payload) + except Exception: + logger.exception("Mail creation failed for %s event", x_gitea_event) + return Response(status_code=500, content="mail creation failed") + + return Response(status_code=200, content="ok")