Files
sanguo_moziplus_v2/src/api/toolchain_routes.py
T
cfdaily 6e6b52fe3b
CI / lint (pull_request) Successful in 12s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 0s
fix: asyncio.Lock 懒加载防 event loop 关闭后 import 失败
2026-06-14 00:09:27 +08:00

1255 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""API 路由 — 工具链事件中枢(Toolchain Event Hub
接收 Gitea Webhook,翻译成 Mail 通知推送给 Agent。
端点: POST /webhook/gitea
支持事件: pull_request, pull_request_review, issues, issue_comment
"""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
import logging
import os
import re
import time
from datetime import datetime
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Set, Tuple
import httpx
from fastapi import APIRouter, Header, Request, Response
from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
from src.api.mention_utils import (
extract_mentions,
should_suppress_mention,
infer_intent,
_build_response_guidance,
GITEA_API_BASE,
)
from src.daemon.toolchain_templates import render_template
from src.utils import get_data_root
logger = logging.getLogger(__name__)
router = APIRouter(tags=["toolchain"])
# ---------------------------------------------------------------------------
# 幂等检查:内存 set,保留最近 7 天
# ---------------------------------------------------------------------------
# 使用内存 set 而非 SQLite(设计文档原计划 SQLite,简化实现:daemon 重启不频繁,
# 重启后丢失可接受,Webhook 重试窗口内不会重复)
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock: Optional[asyncio.Lock] = None
def _get_idempotency_lock() -> asyncio.Lock:
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
global _idempotency_lock
if _idempotency_lock is None:
_idempotency_lock = asyncio.Lock()
return _idempotency_lock
def _is_duplicate(event: str, delivery: str,
payload: Optional[Dict[str, Any]] = None) -> bool:
"""检查 Webhook 是否重复投递,自动清理过期条目。
双重去重策略:
1. delivery UUID 去重(标准幂等)
2. payload 内容去重(应对 Gitea v1.23.4 的 webhookNotifier + actionsNotifier
对同一 review 生成不同 UUID 的双投递问题)
"""
now = time.time()
# 清理过期条目
while _delivery_timestamps and (
now - _delivery_timestamps[0][0]) > _TTL_SECONDS:
_, key = _delivery_timestamps.pop(0)
_delivery_cache.discard(key)
# 检查 delivery UUID 去重
key = f"{event}-{delivery}"
if key in _delivery_cache:
return True
# 检查 payload 内容去重(review 事件:同一 PR + 同一用户 + 同一内容)
# 注意:Gitea webhookNotifier 用 review.bodyactionsNotifier 用 review.content
# 所以去重 key 需要同时取两个字段,确保两种格式生成相同 key
if payload and "review" in event:
pr_num = payload.get("pull_request", {}).get("number")
sender = payload.get("sender", {}).get("login")
review = payload.get("review", {})
# 取 body 或 content,优先 bodywebhookNotifier 格式)
content = review.get("body", "") or review.get("content", "")
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]
review_id = review.get("id", "")
content_key = f"content:{event}:{pr_num}:{sender}:{review_id}:{content_hash}"
if content_key in _delivery_cache:
logger.info(
"Content-based duplicate detected: %s PR#%s by %s",
event,
pr_num,
sender)
return True
_delivery_cache.add(content_key)
_delivery_timestamps.append((now, content_key))
_delivery_cache.add(key)
_delivery_timestamps.append((now, key))
return False
# ---------------------------------------------------------------------------
# 签名验证
# ---------------------------------------------------------------------------
_WEBHOOK_SECRET: Optional[str] = os.environ.get("GITEA_WEBHOOK_SECRET")
def _verify_signature(body: bytes, signature: Optional[str]) -> bool:
"""验证 HMAC-SHA256 签名。secret 为空时跳过验签。"""
if not _WEBHOOK_SECRET:
return True
if not signature:
return False
expected = hmac.new(
_WEBHOOK_SECRET.encode(), body, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
# ---------------------------------------------------------------------------
# Gitea API 调用
# ---------------------------------------------------------------------------
_GITEA_TOKEN: str = os.environ.get("GITEA_TOKEN", "")
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]:
"""获取 PR 文件列表,含重试机制。
Returns:
(文件列表, 错误信息) — 成功时错误信息为空字符串
"""
if not _GITEA_TOKEN:
return [], "GITEA_TOKEN 未配置"
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files"
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
last_error = ""
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
files: List[Dict[str, Any]] = resp.json()
return [f.get("filename", "") for f in files], ""
except Exception as e:
last_error = str(e)
if attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
logger.warning(
"Retry %d/3 fetching PR files: %s/pulls/%d",
attempt + 1,
repo,
pr_number)
logger.warning(
"Failed to fetch PR files after 3 retries: %s/pulls/%d - %s",
repo,
pr_number,
last_error)
return [], f"获取文件列表失败(重试3次): {last_error}"
# ---------------------------------------------------------------------------
# 风险级别判定
# ---------------------------------------------------------------------------
_HIGH_PATTERNS = [
"**/spawner*", "**/ticker*", "**/dispatcher*",
"**/router*", "**/guardrails*", "**/strategy*", "**/risk*",
]
def _calc_risk_level(changed_files: List[str]) -> str:
"""根据改动文件列表判定风险级别。"""
for filepath in changed_files:
for pattern in _HIGH_PATTERNS:
if PurePath(filepath).match(pattern):
return "high"
return "standard"
# ---------------------------------------------------------------------------
# Mail 创建
# ---------------------------------------------------------------------------
MAIL_PROJECT_ID = "_mail"
TOOLCHAIN_PROJECT_ID = "_toolchain"
def _mail_db_path() -> Path:
"""获取 Mail 数据库路径,确保目录存在。"""
root = get_data_root()
db = root / MAIL_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _toolchain_db_path() -> Path:
"""获取 Toolchain 数据库路径,确保目录和表存在。"""
root = get_data_root()
db = root / TOOLCHAIN_PROJECT_ID / "blackboard.db"
db.parent.mkdir(parents=True, exist_ok=True)
init_db(db)
return db
def _send_toolchain_task(
to_agent: str,
title: str,
description: str,
event_type: str,
action_type: str,
steps: list,
context_data: dict | None = None,
source: str = "webhook",
) -> str:
"""创建 Toolchain Task 并写入 _toolchain DB。
Args:
to_agent: 收件人 Agent ID
title: 任务标题
description: 任务描述(模板渲染后的事件信息)
event_type: 事件类型(review_result / ci_failure / ...
action_type: 动作分类(用于步骤选择和日志统计)
steps: 结构化编号步骤列表
context_data: 事件上下文数据(PR 号、仓库名等)
source: 来源标识
Returns:
创建的 Task ID
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping toolchain task", to_agent)
return ""
task_id = f"tc-{int(datetime.now().timestamp() * 1000)}"
must_hives = json.dumps({
"event_type": event_type,
"action_type": action_type,
"steps": steps,
"context": context_data or {},
"from": "system",
"source": source,
}, ensure_ascii=False)
task = Task(
id=task_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=must_hives,
task_type="toolchain",
status="pending",
)
bb = Blackboard(_toolchain_db_path())
bb.create_task(task)
logger.info(
"Toolchain task sent: %s%s [%s] action_type=%s",
title[:40], to_agent, task_id, action_type,
)
return task_id
def _send_mail(
to_agent: str,
title: str,
description: str,
source: str = "webhook",
) -> str:
"""创建 Mail Task 并写入数据库。
Args:
to_agent: 收件人 Agent ID
title: 邮件标题
description: 邮件正文
source: 来源标识
Returns:
创建的 Mail ID
Raises:
Exception: 数据库写入失败
"""
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping mail", to_agent)
return ""
mail_id = f"mail-{int(datetime.now().timestamp() * 1000)}"
notify_meta = {
"type": "inform",
"performative": "inform",
"is_read": False,
"conversation_id": f"conv-{mail_id}",
"from": "system",
"source": source,
}
task = Task(
id=mail_id,
title=title,
description=description,
assignee=to_agent,
assigned_by="system",
must_haves=json.dumps(notify_meta, ensure_ascii=False),
task_type="mail",
status="pending",
)
bb = Blackboard(_mail_db_path())
bb.create_task(task)
logger.info("Mail sent: %s%s [%s]", title[:40], to_agent, mail_id)
return mail_id
# ---------------------------------------------------------------------------
# 辅助:从 payload 提取仓库全名
# ---------------------------------------------------------------------------
def _repo_fullname(payload: Dict[str, Any]) -> str:
"""从 Webhook payload 提取仓库全名(owner/repo)。"""
repo = payload.get("repository") or {}
return repo.get("full_name", "")
# ---------------------------------------------------------------------------
# @mention 通用发送函数
# ---------------------------------------------------------------------------
async def _send_mention_mails(
mentions: list[str],
auto_targets: list[str],
source_type: str,
mention_type: str,
source_url: str,
commenter: str,
content: str,
repo: str,
issue_number: int,
is_pr: bool,
) -> None:
"""通用 @mention Mail 发送函数。
自动抑制已在 auto_targets 中的 Agent,避免双重通知。
根据内容推断意图,生成不同的响应指引。
"""
# 确定 API 路径
if is_pr:
detail_api = f"pulls/{issue_number}"
comments_api = f"issues/{issue_number}/comments"
else:
detail_api = f"issues/{issue_number}"
comments_api = f"issues/{issue_number}/comments"
for agent_id in mentions:
if should_suppress_mention(agent_id, auto_targets):
logger.info(
"Mention suppressed for %s (already notified by auto flow)",
agent_id)
continue
# 从 api_path 提取编号用于标题,如 "issues/32" → "#32"
number_str = f"#{issue_number}" if issue_number else ""
intent = infer_intent(content)
intent_hint = {"help": "求助", "notify": "通知关注",
"collaborate": "协作请求", "assign": "分配子任务"}[intent]
# 生成响应指引
guidance = _build_response_guidance(
intent=intent,
gitea_api=GITEA_API_BASE,
repo=repo,
issue_number=issue_number,
commenter=commenter,
)
text = render_template("mention", {
"mention_type": mention_type,
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"intent_hint": intent_hint,
"content_snippet": content[:500],
"gitea_api": GITEA_API_BASE,
"repo": repo,
"source_detail_api_path": detail_api,
"source_comments_api_path": comments_api,
"response_guidance": guidance,
})
title = f"@mention ({intent_hint}): {source_type} {number_str} ({repo})"
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="mention",
action_type="mention",
steps=[
"按上方 mention 模板中的 response_guidance 执行",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"source_type": source_type,
"source_url": source_url,
"commenter": commenter,
"content_snippet": content[:500],
"repo": repo,
"issue_number": issue_number,
},
)
# ---------------------------------------------------------------------------
# 事件处理函数
# ---------------------------------------------------------------------------
async def _handle_pull_request(payload: Dict[str, Any]) -> None:
"""处理 pull_request 事件:opened → 通知 reviewerclosed → merge 通知。"""
action = payload.get("action", "")
if action == "opened":
await _handle_pr_opened(payload)
elif action == "closed":
await _handle_pr_closed(payload)
elif action == "synchronize":
await _handle_pr_synchronize(payload)
async def _handle_pr_opened(payload: Dict[str, Any]) -> None:
"""PR opened → 通知 simayi-challenger。"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning(
"pull_request event missing pull_request field, skipping")
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
branch = pr.get("head", {}).get("ref", "unknown")
# 获取改动文件列表
changed_files, fetch_error = await _fetch_pr_files(repo, pr_number)
risk_level = _calc_risk_level(changed_files)
if fetch_error:
file_list = f"⚠️ {fetch_error}"
else:
file_list = "\n".join(
f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
text = render_template("review_request", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
"file_list": file_list,
})
title = f"Review 请求: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task(
to_agent="simayi-challenger",
title=title,
description=text,
event_type="review_request",
action_type="review_request",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"按审查清单审查(参考 code-review Skill",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews)— APPROVE 或 REQUEST_CHANGES",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"branch": branch,
"risk_level": risk_level,
},
)
# S3: PR body @mention 通知
pr_body = pr.get("body", "") or ""
sender = pr.get("user", {}).get("login", "")
mentions = extract_mentions(pr_body, sender)
if mentions:
# 自动流转已通知 simayi-challengerreview_request
auto_targets = ["simayi-challenger"]
await _send_mention_mails(
mentions=mentions,
auto_targets=auto_targets,
source_type="PR",
mention_type="PR @mention",
source_url=pr.get("html_url", ""),
commenter=sender,
content=pr_body,
repo=repo,
issue_number=pr_number,
is_pr=True,
)
async def _send_review_mentions(
review_body: str,
reviewer: str,
pr_author: str,
pr: dict,
repo: str,
pr_number: int,
) -> None:
"""提取并发送 Review body 中的 @mention 通知(COMMENTED / 非 COMMENTED 通用)。"""
mentions = extract_mentions(review_body, reviewer)
if mentions:
auto_targets = [pr_author]
await _send_mention_mails(
mentions=mentions,
auto_targets=auto_targets,
source_type="Review",
mention_type="Review @mention",
source_url=pr.get("html_url", ""),
commenter=reviewer,
content=review_body,
repo=repo,
issue_number=pr_number,
is_pr=True,
)
async def _handle_pull_request_review(payload: Dict[str, Any]) -> None:
"""处理 pull_request_review 事件:非 COMMENTED → 通知 PR 作者。
支持两种 payload 格式:
- repo webhook: review.state = "APPROVED" / "REQUEST_CHANGES"
- org webhook (Gitea v1.23.4): review.type = "pull_request_review_approved" / "pull_request_review_rejected"
"""
review = payload.get("review")
if not review or not isinstance(review, dict):
logger.warning(
"pull_request_review event missing review field, skipping")
return
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
logger.warning(
"pull_request_review event missing pull_request field, skipping")
return
# 兼容两种 payload 格式提取 state
state = review.get("state", "")
if not state:
# org webhook 格式:review.type = "pull_request_review_approved"
review_type = review.get("type", "")
type_map = {
"pull_request_review_approved": "APPROVED",
"pull_request_review_rejected": "REQUEST_CHANGES",
"pull_request_review_comment": "COMMENTED",
}
state = type_map.get(review_type, "")
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
# 兼容:org webhook 的 review 没有 user,从 sender 取
reviewer = review.get(
"user",
{}).get(
"login",
"") or payload.get(
"sender",
{}).get(
"login",
"unknown")
review_body = review.get("body", "") or review.get("content", "(无评论)")
if state == "COMMENTED":
# Review 评论 → 通知 PR 作者
review_body = review.get("body", "") or review.get("content", "(无评论)")
reviewer = review.get("user", {}).get("login", "") or payload.get("sender", {}).get("login", "unknown")
text = render_template("review_comment", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
})
title = f"Review 评论: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_comment",
action_type="review_comment",
steps=[
f"查看评论(Gitea API: GET /repos/{repo}/issues/{pr_number}/comments",
"根据评论内容响应(修改代码或在 PR 上回复 comment)",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"reviewer": reviewer,
"comment_body": review_body,
},
)
# S5: Review body @mention 通知(COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
return
result_map = {"APPROVED": "通过 ✓", "REQUEST_CHANGES": "驳回 ✗"}
if state not in result_map:
return
result = result_map[state]
text = render_template("review_result", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"reviewer": reviewer,
"result": result,
"review_body": review_body,
})
title = f"Review {result}: {pr_title} ({repo}#{pr_number})"
if state == "APPROVED":
tc_steps = [
f"合并 PRGitea API: POST /repos/{repo}/pulls/{pr_number}/merge",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
else: # REQUEST_CHANGES
tc_steps = [
"按审查意见逐条修改代码",
"push 到原分支 → CI 自动跑",
"CI 通过后等重新 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
]
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_result",
action_type="review_result",
steps=tc_steps,
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"result": result,
"reviewer": reviewer,
"review_body": review_body,
},
)
# S5: Review body @mention 通知(非 COMMENTED 路径)
await _send_review_mentions(review_body, reviewer, pr_author, pr, repo, pr_number)
async def _fetch_latest_reviewer(repo: str, pr_number: int) -> str:
"""查询 PR 最近一次非 PENDING review 的提交者。
Returns:
reviewer login 或空字符串
"""
if not _GITEA_TOKEN:
return ""
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/reviews"
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
reviews = resp.json()
# 取最后一个非 PENDING 的 review 的 user
for review in reversed(reviews):
state = review.get("state", "")
if state in ("APPROVED", "REQUEST_CHANGES", "COMMENTED"):
user = review.get("user", {})
return user.get("login", "")
except Exception as e:
logger.warning("Failed to fetch reviews for %s#%d: %s", repo, pr_number, e)
return ""
async def _handle_pr_synchronize(payload: Dict[str, Any]) -> None:
"""PR 更新(新 push)→ 通知 reviewer 重新 review。
查询最近一次 review 的提交者作为通知目标。
只在有 review 历史时才通知(避免和 opened 重复)。
"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
new_sha = pr.get("head", {}).get("sha", "unknown")[:12]
# 查询最近 review 的提交者
reviewer = await _fetch_latest_reviewer(repo, pr_number)
if not reviewer:
# 没有已有 review 历史,fallback 到默认 reviewer
reviewer = "simayi-challenger"
logger.info("No review history for PR #%s, using default reviewer %s", pr_number, reviewer)
text = render_template("review_updated", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
})
title = f"PR 更新: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task(
to_agent=reviewer,
title=title,
description=text,
event_type="review_updated",
action_type="review_updated",
steps=[
f"读取 PR diffGitea API: GET /repos/{repo}/pulls/{pr_number}.diff",
"重点检查上次 Review 意见的修改部分",
f"提交 ReviewGitea API: POST /repos/{repo}/pulls/{pr_number}/reviews",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"new_sha": new_sha,
"reviewer": reviewer,
},
)
def _send_deploy_failure_task(repo: str, pr_number: int, pr_title: str, reason: str) -> None:
"""CD 部署失败通知,走 ToolchainHandler。"""
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": f"PR #{pr_number}",
})
title = f"部署失败: {repo} (auto-deploy, PR #{pr_number})"
full_text = f"{text}\n\n失败原因: {reason}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=full_text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"pr_number": pr_number,
"pr_title": pr_title,
"reason": reason,
},
)
async def _handle_pr_closed(payload: Dict[str, Any]) -> None:
"""PR closed → 如果 merged,通知 PR 作者。"""
pr = payload.get("pull_request")
if not pr or not isinstance(pr, dict):
return
# 只处理 merged 的 PR
if not pr.get("merged", False):
return
repo = _repo_fullname(payload)
pr_number = pr.get("number", 0)
pr_title = pr.get("title", "")
pr_author = pr.get("user", {}).get("login", "unknown")
# merged_by 可能不在 payload 中,fallback 到 sender
merged_by = (
pr.get("merged_by", {}).get("login", "")
or payload.get("sender", {}).get("login", "unknown")
)
text = render_template("review_merged", {
"repo": repo,
"pr_number": str(pr_number),
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
})
title = f"PR 已合并: {pr_title} ({repo}#{pr_number})"
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="review_merged",
action_type="review_merged",
steps=[], # 纯通知,无步骤
context_data={
"pr_number": pr_number,
"repo": repo,
"pr_title": pr_title,
"pr_author": pr_author,
"merged_by": merged_by,
},
)
# 自动部署:git pull + rsync + 按需 post_deploy
try:
import yaml
# 加载部署配置
config_path = Path(__file__).parent.parent.parent / "config" / "deploy-targets.yaml"
if not config_path.exists():
return
with open(config_path, "r", encoding="utf-8") as f:
deploy_config = yaml.safe_load(f) or {}
targets = deploy_config.get("targets", {})
target = targets.get(repo)
if not target:
return # 该仓库不在部署配置中,跳过
dev_dir = os.path.expanduser(target["dev_dir"])
install_dir = os.path.expanduser(target.get("install_dir", target["dev_dir"]))
rsync_excludes = target.get("rsync_exclude", [])
# Step 1: git pull in dev dir
proc = await asyncio.create_subprocess_exec(
"git", "pull", "origin", "main",
cwd=dev_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
if proc.returncode != 0:
logger.warning("Auto-deploy: git pull failed for %s: %s", repo, stderr.decode())
return
logger.info("Auto-deploy: git pull success for %s", repo)
# Step 2: rsync to install dir
rsync_args = ["rsync", "-a"]
for exc in rsync_excludes:
rsync_args.extend(["--exclude", exc])
rsync_args.extend([f"{dev_dir}/", f"{install_dir}/"])
rsync_proc = await asyncio.create_subprocess_exec(
*rsync_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, rsync_err = await asyncio.wait_for(rsync_proc.communicate(), timeout=60)
if rsync_proc.returncode != 0:
logger.error("Auto-deploy: rsync failed: %s", rsync_err.decode())
_send_deploy_failure_task(repo, pr_number, pr_title, f"rsync 失败: {rsync_err.decode()}")
return
# Step 3: 判断是否需要执行 post_deploy
files = await _fetch_pr_files(repo, pr_number)
file_list = files[0]
needs_restart = any(
f.startswith("src/") or f.startswith("templates/") or f.startswith("frontend/") or f.endswith(".py")
for f in file_list
)
if needs_restart:
post_deploy_cmds = target.get("post_deploy", [])
pm2_name = target.get("pm2_name", "")
for cmd in post_deploy_cmds:
logger.info("Auto-deploy: executing post_deploy: %s", cmd)
# M2: 检测当前进程是否会被此命令杀掉(而非脆弱的字符串匹配)
# 通过 PM2 环境变量判断:pm2 启动的进程有 PM2_HOME
self_restart = False
if pm2_name and os.environ.get("PM2_HOME") and "pm2 restart" in cmd:
# 检查命令是否包含当前进程名
if re.search(rf'pm2\s+restart\s+{re.escape(pm2_name)}', cmd):
self_restart = True
if self_restart:
# M1: 用 asyncio.sleep 延迟而非 nohup,保留子进程输出和错误检测
# 先 sleep 让 handler 正常返回,再启动 restart 命令
# restart 的子进程会在父进程死后被 pm2 新进程接管
logger.info("Auto-deploy: self-restart detected, deferring 2s: %s", cmd)
await asyncio.sleep(2)
deploy_proc = await asyncio.create_subprocess_exec(
"sh", "-c", cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# restart 会杀掉当前进程,communicate 可能不会完成
# 但我们至少尝试读取输出
try:
_, deploy_err = await asyncio.wait_for(
deploy_proc.communicate(), timeout=10)
except (asyncio.TimeoutError, ProcessLookupError):
# 预期行为:进程被 pm2 restart 杀掉
logger.info("Auto-deploy: process killed by self-restart (expected)")
break
else:
deploy_proc = await asyncio.create_subprocess_exec(
"sh", "-c", cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, deploy_err = await asyncio.wait_for(deploy_proc.communicate(), timeout=30)
if deploy_proc.returncode != 0:
logger.error("Auto-deploy: post_deploy failed: %s", deploy_err.decode())
_send_deploy_failure_task(repo, pr_number, pr_title, f"post_deploy 失败 ({cmd}): {deploy_err.decode()}")
break
else:
logger.info("Auto-deploy: all post_deploy commands succeeded (files: %s)", ", ".join(file_list[:5]))
else:
logger.info("Auto-deploy: docs-only change for %s, skip post_deploy", repo)
except asyncio.TimeoutError:
logger.error("Auto-deploy: timeout for %s", repo)
_send_deploy_failure_task(repo, pr_number, pr_title, "部署超时")
except Exception as e:
logger.error("Auto-deploy: unexpected error: %s", e)
async def _handle_issues(payload: Dict[str, Any]) -> None:
"""处理 issues 事件:assigned → 通知被指派人;opened+部署失败 → 通知运维。"""
action = payload.get("action", "")
issue = payload.get("issue")
if not issue or not isinstance(issue, dict):
logger.warning("issues event missing issue field, skipping")
return
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
issue_title = issue.get("title", "")
if action == "assigned":
assignee = ""
assignees = issue.get("assignees") or []
if not assignees:
single = issue.get("assignee")
if single and isinstance(single, dict):
assignees = [single]
if assignees:
assignee = assignees[-1].get("login", "")
else:
assignee = ""
if not assignee:
logger.debug("Issue assigned but no assignee found, skipping")
return
labels_list = [lbl.get("name", "")
for lbl in (issue.get("labels") or [])]
labels = ", ".join(labels_list) if labels_list else "(无标签)"
issue_body = issue.get("body", "(无描述)")
brief = issue_title[:20].replace(" ", "-").lower()
text = render_template("issue_assigned", {
"repo": repo,
"issue_number": str(issue_number),
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
})
title = f"Issue 指派: {issue_title} ({repo}#{issue_number})"
_send_toolchain_task(
to_agent=assignee,
title=title,
description=text,
event_type="issue_assigned",
action_type="issue_assigned",
steps=[
f"创建分支 fix/{issue_number}-{brief}",
"编码 + 写 UT",
"push → 等 CI",
f"CI 通过后创建 PRGitea API: POST /repos/{repo}/pulls",
"等 Review",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"issue_number": issue_number,
"repo": repo,
"issue_title": issue_title,
"labels": labels,
"issue_body": issue_body or "(无描述)",
"brief": brief,
},
)
elif action == "opened":
if "部署失败" in issue_title:
# 从 Issue body 提取 commit hashGitea deploy workflow 格式)
sha_match = re.search(r'[0-9a-f]{40}', issue.get("body", ""))
commit_sha = sha_match.group(0) if sha_match else "(未知)"
text = render_template("deploy_failure", {
"repo": repo,
"commit_sha": commit_sha or "(未知)",
})
title = f"部署失败: {repo}"
for agent_id in ("jiangwei-infra", "pangtong-fujunshi"):
_send_toolchain_task(
to_agent=agent_id,
title=title,
description=text,
event_type="deploy_failure",
action_type="deploy_failure",
steps=[
"检查 deploy 日志",
"排查失败原因",
"修复并重新部署",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"repo": repo,
"commit_sha": commit_sha or "(未知)",
},
)
# Issue body @mentionopened 时检查)
issue_body = issue.get("body", "") or ""
sender = payload.get("sender", {}).get("login", "")
mentions = extract_mentions(issue_body, sender)
if mentions:
# 自动流转已通知 assignee
assignees = issue.get("assignees") or []
if not assignees:
single = issue.get("assignee")
if single and isinstance(single, dict):
assignees = [single]
auto_targets = [a.get("login", "") for a in assignees if isinstance(a, dict)]
await _send_mention_mails(
mentions=mentions,
auto_targets=auto_targets,
source_type="Issue",
mention_type="Issue @mention",
source_url=issue.get("html_url", ""),
commenter=sender,
content=issue_body,
repo=repo,
issue_number=issue_number,
is_pr=False,
)
async def _handle_issue_comment(payload: Dict[str, Any]) -> None:
"""处理 issue_comment 事件:CI 失败关键词 → 通知 PR 作者;@mention → 通知被提及者。"""
comment = payload.get("comment")
if not comment or not isinstance(comment, dict):
logger.warning("issue_comment event missing comment field, skipping")
return
body = comment.get("body", "")
sender = comment.get("user", {}).get("login", "")
issue = payload.get("issue")
if not issue or not isinstance(issue, dict):
logger.warning("issue_comment event missing issue field, skipping")
return
action = payload.get("action", "")
if action != "created":
return
# === 路径 1:CI 失败通知(原有逻辑,改为正向 if) ===
if ("[CI]" in body or "CI 失败" in body) and issue.get("state") != "closed":
repo = _repo_fullname(payload)
issue_number = issue.get("number", 0)
# 尝试从关联 PR 获取信息
pr_author = issue.get("user", {}).get("login", "unknown")
branch_match = re.search(r"分支:\s*(\S+)", body)
branch = branch_match.group(1) if branch_match else "(未知)"
# 提取错误摘要(取 comment body 前 500 字符)
error_summary = body[:500] if body else "(无错误信息)"
text = render_template("ci_failure", {
"repo": repo,
"pr_number": str(issue_number),
"branch": branch,
"error_summary": error_summary,
})
title = f"CI 失败: {repo}#{issue_number}"
_send_toolchain_task(
to_agent=pr_author,
title=title,
description=text,
event_type="ci_failure",
action_type="ci_failure",
steps=[
"查看完整 CI 日志(PR 页面或 Gitea Actions 页面)",
"修复失败的测试",
"push → CI 自动重跑",
"提交 action reportPOST http://localhost:8083/api/projects/_toolchain/tasks/<task_id>/commentscomment_type=action_report",
],
context_data={
"pr_number": issue_number,
"repo": repo,
"branch": branch,
"error_summary": error_summary,
},
)
# CI 处理完不 return,继续检查 @mention
# === 路径 2:@mention 通知(新增,独立路径) ===
# 注意:@mention 检测与 CI 检测是独立的,同一条评论可同时触发两者
mentions = extract_mentions(body, sender)
if mentions:
# 判断是 PR 还是 IssueGitea 中 PR 本质是特殊的 Issue
is_pr = issue.get("pull_request") is not None
source_type = "PR" if is_pr else "Issue"
mention_type = "PR @mention" if is_pr else "Issue @mention"
issue_number = issue.get("number", 0)
repo = _repo_fullname(payload)
# 自动流转已通知的人(CI 失败通知的 PR 作者)
auto_targets: list[str] = []
if ("[CI]" in body or "CI 失败" in body) and issue.get("state") != "closed":
auto_targets.append(issue.get("user", {}).get("login", ""))
await _send_mention_mails(
mentions=mentions,
auto_targets=auto_targets,
source_type=source_type,
mention_type=mention_type,
source_url=issue.get("html_url", ""),
commenter=sender,
content=body,
repo=repo,
issue_number=issue_number,
is_pr=is_pr,
)
# ---------------------------------------------------------------------------
# 事件分发
# ---------------------------------------------------------------------------
_EVENT_HANDLERS: Dict[str, Any] = {
"pull_request": _handle_pull_request,
"pull_request_sync": _handle_pr_synchronize, # Gitea: PR branch push 是独立事件类型
"pull_request_review": _handle_pull_request_review,
"pull_request_review_approved": _handle_pull_request_review,
"pull_request_review_rejected": _handle_pull_request_review,
"pull_request_review_comment": _handle_pull_request_review,
"pull_request_comment": _handle_pull_request_review, # Gitea: review comment 独立事件类型
# Gitea v1.23.4 实际发出的 review 子事件(无 _review_ 中间段)
"pull_request_approved": _handle_pull_request_review,
"pull_request_rejected": _handle_pull_request_review,
"issues": _handle_issues,
"issue_comment": _handle_issue_comment,
}
# ---------------------------------------------------------------------------
# Webhook 端点
# ---------------------------------------------------------------------------
@router.post("/webhook/gitea")
async def gitea_webhook(
request: Request,
x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"),
x_gitea_delivery: Optional[str] = Header(None, alias="X-Gitea-Delivery"),
x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"),
) -> Response:
"""Gitea Webhook 接收端点。
处理流程:签名验证 → 幂等检查 → 事件分发 → Mail 推送。
返回策略:
- payload 解析失败 / 未知事件 / 幂等重复 → 200(不触发重试)
- Mail 创建失败 → 500(触发 Gitea 重试)
"""
body = await request.body()
# 1. 签名验证
if not _verify_signature(body, x_gitea_signature):
logger.warning("Webhook signature verification failed")
return Response(status_code=403,
content="signature verification failed")
# 3. 解析 payload(提前解析,用于幂等检查)
try:
payload = await request.json()
except Exception:
logger.warning("Failed to parse webhook payload")
return Response(status_code=200, content="invalid payload")
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _get_idempotency_lock():
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",
x_gitea_event,
x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 4. 查找 handler
action = payload.get("action", "")
logger.info("[WEBHOOK] event=%s action=%s delivery=%s", x_gitea_event, action, x_gitea_delivery)
handler = _EVENT_HANDLERS.get(x_gitea_event or "")
if not handler:
logger.info("[WEBHOOK] Unhandled event type: %s", x_gitea_event)
return Response(status_code=200,
content=f"unhandled event: {x_gitea_event}")
# 5. 执行 handler
try:
await handler(payload)
except Exception:
logger.exception("Mail creation failed for %s event", x_gitea_event)
return Response(status_code=500, content="mail creation failed")
return Response(status_code=200, content="ok")