diff --git a/src/api/toolchain_routes.py b/src/api/toolchain_routes.py index 89d9c1a..f8e96da 100644 --- a/src/api/toolchain_routes.py +++ b/src/api/toolchain_routes.py @@ -8,6 +8,7 @@ from __future__ import annotations +import asyncio import hashlib import hmac import json @@ -25,6 +26,7 @@ from fastapi import APIRouter, Header, Request, Response from src.blackboard.db import init_db from src.blackboard.models import Task from src.blackboard.operations import Blackboard +from src.config.agents import AGENT_IDS from src.daemon.toolchain_templates import render_template from src.utils import get_data_root @@ -41,6 +43,7 @@ router = APIRouter(tags=["toolchain"]) _delivery_cache: Set[str] = set() _delivery_timestamps: List[Tuple[float, str]] = [] _TTL_SECONDS = 7 * 24 * 3600 +_idempotency_lock = asyncio.Lock() def _is_duplicate(event: str, delivery: str) -> bool: @@ -86,31 +89,32 @@ _GITEA_TOKEN: str = os.environ.get("GITEA_TOKEN", "") _GITEA_BASE = "http://192.168.2.154:3000/api/v1" -async def _fetch_pr_files(repo: str, pr_number: int) -> List[str]: - """通过 Gitea API 获取 PR changed files 列表。 - - Args: - repo: 仓库路径(如 "sanguo/sanguo_moziplus_v2") - pr_number: PR 编号 +async def _fetch_pr_files(repo: str, pr_number: int) -> Tuple[List[str], str]: + """获取 PR 文件列表,含重试机制。 Returns: - 文件路径列表 + (文件列表, 错误信息) — 成功时错误信息为空字符串 """ if not _GITEA_TOKEN: - logger.warning("GITEA_TOKEN not set, cannot fetch PR files") - return [] + return [], "GITEA_TOKEN 未配置" url = f"{_GITEA_BASE}/repos/{repo}/pulls/{pr_number}/files" headers = {"Authorization": f"token {_GITEA_TOKEN}"} - try: - async with httpx.AsyncClient(timeout=5.0) as client: - resp = await client.get(url, headers=headers) - resp.raise_for_status() - files: List[Dict[str, Any]] = resp.json() - return [f.get("filename", "") for f in files] - except Exception: - logger.warning("Failed to fetch PR files: %s/pulls/%d", repo, pr_number, exc_info=True) - return [] + last_error = "" + for attempt in range(3): + try: + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + files: List[Dict[str, Any]] = resp.json() + return [f.get("filename", "") for f in files], "" + except Exception as e: + last_error = str(e) + if attempt < 2: + await asyncio.sleep(0.5 * (attempt + 1)) + logger.warning("Retry %d/3 fetching PR files: %s/pulls/%d", attempt + 1, repo, pr_number) + logger.warning("Failed to fetch PR files after 3 retries: %s/pulls/%d - %s", repo, pr_number, last_error) + return [], f"获取文件列表失败(重试3次): {last_error}" # --------------------------------------------------------------------------- @@ -136,10 +140,7 @@ def _calc_risk_level(changed_files: List[str]) -> str: # Mail 创建 # --------------------------------------------------------------------------- -KNOWN_AGENTS = { - "pangtong-fujunshi", "simayi-challenger", "zhangfei-dev", - "guanyu-dev", "zhaoyun-data", "jiangwei-infra", -} + MAIL_PROJECT_ID = "_mail" @@ -173,7 +174,7 @@ def _send_mail( Raises: Exception: 数据库写入失败 """ - if to_agent not in KNOWN_AGENTS: + if to_agent not in AGENT_IDS: logger.warning("Unknown agent: %s, skipping mail", to_agent) return "" @@ -235,9 +236,12 @@ async def _handle_pull_request(payload: Dict[str, Any]) -> None: branch = pr.get("head", {}).get("ref", "unknown") # 获取改动文件列表 - changed_files = await _fetch_pr_files(repo, pr_number) + changed_files, fetch_error = await _fetch_pr_files(repo, pr_number) risk_level = _calc_risk_level(changed_files) - file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无法获取文件列表)" + if fetch_error: + file_list = f"⚠️ {fetch_error}" + else: + file_list = "\n".join(f"- {f}" for f in changed_files) if changed_files else "(无文件变更)" text = render_template("review_request", { "repo": repo, @@ -431,9 +435,10 @@ async def gitea_webhook( # 2. 幂等检查 if x_gitea_event and x_gitea_delivery: - if _is_duplicate(x_gitea_event, x_gitea_delivery): - logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) - return Response(status_code=200, content="duplicate") + async with _idempotency_lock: + if _is_duplicate(x_gitea_event, x_gitea_delivery): + logger.debug("Duplicate webhook: %s/%s", x_gitea_event, x_gitea_delivery) + return Response(status_code=200, content="duplicate") # 3. 解析 payload try: diff --git a/src/daemon/mail_notify.py b/src/daemon/mail_notify.py index 90b9fc7..020415e 100644 --- a/src/daemon/mail_notify.py +++ b/src/daemon/mail_notify.py @@ -10,14 +10,10 @@ from typing import Optional from src.blackboard.models import Task from src.blackboard.operations import Blackboard +from src.config.agents import AGENT_IDS logger = logging.getLogger(__name__) -# 有效 Agent ID 集合(用于校验通知目标) -_VALID_AGENT_IDS = frozenset({ - "pangtong-fujunshi", "simayi-challenger", "zhangfei-dev", - "guanyu-dev", "zhaoyun-data", "jiangwei-infra", -}) # 邮件通知正文模板(统一模板,包含所有可能的失败原因和建议) _NOTIFY_TEMPLATE = """你的邮件投递失败了。 @@ -76,7 +72,7 @@ def notify_mail_failed(db_path: Path, original_mail_id: str, # 发件人不是有效 Agent(如 system)→ 通知庞统代处理,不触发广播 target_agent = from_agent - if from_agent not in _VALID_AGENT_IDS: + if from_agent not in AGENT_IDS: logger.warning("Mail %s: sender '%s' is not a valid agent, routing failure notice to pangtong-fujunshi", original_mail_id, from_agent) target_agent = "pangtong-fujunshi"