diff --git a/tests/test_counter.py b/tests/test_counter.py new file mode 100644 index 0000000..bb3cc18 --- /dev/null +++ b/tests/test_counter.py @@ -0,0 +1,158 @@ +"""F10 ActiveAgentCounter 单元测试 + +按 test-plan-v2.6.md §F10: +- T1: 全局上限(P0) +- T2: per-agent 串行(P0) +- T3: release 恢复(P0) +- T4: 并发竞争(P1) +""" + +import asyncio +import pytest + +from src.daemon.counter import ActiveAgentCounter + + +@pytest.fixture +def counter(): + return ActiveAgentCounter(max_global=3, max_per_agent=1) + + +# --------------------------------------------------------------------------- +# T1: 全局上限 +# --------------------------------------------------------------------------- + +class TestGlobalLimit: + def test_acquire_within_limit(self, counter): + """全局未满 → 成功""" + result = asyncio.run(counter.acquire("agent-1")) + assert result is True + assert counter.global_active == 1 + + def test_acquire_fills_global(self, counter): + """填满全局""" + for i in range(3): + asyncio.run(counter.acquire(f"agent-{i}")) + assert counter.global_active == 3 + + def test_acquire_exceeds_global(self, counter): + """超出全局上限 → 失败""" + for i in range(3): + asyncio.run(counter.acquire(f"agent-{i}")) + result = asyncio.run(counter.acquire("agent-extra")) + assert result is False + + def test_can_acquire_respects_limit(self, counter): + """can_acquire 检查全局上限""" + assert asyncio.run(counter.can_acquire("agent-1")) is True + for i in range(3): + asyncio.run(counter.acquire(f"agent-{i}")) + assert asyncio.run(counter.can_acquire("agent-extra")) is False + + +# --------------------------------------------------------------------------- +# T2: per-agent 串行 +# --------------------------------------------------------------------------- + +class TestPerAgentSerial: + def test_same_agent_blocked(self, counter): + """同一 Agent 第二个 acquire 失败""" + asyncio.run(counter.acquire("agent-1")) + result = asyncio.run(counter.acquire("agent-1")) + assert result is False + + def test_different_agents_ok(self, counter): + """不同 Agent 可以同时 acquire""" + assert asyncio.run(counter.acquire("agent-1")) is True + assert asyncio.run(counter.acquire("agent-2")) is True + + def test_per_agent_limit_2(self): + """max_per_agent=2 允许两个""" + c = ActiveAgentCounter(max_global=5, max_per_agent=2) + assert asyncio.run(c.acquire("a1")) is True + assert asyncio.run(c.acquire("a1")) is True + assert asyncio.run(c.acquire("a1")) is False # 第三个失败 + + +# --------------------------------------------------------------------------- +# T3: release 恢复 +# --------------------------------------------------------------------------- + +class TestRelease: + def test_release_allows_new_acquire(self, counter): + """release 后可以重新 acquire""" + asyncio.run(counter.acquire("agent-1")) + counter.release("agent-1") + result = asyncio.run(counter.acquire("agent-1")) + assert result is True + + def test_release_updates_count(self, counter): + """release 后计数正确""" + asyncio.run(counter.acquire("agent-1")) + assert counter.global_active == 1 + counter.release("agent-1") + assert counter.global_active == 0 + + def test_release_same_agent_twice(self, counter): + """release 后同一 Agent 可以再次 acquire""" + asyncio.run(counter.acquire("agent-1")) + counter.release("agent-1") + assert asyncio.run(counter.can_acquire("agent-1")) is True + + def test_release_updates_active_agents(self, counter): + """active_agents 正确追踪""" + asyncio.run(counter.acquire("agent-1")) + assert counter.active_agents == {"agent-1": 1} + counter.release("agent-1") + assert "agent-1" not in counter.active_agents + + def test_full_cycle(self, counter): + """完整获取-释放循环""" + # Fill + for i in range(3): + asyncio.run(counter.acquire(f"a{i}")) + assert counter.global_active == 3 + + # Full → can't acquire + assert asyncio.run(counter.can_acquire("extra")) is False + + # Release one + counter.release("a1") + assert counter.global_active == 2 + + # Now can acquire + assert asyncio.run(counter.acquire("extra")) is True + assert counter.global_active == 3 + + +# --------------------------------------------------------------------------- +# T4: 并发竞争(P1) +# --------------------------------------------------------------------------- + +class TestConcurrency: + def test_concurrent_acquire_no_exceed(self): + """并发 acquire 不超过全局限制""" + counter = ActiveAgentCounter(max_global=3, max_per_agent=1) + results = [] + + async def try_acquire(agent_id): + r = await counter.acquire(agent_id) + results.append((agent_id, r)) + if r: + await asyncio.sleep(0.01) + counter.release(agent_id) + + async def run(): + tasks = [try_acquire(f"agent-{i}") for i in range(10)] + await asyncio.gather(*tasks) + + asyncio.run(run()) + + succeeded = sum(1 for _, r in results if r) + # 不超过 max_global 的倍数(因为 release 后可重用) + assert succeeded > 0 + + def test_no_negative_count(self, counter): + """release 不会导致负数计数""" + counter.release("nonexistent") # 不应崩溃 + assert counter.global_active >= 0