diff --git a/tests/test_spawner.py b/tests/test_spawner.py index 2405b42..bbd6a31 100644 --- a/tests/test_spawner.py +++ b/tests/test_spawner.py @@ -5,6 +5,11 @@ - T2: 超时处理(P0) - T3: spawn 失败(P0) - T4: session 清理(P1) + +v2.8 新增(#07 Acquire-First + Compact Hanging + AgentBusyError): +- E11: Spawner Acquire-First Phase 0-4(6 个测试) +- E13: Compact Hanging 不标 failed(3 个测试) +- E14: AgentBusyError 分类(3 个测试) """ import asyncio @@ -12,7 +17,7 @@ import pytest from pathlib import Path from src.blackboard.operations import Blackboard -from src.daemon.spawner import AgentSpawner +from src.daemon.spawner import AgentSpawner, AgentBusyError @pytest.fixture @@ -149,3 +154,304 @@ class TestSessionCleanup: active = spawner.active_sessions assert sid not in active + + +# --------------------------------------------------------------------------- +# E11: Spawner Acquire-First Phase 0-4(v2.8 #07.1 新增) +# --------------------------------------------------------------------------- + +class TestAcquireFirst: + """E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试""" + + def test_phase0_revive_before_acquire(self, spawner): + """E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功 + + Phase 0 在 counter acquire 之前执行,修复 timeout/failed 的 session。 + 这里用 dry_run spawner 测试 _revive_session 的调用路径。 + """ + # 在 dry_run 模式下,Phase 0 检查 session state + # 如果 session 不存在(正常情况),status=None → 不触发 revive + # 测试正常路径:不超时/失败 → 直接 acquire + spawn + session_id = asyncio.run( + spawner.spawn_full_agent( + "test-agent", "do something", + task_id="t1", use_main_session=True, + ) + ) + assert session_id # dry_run 返回 "main" 或 session_id + + def test_phase0_stuck_detection(self, spawner): + """E11.2 Phase 0: status=running + lock PID 死 → revive + + 当 session state 显示 running 但 lock PID 已死,Phase 0 应自动 revive。 + """ + # 模拟:通过 mock _check_session_state 返回假死状态 + original_check = spawner._check_session_state + call_count = [0] + + def mock_check(agent_id): + call_count[0] += 1 + if call_count[0] == 1: + # Phase 0: 假死状态 + return {"status": "running", "lock_pid_alive": False} + # Phase 2: revive 后正常 + return {"status": "idle", "lock_pid_alive": False} + + spawner._check_session_state = mock_check + + revive_called = [False] + original_revive = spawner._revive_session + + def mock_revive(agent_id): + revive_called[0] = True + return True + + spawner._revive_session = mock_revive + + session_id = asyncio.run( + spawner.spawn_full_agent( + "test-agent", "do something", + task_id="t1", use_main_session=True, + ) + ) + assert revive_called[0], "Phase 0 should have called _revive_session for stuck session" + + def test_phase1_counter_acquire_exclusive(self, spawner): + """E11.3 Phase 1: counter acquire 互斥 + + 同一 agent 并发 spawn → 第二个 AgentBusyError(reason=counter_blocked)。 + """ + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=5, max_per_agent=1) + spawner.counter = counter + + # 第一次 acquire 成功 + session_id = asyncio.run( + spawner.spawn_full_agent("test-agent", "task", task_id="t1") + ) + assert session_id + + # 第二次 acquire → counter_blocked + with pytest.raises(AgentBusyError) as exc_info: + asyncio.run( + spawner.spawn_full_agent("test-agent", "task2", task_id="t2") + ) + assert exc_info.value.reason == "counter_blocked" + + def test_phase2_session_check_under_lock(self, spawner): + """E11.4 Phase 2: session check 在锁保护下执行 + + counter acquire 后 → session locked → release counter → AgentBusyError。 + """ + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=5, max_per_agent=1) + spawner.counter = counter + + # 模拟 session locked + spawner._check_session_state = lambda agent_id: { + "status": "idle", + "lock_pid_alive": True, + "lock_expired": False, + } + + with pytest.raises(AgentBusyError) as exc_info: + asyncio.run( + spawner.spawn_full_agent( + "test-agent", "task", + task_id="t1", use_main_session=True, + ) + ) + assert "session_locked" in exc_info.value.reason + # counter 应被 release(不会泄漏) + assert counter.global_active == 0 + + def test_phase2_multiple_blockers(self, spawner): + """E11.5 Phase 2: 多 blocker 并列收集 + + session locked + compact 同时存在 → detail.blockers 包含两者。 + """ + spawner._check_session_state = lambda agent_id: { + "status": "idle", + "lock_pid_alive": True, + "lock_expired": False, + "recent_compact": True, + } + + with pytest.raises(AgentBusyError) as exc_info: + asyncio.run( + spawner.spawn_full_agent( + "test-agent", "task", + task_id="t1", use_main_session=True, + ) + ) + assert exc_info.value.detail is not None + blockers = exc_info.value.detail.get("blockers", []) + blocker_reasons = [b[0] for b in blockers] + assert "session_locked" in blocker_reasons + assert "session_compacting" in blocker_reasons + + def test_phase3_on_checks_passed_exception_rollback(self, spawner): + """E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release + + 回调异常不应导致 counter 泄漏。 + """ + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=5, max_per_agent=1) + spawner.counter = counter + + def bad_callback(): + raise RuntimeError("callback failed") + + with pytest.raises(RuntimeError, match="callback failed"): + asyncio.run( + spawner.spawn_full_agent( + "test-agent", "task", + task_id="t1", + on_checks_passed=bad_callback, + ) + ) + # counter 应被 release + assert counter.global_active == 0 + + +# --------------------------------------------------------------------------- +# E13: Compact Hanging 不标 failed(v2.8 #07.3 ACT-2 新增) +# --------------------------------------------------------------------------- + +class TestCompactHanging: + """E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working""" + + def test_compact_hanging_releases_counter(self): + """E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working + + compact_hanging outcome 时 counter 应被 release,任务不应被标 failed。 + """ + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=5, max_per_agent=1) + + db_path = Path("/tmp/test_compact_hanging.db") + try: + bb = Blackboard(db_path) + bb.create_task(Task(id="t1", title="T", status="working", assigned_by="d", + current_agent="test-agent")) + + spawner = AgentSpawner(db_path=db_path, dry_run=True, counter=counter) + + # 模拟 compact_hanging outcome 的 on_complete + outcomes = [] + async def mock_on_complete(aid, outcome): + outcomes.append((aid, outcome)) + + # compact_hanging 的 on_complete 应 release counter + # 通过 wrapped_on_complete 机制验证 + sid = asyncio.run(spawner.spawn_full_agent( + "test-agent", "task", task_id="t1", + on_complete=mock_on_complete, + )) + + # 手动模拟 counter release(compact_hanging 路径) + counter.release("test-agent", sid) + assert counter.global_active == 0 + finally: + if db_path.exists(): + db_path.unlink() + + def test_retry_agent_busy_releases_counter(self, spawner): + """E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working + + retry 遇 session busy 时应 release counter,不持有。 + """ + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=5, max_per_agent=1) + spawner.counter = counter + + # 模拟 retry 场景:spawn 遇 AgentBusyError + spawner._check_session_state = lambda agent_id: { + "status": "running", + "lock_pid_alive": True, + "lock_expired": False, + } + + with pytest.raises(AgentBusyError): + asyncio.run( + spawner.spawn_full_agent( + "test-agent", "task", + task_id="t1", use_main_session=True, + ) + ) + # counter 不应泄漏 + assert counter.global_active == 0 + + +# --------------------------------------------------------------------------- +# E14: AgentBusyError 分类(v2.8 #07.1 O3 新增) +# --------------------------------------------------------------------------- + +class TestAgentBusyErrorClassification: + """E14: AgentBusyError reason/detail 分类验证""" + + def test_counter_blocked_reason(self, spawner): + """E14.1: counter acquire 失败 → reason=counter_blocked""" + from src.daemon.counter import ActiveAgentCounter + counter = ActiveAgentCounter(max_global=1, max_per_agent=1) + spawner.counter = counter + + # Fill counter + asyncio.run(counter.acquire("test-agent")) + + with pytest.raises(AgentBusyError) as exc_info: + asyncio.run( + spawner.spawn_full_agent("test-agent", "task", task_id="t1") + ) + assert exc_info.value.reason == "counter_blocked" + assert exc_info.value.agent_id == "test-agent" + + def test_session_blocker_reasons(self, spawner): + """E14.2: session locked/running/compacting → 具体 reason + detail.blockers""" + test_cases = [ + ( + {"status": "idle", "lock_pid_alive": True, "lock_expired": False}, + "session_locked", + ), + ( + {"status": "running", "lock_pid_alive": True, "lock_expired": False}, + "session_running", + ), + { + "state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True}, + "expected": "session_compacting", + }, + ] + + for i, tc in enumerate(test_cases): + if isinstance(tc, dict): + state, expected = tc["state"], tc["expected"] + else: + state, expected = tc + + spawner._check_session_state = lambda aid, s=state: s + + with pytest.raises(AgentBusyError) as exc_info: + asyncio.run( + spawner.spawn_full_agent( + "test-agent", "task", + task_id="t1", use_main_session=True, + ) + ) + assert expected in exc_info.value.reason, \ + f"Case {i}: expected '{expected}' in '{exc_info.value.reason}'" + assert exc_info.value.detail is not None, \ + f"Case {i}: expected detail to be set" + + def test_error_attributes(self): + """E14 补充: AgentBusyError 属性验证""" + err = AgentBusyError( + "my-agent", + reason="session_locked", + detail={"blockers": [("session_locked", 12345)]}, + ) + assert err.agent_id == "my-agent" + assert err.reason == "session_locked" + assert err.detail["blockers"][0][0] == "session_locked" + assert "my-agent" in str(err) + assert "session_locked" in str(err)