diff --git a/tests/unit/test_counter.py b/tests/unit/test_counter.py index 93c17a9..771474f 100644 --- a/tests/unit/test_counter.py +++ b/tests/unit/test_counter.py @@ -158,3 +158,144 @@ class TestConcurrency: """release 不会导致负数计数""" counter.release("nonexistent") # 不应崩溃 assert counter.global_active >= 0 + + +# =================================================================== +# === 以下为 Phase 2 补充测试 === +# =================================================================== + +import time +from unittest.mock import patch + + +# --------------------------------------------------------------------------- +# TestCooldown: 冷却期控制 +# --------------------------------------------------------------------------- + +class TestCooldown: + def test_set_cooldown_default(self): + """默认 120 秒冷却""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1, + default_cooldown_seconds=120.0) + counter.set_cooldown("agent-1") + assert "agent-1" in counter._cooldown_until + # 验证 until 约为 now + 120 + until = counter._cooldown_until["agent-1"] + assert until >= time.time() + 119 + assert until <= time.time() + 121 + + def test_set_cooldown_custom(self): + """自定义冷却时间""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1, + default_cooldown_seconds=120.0) + counter.set_cooldown("agent-1", seconds=30.0) + until = counter._cooldown_until["agent-1"] + assert until >= time.time() + 29 + assert until <= time.time() + 31 + + def test_cooldown_blocks_acquire(self): + """冷却期内 can_acquire 返回 False""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1, + default_cooldown_seconds=120.0) + counter.set_cooldown("agent-1", seconds=300.0) + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is False + + def test_cooldown_expires(self): + """冷却过期后可以 acquire(mock time.time)""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1, + default_cooldown_seconds=120.0) + counter.set_cooldown("agent-1", seconds=100.0) + real_until = counter._cooldown_until["agent-1"] + + # Mock time.time to be past the cooldown + with patch("src.daemon.counter.time.time", return_value=real_until + 1.0): + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is True + + def test_cooldown_expired_auto_cleanup(self): + """过期后 is_cooling_down 清理条目""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1, + default_cooldown_seconds=120.0) + counter.set_cooldown("agent-1", seconds=100.0) + assert "agent-1" in counter._cooldown_until + + real_until = counter._cooldown_until["agent-1"] + with patch("src.daemon.counter.time.time", return_value=real_until + 1.0): + assert counter.is_cooling_down("agent-1") is False + + # 条目应被 pop 掉 + assert "agent-1" not in counter._cooldown_until + + +# --------------------------------------------------------------------------- +# TestIsNearLimit: 全局接近上限检测 +# --------------------------------------------------------------------------- + +class TestIsNearLimit: + def test_not_near_limit(self): + """空闲时不接近""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1) + assert counter.is_near_limit() is False + + def test_near_limit_at_margin(self): + """活跃数 = max-1 时为 True""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1) + # 手动设置全局活跃数 + counter._global_active = 2 # max_global=3, margin=1 → 3-1=2 + assert counter.is_near_limit() is True + + def test_at_limit(self): + """活跃数 = max 时为 True""" + counter = ActiveAgentCounter(max_global=3, max_per_session=1) + counter._global_active = 3 + assert counter.is_near_limit() is True + + def test_custom_margin(self): + """margin=2 时提前触发""" + counter = ActiveAgentCounter(max_global=5, max_per_session=1) + counter._global_active = 3 # max=5, margin=2 → 5-2=3 + assert counter.is_near_limit(margin=2) is True + # 低于阈值 + counter._global_active = 2 + assert counter.is_near_limit(margin=2) is False + + +# --------------------------------------------------------------------------- +# TestThreeLayerCheck: 三层检查 +# --------------------------------------------------------------------------- + +class TestThreeLayerCheck: + def test_layer1_cooldown_blocks(self): + """第一层:冷却阻断""" + counter = ActiveAgentCounter(max_global=5, max_per_session=1, + max_concurrent_sessions=3) + counter.set_cooldown("agent-1", seconds=300.0) + # 即使全局空闲,冷却也阻断 + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is False + + def test_layer2_global_blocks(self): + """第二层:全局满阻断""" + counter = ActiveAgentCounter(max_global=2, max_per_session=1, + max_concurrent_sessions=3) + # 手动填满全局 + counter._global_active = 2 + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is False + + def test_layer3_per_agent_blocks(self): + """第三层:per-agent 满阻断""" + counter = ActiveAgentCounter(max_global=5, max_per_session=1, + max_concurrent_sessions=2) + # 手动填满 per-agent + counter._agent_active["agent-1"] = 2 + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is False + + def test_all_layers_pass(self): + """三层都通过""" + counter = ActiveAgentCounter(max_global=5, max_per_session=1, + max_concurrent_sessions=3) + result = asyncio.run(counter.can_acquire("agent-1")) + assert result is True