"""F9 Agent 调度器单元测试 按 test-plan-v2.6.md §F9: - T1: 三级决策树(P0) - T2: 调度不阻塞(P0) - T3: 队列满拒绝(P0) - T4: 任务优先级排序(P1) v2.8 新增(#07 AgentBusyError 分类): - E14.3: Dispatcher 错误区分(1 个测试) """ import asyncio import json import pytest from pathlib import Path from typing import Any, Dict, Optional from unittest.mock import AsyncMock, MagicMock from src.blackboard.models import Task from src.daemon.dispatcher import Dispatcher, DispatchLevel from src.daemon.spawner import AgentBusyError # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def dispatcher(): return Dispatcher( registered_agents=["zhangfei-dev", "guanyu-dev", "simayi-challenger"], ) @pytest.fixture def task_pending(): return Task( id="t1", title="Write code", status="pending", assigned_by="daemon", task_type="coding", assignee="zhangfei-dev", ) @pytest.fixture def task_no_assignee(): return Task( id="t2", title="Generic task", status="pending", assigned_by="daemon", ) @pytest.fixture def task_unknown_agent(): return Task( id="t3", title="Unknown agent task", status="pending", assigned_by="daemon", assignee="nonexistent-agent", ) # --------------------------------------------------------------------------- # T1: 三级决策树 # --------------------------------------------------------------------------- class TestDecisionTree: def test_local_action(self, dispatcher, task_pending): """L1 本地执行""" decision = dispatcher.decide(task_pending, "L1_guardrail") assert decision["level"] == DispatchLevel.LOCAL assert decision["agent_id"] == "daemon" def test_local_format_check(self, dispatcher, task_pending): """format_check → 本地""" decision = dispatcher.decide(task_pending, "format_check") assert decision["level"] == DispatchLevel.LOCAL def test_local_file_exists(self, dispatcher, task_pending): """file_exists_check → 本地""" decision = dispatcher.decide(task_pending, "file_exists_check") assert decision["level"] == DispatchLevel.LOCAL def test_registered_agent(self, dispatcher, task_pending): """注册 Agent → Full Agent""" decision = dispatcher.decide(task_pending) assert decision["level"] == DispatchLevel.FULL_AGENT assert decision["agent_id"] == "zhangfei-dev" def test_adjudication_new_session(self, dispatcher, task_pending): """adjudication → new_session=True""" decision = dispatcher.decide(task_pending, "adjudication") assert decision["level"] == DispatchLevel.FULL_AGENT assert decision["new_session"] is True def test_no_assignee_subagent(self, dispatcher, task_no_assignee): """无 assignee → 能力映射 fallback 庞统(Full Agent)""" decision = dispatcher.decide(task_no_assignee) assert decision["level"] == DispatchLevel.FULL_AGENT assert decision["agent_id"] == "pangtong-fujunshi" # fallback def test_unknown_agent_escalate(self, dispatcher, task_unknown_agent): """未注册 Agent → 升级庞统""" decision = dispatcher.decide(task_unknown_agent) assert decision["level"] == DispatchLevel.ESCALATE assert decision["agent_id"] == "pangtong-fujunshi" assert decision["new_session"] is True def test_different_registered_agents(self, dispatcher): """不同注册 Agent 正确路由""" for agent_id in ["zhangfei-dev", "guanyu-dev", "simayi-challenger"]: task = Task(id="t", title="T", status="pending", assigned_by="d", assignee=agent_id) decision = dispatcher.decide(task) assert decision["level"] == DispatchLevel.FULL_AGENT assert decision["agent_id"] == agent_id def test_unknown_action_with_registered_agent(self, dispatcher, task_pending): """未知 action_type 但有注册 assignee → Full Agent""" decision = dispatcher.decide(task_pending, "some_unknown_action") assert decision["level"] == DispatchLevel.FULL_AGENT # --------------------------------------------------------------------------- # T2: 调度执行(带 mock) # --------------------------------------------------------------------------- class TestDispatch: def test_dispatch_local(self, dispatcher, task_pending): """本地调度不需要 spawner""" result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail")) assert result["status"] == "dispatched" assert result["level"] == "local" def test_dispatch_full_agent(self, dispatcher, task_pending): """Full Agent 调度""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="session-123") dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "dispatched" assert result["session_id"] == "session-123" assert result["level"] == "full" def test_dispatch_no_spawner_returns_error(self, dispatcher, task_pending): """无 spawner → error""" result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "error" assert "No spawner" in result["reason"] def test_dispatch_subagent(self, dispatcher, task_no_assignee): """无 assignee → 能力映射 fallback 庞统(Full Agent)""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="auto-123") dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_no_assignee)) assert result["status"] == "dispatched" assert result["level"] == "full" assert result["agent_id"] == "pangtong-fujunshi" def test_dispatch_escalate(self, dispatcher, task_unknown_agent): """升级调度""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="esc-123") dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_unknown_agent)) assert result["status"] == "dispatched" assert result["level"] == "escalate" def test_dispatch_spawn_failure(self, dispatcher, task_pending): """spawn 失败 → error""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("spawn failed")) dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "error" # --------------------------------------------------------------------------- # T3: 队列满拒绝 # --------------------------------------------------------------------------- class TestConcurrencyControl: def test_counter_busy_skips(self, dispatcher, task_pending): """Agent 忙 → skip(v2.8 #07: AgentBusyError 从 spawn_full_agent 抛出)""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock( side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked") ) dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "skipped" assert "busy" in result["reason"].lower() def test_counter_releases_on_error(self, dispatcher, task_pending): """spawn 失败后释放 counter(v2.8 #07: counter.release 由 spawn_full_agent 内部保证)""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("fail")) dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "error" def test_local_not_blocked_by_counter(self, dispatcher, task_pending): """本地执行不受 counter 限制""" mock_counter = MagicMock() mock_counter.can_acquire = AsyncMock(return_value=False) dispatcher.counter = mock_counter result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail")) assert result["status"] == "dispatched" # --------------------------------------------------------------------------- # T4: 批量决策 # --------------------------------------------------------------------------- class TestBatchDecision: def test_dispatch_pending(self, dispatcher): """批量决策返回所有任务的决策""" tasks = [ Task(id="t1", title="T1", status="pending", assigned_by="d", assignee="zhangfei-dev"), Task(id="t2", title="T2", status="pending", assigned_by="d", assignee=None), Task(id="t3", title="T3", status="pending", assigned_by="d", assignee="unknown"), ] results = dispatcher.dispatch_pending(tasks) assert len(results) == 3 assert results[0]["level"] == DispatchLevel.FULL_AGENT assert results[1]["level"] == DispatchLevel.FULL_AGENT assert results[1]["agent_id"] == "pangtong-fujunshi" # fallback assert results[2]["level"] == DispatchLevel.ESCALATE def test_message_building(self, dispatcher): """构建给 Agent 的消息""" task = Task( id="t1", title="Build Feature", status="pending", assigned_by="daemon", task_type="coding", description="Implement X", must_haves="Tests, Docs", ) msg = dispatcher._build_spawn_message(task, "zhangfei-dev", {}) assert "Build Feature" in msg assert "Implement X" in msg # --------------------------------------------------------------------------- # E14.3: Dispatcher 错误区分(v2.8 #07.1 O3 新增) # --------------------------------------------------------------------------- class TestDispatcherErrorClassification: """E14.3: Dispatcher 捕获 AgentBusyError → 日志记录具体原因 → 路由决策""" def test_busy_error_reason_in_result(self, dispatcher, task_pending): """AgentBusyError 被捕获后,结果包含具体 busy 原因(status=skipped)""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock( side_effect=AgentBusyError("zhangfei-dev", reason="session_locked", detail={"blockers": [("session_locked", 12345)]}) ) dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) assert result["status"] == "skipped" assert "session" in result.get("reason", "").lower() or "locked" in result.get("reason", "").lower() def test_counter_busy_returns_skipped(self, dispatcher, task_pending): """counter_blocked → skipped(非 error)""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock( side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked") ) dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) # counter_blocked 通常在 can_acquire 阶段被拦,结果为 skipped # 如果穿透到 spawn_full_agent,则为 error assert result["status"] in ("skipped", "error") # --------------------------------------------------------------------------- # 司马懿评审补充:_rollback_current_agent + on_complete 统一(v2.8 #07.2) # --------------------------------------------------------------------------- class TestRollbackAndOnComplete: """司马懿评审遗漏 #3 + #4: crash 后 current_agent 回退 + on_complete 统一路径""" def test_rollback_current_agent_on_crash(self, tmp_path): """executor crash → _rollback_current_agent 回退 current_agent → assignee #07.2 核心改动:executor crash 后也回退 current_agent, 避免 _dispatch_reviews 的 exclude_current 卡死。 """ from src.blackboard.operations import Blackboard from src.blackboard.models import Task db_path = tmp_path / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task( id="t1", title="T", status="working", assigned_by="daemon", assignee="zhangfei-dev", )) # 通过状态变更设置 current_agent(create_task 不持久化 current_agent) conn = bb._conn() try: conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) conn.commit() finally: conn.close() dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") import sqlite3 conn2 = sqlite3.connect(str(db_path)) conn2.row_factory = sqlite3.Row row = conn2.execute("SELECT current_agent, assignee FROM tasks WHERE id=?", ("t1",)).fetchone() conn2.close() assert row["current_agent"] == row["assignee"] == "zhangfei-dev" def test_rollback_different_agent(self, tmp_path): """current_agent ≠ agent_id → 不回退(安全检查) _rollback_current_agent 的 WHERE 条件包含 current_agent=?, 如果 agent_id 不匹配 current_agent 则不执行更新。 """ from src.blackboard.operations import Blackboard from src.blackboard.models import Task db_path = tmp_path / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task( id="t1", title="T", status="working", assigned_by="daemon", assignee="zhangfei-dev", )) # 设置 current_agent 为 simayi-challenger conn = bb._conn() try: conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) conn.commit() finally: conn.close() dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) # 用错误的 agent_id 回退 dispatcher._rollback_current_agent(db_path, "t1", "wrong-agent") import sqlite3 conn2 = sqlite3.connect(str(db_path)) conn2.row_factory = sqlite3.Row row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() conn2.close() # current_agent 不变 assert row["current_agent"] == "simayi-challenger" def test_on_complete_crash_rollback_executor(self, tmp_path): """executor crash → rollback current_agent + _task_auto_complete(标 review) #07.2: crash 回退在 if _is_review 之前执行。 """ from src.blackboard.operations import Blackboard from src.blackboard.models import Task db_path = tmp_path / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task( id="t1", title="T", status="working", assigned_by="daemon", assignee="zhangfei-dev", )) # 设置 current_agent conn = bb._conn() try: conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) conn.commit() finally: conn.close() dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) # executor crash: rollback current_agent dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") import sqlite3 conn2 = sqlite3.connect(str(db_path)) conn2.row_factory = sqlite3.Row row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() conn2.close() assert row["current_agent"] == "zhangfei-dev" # rollback 到 assignee def test_on_complete_crash_rollback_review(self, tmp_path): """review crash → rollback current_agent + 保持 review 状态 #07.2: crash 回退在 if _is_review 之前执行。 review crash 后 current_agent 回退,但任务保持 review 状态。 """ from src.blackboard.operations import Blackboard from src.blackboard.models import Task db_path = tmp_path / "blackboard.db" bb = Blackboard(db_path) bb.create_task(Task( id="t1", title="T", status="review", assigned_by="daemon", assignee="zhangfei-dev", )) # 设置 current_agent 为 reviewer conn = bb._conn() try: conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) conn.commit() finally: conn.close() dispatcher = Dispatcher(registered_agents=["simayi-challenger"]) dispatcher._rollback_current_agent(db_path, "t1", "simayi-challenger") import sqlite3 conn2 = sqlite3.connect(str(db_path)) conn2.row_factory = sqlite3.Row row = conn2.execute("SELECT current_agent, status FROM tasks WHERE id=?", ("t1",)).fetchone() conn2.close() # current_agent 回退到 assignee assert row["current_agent"] == "zhangfei-dev" # review 状态不变 assert row["status"] == "review"