"""API 路由 — 工具链事件中枢(Toolchain Event Hub) 接收 Gitea Webhook,翻译成 Mail 通知推送给 Agent。 端点: POST /webhook/gitea 支持事件: pull_request, pull_request_review, issues, issue_comment """ from __future__ import annotations import hashlib import hmac import json import logging import os import re import time from datetime import datetime from pathlib import Path, PurePath from typing import Any, Dict, List, Optional, Set, Tuple import httpx from fastapi import APIRouter, Header, Request, Response from src.blackboard.db import init_db from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.daemon.toolchain_templates import render_template from src.utils import get_data_root logger = logging.getLogger(__name__) router = APIRouter(tags=["toolchain"]) # --------------------------------------------------------------------------- # 幂等检查:内存 set,保留最近 7 天 # --------------------------------------------------------------------------- # 使用内存 set 而非 SQLite(设计文档原计划 SQLite,简化实现:daemon 重启不频繁, # 重启后丢失可接受,Webhook 重试窗口内不会重复) _delivery_cache: Set[str] = set() _delivery_timestamps: List[Tuple[float, str]] = [] _TTL_SECONDS = 7 * 24 * 3600 def _is_duplicate(event: str, delivery: str) -> bool: """检查 Webhook 是否重复投递,自动清理过期条目。""" now = time.time() # 清理过期条目 while _delivery_timestamps and (now - _delivery_timestamps[0][0]) > _TTL_SECONDS: _, key = _delivery_timestamps.pop(0) _delivery_cache.discard(key) key = f"{event}-{delivery}" if key in _delivery_cache: return True _delivery_cache.add(key) _delivery_timestamps.append((now, key)) return False # --------------------------------------------------------------------------- # 签名验证 # --------------------------------------------------------------------------- _WEBHOOK_SECRET: Optional[str] = os.environ.get("GITEA_WEBHOOK_SECRET") def _verify_signature(body: bytes, signature: Optional[str]) -> bool: """验证 HMAC-SHA256 签名。secret 为空时跳过验签。""" if not _WEBHOOK_SECRET: return True if not signature: return False expected = hmac.new( _WEBHOOK_SECRET.encode(), body, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected, signature) # --------------------------------------------------------------------------- # Gitea API 调用 # --------------------------------------------------------------------------- _GITEA_TOKEN: str = os.environ.get("GITEA_TOKEN", "") _GITEA_BASE = "http://192.168.2.154:3000/api/v1" async def _fetch_pr_files(repo: str, pr_number: int) -> List[str]: """通过 Gitea API 获取 PR changed files 列表。 Args: repo: 仓库路径(如 "sanguo/sanguo_moziplus_v2") pr_number: PR 编号 Returns: 文件路径列表 """ if not _GITEA_TOKEN: logger.warning("GITEA_TOKEN not set, cannot fetch PR files") return [] url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files" headers = {"Authorization": f"token {_GITEA_TOKEN}"} try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get(url, headers=headers) resp.raise_for_status() files: List[Dict[str, Any]] = resp.json() return [f.get("filename", "") for f in files] except Exception: logger.warning("Failed to fetch PR files: %s/pulls/%d", repo, pr_number, exc_info=True) return [] # --------------------------------------------------------------------------- # 风险级别判定 # --------------------------------------------------------------------------- _HIGH_PATTERNS = [ "**/spawner*", "**/ticker*", "**/dispatcher*", "**/router*", "**/guardrails*", "**/strategy*", "**/risk*", ] def _calc_risk_level(changed_files: List[str]) -> str: """根据改动文件列表判定风险级别。""" for filepath in changed_files: for pattern in _HIGH_PATTERNS: if PurePath(filepath).match(pattern): return "high" return "standard" # --------------------------------------------------------------------------- # Mail 创建 # --------------------------------------------------------------------------- KNOWN_AGENTS = { "pangtong-fujunshi", "simayi-challenger", "zhangfei-dev", "guanyu-dev", "zhaoyun-data", "jiangwei-infra", } MAIL_PROJECT_ID = "_mail" def _mail_db_path() -> Path: """获取 Mail 数据库路径,确保目录存在。""" root = get_data_root() db = root / MAIL_PROJECT_ID / "blackboard.db" db.parent.mkdir(parents=True, exist_ok=True) init_db(db) return db def _send_mail( to_agent: str, title: str, description: str, source: str = "webhook", ) -> str: """创建 Mail Task 并写入数据库。 Args: to_agent: 收件人 Agent ID title: 邮件标题 description: 邮件正文 source: 来源标识 Returns: 创建的 Mail ID Raises: Exception: 数据库写入失败 """ if to_agent not in KNOWN_AGENTS: logger.warning("Unknown agent: %s, skipping mail", to_agent) return "" mail_id = f"mail-{int(datetime.now().timestamp() * 1000)}" notify_meta = { "type": "inform", "performative": "inform", "is_read": False, "conversation_id": f"conv-{mail_id}", "from": "system", "source": source, } task = Task( id=mail_id, title=title, description=description, assignee=to_agent, assigned_by="system", must_haves=json.dumps(notify_meta, ensure_ascii=False), task_type="mail", status="pending", ) bb = Blackboard(_mail_db_path()) bb.create_task(task) logger.info("Mail sent: %s → %s [%s]", title[:40], to_agent, mail_id) return mail_id # --------------------------------------------------------------------------- # 辅助:从 payload 提取仓库全名 # --------------------------------------------------------------------------- def _repo_fullname(payload: Dict[str, Any]) -> str: """从 Webhook payload 提取仓库全名(owner/repo)。""" repo = payload.get("repository") or {} return repo.get("full_name", "") # --------------------------------------------------------------------------- # 事件处理函数 # --------------------------------------------------------------------------- async def _handle_pull_request(payload: Dict[str, Any]) -> None: """处理 pull_request 事件:opened → 通知 simayi-challenger。""" action = payload.get("action", "") if action != "opened": return pr = payload.get("pull_request") or {} repo = _repo_fullname(payload) pr_number = pr.get("number", 0) pr_title = pr.get("title", "") pr_author = pr.get("user", {}).get("login", "unknown") branch = pr.get("head", {}).get("ref", "unknown") # 获取改动文件列表 changed_files = await _fetch_pr_files(repo, pr_number) risk_level = _calc_risk_level(changed_files) file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无法获取文件列表)" text = render_template("review_request", { "repo": repo, "pr_number": str(pr_number), "pr_title": pr_title, "pr_author": pr_author, "branch": branch, "risk_level": risk_level, "file_list": file_list, }) title = f"Review 请求: {pr_title} ({repo}#{pr_number})" _send_mail("simayi-challenger", title, text) async def _handle_pull_request_review(payload: Dict[str, Any]) -> None: """处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。""" review = payload.get("review") or {} state = review.get("state", "") # 只通知 APPROVED 和 REJECTED,跳过 COMMENTED if state == "COMMENTED": return pr = payload.get("pull_request") or {} repo = _repo_fullname(payload) pr_number = pr.get("number", 0) pr_title = pr.get("title", "") pr_author = pr.get("user", {}).get("login", "unknown") reviewer = review.get("user", {}).get("login", "unknown") review_body = review.get("body", "(无评论)") result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"} if state not in result_map: return result = result_map[state] text = render_template("review_result", { "repo": repo, "pr_number": str(pr_number), "pr_title": pr_title, "reviewer": reviewer, "result": result, "review_body": review_body, }) title = f"Review {result}: {pr_title} ({repo}#{pr_number})" _send_mail(pr_author, title, text) async def _handle_issues(payload: Dict[str, Any]) -> None: """处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。""" action = payload.get("action", "") issue = payload.get("issue") or {} repo = _repo_fullname(payload) issue_number = issue.get("number", 0) issue_title = issue.get("title", "") if action == "assigned": assignee = "" assignees = issue.get("assignees") or [] if assignees: assignee = assignees[-1].get("login", "") if not assignee: logger.debug("Issue assigned but no assignee found, skipping") return labels_list = [lbl.get("name", "") for lbl in (issue.get("labels") or [])] labels = ", ".join(labels_list) if labels_list else "(无标签)" issue_body = issue.get("body", "(无描述)") brief = issue_title[:20].replace(" ", "-").lower() text = render_template("issue_assigned", { "repo": repo, "issue_number": str(issue_number), "issue_title": issue_title, "labels": labels, "issue_body": issue_body or "(无描述)", "brief": brief, }) title = f"Issue 指派: {issue_title} ({repo}#{issue_number})" _send_mail(assignee, title, text) elif action == "opened" and "部署失败" in issue_title: # 从 Issue body 提取 commit hash(Gitea deploy workflow 格式) sha_match = re.search(r'[0-9a-f]{40}', issue.get("body", "")) commit_sha = sha_match.group(0) if sha_match else "(未知)" text = render_template("deploy_failure", { "repo": repo, "commit_sha": commit_sha or "(未知)", }) title = f"部署失败: {repo}" for agent_id in ("jiangwei-infra", "pangtong-fujunshi"): _send_mail(agent_id, title, text) async def _handle_issue_comment(payload: Dict[str, Any]) -> None: """处理 issue_comment 事件:CI 失败关键词 → 通知 PR 作者。""" comment = payload.get("comment") or {} body = comment.get("body", "") # 检查是否包含 CI 失败关键词 if "[CI]" not in body and "CI 失败" not in body: return issue = payload.get("issue") or {} repo = _repo_fullname(payload) issue_number = issue.get("number", 0) # 尝试从关联 PR 获取信息 pr_author = issue.get("user", {}).get("login", "unknown") branch_match = re.search(r"分支:\s*(\S+)", body) branch = branch_match.group(1) if branch_match else "(未知)" # 提取错误摘要(取 comment body 前 500 字符) error_summary = body[:500] if body else "(无错误信息)" text = render_template("ci_failure", { "repo": repo, "pr_number": str(issue_number), "branch": branch, "error_summary": error_summary, }) title = f"CI 失败: {repo}#{issue_number}" _send_mail(pr_author, title, text) # --------------------------------------------------------------------------- # 事件分发 # --------------------------------------------------------------------------- _EVENT_HANDLERS: Dict[str, Any] = { "pull_request": _handle_pull_request, "pull_request_review": _handle_pull_request_review, "issues": _handle_issues, "issue_comment": _handle_issue_comment, } # --------------------------------------------------------------------------- # Webhook 端点 # --------------------------------------------------------------------------- @router.post("/webhook/gitea") async def gitea_webhook( request: Request, x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"), x_gitea_delivery: Optional[str] = Header(None, alias="X-Gitea-Delivery"), x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"), ) -> Response: """Gitea Webhook 接收端点。 处理流程:签名验证 → 幂等检查 → 事件分发 → Mail 推送。 返回策略: - payload 解析失败 / 未知事件 / 幂等重复 → 200(不触发重试) - Mail 创建失败 → 500(触发 Gitea 重试) """ body = await request.body() # 1. 签名验证 if not _verify_signature(body, x_gitea_signature): logger.warning("Webhook signature verification failed") return Response(status_code=403, content="signature verification failed") # 2. 幂等检查 if x_gitea_event and x_gitea_delivery: if _is_duplicate(x_gitea_event, x_gitea_delivery): logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) return Response(status_code=200, content="duplicate") # 3. 解析 payload try: payload = await request.json() except Exception: logger.warning("Failed to parse webhook payload") return Response(status_code=200, content="invalid payload") # 4. 查找 handler handler = _EVENT_HANDLERS.get(x_gitea_event or "") if not handler: logger.debug("Unhandled event type: %s", x_gitea_event) return Response(status_code=200, content=f"unhandled event: {x_gitea_event}") # 5. 执行 handler try: await handler(payload) except Exception: logger.exception("Mail creation failed for %s event", x_gitea_event) return Response(status_code=500, content="mail creation failed") return Response(status_code=200, content="ok")