Files
sanguo_moziplus_v2/tests/unit/test_counter.py
T
2026-06-05 13:29:00 +08:00

302 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F10 ActiveAgentCounter 单元测试
按 test-plan-v2.6.md §F10
- T1: 全局上限(P0
- T2: per-agent 串行(P0
- T3: release 恢复(P0
- T4: 并发竞争(P1
"""
import asyncio
import pytest
pytestmark = pytest.mark.unit
from src.daemon.counter import ActiveAgentCounter
@pytest.fixture
def counter():
return ActiveAgentCounter(max_global=3, max_per_agent=1)
# ---------------------------------------------------------------------------
# T1: 全局上限
# ---------------------------------------------------------------------------
class TestGlobalLimit:
def test_acquire_within_limit(self, counter):
"""全局未满 → 成功"""
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
assert counter.global_active == 1
def test_acquire_fills_global(self, counter):
"""填满全局"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert counter.global_active == 3
def test_acquire_exceeds_global(self, counter):
"""超出全局上限 → 失败"""
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
result = asyncio.run(counter.acquire("agent-extra"))
assert result is False
def test_can_acquire_respects_limit(self, counter):
"""can_acquire 检查全局上限"""
assert asyncio.run(counter.can_acquire("agent-1")) is True
for i in range(3):
asyncio.run(counter.acquire(f"agent-{i}"))
assert asyncio.run(counter.can_acquire("agent-extra")) is False
# ---------------------------------------------------------------------------
# T2: per-agent 串行
# ---------------------------------------------------------------------------
class TestPerAgentSerial:
def test_same_agent_blocked(self, counter):
"""同一 Agent 第二个 acquire 失败"""
asyncio.run(counter.acquire("agent-1"))
result = asyncio.run(counter.acquire("agent-1"))
assert result is False
def test_different_agents_ok(self, counter):
"""不同 Agent 可以同时 acquire"""
assert asyncio.run(counter.acquire("agent-1")) is True
assert asyncio.run(counter.acquire("agent-2")) is True
def test_per_agent_limit_2(self):
"""max_per_agent=2 允许两个"""
c = ActiveAgentCounter(max_global=5, max_per_agent=2)
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is True
assert asyncio.run(c.acquire("a1")) is False # 第三个失败
# ---------------------------------------------------------------------------
# T3: release 恢复
# ---------------------------------------------------------------------------
class TestRelease:
def test_release_allows_new_acquire(self, counter):
"""release 后可以重新 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
result = asyncio.run(counter.acquire("agent-1"))
assert result is True
def test_release_updates_count(self, counter):
"""release 后计数正确"""
asyncio.run(counter.acquire("agent-1"))
assert counter.global_active == 1
counter.release("agent-1")
assert counter.global_active == 0
def test_release_same_agent_twice(self, counter):
"""release 后同一 Agent 可以再次 acquire"""
asyncio.run(counter.acquire("agent-1"))
counter.release("agent-1")
assert asyncio.run(counter.can_acquire("agent-1")) is True
def test_release_updates_active_agents(self, counter):
"""active_agents 正确追踪"""
asyncio.run(counter.acquire("agent-1"))
assert counter.active_agents == {"agent-1": 1}
counter.release("agent-1")
assert "agent-1" not in counter.active_agents
def test_full_cycle(self, counter):
"""完整获取-释放循环"""
# Fill
for i in range(3):
asyncio.run(counter.acquire(f"a{i}"))
assert counter.global_active == 3
# Full → can't acquire
assert asyncio.run(counter.can_acquire("extra")) is False
# Release one
counter.release("a1")
assert counter.global_active == 2
# Now can acquire
assert asyncio.run(counter.acquire("extra")) is True
assert counter.global_active == 3
# ---------------------------------------------------------------------------
# T4: 并发竞争(P1
# ---------------------------------------------------------------------------
class TestConcurrency:
def test_concurrent_acquire_no_exceed(self):
"""并发 acquire 不超过全局限制"""
counter = ActiveAgentCounter(max_global=3, max_per_agent=1)
results = []
async def try_acquire(agent_id):
r = await counter.acquire(agent_id)
results.append((agent_id, r))
if r:
await asyncio.sleep(0.01)
counter.release(agent_id)
async def run():
tasks = [try_acquire(f"agent-{i}") for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(run())
succeeded = sum(1 for _, r in results if r)
# 不超过 max_global 的倍数(因为 release 后可重用)
assert succeeded > 0
def test_no_negative_count(self, counter):
"""release 不会导致负数计数"""
counter.release("nonexistent") # 不应崩溃
assert counter.global_active >= 0
# ===================================================================
# === 以下为 Phase 2 补充测试 ===
# ===================================================================
import time
from unittest.mock import patch
# ---------------------------------------------------------------------------
# TestCooldown: 冷却期控制
# ---------------------------------------------------------------------------
class TestCooldown:
def test_set_cooldown_default(self):
"""默认 120 秒冷却"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1,
default_cooldown_seconds=120.0)
counter.set_cooldown("agent-1")
assert "agent-1" in counter._cooldown_until
# 验证 until 约为 now + 120
until = counter._cooldown_until["agent-1"]
assert until >= time.time() + 119
assert until <= time.time() + 121
def test_set_cooldown_custom(self):
"""自定义冷却时间"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1,
default_cooldown_seconds=120.0)
counter.set_cooldown("agent-1", seconds=30.0)
until = counter._cooldown_until["agent-1"]
assert until >= time.time() + 29
assert until <= time.time() + 31
def test_cooldown_blocks_acquire(self):
"""冷却期内 can_acquire 返回 False"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1,
default_cooldown_seconds=120.0)
counter.set_cooldown("agent-1", seconds=300.0)
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is False
def test_cooldown_expires(self):
"""冷却过期后可以 acquiremock time.time"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1,
default_cooldown_seconds=120.0)
counter.set_cooldown("agent-1", seconds=100.0)
real_until = counter._cooldown_until["agent-1"]
# Mock time.time to be past the cooldown
with patch("src.daemon.counter.time.time", return_value=real_until + 1.0):
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is True
def test_cooldown_expired_auto_cleanup(self):
"""过期后 is_cooling_down 清理条目"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1,
default_cooldown_seconds=120.0)
counter.set_cooldown("agent-1", seconds=100.0)
assert "agent-1" in counter._cooldown_until
real_until = counter._cooldown_until["agent-1"]
with patch("src.daemon.counter.time.time", return_value=real_until + 1.0):
assert counter.is_cooling_down("agent-1") is False
# 条目应被 pop 掉
assert "agent-1" not in counter._cooldown_until
# ---------------------------------------------------------------------------
# TestIsNearLimit: 全局接近上限检测
# ---------------------------------------------------------------------------
class TestIsNearLimit:
def test_not_near_limit(self):
"""空闲时不接近"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1)
assert counter.is_near_limit() is False
def test_near_limit_at_margin(self):
"""活跃数 = max-1 时为 True"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1)
# 手动设置全局活跃数
counter._global_active = 2 # max_global=3, margin=1 → 3-1=2
assert counter.is_near_limit() is True
def test_at_limit(self):
"""活跃数 = max 时为 True"""
counter = ActiveAgentCounter(max_global=3, max_per_session=1)
counter._global_active = 3
assert counter.is_near_limit() is True
def test_custom_margin(self):
"""margin=2 时提前触发"""
counter = ActiveAgentCounter(max_global=5, max_per_session=1)
counter._global_active = 3 # max=5, margin=2 → 5-2=3
assert counter.is_near_limit(margin=2) is True
# 低于阈值
counter._global_active = 2
assert counter.is_near_limit(margin=2) is False
# ---------------------------------------------------------------------------
# TestThreeLayerCheck: 三层检查
# ---------------------------------------------------------------------------
class TestThreeLayerCheck:
def test_layer1_cooldown_blocks(self):
"""第一层:冷却阻断"""
counter = ActiveAgentCounter(max_global=5, max_per_session=1,
max_concurrent_sessions=3)
counter.set_cooldown("agent-1", seconds=300.0)
# 即使全局空闲,冷却也阻断
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is False
def test_layer2_global_blocks(self):
"""第二层:全局满阻断"""
counter = ActiveAgentCounter(max_global=2, max_per_session=1,
max_concurrent_sessions=3)
# 手动填满全局
counter._global_active = 2
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is False
def test_layer3_per_agent_blocks(self):
"""第三层:per-agent 满阻断"""
counter = ActiveAgentCounter(max_global=5, max_per_session=1,
max_concurrent_sessions=2)
# 手动填满 per-agent
counter._agent_active["agent-1"] = 2
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is False
def test_all_layers_pass(self):
"""三层都通过"""
counter = ActiveAgentCounter(max_global=5, max_per_session=1,
max_concurrent_sessions=3)
result = asyncio.run(counter.can_acquire("agent-1"))
assert result is True