Files
sanguo_moziplus_v2/tests/test_counter.py
T
2026-05-17 06:02:00 +08:00

159 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F10 ActiveAgentCounter 单元测试
按 test-plan-v2.6.md §F10
- T1: 全局上限(P0
- T2: per-agent 串行(P0
- T3: release 恢复(P0
- T4: 并发竞争(P1
"""
import asyncio
import pytest
from src.daemon.counter import ActiveAgentCounter
@pytest.fixture
def counter():
return ActiveAgentCounter(max_global=3, max_per_agent=1)
# ---------------------------------------------------------------------------
# T1: 全局上限
# ---------------------------------------------------------------------------
class TestGlobalLimit:
def test_acquire_within_limit(self, counter):
"""全局未满 → 成功"""
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
assert counter.global_active == 1
def test_acquire_fills_global(self, counter):
"""填满全局"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert counter.global_active == 3
def test_acquire_exceeds_global(self, counter):
"""超出全局上限 → 失败"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
result = asyncio.run(counter.acquire("agent-extra"))
assert result is False
def test_can_acquire_respects_limit(self, counter):
"""can_acquire 检查全局上限"""
assert asyncio.run(counter.can_acquire("agent-1")) is True
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert asyncio.run(counter.can_acquire("agent-extra")) is False
# ---------------------------------------------------------------------------
# T2: per-agent 串行
# ---------------------------------------------------------------------------
class TestPerAgentSerial:
def test_same_agent_blocked(self, counter):
"""同一 Agent 第二个 acquire 失败"""
asyncio.run(counter.acquire("agent-1"))
result = asyncio.run(counter.acquire("agent-1"))
assert result is False
def test_different_agents_ok(self, counter):
"""不同 Agent 可以同时 acquire"""
assert asyncio.run(counter.acquire("agent-1")) is True
assert asyncio.run(counter.acquire("agent-2")) is True
def test_per_agent_limit_2(self):
"""max_per_agent=2 允许两个"""
c = ActiveAgentCounter(max_global=5, max_per_agent=2)
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is False # 第三个失败
# ---------------------------------------------------------------------------
# T3: release 恢复
# ---------------------------------------------------------------------------
class TestRelease:
def test_release_allows_new_acquire(self, counter):
"""release 后可以重新 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
def test_release_updates_count(self, counter):
"""release 后计数正确"""
asyncio.run(counter.acquire("agent-1"))
assert counter.global_active == 1
counter.release("agent-1")
assert counter.global_active == 0
def test_release_same_agent_twice(self, counter):
"""release 后同一 Agent 可以再次 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
assert asyncio.run(counter.can_acquire("agent-1")) is True
def test_release_updates_active_agents(self, counter):
"""active_agents 正确追踪"""
asyncio.run(counter.acquire("agent-1"))
assert counter.active_agents == {"agent-1": 1}
counter.release("agent-1")
assert "agent-1" not in counter.active_agents
def test_full_cycle(self, counter):
"""完整获取-释放循环"""
# Fill
for i in range(3):
asyncio.run(counter.acquire(f"a{i}"))
assert counter.global_active == 3
# Full → can't acquire
assert asyncio.run(counter.can_acquire("extra")) is False
# Release one
counter.release("a1")
assert counter.global_active == 2
# Now can acquire
assert asyncio.run(counter.acquire("extra")) is True
assert counter.global_active == 3
# ---------------------------------------------------------------------------
# T4: 并发竞争(P1
# ---------------------------------------------------------------------------
class TestConcurrency:
def test_concurrent_acquire_no_exceed(self):
"""并发 acquire 不超过全局限制"""
counter = ActiveAgentCounter(max_global=3, max_per_agent=1)
results = []
async def try_acquire(agent_id):
r = await counter.acquire(agent_id)
results.append((agent_id, r))
if r:
await asyncio.sleep(0.01)
counter.release(agent_id)
async def run():
tasks = [try_acquire(f"agent-{i}") for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(run())
succeeded = sum(1 for _, r in results if r)
# 不超过 max_global 的倍数(因为 release 后可重用)
assert succeeded > 0
def test_no_negative_count(self, counter):
"""release 不会导致负数计数"""
counter.release("nonexistent") # 不应崩溃
assert counter.global_active >= 0