f74ae30d41
改动 1: ci_failure steps 增加分支指引(代码问题自己修/基础设施问题提Issue给姜维) 改动 2: deploy_failure steps 同上分支指引(2处定义都改) 改动 3: issue_assigned handler 按 type/infrastructure label 分流 - infrastructure label → infrastructure_failure event_type(运维排查 steps) - 其他 → 原有编码 steps 改动 4: ToolchainApiSection 新增「需要创建 Issue 时」API 指引段落 改动 5: Red Flags 新增「不是我代码的问题」条目 测试: 8 个新测试,463 passed
1280 lines
48 KiB
Python
1280 lines
48 KiB
Python
"""API 路由 — 工具链事件中枢(Toolchain Event Hub)
|
||
|
||
接收 Gitea Webhook,翻译成 Mail 通知推送给 Agent。
|
||
|
||
端点: POST /webhook/gitea
|
||
支持事件: pull_request, pull_request_review, issues, issue_comment
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path, PurePath
|
||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||
|
||
import httpx
|
||
from fastapi import APIRouter, Header, Request, Response
|
||
|
||
from src.blackboard.db import init_db
|
||
from src.blackboard.models import Task
|
||
from src.blackboard.operations import Blackboard
|
||
from src.config.agents import AGENT_IDS
|
||
from src.api.mention_utils import (
|
||
extract_mentions,
|
||
should_suppress_mention,
|
||
infer_intent,
|
||
_build_response_guidance,
|
||
GITEA_API_BASE,
|
||
)
|
||
from src.daemon.toolchain_templates import render_template
|
||
from src.utils import get_data_root
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(tags=["toolchain"])
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 幂等检查:内存 set,保留最近 7 天
|
||
# ---------------------------------------------------------------------------
|
||
# 使用内存 set 而非 SQLite(设计文档原计划 SQLite,简化实现:daemon 重启不频繁,
|
||
# 重启后丢失可接受,Webhook 重试窗口内不会重复)
|
||
|
||
_delivery_cache: Set[str] = set()
|
||
_delivery_timestamps: List[Tuple[float, str]] = []
|
||
_TTL_SECONDS = 7 * 24 * 3600
|
||
_idempotency_lock: Optional[asyncio.Lock] = None
|
||
|
||
|
||
def _get_idempotency_lock() -> asyncio.Lock:
|
||
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
|
||
global _idempotency_lock
|
||
if _idempotency_lock is None:
|
||
_idempotency_lock = asyncio.Lock()
|
||
return _idempotency_lock
|
||
|
||
|
||
def _is_duplicate(event: str, delivery: str,
|
||
payload: Optional[Dict[str, Any]] = None) -> bool:
|
||
"""检查 Webhook 是否重复投递,自动清理过期条目。
|
||
|
||
双重去重策略:
|
||
1. delivery UUID 去重(标准幂等)
|
||
2. payload 内容去重(应对 Gitea v1.23.4 的 webhookNotifier + actionsNotifier
|
||
对同一 review 生成不同 UUID 的双投递问题)
|
||
"""
|
||
now = time.time()
|
||
# 清理过期条目
|
||
while _delivery_timestamps and (
|
||
now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
|
||
_, key = _delivery_timestamps.pop(0)
|
||
_delivery_cache.discard(key)
|
||
|
||
# 检查 delivery UUID 去重
|
||
key = f"{event}-{delivery}"
|
||
if key in _delivery_cache:
|
||
return True
|
||
|
||
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
|
||
# 注意:Gitea webhookNotifier 用 review.body,actionsNotifier 用 review.content
|
||
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
|
||
if payload and "review" in event:
|
||
pr_num = payload.get("pull_request", {}).get("number")
|
||
sender = payload.get("sender", {}).get("login")
|
||
review = payload.get("review", {})
|
||
# 取 body 或 content,优先 body(webhookNotifier 格式)
|
||
content = review.get("body", "") or review.get("content", "")
|
||
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
|
||
review_id = review.get("id", "")
|
||
content_key = f"content:{event}:{pr_num}:{sender}:{review_id}:{content_hash}"
|
||
if content_key in _delivery_cache:
|
||
logger.info(
|
||
"Content-based duplicate detected: %s PR#%s by %s",
|
||
event,
|
||
pr_num,
|
||
sender)
|
||
return True
|
||
_delivery_cache.add(content_key)
|
||
_delivery_timestamps.append((now, content_key))
|
||
|
||
_delivery_cache.add(key)
|
||
_delivery_timestamps.append((now, key))
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 签名验证
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_WEBHOOK_SECRET: Optional[str] = os.environ.get("GITEA_WEBHOOK_SECRET")
|
||
|
||
|
||
def _verify_signature(body: bytes, signature: Optional[str]) -> bool:
|
||
"""验证 HMAC-SHA256 签名。secret 为空时跳过验签。"""
|
||
if not _WEBHOOK_SECRET:
|
||
return True
|
||
if not signature:
|
||
return False
|
||
expected = hmac.new(
|
||
_WEBHOOK_SECRET.encode(), body, hashlib.sha256
|
||
).hexdigest()
|
||
return hmac.compare_digest(expected, signature)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Gitea API 调用
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_GITEA_TOKEN: str = os.environ.get("GITEA_TOKEN", "")
|
||
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
|
||
|
||
|
||
async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]:
|
||
"""获取 PR 文件列表,含重试机制。
|
||
|
||
Returns:
|
||
(文件列表, 错误信息) — 成功时错误信息为空字符串
|
||
"""
|
||
if not _GITEA_TOKEN:
|
||
return [], "GITEA_TOKEN 未配置"
|
||
|
||
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files"
|
||
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
|
||
last_error = ""
|
||
for attempt in range(3):
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
resp = await client.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
files: List[Dict[str, Any]] = resp.json()
|
||
return [f.get("filename", "") for f in files], ""
|
||
except Exception as e:
|
||
last_error = str(e)
|
||
if attempt < 2:
|
||
await asyncio.sleep(0.5 * (attempt + 1))
|
||
logger.warning(
|
||
"Retry %d/3 fetching PR files: %s/pulls/%d",
|
||
attempt + 1,
|
||
repo,
|
||
pr_number)
|
||
logger.warning(
|
||
"Failed to fetch PR files after 3 retries: %s/pulls/%d - %s",
|
||
repo,
|
||
pr_number,
|
||
last_error)
|
||
return [], f"获取文件列表失败(重试3次): {last_error}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 风险级别判定
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_HIGH_PATTERNS = [
|
||
"**/spawner*", "**/ticker*", "**/dispatcher*",
|
||
"**/router*", "**/guardrails*", "**/strategy*", "**/risk*",
|
||
]
|
||
|
||
|
||
def _calc_risk_level(changed_files: List[str]) -> str:
|
||
"""根据改动文件列表判定风险级别。"""
|
||
for filepath in changed_files:
|
||
for pattern in _HIGH_PATTERNS:
|
||
if PurePath(filepath).match(pattern):
|
||
return "high"
|
||
return "standard"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Mail 创建
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
MAIL_PROJECT_ID = "_mail"
|
||
TOOLCHAIN_PROJECT_ID = "_toolchain"
|
||
|
||
|
||
def _mail_db_path() -> Path:
|
||
"""获取 Mail 数据库路径,确保目录存在。"""
|
||
root = get_data_root()
|
||
db = root / MAIL_PROJECT_ID / "blackboard.db"
|
||
db.parent.mkdir(parents=True, exist_ok=True)
|
||
init_db(db)
|
||
return db
|
||
|
||
|
||
def _toolchain_db_path() -> Path:
|
||
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
|
||
root = get_data_root()
|
||
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
|
||
db.parent.mkdir(parents=True, exist_ok=True)
|
||
init_db(db)
|
||
return db
|
||
|
||
|
||
def _send_toolchain_task(
|
||
to_agent: str,
|
||
title: str,
|
||
description: str,
|
||
event_type: str,
|
||
action_type: str,
|
||
steps: list,
|
||
context_data: dict | None = None,
|
||
source: str = "webhook",
|
||
) -> str:
|
||
"""创建 Toolchain Task 并写入 _toolchain DB。
|
||
|
||
Args:
|
||
to_agent: 收件人 Agent ID
|
||
title: 任务标题
|
||
description: 任务描述(模板渲染后的事件信息)
|
||
event_type: 事件类型(review_result / ci_failure / ...)
|
||
action_type: 动作分类(用于步骤选择和日志统计)
|
||
steps: 结构化编号步骤列表
|
||
context_data: 事件上下文数据(PR 号、仓库名等)
|
||
source: 来源标识
|
||
|
||
Returns:
|
||
创建的 Task ID
|
||
"""
|
||
if to_agent not in AGENT_IDS:
|
||
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
|
||
return ""
|
||
|
||
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
|
||
must_hives = json.dumps({
|
||
"event_type": event_type,
|
||
"action_type": action_type,
|
||
"steps": steps,
|
||
"context": context_data or {},
|
||
"from": "system",
|
||
"source": source,
|
||
}, ensure_ascii=False)
|
||
|
||
task = Task(
|
||
id=task_id,
|
||
title=title,
|
||
description=description,
|
||
assignee=to_agent,
|
||
assigned_by="system",
|
||
must_haves=must_hives,
|
||
task_type="toolchain",
|
||
status="pending",
|
||
)
|
||
bb = Blackboard(_toolchain_db_path())
|
||
bb.create_task(task)
|
||
logger.info(
|
||
"Toolchain task sent: %s → %s [%s] action_type=%s",
|
||
title[:40], to_agent, task_id, action_type,
|
||
)
|
||
return task_id
|
||
|
||
|
||
def _send_mail(
|
||
to_agent: str,
|
||
title: str,
|
||
description: str,
|
||
source: str = "webhook",
|
||
) -> str:
|
||
"""创建 Mail Task 并写入数据库。
|
||
|
||
Args:
|
||
to_agent: 收件人 Agent ID
|
||
title: 邮件标题
|
||
description: 邮件正文
|
||
source: 来源标识
|
||
|
||
Returns:
|
||
创建的 Mail ID
|
||
|
||
Raises:
|
||
Exception: 数据库写入失败
|
||
"""
|
||
if to_agent not in AGENT_IDS:
|
||
logger.warning("Unknown agent: %s, skipping mail", to_agent)
|
||
return ""
|
||
|
||
mail_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
|
||
notify_meta = {
|
||
"type": "inform",
|
||
"performative": "inform",
|
||
"is_read": False,
|
||
"conversation_id": f"conv-{mail_id}",
|
||
"from": "system",
|
||
"source": source,
|
||
}
|
||
task = Task(
|
||
id=mail_id,
|
||
title=title,
|
||
description=description,
|
||
assignee=to_agent,
|
||
assigned_by="system",
|
||
must_haves=json.dumps(notify_meta, ensure_ascii=False),
|
||
task_type="mail",
|
||
status="pending",
|
||
)
|
||
bb = Blackboard(_mail_db_path())
|
||
bb.create_task(task)
|
||
logger.info("Mail sent: %s → %s [%s]", title[:40], to_agent, mail_id)
|
||
return mail_id
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助:从 payload 提取仓库全名
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _repo_fullname(payload: Dict[str, Any]) -> str:
|
||
"""从 Webhook payload 提取仓库全名(owner/repo)。"""
|
||
repo = payload.get("repository") or {}
|
||
return repo.get("full_name", "")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# @mention 通用发送函数
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _send_mention_mails(
|
||
mentions: list[str],
|
||
auto_targets: list[str],
|
||
source_type: str,
|
||
mention_type: str,
|
||
source_url: str,
|
||
commenter: str,
|
||
content: str,
|
||
repo: str,
|
||
issue_number: int,
|
||
is_pr: bool,
|
||
) -> None:
|
||
"""通用 @mention Mail 发送函数。
|
||
|
||
自动抑制已在 auto_targets 中的 Agent,避免双重通知。
|
||
根据内容推断意图,生成不同的响应指引。
|
||
"""
|
||
# 确定 API 路径
|
||
if is_pr:
|
||
detail_api = f"pulls/{issue_number}"
|
||
comments_api = f"issues/{issue_number}/comments"
|
||
else:
|
||
detail_api = f"issues/{issue_number}"
|
||
comments_api = f"issues/{issue_number}/comments"
|
||
|
||
for agent_id in mentions:
|
||
if should_suppress_mention(agent_id, auto_targets):
|
||
logger.info(
|
||
"Mention suppressed for %s (already notified by auto flow)",
|
||
agent_id)
|
||
continue
|
||
|
||
# 从 api_path 提取编号用于标题,如 "issues/32" → "#32"
|
||
number_str = f"#{issue_number}" if issue_number else ""
|
||
intent = infer_intent(content)
|
||
intent_hint = {"help": "求助", "notify": "通知关注",
|
||
"collaborate": "协作请求", "assign": "分配子任务"}[intent]
|
||
|
||
# 生成响应指引
|
||
guidance = _build_response_guidance(
|
||
intent=intent,
|
||
gitea_api=GITEA_API_BASE,
|
||
repo=repo,
|
||
issue_number=issue_number,
|
||
commenter=commenter,
|
||
)
|
||
|
||
text = render_template("mention", {
|
||
"mention_type": mention_type,
|
||
"source_type": source_type,
|
||
"source_url": source_url,
|
||
"commenter": commenter,
|
||
"intent_hint": intent_hint,
|
||
"content_snippet": content[:500],
|
||
"gitea_api": GITEA_API_BASE,
|
||
"repo": repo,
|
||
"source_detail_api_path": detail_api,
|
||
"source_comments_api_path": comments_api,
|
||
"response_guidance": guidance,
|
||
})
|
||
|
||
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
|
||
_send_toolchain_task(
|
||
to_agent=agent_id,
|
||
title=title,
|
||
description=text,
|
||
event_type="mention",
|
||
action_type="mention",
|
||
steps=[
|
||
"按上方 mention 模板中的 response_guidance 执行",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
],
|
||
context_data={
|
||
"source_type": source_type,
|
||
"source_url": source_url,
|
||
"commenter": commenter,
|
||
"content_snippet": content[:500],
|
||
"repo": repo,
|
||
"issue_number": issue_number,
|
||
},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 事件处理函数
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
|
||
"""处理 pull_request 事件:opened → 通知 reviewer;closed → merge 通知。"""
|
||
action = payload.get("action", "")
|
||
if action == "opened":
|
||
await _handle_pr_opened(payload)
|
||
elif action == "closed":
|
||
await _handle_pr_closed(payload)
|
||
elif action == "synchronize":
|
||
await _handle_pr_synchronize(payload)
|
||
|
||
|
||
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
|
||
"""PR opened → 通知 simayi-challenger。"""
|
||
pr = payload.get("pull_request")
|
||
if not pr or not isinstance(pr, dict):
|
||
logger.warning(
|
||
"pull_request event missing pull_request field, skipping")
|
||
return
|
||
repo = _repo_fullname(payload)
|
||
pr_number = pr.get("number", 0)
|
||
pr_title = pr.get("title", "")
|
||
pr_author = pr.get("user", {}).get("login", "unknown")
|
||
branch = pr.get("head", {}).get("ref", "unknown")
|
||
|
||
# 获取改动文件列表
|
||
changed_files, fetch_error = await _fetch_pr_files(repo, pr_number)
|
||
risk_level = _calc_risk_level(changed_files)
|
||
if fetch_error:
|
||
file_list = f"⚠️ {fetch_error}"
|
||
else:
|
||
file_list = "\n".join(
|
||
f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
|
||
|
||
text = render_template("review_request", {
|
||
"repo": repo,
|
||
"pr_number": str(pr_number),
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"branch": branch,
|
||
"risk_level": risk_level,
|
||
"file_list": file_list,
|
||
})
|
||
|
||
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
|
||
_send_toolchain_task(
|
||
to_agent="simayi-challenger",
|
||
title=title,
|
||
description=text,
|
||
event_type="review_request",
|
||
action_type="review_request",
|
||
steps=[
|
||
f"读取 PR diff(Gitea API: GET /repos/{repo}/pulls/{pr_number}.diff)",
|
||
"按审查清单审查(参考 code-review Skill)",
|
||
f"提交 Review(Gitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
],
|
||
context_data={
|
||
"pr_number": pr_number,
|
||
"repo": repo,
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"branch": branch,
|
||
"risk_level": risk_level,
|
||
},
|
||
)
|
||
|
||
# S3: PR body @mention 通知
|
||
pr_body = pr.get("body", "") or ""
|
||
sender = pr.get("user", {}).get("login", "")
|
||
mentions = extract_mentions(pr_body, sender)
|
||
if mentions:
|
||
# 自动流转已通知 simayi-challenger(review_request)
|
||
auto_targets = ["simayi-challenger"]
|
||
await _send_mention_mails(
|
||
mentions=mentions,
|
||
auto_targets=auto_targets,
|
||
source_type="PR",
|
||
mention_type="PR @mention",
|
||
source_url=pr.get("html_url", ""),
|
||
commenter=sender,
|
||
content=pr_body,
|
||
repo=repo,
|
||
issue_number=pr_number,
|
||
is_pr=True,
|
||
)
|
||
|
||
|
||
async def _send_review_mentions(
|
||
review_body: str,
|
||
reviewer: str,
|
||
pr_author: str,
|
||
pr: dict,
|
||
repo: str,
|
||
pr_number: int,
|
||
) -> None:
|
||
"""提取并发送 Review body 中的 @mention 通知(COMMENTED / 非 COMMENTED 通用)。"""
|
||
mentions = extract_mentions(review_body, reviewer)
|
||
if mentions:
|
||
auto_targets = [pr_author]
|
||
await _send_mention_mails(
|
||
mentions=mentions,
|
||
auto_targets=auto_targets,
|
||
source_type="Review",
|
||
mention_type="Review @mention",
|
||
source_url=pr.get("html_url", ""),
|
||
commenter=reviewer,
|
||
content=review_body,
|
||
repo=repo,
|
||
issue_number=pr_number,
|
||
is_pr=True,
|
||
)
|
||
|
||
|
||
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
|
||
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
|
||
|
||
支持两种 payload 格式:
|
||
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
|
||
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
|
||
"""
|
||
review = payload.get("review")
|
||
if not review or not isinstance(review, dict):
|
||
logger.warning(
|
||
"pull_request_review event missing review field, skipping")
|
||
return
|
||
pr = payload.get("pull_request")
|
||
if not pr or not isinstance(pr, dict):
|
||
logger.warning(
|
||
"pull_request_review event missing pull_request field, skipping")
|
||
return
|
||
|
||
# 兼容两种 payload 格式提取 state
|
||
state = review.get("state", "")
|
||
if not state:
|
||
# org webhook 格式:review.type = "pull_request_review_approved"
|
||
review_type = review.get("type", "")
|
||
type_map = {
|
||
"pull_request_review_approved": "APPROVED",
|
||
"pull_request_review_rejected": "REQUEST_CHANGES",
|
||
"pull_request_review_comment": "COMMENTED",
|
||
}
|
||
state = type_map.get(review_type, "")
|
||
|
||
repo = _repo_fullname(payload)
|
||
pr_number = pr.get("number", 0)
|
||
pr_title = pr.get("title", "")
|
||
pr_author = pr.get("user", {}).get("login", "unknown")
|
||
# 兼容:org webhook 的 review 没有 user,从 sender 取
|
||
reviewer = review.get(
|
||
"user",
|
||
{}).get(
|
||
"login",
|
||
"") or payload.get(
|
||
"sender",
|
||
{}).get(
|
||
"login",
|
||
"unknown")
|
||
review_body = review.get("body", "") or review.get("content", "(无评论)")
|
||
|
||
if state == "COMMENTED":
|
||
# Review 评论 → 通知 PR 作者
|
||
review_body = review.get("body", "") or review.get("content", "(无评论)")
|
||
reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown")
|
||
|
||
text = render_template("review_comment", {
|
||
"repo": repo,
|
||
"pr_number": str(pr_number),
|
||
"pr_title": pr_title,
|
||
"reviewer": reviewer,
|
||
"comment_body": review_body,
|
||
})
|
||
|
||
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
|
||
_send_toolchain_task(
|
||
to_agent=pr_author,
|
||
title=title,
|
||
description=text,
|
||
event_type="review_comment",
|
||
action_type="review_comment",
|
||
steps=[
|
||
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments)",
|
||
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
],
|
||
context_data={
|
||
"pr_number": pr_number,
|
||
"repo": repo,
|
||
"pr_title": pr_title,
|
||
"reviewer": reviewer,
|
||
"comment_body": review_body,
|
||
},
|
||
)
|
||
|
||
# S5: Review body @mention 通知(COMMENTED 路径)
|
||
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
|
||
|
||
return
|
||
|
||
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
|
||
if state not in result_map:
|
||
return
|
||
result = result_map[state]
|
||
|
||
text = render_template("review_result", {
|
||
"repo": repo,
|
||
"pr_number": str(pr_number),
|
||
"pr_title": pr_title,
|
||
"reviewer": reviewer,
|
||
"result": result,
|
||
"review_body": review_body,
|
||
})
|
||
|
||
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
|
||
if state == "APPROVED":
|
||
tc_steps = [
|
||
f"合并 PR(Gitea API: POST /repos/{repo}/pulls/{pr_number}/merge)",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
]
|
||
else: # REQUEST_CHANGES
|
||
tc_steps = [
|
||
"按审查意见逐条修改代码",
|
||
"push 到原分支 → CI 自动跑",
|
||
"CI 通过后等重新 Review",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
]
|
||
_send_toolchain_task(
|
||
to_agent=pr_author,
|
||
title=title,
|
||
description=text,
|
||
event_type="review_result",
|
||
action_type="review_result",
|
||
steps=tc_steps,
|
||
context_data={
|
||
"pr_number": pr_number,
|
||
"repo": repo,
|
||
"pr_title": pr_title,
|
||
"result": result,
|
||
"reviewer": reviewer,
|
||
"review_body": review_body,
|
||
},
|
||
)
|
||
|
||
# S5: Review body @mention 通知(非 COMMENTED 路径)
|
||
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
|
||
|
||
|
||
async def _fetch_latest_reviewer(repo: str, pr_number: int) -> str:
|
||
"""查询 PR 最近一次非 PENDING review 的提交者。
|
||
|
||
Returns:
|
||
reviewer login 或空字符串
|
||
"""
|
||
if not _GITEA_TOKEN:
|
||
return ""
|
||
|
||
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/reviews"
|
||
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
resp = await client.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
reviews = resp.json()
|
||
|
||
# 取最后一个非 PENDING 的 review 的 user
|
||
for review in reversed(reviews):
|
||
state = review.get("state", "")
|
||
if state in ("APPROVED", "REQUEST_CHANGES", "COMMENTED"):
|
||
user = review.get("user", {})
|
||
return user.get("login", "")
|
||
except Exception as e:
|
||
logger.warning("Failed to fetch reviews for %s#%d: %s", repo, pr_number, e)
|
||
|
||
return ""
|
||
|
||
|
||
async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
|
||
"""PR 更新(新 push)→ 通知 reviewer 重新 review。
|
||
|
||
查询最近一次 review 的提交者作为通知目标。
|
||
只在有 review 历史时才通知(避免和 opened 重复)。
|
||
"""
|
||
pr = payload.get("pull_request")
|
||
if not pr or not isinstance(pr, dict):
|
||
return
|
||
|
||
repo = _repo_fullname(payload)
|
||
pr_number = pr.get("number", 0)
|
||
pr_title = pr.get("title", "")
|
||
pr_author = pr.get("user", {}).get("login", "unknown")
|
||
new_sha = pr.get("head", {}).get("sha", "unknown")[:12]
|
||
|
||
# 查询最近 review 的提交者
|
||
reviewer = await _fetch_latest_reviewer(repo, pr_number)
|
||
if not reviewer:
|
||
# 没有已有 review 历史,fallback 到默认 reviewer
|
||
reviewer = "simayi-challenger"
|
||
logger.info("No review history for PR #%s, using default reviewer %s", pr_number, reviewer)
|
||
|
||
text = render_template("review_updated", {
|
||
"repo": repo,
|
||
"pr_number": str(pr_number),
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"new_sha": new_sha,
|
||
"reviewer": reviewer,
|
||
})
|
||
|
||
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
|
||
_send_toolchain_task(
|
||
to_agent=reviewer,
|
||
title=title,
|
||
description=text,
|
||
event_type="review_updated",
|
||
action_type="review_updated",
|
||
steps=[
|
||
f"读取 PR diff(Gitea API: GET /repos/{repo}/pulls/{pr_number}.diff)",
|
||
"重点检查上次 Review 意见的修改部分",
|
||
f"提交 Review(Gitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
],
|
||
context_data={
|
||
"pr_number": pr_number,
|
||
"repo": repo,
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"new_sha": new_sha,
|
||
"reviewer": reviewer,
|
||
},
|
||
)
|
||
|
||
|
||
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
|
||
"""CD 部署失败通知,走 ToolchainHandler。"""
|
||
text = render_template("deploy_failure", {
|
||
"repo": repo,
|
||
"commit_sha": f"PR #{pr_number}",
|
||
})
|
||
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
|
||
full_text = f"{text}\n\n失败原因: {reason}"
|
||
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
|
||
_send_toolchain_task(
|
||
to_agent=agent_id,
|
||
title=title,
|
||
description=full_text,
|
||
event_type="deploy_failure",
|
||
action_type="deploy_failure",
|
||
steps=[
|
||
"检查 deploy 日志",
|
||
"根据 deploy 日志判断失败原因类型:\n a. 代码/配置问题(rsync 路径错、依赖缺失、启动失败)→ 修复 → 重新部署\n b. 基础设施问题(Gitea 不可用、网络不通、磁盘满、SSH 故障)→ 在该仓库创建 Issue 指派 jiangwei-infra(见下方「需要创建 Issue 时」),label 必须包含 type/infrastructure",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)— 报告中说明判断的原因类型和执行的操作",
|
||
],
|
||
context_data={
|
||
"repo": repo,
|
||
"pr_number": pr_number,
|
||
"pr_title": pr_title,
|
||
"reason": reason,
|
||
},
|
||
)
|
||
|
||
|
||
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
|
||
"""PR closed → 如果 merged,通知 PR 作者。"""
|
||
pr = payload.get("pull_request")
|
||
if not pr or not isinstance(pr, dict):
|
||
return
|
||
|
||
# 只处理 merged 的 PR
|
||
if not pr.get("merged", False):
|
||
return
|
||
|
||
repo = _repo_fullname(payload)
|
||
pr_number = pr.get("number", 0)
|
||
pr_title = pr.get("title", "")
|
||
pr_author = pr.get("user", {}).get("login", "unknown")
|
||
# merged_by 可能不在 payload 中,fallback 到 sender
|
||
merged_by = (
|
||
pr.get("merged_by", {}).get("login", "")
|
||
or payload.get("sender", {}).get("login", "unknown")
|
||
)
|
||
|
||
text = render_template("review_merged", {
|
||
"repo": repo,
|
||
"pr_number": str(pr_number),
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"merged_by": merged_by,
|
||
})
|
||
|
||
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
|
||
_send_toolchain_task(
|
||
to_agent=pr_author,
|
||
title=title,
|
||
description=text,
|
||
event_type="review_merged",
|
||
action_type="review_merged",
|
||
steps=[], # 纯通知,无步骤
|
||
context_data={
|
||
"pr_number": pr_number,
|
||
"repo": repo,
|
||
"pr_title": pr_title,
|
||
"pr_author": pr_author,
|
||
"merged_by": merged_by,
|
||
},
|
||
)
|
||
|
||
# 自动部署:git pull + rsync + 按需 post_deploy
|
||
try:
|
||
import yaml
|
||
|
||
# 加载部署配置
|
||
config_path = Path(__file__).parent.parent.parent / "config" / "deploy-targets.yaml"
|
||
if not config_path.exists():
|
||
return
|
||
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
deploy_config = yaml.safe_load(f) or {}
|
||
|
||
targets = deploy_config.get("targets", {})
|
||
target = targets.get(repo)
|
||
if not target:
|
||
return # 该仓库不在部署配置中,跳过
|
||
|
||
dev_dir = os.path.expanduser(target["dev_dir"])
|
||
install_dir = os.path.expanduser(target.get("install_dir", target["dev_dir"]))
|
||
rsync_excludes = target.get("rsync_exclude", [])
|
||
|
||
# Step 1: git pull in dev dir
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"git", "pull", "origin", "main",
|
||
cwd=dev_dir,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
|
||
|
||
if proc.returncode != 0:
|
||
logger.warning("Auto-deploy: git pull failed for %s: %s", repo, stderr.decode())
|
||
return
|
||
|
||
logger.info("Auto-deploy: git pull success for %s", repo)
|
||
|
||
# Step 2: rsync to install dir
|
||
rsync_args = ["rsync", "-a"]
|
||
for exc in rsync_excludes:
|
||
rsync_args.extend(["--exclude", exc])
|
||
rsync_args.extend([f"{dev_dir}/", f"{install_dir}/"])
|
||
|
||
rsync_proc = await asyncio.create_subprocess_exec(
|
||
*rsync_args,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
_, rsync_err = await asyncio.wait_for(rsync_proc.communicate(), timeout=60)
|
||
|
||
if rsync_proc.returncode != 0:
|
||
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
|
||
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
|
||
return
|
||
|
||
# Step 3: 判断是否需要执行 post_deploy
|
||
files = await _fetch_pr_files(repo, pr_number)
|
||
file_list = files[0]
|
||
needs_restart = any(
|
||
f.startswith("src/") or f.startswith("templates/") or f.startswith("frontend/") or f.endswith(".py")
|
||
for f in file_list
|
||
)
|
||
|
||
if needs_restart:
|
||
post_deploy_cmds = target.get("post_deploy", [])
|
||
pm2_name = target.get("pm2_name", "")
|
||
for cmd in post_deploy_cmds:
|
||
logger.info("Auto-deploy: executing post_deploy: %s", cmd)
|
||
|
||
# M2: 检测当前进程是否会被此命令杀掉(而非脆弱的字符串匹配)
|
||
# 通过 PM2 环境变量判断:pm2 启动的进程有 PM2_HOME
|
||
self_restart = False
|
||
if pm2_name and os.environ.get("PM2_HOME") and "pm2 restart" in cmd:
|
||
# 检查命令是否包含当前进程名
|
||
if re.search(rf'pm2\s+restart\s+{re.escape(pm2_name)}', cmd):
|
||
self_restart = True
|
||
|
||
if self_restart:
|
||
# M1: 用 asyncio.sleep 延迟而非 nohup,保留子进程输出和错误检测
|
||
# 先 sleep 让 handler 正常返回,再启动 restart 命令
|
||
# restart 的子进程会在父进程死后被 pm2 新进程接管
|
||
logger.info("Auto-deploy: self-restart detected, deferring 2s: %s", cmd)
|
||
await asyncio.sleep(2)
|
||
deploy_proc = await asyncio.create_subprocess_exec(
|
||
"sh", "-c", cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
# restart 会杀掉当前进程,communicate 可能不会完成
|
||
# 但我们至少尝试读取输出
|
||
try:
|
||
_, deploy_err = await asyncio.wait_for(
|
||
deploy_proc.communicate(), timeout=10)
|
||
except (asyncio.TimeoutError, ProcessLookupError):
|
||
# 预期行为:进程被 pm2 restart 杀掉
|
||
logger.info("Auto-deploy: process killed by self-restart (expected)")
|
||
break
|
||
else:
|
||
deploy_proc = await asyncio.create_subprocess_exec(
|
||
"sh", "-c", cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
_, deploy_err = await asyncio.wait_for(deploy_proc.communicate(), timeout=30)
|
||
|
||
if deploy_proc.returncode != 0:
|
||
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
|
||
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
|
||
break
|
||
else:
|
||
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
|
||
else:
|
||
logger.info("Auto-deploy: docs-only change for %s, skip post_deploy", repo)
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.error("Auto-deploy: timeout for %s", repo)
|
||
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时")
|
||
except Exception as e:
|
||
logger.error("Auto-deploy: unexpected error: %s", e)
|
||
|
||
|
||
async def _handle_issues(payload: Dict[str, Any]) -> None:
|
||
"""处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。"""
|
||
action = payload.get("action", "")
|
||
issue = payload.get("issue")
|
||
if not issue or not isinstance(issue, dict):
|
||
logger.warning("issues event missing issue field, skipping")
|
||
return
|
||
repo = _repo_fullname(payload)
|
||
issue_number = issue.get("number", 0)
|
||
issue_title = issue.get("title", "")
|
||
|
||
if action == "assigned":
|
||
assignee = ""
|
||
assignees = issue.get("assignees") or []
|
||
if not assignees:
|
||
single = issue.get("assignee")
|
||
if single and isinstance(single, dict):
|
||
assignees = [single]
|
||
if assignees:
|
||
assignee = assignees[-1].get("login", "")
|
||
else:
|
||
assignee = ""
|
||
if not assignee:
|
||
logger.debug("Issue assigned but no assignee found, skipping")
|
||
return
|
||
|
||
labels_list = [lbl.get("name", "")
|
||
for lbl in (issue.get("labels") or [])]
|
||
labels = ", ".join(labels_list) if labels_list else "(无标签)"
|
||
issue_body = issue.get("body", "(无描述)")
|
||
brief = issue_title[:20].replace(" ", "-").lower()
|
||
|
||
text = render_template("issue_assigned", {
|
||
"repo": repo,
|
||
"issue_number": str(issue_number),
|
||
"issue_title": issue_title,
|
||
"labels": labels,
|
||
"issue_body": issue_body or "(无描述)",
|
||
"brief": brief,
|
||
})
|
||
|
||
# 检查是否是基础设施 Issue(按 label 分流)
|
||
is_infrastructure = any("infrastructure" in lbl.lower() for lbl in labels_list)
|
||
|
||
if is_infrastructure:
|
||
infra_steps = [
|
||
"根据 Issue body 中的错误来源和日志片段排查问题",
|
||
"修复基础设施问题(如修复 CI runner 环境、恢复网络、重启服务等)",
|
||
"修复后在 Issue 上 comment 说明修复方式和结果",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
]
|
||
title = f"基础设施 Issue: {issue_title} ({repo}#{issue_number})"
|
||
_send_toolchain_task(
|
||
to_agent=assignee,
|
||
title=title,
|
||
description=text,
|
||
event_type="infrastructure_failure",
|
||
action_type="infrastructure_failure",
|
||
steps=infra_steps,
|
||
context_data={
|
||
"issue_number": issue_number,
|
||
"repo": repo,
|
||
"issue_title": issue_title,
|
||
"labels": labels,
|
||
"issue_body": issue_body or "(无描述)",
|
||
"brief": brief,
|
||
},
|
||
)
|
||
else:
|
||
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
|
||
_send_toolchain_task(
|
||
to_agent=assignee,
|
||
title=title,
|
||
description=text,
|
||
event_type="issue_assigned",
|
||
action_type="issue_assigned",
|
||
steps=[
|
||
f"创建分支 fix/{issue_number}-{brief}",
|
||
"编码 + 写 UT",
|
||
"push → 等 CI",
|
||
f"CI 通过后创建 PR(Gitea API: POST /repos/{repo}/pulls)",
|
||
"等 Review",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)",
|
||
],
|
||
context_data={
|
||
"issue_number": issue_number,
|
||
"repo": repo,
|
||
"issue_title": issue_title,
|
||
"labels": labels,
|
||
"issue_body": issue_body or "(无描述)",
|
||
"brief": brief,
|
||
},
|
||
)
|
||
|
||
elif action == "opened":
|
||
if "部署失败" in issue_title:
|
||
# 从 Issue body 提取 commit hash(Gitea deploy workflow 格式)
|
||
sha_match = re.search(r'[0-9a-f]{40}', issue.get("body", ""))
|
||
commit_sha = sha_match.group(0) if sha_match else "(未知)"
|
||
|
||
text = render_template("deploy_failure", {
|
||
"repo": repo,
|
||
"commit_sha": commit_sha or "(未知)",
|
||
})
|
||
|
||
title = f"部署失败: {repo}"
|
||
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
|
||
_send_toolchain_task(
|
||
to_agent=agent_id,
|
||
title=title,
|
||
description=text,
|
||
event_type="deploy_failure",
|
||
action_type="deploy_failure",
|
||
steps=[
|
||
"检查 deploy 日志",
|
||
"根据 deploy 日志判断失败原因类型:\n a. 代码/配置问题(rsync 路径错、依赖缺失、启动失败)→ 修复 → 重新部署\n b. 基础设施问题(Gitea 不可用、网络不通、磁盘满、SSH 故障)→ 在该仓库创建 Issue 指派 jiangwei-infra(见下方「需要创建 Issue 时」),label 必须包含 type/infrastructure",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)— 报告中说明判断的原因类型和执行的操作",
|
||
],
|
||
context_data={
|
||
"repo": repo,
|
||
"commit_sha": commit_sha or "(未知)",
|
||
},
|
||
)
|
||
|
||
# Issue body @mention(opened 时检查)
|
||
issue_body = issue.get("body", "") or ""
|
||
sender = payload.get("sender", {}).get("login", "")
|
||
mentions = extract_mentions(issue_body, sender)
|
||
if mentions:
|
||
# 自动流转已通知 assignee
|
||
assignees = issue.get("assignees") or []
|
||
if not assignees:
|
||
single = issue.get("assignee")
|
||
if single and isinstance(single, dict):
|
||
assignees = [single]
|
||
auto_targets = [a.get("login", "") for a in assignees if isinstance(a, dict)]
|
||
await _send_mention_mails(
|
||
mentions=mentions,
|
||
auto_targets=auto_targets,
|
||
source_type="Issue",
|
||
mention_type="Issue @mention",
|
||
source_url=issue.get("html_url", ""),
|
||
commenter=sender,
|
||
content=issue_body,
|
||
repo=repo,
|
||
issue_number=issue_number,
|
||
is_pr=False,
|
||
)
|
||
|
||
|
||
async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
|
||
"""处理 issue_comment 事件:CI 失败关键词 → 通知 PR 作者;@mention → 通知被提及者。"""
|
||
comment = payload.get("comment")
|
||
if not comment or not isinstance(comment, dict):
|
||
logger.warning("issue_comment event missing comment field, skipping")
|
||
return
|
||
body = comment.get("body", "")
|
||
sender = comment.get("user", {}).get("login", "")
|
||
|
||
issue = payload.get("issue")
|
||
if not issue or not isinstance(issue, dict):
|
||
logger.warning("issue_comment event missing issue field, skipping")
|
||
return
|
||
|
||
action = payload.get("action", "")
|
||
if action != "created":
|
||
return
|
||
|
||
# === 路径 1:CI 失败通知(原有逻辑,改为正向 if) ===
|
||
if ("[CI]" in body or "CI 失败" in body) and issue.get("state") != "closed":
|
||
repo = _repo_fullname(payload)
|
||
issue_number = issue.get("number", 0)
|
||
|
||
# 尝试从关联 PR 获取信息
|
||
pr_author = issue.get("user", {}).get("login", "unknown")
|
||
branch_match = re.search(r"分支:\s*(\S+)", body)
|
||
branch = branch_match.group(1) if branch_match else "(未知)"
|
||
|
||
# 提取错误摘要(取 comment body 前 500 字符)
|
||
error_summary = body[:500] if body else "(无错误信息)"
|
||
|
||
text = render_template("ci_failure", {
|
||
"repo": repo,
|
||
"pr_number": str(issue_number),
|
||
"branch": branch,
|
||
"error_summary": error_summary,
|
||
})
|
||
|
||
title = f"CI 失败: {repo}#{issue_number}"
|
||
_send_toolchain_task(
|
||
to_agent=pr_author,
|
||
title=title,
|
||
description=text,
|
||
event_type="ci_failure",
|
||
action_type="ci_failure",
|
||
steps=[
|
||
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
|
||
"根据 CI 日志判断失败原因类型:\n a. 代码问题(lint/test 失败)→ 修复失败的测试 → push 到原分支 → CI 自动重跑\n b. 基础设施问题(runner 环境/Python/venv/Gitea/网络故障)→ 在该仓库创建 Issue 指派 jiangwei-infra(见下方「需要创建 Issue 时」),label 必须包含 type/infrastructure",
|
||
"提交 action report(POST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/comments,comment_type=action_report)— 报告中说明判断的原因类型和执行的操作",
|
||
],
|
||
context_data={
|
||
"pr_number": issue_number,
|
||
"repo": repo,
|
||
"branch": branch,
|
||
"error_summary": error_summary,
|
||
},
|
||
)
|
||
# CI 处理完不 return,继续检查 @mention
|
||
|
||
# === 路径 2:@mention 通知(新增,独立路径) ===
|
||
# 注意:@mention 检测与 CI 检测是独立的,同一条评论可同时触发两者
|
||
mentions = extract_mentions(body, sender)
|
||
if mentions:
|
||
# 判断是 PR 还是 Issue(Gitea 中 PR 本质是特殊的 Issue)
|
||
is_pr = issue.get("pull_request") is not None
|
||
source_type = "PR" if is_pr else "Issue"
|
||
mention_type = "PR @mention" if is_pr else "Issue @mention"
|
||
|
||
issue_number = issue.get("number", 0)
|
||
repo = _repo_fullname(payload)
|
||
|
||
# 自动流转已通知的人(CI 失败通知的 PR 作者)
|
||
auto_targets: list[str] = []
|
||
if ("[CI]" in body or "CI 失败" in body) and issue.get("state") != "closed":
|
||
auto_targets.append(issue.get("user", {}).get("login", ""))
|
||
|
||
await _send_mention_mails(
|
||
mentions=mentions,
|
||
auto_targets=auto_targets,
|
||
source_type=source_type,
|
||
mention_type=mention_type,
|
||
source_url=issue.get("html_url", ""),
|
||
commenter=sender,
|
||
content=body,
|
||
repo=repo,
|
||
issue_number=issue_number,
|
||
is_pr=is_pr,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 事件分发
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_EVENT_HANDLERS: Dict[str, Any] = {
|
||
"pull_request": _handle_pull_request,
|
||
"pull_request_sync": _handle_pr_synchronize, # Gitea: PR branch push 是独立事件类型
|
||
"pull_request_review": _handle_pull_request_review,
|
||
"pull_request_review_approved": _handle_pull_request_review,
|
||
"pull_request_review_rejected": _handle_pull_request_review,
|
||
"pull_request_review_comment": _handle_pull_request_review,
|
||
"pull_request_comment": _handle_pull_request_review, # Gitea: review comment 独立事件类型
|
||
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
|
||
"pull_request_approved": _handle_pull_request_review,
|
||
"pull_request_rejected": _handle_pull_request_review,
|
||
"issues": _handle_issues,
|
||
"issue_comment": _handle_issue_comment,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Webhook 端点
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@router.post("/webhook/gitea")
|
||
async def gitea_webhook(
|
||
request: Request,
|
||
x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"),
|
||
x_gitea_delivery: Optional[str] = Header(None, alias="X-Gitea-Delivery"),
|
||
x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"),
|
||
) -> Response:
|
||
"""Gitea Webhook 接收端点。
|
||
|
||
处理流程:签名验证 → 幂等检查 → 事件分发 → Mail 推送。
|
||
|
||
返回策略:
|
||
- payload 解析失败 / 未知事件 / 幂等重复 → 200(不触发重试)
|
||
- Mail 创建失败 → 500(触发 Gitea 重试)
|
||
"""
|
||
body = await request.body()
|
||
|
||
# 1. 签名验证
|
||
if not _verify_signature(body, x_gitea_signature):
|
||
logger.warning("Webhook signature verification failed")
|
||
return Response(status_code=403,
|
||
content="signature verification failed")
|
||
|
||
# 3. 解析 payload(提前解析,用于幂等检查)
|
||
try:
|
||
payload = await request.json()
|
||
except Exception:
|
||
logger.warning("Failed to parse webhook payload")
|
||
return Response(status_code=200, content="invalid payload")
|
||
|
||
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
|
||
if x_gitea_event and x_gitea_delivery:
|
||
async with _get_idempotency_lock():
|
||
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
|
||
logger.debug(
|
||
"Duplicate webhook: %s/%s",
|
||
x_gitea_event,
|
||
x_gitea_delivery)
|
||
return Response(status_code=200, content="duplicate")
|
||
|
||
# 4. 查找 handler
|
||
action = payload.get("action", "")
|
||
logger.info("[WEBHOOK] event=%s action=%s delivery=%s", x_gitea_event, action, x_gitea_delivery)
|
||
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
|
||
if not handler:
|
||
logger.info("[WEBHOOK] Unhandled event type: %s", x_gitea_event)
|
||
return Response(status_code=200,
|
||
content=f"unhandled event: {x_gitea_event}")
|
||
|
||
# 5. 执行 handler
|
||
try:
|
||
await handler(payload)
|
||
except Exception:
|
||
logger.exception("Mail creation failed for %s event", x_gitea_event)
|
||
return Response(status_code=500, content="mail creation failed")
|
||
|
||
return Response(status_code=200, content="ok")
|