From 0cb78fd33218405824bd5843ae79386ab1c5140f Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 06:01:35 +0800 Subject: [PATCH] auto-sync: 2026-05-17 06:01:35 --- src/daemon/counter.py | 71 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 src/daemon/counter.py diff --git a/src/daemon/counter.py b/src/daemon/counter.py new file mode 100644 index 0000000..2409c26 --- /dev/null +++ b/src/daemon/counter.py @@ -0,0 +1,71 @@ +"""ActiveAgentCounter — 并发控制 + +全局上限 + per-agent 串行,asyncio Semaphore 实现。 +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Dict, Optional + +logger = logging.getLogger("moziplus-v2.counter") + + +class ActiveAgentCounter: + """异步并发计数器""" + + def __init__(self, max_global: int = 5, max_per_agent: int = 1): + self._max_global = max_global + self._max_per_agent = max_per_agent + self._global_sem = asyncio.Semaphore(max_global) + self._per_agent: Dict[str, asyncio.Semaphore] = {} + self._active: Dict[str, int] = {} # agent_id → count + self._global_active: int = 0 + + async def can_acquire(self, agent_id: str) -> bool: + """检查是否可以获取(不实际获取)""" + if self._global_active >= self._max_global: + return False + active = self._active.get(agent_id, 0) + if active >= self._max_per_agent: + return False + return True + + async def acquire(self, agent_id: str) -> bool: + """获取一个 slot + + Returns: + True if acquired, False if would block + """ + if not await self.can_acquire(agent_id): + return False + + await self._global_sem.acquire() + + if agent_id not in self._per_agent: + self._per_agent[agent_id] = asyncio.Semaphore(self._max_per_agent) + await self._per_agent[agent_id].acquire() + + self._global_active += 1 + self._active[agent_id] = self._active.get(agent_id, 0) + 1 + return True + + def release(self, agent_id: str) -> None: + """释放一个 slot""" + if agent_id in self._per_agent: + self._per_agent[agent_id].release() + self._global_sem.release() + self._global_active = max(0, self._global_active - 1) + if agent_id in self._active: + self._active[agent_id] = max(0, self._active[agent_id] - 1) + if self._active[agent_id] == 0: + del self._active[agent_id] + + @property + def global_active(self) -> int: + return self._global_active + + @property + def active_agents(self) -> Dict[str, int]: + return dict(self._active)