auto-sync: 2026-05-17 06:02:23
This commit is contained in:
+17
-15
@@ -1,6 +1,7 @@
|
||||
"""ActiveAgentCounter — 并发控制
|
||||
|
||||
全局上限 + per-agent 串行,asyncio Semaphore 实现。
|
||||
延迟创建 Semaphore(兼容 Python 3.9 无 event loop 时的构造)。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -18,13 +19,22 @@ class ActiveAgentCounter:
|
||||
def __init__(self, max_global: int = 5, max_per_agent: int = 1):
|
||||
self._max_global = max_global
|
||||
self._max_per_agent = max_per_agent
|
||||
self._global_sem = asyncio.Semaphore(max_global)
|
||||
self._global_sem: Optional[asyncio.Semaphore] = None
|
||||
self._per_agent: Dict[str, asyncio.Semaphore] = {}
|
||||
self._active: Dict[str, int] = {} # agent_id → count
|
||||
self._active: Dict[str, int] = {}
|
||||
self._global_active: int = 0
|
||||
|
||||
def _get_global_sem(self) -> asyncio.Semaphore:
|
||||
if self._global_sem is None:
|
||||
self._global_sem = asyncio.Semaphore(self._max_global)
|
||||
return self._global_sem
|
||||
|
||||
def _get_agent_sem(self, agent_id: str) -> asyncio.Semaphore:
|
||||
if agent_id not in self._per_agent:
|
||||
self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent)
|
||||
return self._per_agent[agent_id]
|
||||
|
||||
async def can_acquire(self, agent_id: str) -> bool:
|
||||
"""检查是否可以获取(不实际获取)"""
|
||||
if self._global_active >= self._max_global:
|
||||
return False
|
||||
active = self._active.get(agent_id, 0)
|
||||
@@ -33,29 +43,21 @@ class ActiveAgentCounter:
|
||||
return True
|
||||
|
||||
async def acquire(self, agent_id: str) -> bool:
|
||||
"""获取一个 slot
|
||||
|
||||
Returns:
|
||||
True if acquired, False if would block
|
||||
"""
|
||||
if not await self.can_acquire(agent_id):
|
||||
return False
|
||||
|
||||
await self._global_sem.acquire()
|
||||
|
||||
if agent_id not in self._per_agent:
|
||||
self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent)
|
||||
await self._per_agent[agent_id].acquire()
|
||||
await self._get_global_sem().acquire()
|
||||
await self._get_agent_sem(agent_id).acquire()
|
||||
|
||||
self._global_active += 1
|
||||
self._active[agent_id] = self._active.get(agent_id, 0) + 1
|
||||
return True
|
||||
|
||||
def release(self, agent_id: str) -> None:
|
||||
"""释放一个 slot"""
|
||||
if agent_id in self._per_agent:
|
||||
self._per_agent[agent_id].release()
|
||||
self._global_sem.release()
|
||||
if self._global_sem:
|
||||
self._global_sem.release()
|
||||
self._global_active = max(0, self._global_active - 1)
|
||||
if agent_id in self._active:
|
||||
self._active[agent_id] = max(0, self._active[agent_id] - 1)
|
||||
|
||||
Reference in New Issue
Block a user