From 79107e47b54c9251ce469f97d460a6b92115c448 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 06:00:34 +0800 Subject: [PATCH] auto-sync: 2026-05-17 06:00:34 --- tests/test_dispatcher.py | 246 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 tests/test_dispatcher.py diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py new file mode 100644 index 0000000..9c6be7b --- /dev/null +++ b/tests/test_dispatcher.py @@ -0,0 +1,246 @@ +"""F9 Agent 调度器单元测试 + +按 test-plan-v2.6.md §F9: +- T1: 三级决策树(P0) +- T2: 调度不阻塞(P0) +- T3: 队列满拒绝(P0) +- T4: 任务优先级排序(P1) +""" + +import asyncio +import json +import pytest +from pathlib import Path +from typing import Any, Dict, Optional +from unittest.mock import AsyncMock, MagicMock + +from src.blackboard.models import Task +from src.daemon.dispatcher import Dispatcher, DispatchLevel + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def dispatcher(): + return Dispatcher( + registered_agents=["zhangfei-dev", "guanyu-dev", "simayi-challenger"], + ) + + +@pytest.fixture +def task_pending(): + return Task( + id="t1", title="Write code", status="pending", + assigned_by="daemon", task_type="coding", + assignee="zhangfei-dev", + ) + + +@pytest.fixture +def task_no_assignee(): + return Task( + id="t2", title="Generic task", status="pending", + assigned_by="daemon", + ) + + +@pytest.fixture +def task_unknown_agent(): + return Task( + id="t3", title="Unknown agent task", status="pending", + assigned_by="daemon", assignee="nonexistent-agent", + ) + + +# --------------------------------------------------------------------------- +# T1: 三级决策树 +# --------------------------------------------------------------------------- + +class TestDecisionTree: + def test_local_action(self, dispatcher, task_pending): + """L1 本地执行""" + decision = dispatcher.decide(task_pending, "L1_guardrail") + assert decision["level"] == DispatchLevel.LOCAL + assert decision["agent_id"] == "daemon" + + def test_local_format_check(self, dispatcher, task_pending): + """format_check → 本地""" + decision = dispatcher.decide(task_pending, "format_check") + assert decision["level"] == DispatchLevel.LOCAL + + def test_local_file_exists(self, dispatcher, task_pending): + """file_exists_check → 本地""" + decision = dispatcher.decide(task_pending, "file_exists_check") + assert decision["level"] == DispatchLevel.LOCAL + + def test_registered_agent(self, dispatcher, task_pending): + """注册 Agent → Full Agent""" + decision = dispatcher.decide(task_pending) + assert decision["level"] == DispatchLevel.FULL_AGENT + assert decision["agent_id"] == "zhangfei-dev" + + def test_adjudication_new_session(self, dispatcher, task_pending): + """adjudication → new_session=True""" + decision = dispatcher.decide(task_pending, "adjudication") + assert decision["level"] == DispatchLevel.FULL_AGENT + assert decision["new_session"] is True + + def test_no_assignee_subagent(self, dispatcher, task_no_assignee): + """无 assignee → Subagent""" + decision = dispatcher.decide(task_no_assignee) + assert decision["level"] == DispatchLevel.SUB_AGENT + + def test_unknown_agent_escalate(self, dispatcher, task_unknown_agent): + """未注册 Agent → 升级庞统""" + decision = dispatcher.decide(task_unknown_agent) + assert decision["level"] == DispatchLevel.ESCALATE + assert decision["agent_id"] == "pangtong-fujunshi" + assert decision["new_session"] is True + + def test_different_registered_agents(self, dispatcher): + """不同注册 Agent 正确路由""" + for agent_id in ["zhangfei-dev", "guanyu-dev", "simayi-challenger"]: + task = Task(id="t", title="T", status="pending", + assigned_by="d", assignee=agent_id) + decision = dispatcher.decide(task) + assert decision["level"] == DispatchLevel.FULL_AGENT + assert decision["agent_id"] == agent_id + + def test_unknown_action_with_registered_agent(self, dispatcher, task_pending): + """未知 action_type 但有注册 assignee → Full Agent""" + decision = dispatcher.decide(task_pending, "some_unknown_action") + assert decision["level"] == DispatchLevel.FULL_AGENT + + +# --------------------------------------------------------------------------- +# T2: 调度执行(带 mock) +# --------------------------------------------------------------------------- + +class TestDispatch: + def test_dispatch_local(self, dispatcher, task_pending): + """本地调度不需要 spawner""" + result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail")) + assert result["status"] == "dispatched" + assert result["level"] == "local" + + def test_dispatch_full_agent(self, dispatcher, task_pending): + """Full Agent 调度""" + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="session-123") + dispatcher.spawner = mock_spawner + + result = asyncio.run(dispatcher.dispatch(task_pending)) + assert result["status"] == "dispatched" + assert result["session_id"] == "session-123" + assert result["level"] == "full_agent" + + def test_dispatch_no_spawner_returns_error(self, dispatcher, task_pending): + """无 spawner → error""" + result = asyncio.run(dispatcher.dispatch(task_pending)) + assert result["status"] == "error" + assert "No spawner" in result["reason"] + + def test_dispatch_subagent(self, dispatcher, task_no_assignee): + """Subagent 调度""" + mock_spawner = MagicMock() + mock_spawner.spawn_subagent = AsyncMock(return_value="sub-123") + dispatcher.spawner = mock_spawner + + result = asyncio.run(dispatcher.dispatch(task_no_assignee)) + assert result["status"] == "dispatched" + assert result["level"] == "sub_agent" + + def test_dispatch_escalate(self, dispatcher, task_unknown_agent): + """升级调度""" + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="esc-123") + dispatcher.spawner = mock_spawner + + result = asyncio.run(dispatcher.dispatch(task_unknown_agent)) + assert result["status"] == "dispatched" + assert result["level"] == "escalate" + + def test_dispatch_spawn_failure(self, dispatcher, task_pending): + """spawn 失败 → error""" + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("spawn failed")) + dispatcher.spawner = mock_spawner + + result = asyncio.run(dispatcher.dispatch(task_pending)) + assert result["status"] == "error" + + +# --------------------------------------------------------------------------- +# T3: 队列满拒绝 +# --------------------------------------------------------------------------- + +class TestConcurrencyControl: + def test_counter_busy_skips(self, dispatcher, task_pending): + """Agent 忙 → skip""" + mock_counter = MagicMock() + mock_counter.can_acquire = AsyncMock(return_value=False) + dispatcher.counter = mock_counter + + result = asyncio.run(dispatcher.dispatch(task_pending)) + assert result["status"] == "skipped" + assert "busy" in result["reason"].lower() + + def test_counter_releases_on_error(self, dispatcher, task_pending): + """spawn 失败后释放 counter""" + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("fail")) + mock_counter = MagicMock() + mock_counter.can_acquire = AsyncMock(return_value=True) + mock_counter.acquire = AsyncMock() + mock_counter.release = MagicMock() + dispatcher.spawner = mock_spawner + dispatcher.counter = mock_counter + + result = asyncio.run(dispatcher.dispatch(task_pending)) + assert result["status"] == "error" + mock_counter.release.assert_called_once_with("zhangfei-dev") + + def test_local_not_blocked_by_counter(self, dispatcher, task_pending): + """本地执行不受 counter 限制""" + mock_counter = MagicMock() + mock_counter.can_acquire = AsyncMock(return_value=False) + dispatcher.counter = mock_counter + + result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail")) + assert result["status"] == "dispatched" + + +# --------------------------------------------------------------------------- +# T4: 批量决策 +# --------------------------------------------------------------------------- + +class TestBatchDecision: + def test_dispatch_pending(self, dispatcher): + """批量决策返回所有任务的决策""" + tasks = [ + Task(id="t1", title="T1", status="pending", + assigned_by="d", assignee="zhangfei-dev"), + Task(id="t2", title="T2", status="pending", + assigned_by="d", assignee=None), + Task(id="t3", title="T3", status="pending", + assigned_by="d", assignee="unknown"), + ] + results = dispatcher.dispatch_pending(tasks) + assert len(results) == 3 + assert results[0]["level"] == DispatchLevel.FULL_AGENT + assert results[1]["level"] == DispatchLevel.SUB_AGENT + assert results[2]["level"] == DispatchLevel.ESCALATE + + def test_message_building(self, dispatcher): + """构建给 Agent 的消息""" + task = Task( + id="t1", title="Build Feature", status="pending", + assigned_by="daemon", task_type="coding", + description="Implement X", must_haves="Tests, Docs", + ) + msg = dispatcher._build_message(task, "execute") + assert "Build Feature" in msg + assert "Implement X" in msg + assert "execute" in msg