auto-sync: 2026-06-08 08:51:12
Deploy / ci (push) Waiting to run
Deploy / deploy (push) Blocked by required conditions
Deploy / notify-deploy-failure (push) Blocked by required conditions

This commit is contained in:
cfdaily
2026-06-08 08:51:12 +08:00
parent 8bd364055c
commit 83fb270e2a
2 changed files with 35 additions and 34 deletions
+33 -28
View File
@@ -8,6 +8,7 @@
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
@@ -25,6 +26,7 @@ from fastapi import APIRouter, Header, Request, Response
from src.blackboard.db import init_db
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
from src.daemon.toolchain_templates import render_template
from src.utils import get_data_root
@@ -41,6 +43,7 @@ router = APIRouter(tags=["toolchain"])
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
def _is_duplicate(event: str, delivery: str) -> bool:
@@ -86,31 +89,32 @@ _GITEA_TOKEN: str = os.environ.get("GITEA_TOKEN", "")
_GITEA_BASE = "http://192.168.2.154:3000/api/v1"
async def _fetch_pr_files(repo: str, pr_number: int) -> List[str]:
"""通过 Gitea API 获取 PR changed files 列表
Args:
repo: 仓库路径(如 "sanguo/sanguo_moziplus_v2"
pr_number: PR 编号
async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]:
"""获取 PR 文件列表,含重试机制
Returns:
文件路径列表
(文件列表, 错误信息) — 成功时错误信息为空字符串
"""
if not _GITEA_TOKEN:
logger.warning("GITEA_TOKEN not set, cannot fetch PR files")
return []
return [], "GITEA_TOKEN 未配置"
url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files"
headers = {"Authorization": f"token {_GITEA_TOKEN}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
files: List[Dict[str, Any]] = resp.json()
return [f.get("filename", "") for f in files]
except Exception:
logger.warning("Failed to fetch PR files: %s/pulls/%d", repo, pr_number, exc_info=True)
return []
last_error = ""
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
files: List[Dict[str, Any]] = resp.json()
return [f.get("filename", "") for f in files], ""
except Exception as e:
last_error = str(e)
if attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
logger.warning("Retry %d/3 fetching PR files: %s/pulls/%d", attempt + 1, repo, pr_number)
logger.warning("Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", repo, pr_number, last_error)
return [], f"获取文件列表失败(重试3次): {last_error}"
# ---------------------------------------------------------------------------
@@ -136,10 +140,7 @@ def _calc_risk_level(changed_files: List[str]) -> str:
# Mail 创建
# ---------------------------------------------------------------------------
KNOWN_AGENTS = {
"pangtong-fujunshi", "simayi-challenger", "zhangfei-dev",
"guanyu-dev", "zhaoyun-data", "jiangwei-infra",
}
MAIL_PROJECT_ID = "_mail"
@@ -173,7 +174,7 @@ def _send_mail(
Raises:
Exception: 数据库写入失败
"""
if to_agent not in KNOWN_AGENTS:
if to_agent not in AGENT_IDS:
logger.warning("Unknown agent: %s, skipping mail", to_agent)
return ""
@@ -235,9 +236,12 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None:
branch = pr.get("head", {}).get("ref", "unknown")
# 获取改动文件列表
changed_files = await _fetch_pr_files(repo, pr_number)
changed_files, fetch_error = await _fetch_pr_files(repo, pr_number)
risk_level = _calc_risk_level(changed_files)
file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无法获取文件列表)"
if fetch_error:
file_list = f"⚠️ {fetch_error}"
else:
file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无文件变更)"
text = render_template("review_request", {
"repo": repo,
@@ -431,9 +435,10 @@ async def gitea_webhook(
# 2. 幂等检查
if x_gitea_event and x_gitea_delivery:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
async with _idempotency_lock:
if _is_duplicate(x_gitea_event, x_gitea_delivery):
logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery)
return Response(status_code=200, content="duplicate")
# 3. 解析 payload
try:
+2 -6
View File
@@ -10,14 +10,10 @@ from typing import Optional
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.config.agents import AGENT_IDS
logger = logging.getLogger(__name__)
# 有效 Agent ID 集合(用于校验通知目标)
_VALID_AGENT_IDS = frozenset({
"pangtong-fujunshi", "simayi-challenger", "zhangfei-dev",
"guanyu-dev", "zhaoyun-data", "jiangwei-infra",
})
# 邮件通知正文模板(统一模板,包含所有可能的失败原因和建议)
_NOTIFY_TEMPLATE = """你的邮件投递失败了。
@@ -76,7 +72,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str,
# 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播
target_agent = from_agent
if from_agent not in _VALID_AGENT_IDS:
if from_agent not in AGENT_IDS:
logger.warning("Mail %s: sender '%s' is not a valid agent, routing failure notice to pangtong-fujunshi",
original_mail_id, from_agent)
target_agent = "pangtong-fujunshi"