auto-sync: 2026-05-17 06:02:00

This commit is contained in:
cfdaily
2026-05-17 06:02:00 +08:00
parent 0cb78fd332
commit d5d14af430
+158
View File
@@ -0,0 +1,158 @@
"""F10 ActiveAgentCounter 单元测试
按 test-plan-v2.6.md §F10
- T1: 全局上限(P0
- T2: per-agent 串行(P0
- T3: release 恢复(P0
- T4: 并发竞争(P1
"""
import asyncio
import pytest
from src.daemon.counter import ActiveAgentCounter
@pytest.fixture
def counter():
return ActiveAgentCounter(max_global=3, max_per_agent=1)
# ---------------------------------------------------------------------------
# T1: 全局上限
# ---------------------------------------------------------------------------
class TestGlobalLimit:
def test_acquire_within_limit(self, counter):
"""全局未满 → 成功"""
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
assert counter.global_active == 1
def test_acquire_fills_global(self, counter):
"""填满全局"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert counter.global_active == 3
def test_acquire_exceeds_global(self, counter):
"""超出全局上限 → 失败"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
result = asyncio.run(counter.acquire("agent-extra"))
assert result is False
def test_can_acquire_respects_limit(self, counter):
"""can_acquire 检查全局上限"""
assert asyncio.run(counter.can_acquire("agent-1")) is True
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert asyncio.run(counter.can_acquire("agent-extra")) is False
# ---------------------------------------------------------------------------
# T2: per-agent 串行
# ---------------------------------------------------------------------------
class TestPerAgentSerial:
def test_same_agent_blocked(self, counter):
"""同一 Agent 第二个 acquire 失败"""
asyncio.run(counter.acquire("agent-1"))
result = asyncio.run(counter.acquire("agent-1"))
assert result is False
def test_different_agents_ok(self, counter):
"""不同 Agent 可以同时 acquire"""
assert asyncio.run(counter.acquire("agent-1")) is True
assert asyncio.run(counter.acquire("agent-2")) is True
def test_per_agent_limit_2(self):
"""max_per_agent=2 允许两个"""
c = ActiveAgentCounter(max_global=5, max_per_agent=2)
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is False # 第三个失败
# ---------------------------------------------------------------------------
# T3: release 恢复
# ---------------------------------------------------------------------------
class TestRelease:
def test_release_allows_new_acquire(self, counter):
"""release 后可以重新 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
def test_release_updates_count(self, counter):
"""release 后计数正确"""
asyncio.run(counter.acquire("agent-1"))
assert counter.global_active == 1
counter.release("agent-1")
assert counter.global_active == 0
def test_release_same_agent_twice(self, counter):
"""release 后同一 Agent 可以再次 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
assert asyncio.run(counter.can_acquire("agent-1")) is True
def test_release_updates_active_agents(self, counter):
"""active_agents 正确追踪"""
asyncio.run(counter.acquire("agent-1"))
assert counter.active_agents == {"agent-1": 1}
counter.release("agent-1")
assert "agent-1" not in counter.active_agents
def test_full_cycle(self, counter):
"""完整获取-释放循环"""
# Fill
for i in range(3):
asyncio.run(counter.acquire(f"a{i}"))
assert counter.global_active == 3
# Full → can't acquire
assert asyncio.run(counter.can_acquire("extra")) is False
# Release one
counter.release("a1")
assert counter.global_active == 2
# Now can acquire
assert asyncio.run(counter.acquire("extra")) is True
assert counter.global_active == 3
# ---------------------------------------------------------------------------
# T4: 并发竞争(P1
# ---------------------------------------------------------------------------
class TestConcurrency:
def test_concurrent_acquire_no_exceed(self):
"""并发 acquire 不超过全局限制"""
counter = ActiveAgentCounter(max_global=3, max_per_agent=1)
results = []
async def try_acquire(agent_id):
r = await counter.acquire(agent_id)
results.append((agent_id, r))
if r:
await asyncio.sleep(0.01)
counter.release(agent_id)
async def run():
tasks = [try_acquire(f"agent-{i}") for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(run())
succeeded = sum(1 for _, r in results if r)
# 不超过 max_global 的倍数(因为 release 后可重用)
assert succeeded > 0
def test_no_negative_count(self, counter):
"""release 不会导致负数计数"""
counter.release("nonexistent") # 不应崩溃
assert counter.global_active >= 0