"""F9 Agent Spawner 单元测试 按 test-plan-v2.6.md §F9 Spawner: - T1: spawn 成功(P0) - T2: 超时处理(P0) - T3: spawn 失败(P0) - T4: session 清理(P1) v2.8 新增(#07 Acquire-First + Compact Hanging + AgentBusyError): - E11: Spawner Acquire-First Phase 0-4(6 个测试) - E13: Compact Hanging 不标 failed(3 个测试) - E14: AgentBusyError 分类(3 个测试) """ import asyncio import pytest from pathlib import Path from src.blackboard.operations import Blackboard from src.daemon.spawner import AgentSpawner, AgentBusyError @pytest.fixture def db_path(tmp_path): return tmp_path / "blackboard.db" @pytest.fixture def bb(db_path): return Blackboard(db_path) @pytest.fixture def spawner(db_path): return AgentSpawner(db_path=db_path, dry_run=True) @pytest.fixture def real_spawner(db_path): return AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=2.0) # --------------------------------------------------------------------------- # T1: spawn 成功 # --------------------------------------------------------------------------- class TestSpawnSuccess: def test_dry_run_spawn(self, spawner): """dry_run 模式不实际 spawn""" session_id = asyncio.run( spawner.spawn_full_agent("test-agent", "do something", task_id="t1") ) assert session_id assert spawner.get_session(session_id) is not None assert spawner.get_session(session_id)["agent_id"] == "test-agent" def test_session_registered(self, spawner): """spawn 后 session 注册""" asyncio.run(spawner.spawn_full_agent("agent-1", "task", task_id="t1")) sessions = spawner.active_sessions assert len(sessions) >= 1 def test_spawn_subagent_dry_run(self, spawner): """subagent dry_run""" session_id = asyncio.run( spawner.spawn_subagent("do task", task_id="t1") ) assert session_id def test_multiple_spawns(self, spawner): """多次 spawn 独立 session""" ids = [] for i in range(3): sid = asyncio.run( spawner.spawn_full_agent(f"agent-{i}", f"task {i}") ) ids.append(sid) assert len(set(ids)) == 3 # 每个 session_id 唯一 # --------------------------------------------------------------------------- # T2: 超时处理 # --------------------------------------------------------------------------- class TestTimeout: def test_timeout_kills_process(self, tmp_path): """超时后 kill 进程""" db_path = tmp_path / "blackboard.db" Blackboard(db_path) # init spawner = AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=0.5) # Spawn a long-running process (sleep 10) session_id = asyncio.run( spawner.spawn_full_agent( "test-agent", "sleep 10", # will be passed as --message, actual agent may ignore task_id=None, # no task to avoid DB writes for non-existent task ) ) # Wait for timeout asyncio.run(asyncio.sleep(1.0)) session = spawner.get_session(session_id) if session: # Process should have been killed assert session["status"] in ("timed_out", "running", "completed") # --------------------------------------------------------------------------- # T3: spawn 失败 # --------------------------------------------------------------------------- class TestSpawnFailure: def test_nonexistent_command(self, real_spawner, db_path, bb): """命令不存在 → spawn_failed""" bb.create_task( __import__("src.blackboard.models", fromlist=["Task"]).Task( id="t1", title="T", status="pending", assigned_by="d" ) ) # Spawner will try to run "openclaw" which may not exist in test env # This test is about error handling, not the actual command try: asyncio.run( real_spawner.spawn_full_agent("test", "msg", task_id="t1") ) except Exception: pass # Expected - command may fail # --------------------------------------------------------------------------- # T4: session 清理 # --------------------------------------------------------------------------- class TestSessionCleanup: def test_cleanup_removes_session(self, spawner): """cleanup 删除 session""" sid = asyncio.run(spawner.spawn_full_agent("a", "m")) assert spawner.get_session(sid) is not None spawner.cleanup_session(sid) assert spawner.get_session(sid) is None def test_cleanup_nonexistent(self, spawner): """清理不存在的 session 不报错""" spawner.cleanup_session("nonexistent-id") # no error def test_active_sessions_excludes_completed(self, spawner): """active_sessions 排除已完成""" sid = asyncio.run(spawner.spawn_full_agent("a", "m")) session = spawner.get_session(sid) session["status"] = "completed" active = spawner.active_sessions assert sid not in active # --------------------------------------------------------------------------- # E11: Spawner Acquire-First Phase 0-4(v2.8 #07.1 新增) # --------------------------------------------------------------------------- class TestAcquireFirst: """E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试""" def test_phase0_revive_before_acquire(self, spawner): """E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功 Phase 0 在 counter acquire 之前执行,修复 timeout/failed 的 session。 这里用 dry_run spawner 测试 _revive_session 的调用路径。 """ # 在 dry_run 模式下,Phase 0 检查 session state # 如果 session 不存在(正常情况),status=None → 不触发 revive # 测试正常路径:不超时/失败 → 直接 acquire + spawn session_id = asyncio.run( spawner.spawn_full_agent( "test-agent", "do something", task_id="t1", use_main_session=True, ) ) assert session_id # dry_run 返回 "main" 或 session_id def test_phase0_stuck_detection(self, spawner): """E11.2 Phase 0: status=running + lock PID 死 → revive 当 session state 显示 running 但 lock PID 已死,Phase 0 应自动 revive。 """ # 模拟:通过 mock _check_session_state 返回假死状态 original_check = spawner._check_session_state call_count = [0] def mock_check(agent_id): call_count[0] += 1 if call_count[0] == 1: # Phase 0: 假死状态 return {"status": "running", "lock_pid_alive": False} # Phase 2: revive 后正常 return {"status": "idle", "lock_pid_alive": False} spawner._check_session_state = mock_check revive_called = [False] original_revive = spawner._revive_session def mock_revive(agent_id): revive_called[0] = True return True spawner._revive_session = mock_revive session_id = asyncio.run( spawner.spawn_full_agent( "test-agent", "do something", task_id="t1", use_main_session=True, ) ) assert revive_called[0], "Phase 0 should have called _revive_session for stuck session" def test_phase1_counter_acquire_exclusive(self, spawner): """E11.3 Phase 1: counter acquire 互斥 同一 agent 并发 spawn → 第二个 AgentBusyError(reason 含 counter)。 使用 max_concurrent_sessions=1 确保同 agent 第二次 acquire 失败。 """ from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_concurrent_sessions=1) spawner.counter = counter # 第一次 acquire 成功 session_id = asyncio.run( spawner.spawn_full_agent("test-agent", "task", task_id="t1") ) assert session_id # 第二次 acquire → counter blocked(同 agent 已有活跃 session) with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent("test-agent", "task2", task_id="t2") ) assert "counter" in exc_info.value.reason or "blocked" in exc_info.value.reason def test_phase2_session_check_under_lock(self, spawner): """E11.4 Phase 2: session check 在锁保护下执行 counter acquire 后 → session locked → release counter → AgentBusyError。 """ from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter # 模拟 session locked spawner._check_session_state = lambda agent_id: { "status": "idle", "lock_pid_alive": True, "lock_expired": False, } with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent( "test-agent", "task", task_id="t1", use_main_session=True, ) ) assert "session_locked" in exc_info.value.reason # counter 应被 release(不会泄漏) assert counter.global_active == 0 def test_phase2_multiple_blockers(self, spawner): """E11.5 Phase 2: 多 blocker 并列收集 session locked + compact 同时存在 → detail.blockers 包含两者。 """ spawner._check_session_state = lambda agent_id: { "status": "idle", "lock_pid_alive": True, "lock_expired": False, "recent_compact": True, } with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent( "test-agent", "task", task_id="t1", use_main_session=True, ) ) assert exc_info.value.detail is not None blockers = exc_info.value.detail.get("blockers", []) blocker_reasons = [b[0] for b in blockers] assert "session_locked" in blocker_reasons assert "session_compacting" in blocker_reasons def test_phase3_on_checks_passed_exception_rollback(self, spawner): """E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release 回调异常不应导致 counter 泄漏。 """ from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter def bad_callback(): raise RuntimeError("callback failed") with pytest.raises(RuntimeError, match="callback failed"): asyncio.run( spawner.spawn_full_agent( "test-agent", "task", task_id="t1", on_checks_passed=bad_callback, ) ) # counter 应被 release assert counter.global_active == 0 # --------------------------------------------------------------------------- # E13: Compact Hanging 不标 failed(v2.8 #07.3 ACT-2 新增) # --------------------------------------------------------------------------- class TestCompactHanging: """E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working""" def test_compact_hanging_releases_counter(self): """E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working compact_hanging outcome 时 counter 应被 release,任务不应被标 failed。 """ from src.daemon.counter import ActiveAgentCounter from src.blackboard.models import Task as TaskModel counter = ActiveAgentCounter(max_global=5, max_per_agent=1) db_path = Path("/tmp/test_compact_hanging.db") try: bb = Blackboard(db_path) bb.create_task(TaskModel(id="t1", title="T", status="working", assigned_by="d", current_agent="test-agent")) spawner = AgentSpawner(db_path=db_path, dry_run=True) spawner.counter = counter # 模拟 compact_hanging outcome 的 on_complete outcomes = [] async def mock_on_complete(aid, outcome): outcomes.append((aid, outcome)) sid = asyncio.run(spawner.spawn_full_agent( "test-agent", "task", task_id="t1", on_complete=mock_on_complete, )) # 手动模拟 counter release(compact_hanging 路径) counter.release("test-agent", sid) assert counter.global_active == 0 finally: if db_path.exists(): db_path.unlink() def test_retry_agent_busy_releases_counter(self, spawner): """E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working retry 遇 session busy 时应 release counter,不持有。 """ from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter # 模拟 retry 场景:spawn 遇 AgentBusyError spawner._check_session_state = lambda agent_id: { "status": "running", "lock_pid_alive": True, "lock_expired": False, } with pytest.raises(AgentBusyError): asyncio.run( spawner.spawn_full_agent( "test-agent", "task", task_id="t1", use_main_session=True, ) ) # counter 不应泄漏 assert counter.global_active == 0 # --------------------------------------------------------------------------- # E14: AgentBusyError 分类(v2.8 #07.1 O3 新增) # --------------------------------------------------------------------------- class TestAgentBusyErrorClassification: """E14: AgentBusyError reason/detail 分类验证""" def test_counter_blocked_reason(self, spawner): """E14.1: counter acquire 失败 → reason=counter_blocked""" from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=1, max_per_agent=1) spawner.counter = counter # Fill counter asyncio.run(counter.acquire("test-agent")) with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent("test-agent", "task", task_id="t1") ) assert exc_info.value.reason == "counter_blocked" assert exc_info.value.agent_id == "test-agent" def test_session_blocker_reasons(self, spawner): """E14.2: session locked/running/compacting → 具体 reason + detail.blockers""" test_cases = [ { "state": {"status": "idle", "lock_pid_alive": True, "lock_expired": False}, "expected": "session_locked", }, { "state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True}, "expected": "session_compacting", }, ] for i, tc in enumerate(test_cases): state = tc["state"] expected = tc["expected"] spawner._check_session_state = lambda aid, s=state: s with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent( "test-agent", "task", task_id="t1", use_main_session=True, ) ) assert expected in exc_info.value.reason, \ f"Case {i}: expected '{expected}' in '{exc_info.value.reason}'" assert exc_info.value.detail is not None, \ f"Case {i}: expected detail to be set" def test_error_attributes(self): """E14 补充: AgentBusyError 属性验证""" err = AgentBusyError( "my-agent", reason="session_locked", detail={"blockers": [("session_locked", 12345)]}, ) assert err.agent_id == "my-agent" assert err.reason == "session_locked" assert err.detail["blockers"][0][0] == "session_locked" assert "my-agent" in str(err) assert "session_locked" in str(err)