"""ActiveAgentCounter — 并发控制 全局上限 + per-agent 串行,asyncio Semaphore 实现。 延迟创建 Semaphore(兼容 Python 3.9 无 event loop 时的构造)。 """ from __future__ import annotations import asyncio import logging from typing import Dict, Optional logger = logging.getLogger("moziplus-v2.counter") class ActiveAgentCounter: """异步并发计数器""" def __init__(self, max_global: int = 5, max_per_agent: int = 1): self._max_global = max_global self._max_per_agent = max_per_agent self._global_sem: Optional[asyncio.Semaphore] = None self._per_agent: Dict[str, asyncio.Semaphore] = {} self._active: Dict[str, int] = {} self._global_active: int = 0 def _get_global_sem(self) -> asyncio.Semaphore: if self._global_sem is None: self._global_sem = asyncio.Semaphore(self._max_global) return self._global_sem def _get_agent_sem(self, agent_id: str) -> asyncio.Semaphore: if agent_id not in self._per_agent: self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent) return self._per_agent[agent_id] async def can_acquire(self, agent_id: str) -> bool: if self._global_active >= self._max_global: return False active = self._active.get(agent_id, 0) if active >= self._max_per_agent: return False return True async def acquire(self, agent_id: str) -> bool: if not await self.can_acquire(agent_id): return False await self._get_global_sem().acquire() await self._get_agent_sem(agent_id).acquire() self._global_active += 1 self._active[agent_id] = self._active.get(agent_id, 0) + 1 return True def release(self, agent_id: str) -> None: if agent_id in self._per_agent: self._per_agent[agent_id].release() if self._global_sem: self._global_sem.release() self._global_active = max(0, self._global_active - 1) if agent_id in self._active: self._active[agent_id] = max(0, self._active[agent_id] - 1) if self._active[agent_id] == 0: del self._active[agent_id] @property def global_active(self) -> int: return self._global_active @property def active_agents(self) -> Dict[str, int]: return dict(self._active)