auto-sync: 2026-05-26 20:28:32

This commit is contained in:
cfdaily
2026-05-26 20:28:33 +08:00
parent 5e9b2bbbec
commit cab0b1e1b3
+69 -30
View File
@@ -1,8 +1,12 @@
"""ActiveAgentCounter — 并发控制 + 冷却机制
全局上限 + per-agent 串行,asyncio Semaphore 实现。
v2.1per (agent, session) 粒度。三层控制:
- per session key: max_per_session(同 session 不能并发 spawn
- per agent: max_concurrent_sessions(同 agent 最多 N 个不同 session
- global: max_global(全局总并发上限)
asyncio Semaphore 实现。
延迟创建 Semaphore(兼容 Python 3.9 无 event loop 时的构造)。
v2.7.2:新增 cooldown 机制(429/API 错误后冷却期)。
"""
from __future__ import annotations
@@ -16,36 +20,51 @@ logger = logging.getLogger("moziplus-v2.counter")
class ActiveAgentCounter:
"""异步并发计数器 + 冷却机制"""
"""异步并发计数器 + 冷却机制v2.1: per session 粒度)"""
def __init__(self, max_global: int = 5, max_per_agent: int = 1,
default_cooldown_seconds: float = 120.0):
def __init__(self, max_global: int = 5, max_per_session: int = 1,
max_concurrent_sessions: int = 3,
default_cooldown_seconds: float = 120.0,
# 向后兼容旧配置
max_per_agent: Optional[int] = None):
self._max_global = max_global
self._max_per_agent = max_per_agent
self._max_per_session = max_per_session
self._max_concurrent_sessions = max_concurrent_sessions
self._default_cooldown_seconds = default_cooldown_seconds
# 如果调用方传了旧的 max_per_agent,映射到 max_per_session
if max_per_agent is not None and max_per_session == 1:
self._max_per_session = max_per_agent
self._global_sem: Optional[asyncio.Semaphore] = None
self._per_agent: Dict[str, asyncio.Semaphore] = {}
self._active: Dict[str, int] = {}
self._per_key: Dict[str, asyncio.Semaphore] = {}
# key → 引用计数(同一个 key 理论上只有 0 或 1)
self._active_keys: Dict[str, int] = {}
# agent_id → 活跃 session 数
self._agent_active: Dict[str, int] = {}
self._global_active: int = 0
# v2.7.2冷却机制
self._cooldown_until: Dict[str, float] = {} # agent_id → 冷却到期时间戳
# 冷却机制per agent
self._cooldown_until: Dict[str, float] = {}
@staticmethod
def _make_key(agent_id: str, session_id: str) -> str:
return f"{agent_id}:{session_id}"
def _get_global_sem(self) -> asyncio.Semaphore:
if self._global_sem is None:
self._global_sem = asyncio.Semaphore(self._max_global)
return self._global_sem
def _get_agent_sem(self, agent_id: str) -> asyncio.Semaphore:
if agent_id not in self._per_agent:
self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent)
return self._per_agent[agent_id]
def _get_key_sem(self, key: str) -> asyncio.Semaphore:
if key not in self._per_key:
self._per_key[key] = asyncio.Semaphore(self._max_per_session)
return self._per_key[key]
def is_cooling_down(self, agent_id: str) -> bool:
"""检查 agent 是否在冷却期"""
until = self._cooldown_until.get(agent_id)
if until and time.time() < until:
return True
# 冷却期已过,清理
self._cooldown_until.pop(agent_id, None)
return False
@@ -56,37 +75,56 @@ class ActiveAgentCounter:
logger.info("Cooldown set for %s: %.0fs (until %.0f)",
agent_id, cd, self._cooldown_until[agent_id])
async def can_acquire(self, agent_id: str) -> bool:
async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool:
"""三层检查:cooldown → global → per agent → per session key"""
if self.is_cooling_down(agent_id):
return False
if self._global_active >= self._max_global:
return False
active = self._active.get(agent_id, 0)
if active >= self._max_per_agent:
if self._agent_active.get(agent_id, 0) >= self._max_concurrent_sessions:
return False
key = self._make_key(agent_id, session_id)
if self._active_keys.get(key, 0) >= self._max_per_session:
return False
return True
async def acquire(self, agent_id: str) -> bool:
if not await self.can_acquire(agent_id):
async def acquire(self, agent_id: str, session_id: str = "main") -> bool:
"""占用 per-session key + per-agent 计数 + global semaphore"""
if not await self.can_acquire(agent_id, session_id):
return False
key = self._make_key(agent_id, session_id)
await self._get_global_sem().acquire()
await self._get_agent_sem(agent_id).acquire()
await self._get_key_sem(key).acquire()
self._global_active += 1
self._active[agent_id] = self._active.get(agent_id, 0) + 1
self._active_keys[key] = self._active_keys.get(key, 0) + 1
self._agent_active[agent_id] = self._agent_active.get(agent_id, 0) + 1
return True
def release(self, agent_id: str) -> None:
if agent_id in self._per_agent:
self._per_agent[agent_id].release()
def release(self, agent_id: str, session_id: str = "main") -> None:
"""释放 per-session key + per-agent 计数 + global semaphore"""
key = self._make_key(agent_id, session_id)
if key in self._per_key:
self._per_key[key].release()
# 如果 key 不再活跃,清理 semaphore
if self._active_keys.get(key, 0) <= 1:
del self._per_key[key]
if self._global_sem:
self._global_sem.release()
self._global_active = max(0, self._global_active - 1)
if agent_id in self._active:
self._active[agent_id] = max(0, self._active[agent_id] - 1)
if self._active[agent_id] == 0:
del self._active[agent_id]
if key in self._active_keys:
self._active_keys[key] = max(0, self._active_keys[key] - 1)
if self._active_keys[key] == 0:
del self._active_keys[key]
if agent_id in self._agent_active:
self._agent_active[agent_id] = max(0, self._agent_active[agent_id] - 1)
if self._agent_active[agent_id] == 0:
del self._agent_active[agent_id]
@property
def global_active(self) -> int:
@@ -98,7 +136,8 @@ class ActiveAgentCounter:
@property
def active_agents(self) -> Dict[str, int]:
return dict(self._active)
"""返回 per agent 的活跃 session 数"""
return dict(self._agent_active)
def is_near_limit(self, margin: int = 1) -> bool:
"""全局活跃数是否接近上限"""