fix: asyncio.Lock 懒加载防 event loop 关闭后 import 失败
CI / lint (pull_request) Successful in 12s
CI / test (pull_request) Failing after 6s
CI / notify-on-failure (pull_request) Successful in 0s

This commit is contained in:
cfdaily
2026-06-14 00:09:27 +08:00
parent 3bca794902
commit 6e6b52fe3b
+10 -2
View File
@@ -50,7 +50,15 @@ router = APIRouter(tags=["toolchain"])
_delivery_cache: Set[str] = set()
_delivery_timestamps: List[Tuple[float, str]] = []
_TTL_SECONDS = 7 * 24 * 3600
_idempotency_lock = asyncio.Lock()
_idempotency_lock: Optional[asyncio.Lock] = None
def _get_idempotency_lock() -> asyncio.Lock:
"""懒加载 asyncio.Lock,避免模块级创建时 event loop 不存在(Python 3.9)。"""
global _idempotency_lock
if _idempotency_lock is None:
_idempotency_lock = asyncio.Lock()
return _idempotency_lock
def _is_duplicate(event: str, delivery: str,
@@ -1219,7 +1227,7 @@ async def gitea_webhook(
# 2. 幂等检查(需要在 payload 解析后,以支持内容去重)
if x_gitea_event and x_gitea_delivery:
async with _idempotency_lock:
async with _get_idempotency_lock():
if _is_duplicate(x_gitea_event, x_gitea_delivery, payload):
logger.debug(
"Duplicate webhook: %s/%s",