"""F10 ActiveAgentCounter 单元测试 按 test-plan-v2.6.md §F10: - T1: 全局上限(P0) - T2: per-agent 串行(P0) - T3: release 恢复(P0) - T4: 并发竞争(P1) """ import asyncio import pytest from src.daemon.counter import ActiveAgentCounter @pytest.fixture def counter(): return ActiveAgentCounter(max_global=3, max_per_agent=1) # --------------------------------------------------------------------------- # T1: 全局上限 # --------------------------------------------------------------------------- class TestGlobalLimit: def test_acquire_within_limit(self, counter): """全局未满 → 成功""" result = asyncio.run(counter.acquire("agent-1")) assert result is True assert counter.global_active == 1 def test_acquire_fills_global(self, counter): """填满全局""" for i in range(3): asyncio.run(counter.acquire(f"agent-{i}")) assert counter.global_active == 3 def test_acquire_exceeds_global(self, counter): """超出全局上限 → 失败""" for i in range(3): asyncio.run(counter.acquire(f"agent-{i}")) result = asyncio.run(counter.acquire("agent-extra")) assert result is False def test_can_acquire_respects_limit(self, counter): """can_acquire 检查全局上限""" assert asyncio.run(counter.can_acquire("agent-1")) is True for i in range(3): asyncio.run(counter.acquire(f"agent-{i}")) assert asyncio.run(counter.can_acquire("agent-extra")) is False # --------------------------------------------------------------------------- # T2: per-agent 串行 # --------------------------------------------------------------------------- class TestPerAgentSerial: def test_same_agent_blocked(self, counter): """同一 Agent 第二个 acquire 失败""" asyncio.run(counter.acquire("agent-1")) result = asyncio.run(counter.acquire("agent-1")) assert result is False def test_different_agents_ok(self, counter): """不同 Agent 可以同时 acquire""" assert asyncio.run(counter.acquire("agent-1")) is True assert asyncio.run(counter.acquire("agent-2")) is True def test_per_agent_limit_2(self): """max_per_agent=2 允许两个""" c = ActiveAgentCounter(max_global=5, max_per_agent=2) assert asyncio.run(c.acquire("a1")) is True assert asyncio.run(c.acquire("a1")) is True assert asyncio.run(c.acquire("a1")) is False # 第三个失败 # --------------------------------------------------------------------------- # T3: release 恢复 # --------------------------------------------------------------------------- class TestRelease: def test_release_allows_new_acquire(self, counter): """release 后可以重新 acquire""" asyncio.run(counter.acquire("agent-1")) counter.release("agent-1") result = asyncio.run(counter.acquire("agent-1")) assert result is True def test_release_updates_count(self, counter): """release 后计数正确""" asyncio.run(counter.acquire("agent-1")) assert counter.global_active == 1 counter.release("agent-1") assert counter.global_active == 0 def test_release_same_agent_twice(self, counter): """release 后同一 Agent 可以再次 acquire""" asyncio.run(counter.acquire("agent-1")) counter.release("agent-1") assert asyncio.run(counter.can_acquire("agent-1")) is True def test_release_updates_active_agents(self, counter): """active_agents 正确追踪""" asyncio.run(counter.acquire("agent-1")) assert counter.active_agents == {"agent-1": 1} counter.release("agent-1") assert "agent-1" not in counter.active_agents def test_full_cycle(self, counter): """完整获取-释放循环""" # Fill for i in range(3): asyncio.run(counter.acquire(f"a{i}")) assert counter.global_active == 3 # Full → can't acquire assert asyncio.run(counter.can_acquire("extra")) is False # Release one counter.release("a1") assert counter.global_active == 2 # Now can acquire assert asyncio.run(counter.acquire("extra")) is True assert counter.global_active == 3 # --------------------------------------------------------------------------- # T4: 并发竞争(P1) # --------------------------------------------------------------------------- class TestConcurrency: def test_concurrent_acquire_no_exceed(self): """并发 acquire 不超过全局限制""" counter = ActiveAgentCounter(max_global=3, max_per_agent=1) results = [] async def try_acquire(agent_id): r = await counter.acquire(agent_id) results.append((agent_id, r)) if r: await asyncio.sleep(0.01) counter.release(agent_id) async def run(): tasks = [try_acquire(f"agent-{i}") for i in range(10)] await asyncio.gather(*tasks) asyncio.run(run()) succeeded = sum(1 for _, r in results if r) # 不超过 max_global 的倍数(因为 release 后可重用) assert succeeded > 0 def test_no_negative_count(self, counter): """release 不会导致负数计数""" counter.release("nonexistent") # 不应崩溃 assert counter.global_active >= 0