From cab0b1e1b3c3f4b1ee48e11795ff4a8de5dee3ce Mon Sep 17 00:00:00 2001 From: cfdaily Date: Tue, 26 May 2026 20:28:33 +0800 Subject: [PATCH] auto-sync: 2026-05-26 20:28:32 --- src/daemon/counter.py | 99 ++++++++++++++++++++++++++++++------------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/src/daemon/counter.py b/src/daemon/counter.py index 21238a2..b70c209 100644 --- a/src/daemon/counter.py +++ b/src/daemon/counter.py @@ -1,8 +1,12 @@ """ActiveAgentCounter — 并发控制 + 冷却机制 -全局上限 + per-agent 串行,asyncio Semaphore 实现。 +v2.1:per (agent, session) 粒度。三层控制: + - per session key: max_per_session(同 session 不能并发 spawn) + - per agent: max_concurrent_sessions(同 agent 最多 N 个不同 session) + - global: max_global(全局总并发上限) + +asyncio Semaphore 实现。 延迟创建 Semaphore(兼容 Python 3.9 无 event loop 时的构造)。 -v2.7.2:新增 cooldown 机制(429/API 错误后冷却期)。 """ from __future__ import annotations @@ -16,36 +20,51 @@ logger = logging.getLogger("moziplus-v2.counter") class ActiveAgentCounter: - """异步并发计数器 + 冷却机制""" + """异步并发计数器 + 冷却机制(v2.1: per session 粒度)""" - def __init__(self, max_global: int = 5, max_per_agent: int = 1, - default_cooldown_seconds: float = 120.0): + def __init__(self, max_global: int = 5, max_per_session: int = 1, + max_concurrent_sessions: int = 3, + default_cooldown_seconds: float = 120.0, + # 向后兼容旧配置 + max_per_agent: Optional[int] = None): self._max_global = max_global - self._max_per_agent = max_per_agent + self._max_per_session = max_per_session + self._max_concurrent_sessions = max_concurrent_sessions self._default_cooldown_seconds = default_cooldown_seconds + + # 如果调用方传了旧的 max_per_agent,映射到 max_per_session + if max_per_agent is not None and max_per_session == 1: + self._max_per_session = max_per_agent + self._global_sem: Optional[asyncio.Semaphore] = None - self._per_agent: Dict[str, asyncio.Semaphore] = {} - self._active: Dict[str, int] = {} + self._per_key: Dict[str, asyncio.Semaphore] = {} + # key → 引用计数(同一个 key 理论上只有 0 或 1) + self._active_keys: Dict[str, int] = {} + # agent_id → 活跃 session 数 + self._agent_active: Dict[str, int] = {} self._global_active: int = 0 - # v2.7.2:冷却机制 - self._cooldown_until: Dict[str, float] = {} # agent_id → 冷却到期时间戳 + # 冷却机制(per agent) + self._cooldown_until: Dict[str, float] = {} + + @staticmethod + def _make_key(agent_id: str, session_id: str) -> str: + return f"{agent_id}:{session_id}" def _get_global_sem(self) -> asyncio.Semaphore: if self._global_sem is None: self._global_sem = asyncio.Semaphore(self._max_global) return self._global_sem - def _get_agent_sem(self, agent_id: str) -> asyncio.Semaphore: - if agent_id not in self._per_agent: - self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent) - return self._per_agent[agent_id] + def _get_key_sem(self, key: str) -> asyncio.Semaphore: + if key not in self._per_key: + self._per_key[key] = asyncio.Semaphore(self._max_per_session) + return self._per_key[key] def is_cooling_down(self, agent_id: str) -> bool: """检查 agent 是否在冷却期""" until = self._cooldown_until.get(agent_id) if until and time.time() < until: return True - # 冷却期已过,清理 self._cooldown_until.pop(agent_id, None) return False @@ -56,37 +75,56 @@ class ActiveAgentCounter: logger.info("Cooldown set for %s: %.0fs (until %.0f)", agent_id, cd, self._cooldown_until[agent_id]) - async def can_acquire(self, agent_id: str) -> bool: + async def can_acquire(self, agent_id: str, session_id: str = "main") -> bool: + """三层检查:cooldown → global → per agent → per session key""" if self.is_cooling_down(agent_id): return False if self._global_active >= self._max_global: return False - active = self._active.get(agent_id, 0) - if active >= self._max_per_agent: + if self._agent_active.get(agent_id, 0) >= self._max_concurrent_sessions: + return False + key = self._make_key(agent_id, session_id) + if self._active_keys.get(key, 0) >= self._max_per_session: return False return True - async def acquire(self, agent_id: str) -> bool: - if not await self.can_acquire(agent_id): + async def acquire(self, agent_id: str, session_id: str = "main") -> bool: + """占用 per-session key + per-agent 计数 + global semaphore""" + if not await self.can_acquire(agent_id, session_id): return False + key = self._make_key(agent_id, session_id) await self._get_global_sem().acquire() - await self._get_agent_sem(agent_id).acquire() + await self._get_key_sem(key).acquire() self._global_active += 1 - self._active[agent_id] = self._active.get(agent_id, 0) + 1 + self._active_keys[key] = self._active_keys.get(key, 0) + 1 + self._agent_active[agent_id] = self._agent_active.get(agent_id, 0) + 1 return True - def release(self, agent_id: str) -> None: - if agent_id in self._per_agent: - self._per_agent[agent_id].release() + def release(self, agent_id: str, session_id: str = "main") -> None: + """释放 per-session key + per-agent 计数 + global semaphore""" + key = self._make_key(agent_id, session_id) + + if key in self._per_key: + self._per_key[key].release() + # 如果 key 不再活跃,清理 semaphore + if self._active_keys.get(key, 0) <= 1: + del self._per_key[key] + if self._global_sem: self._global_sem.release() self._global_active = max(0, self._global_active - 1) - if agent_id in self._active: - self._active[agent_id] = max(0, self._active[agent_id] - 1) - if self._active[agent_id] == 0: - del self._active[agent_id] + + if key in self._active_keys: + self._active_keys[key] = max(0, self._active_keys[key] - 1) + if self._active_keys[key] == 0: + del self._active_keys[key] + + if agent_id in self._agent_active: + self._agent_active[agent_id] = max(0, self._agent_active[agent_id] - 1) + if self._agent_active[agent_id] == 0: + del self._agent_active[agent_id] @property def global_active(self) -> int: @@ -98,7 +136,8 @@ class ActiveAgentCounter: @property def active_agents(self) -> Dict[str, int]: - return dict(self._active) + """返回 per agent 的活跃 session 数""" + return dict(self._agent_active) def is_near_limit(self, margin: int = 1) -> bool: """全局活跃数是否接近上限"""