Files
sanguo_moziplus_v2/tests/test_dispatcher.py
T
2026-06-01 22:43:36 +08:00

288 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent 调度器单元测试
按 test-plan-v2.6.md §F9
- T1: 三级决策树(P0
- T2: 调度不阻塞(P0
- T3: 队列满拒绝(P0
- T4: 任务优先级排序(P1
v2.8 新增(#07 AgentBusyError 分类):
- E14.3: Dispatcher 错误区分(1 个测试)
"""
import asyncio
import json
import pytest
from pathlib import Path
from typing import Any, Dict, Optional
from unittest.mock import AsyncMock, MagicMock
from src.blackboard.models import Task
from src.daemon.dispatcher import Dispatcher, DispatchLevel
from src.daemon.spawner import AgentBusyError
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def dispatcher():
return Dispatcher(
registered_agents=["zhangfei-dev", "guanyu-dev", "simayi-challenger"],
)
@pytest.fixture
def task_pending():
return Task(
id="t1", title="Write code", status="pending",
assigned_by="daemon", task_type="coding",
assignee="zhangfei-dev",
)
@pytest.fixture
def task_no_assignee():
return Task(
id="t2", title="Generic task", status="pending",
assigned_by="daemon",
)
@pytest.fixture
def task_unknown_agent():
return Task(
id="t3", title="Unknown agent task", status="pending",
assigned_by="daemon", assignee="nonexistent-agent",
)
# ---------------------------------------------------------------------------
# T1: 三级决策树
# ---------------------------------------------------------------------------
class TestDecisionTree:
def test_local_action(self, dispatcher, task_pending):
"""L1 本地执行"""
decision = dispatcher.decide(task_pending, "L1_guardrail")
assert decision["level"] == DispatchLevel.LOCAL
assert decision["agent_id"] == "daemon"
def test_local_format_check(self, dispatcher, task_pending):
"""format_check → 本地"""
decision = dispatcher.decide(task_pending, "format_check")
assert decision["level"] == DispatchLevel.LOCAL
def test_local_file_exists(self, dispatcher, task_pending):
"""file_exists_check → 本地"""
decision = dispatcher.decide(task_pending, "file_exists_check")
assert decision["level"] == DispatchLevel.LOCAL
def test_registered_agent(self, dispatcher, task_pending):
"""注册 Agent → Full Agent"""
decision = dispatcher.decide(task_pending)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == "zhangfei-dev"
def test_adjudication_new_session(self, dispatcher, task_pending):
"""adjudication → new_session=True"""
decision = dispatcher.decide(task_pending, "adjudication")
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["new_session"] is True
def test_no_assignee_subagent(self, dispatcher, task_no_assignee):
"""无 assignee → 能力映射 fallback 庞统(Full Agent"""
decision = dispatcher.decide(task_no_assignee)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == "pangtong-fujunshi" # fallback
def test_unknown_agent_escalate(self, dispatcher, task_unknown_agent):
"""未注册 Agent → 升级庞统"""
decision = dispatcher.decide(task_unknown_agent)
assert decision["level"] == DispatchLevel.ESCALATE
assert decision["agent_id"] == "pangtong-fujunshi"
assert decision["new_session"] is True
def test_different_registered_agents(self, dispatcher):
"""不同注册 Agent 正确路由"""
for agent_id in ["zhangfei-dev", "guanyu-dev", "simayi-challenger"]:
task = Task(id="t", title="T", status="pending",
assigned_by="d", assignee=agent_id)
decision = dispatcher.decide(task)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == agent_id
def test_unknown_action_with_registered_agent(self, dispatcher, task_pending):
"""未知 action_type 但有注册 assignee → Full Agent"""
decision = dispatcher.decide(task_pending, "some_unknown_action")
assert decision["level"] == DispatchLevel.FULL_AGENT
# ---------------------------------------------------------------------------
# T2: 调度执行(带 mock
# ---------------------------------------------------------------------------
class TestDispatch:
def test_dispatch_local(self, dispatcher, task_pending):
"""本地调度不需要 spawner"""
result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail"))
assert result["status"] == "dispatched"
assert result["level"] == "local"
def test_dispatch_full_agent(self, dispatcher, task_pending):
"""Full Agent 调度"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "dispatched"
assert result["session_id"] == "session-123"
assert result["level"] == "full"
def test_dispatch_no_spawner_returns_error(self, dispatcher, task_pending):
"""无 spawner → error"""
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
assert "No spawner" in result["reason"]
def test_dispatch_subagent(self, dispatcher, task_no_assignee):
"""无 assignee → 能力映射 fallback 庞统(Full Agent"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="auto-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_no_assignee))
assert result["status"] == "dispatched"
assert result["level"] == "full"
assert result["agent_id"] == "pangtong-fujunshi"
def test_dispatch_escalate(self, dispatcher, task_unknown_agent):
"""升级调度"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="esc-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_unknown_agent))
assert result["status"] == "dispatched"
assert result["level"] == "escalate"
def test_dispatch_spawn_failure(self, dispatcher, task_pending):
"""spawn 失败 → error"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("spawn failed"))
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
# ---------------------------------------------------------------------------
# T3: 队列满拒绝
# ---------------------------------------------------------------------------
class TestConcurrencyControl:
def test_counter_busy_skips(self, dispatcher, task_pending):
"""Agent 忙 → skip"""
mock_counter = MagicMock()
mock_counter.can_acquire = AsyncMock(return_value=False)
dispatcher.counter = mock_counter
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "skipped"
assert "busy" in result["reason"].lower()
def test_counter_releases_on_error(self, dispatcher, task_pending):
"""spawn 失败后释放 counter"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("fail"))
mock_counter = MagicMock()
mock_counter.can_acquire = AsyncMock(return_value=True)
mock_counter.acquire = AsyncMock()
mock_counter.release = MagicMock()
dispatcher.spawner = mock_spawner
dispatcher.counter = mock_counter
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
mock_counter.release.assert_called_once_with("zhangfei-dev")
def test_local_not_blocked_by_counter(self, dispatcher, task_pending):
"""本地执行不受 counter 限制"""
mock_counter = MagicMock()
mock_counter.can_acquire = AsyncMock(return_value=False)
dispatcher.counter = mock_counter
result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail"))
assert result["status"] == "dispatched"
# ---------------------------------------------------------------------------
# T4: 批量决策
# ---------------------------------------------------------------------------
class TestBatchDecision:
def test_dispatch_pending(self, dispatcher):
"""批量决策返回所有任务的决策"""
tasks = [
Task(id="t1", title="T1", status="pending",
assigned_by="d", assignee="zhangfei-dev"),
Task(id="t2", title="T2", status="pending",
assigned_by="d", assignee=None),
Task(id="t3", title="T3", status="pending",
assigned_by="d", assignee="unknown"),
]
results = dispatcher.dispatch_pending(tasks)
assert len(results) == 3
assert results[0]["level"] == DispatchLevel.FULL_AGENT
assert results[1]["level"] == DispatchLevel.FULL_AGENT
assert results[1]["agent_id"] == "pangtong-fujunshi" # fallback
assert results[2]["level"] == DispatchLevel.ESCALATE
def test_message_building(self, dispatcher):
"""构建给 Agent 的消息"""
task = Task(
id="t1", title="Build Feature", status="pending",
assigned_by="daemon", task_type="coding",
description="Implement X", must_haves="Tests, Docs",
)
msg = dispatcher._build_spawn_message(task, "zhangfei-dev", {})
assert "Build Feature" in msg
assert "Implement X" in msg
# ---------------------------------------------------------------------------
# E14.3: Dispatcher 错误区分(v2.8 #07.1 O3 新增)
# ---------------------------------------------------------------------------
class TestDispatcherErrorClassification:
"""E14.3: Dispatcher 捕获 AgentBusyError → 日志记录具体原因 → 路由决策"""
def test_busy_error_reason_in_result(self, dispatcher, task_pending):
"""AgentBusyError 被捕获后,结果包含具体 busy 原因"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(
side_effect=AgentBusyError("zhangfei-dev", reason="session_locked",
detail={"blockers": [("session_locked", 12345)]})
)
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
# AgentBusyError 应被 dispatcher 捕获并包含在结果中
assert "busy" in result.get("reason", "").lower() or "locked" in result.get("reason", "").lower() or "error" in result.get("status", "")
def test_counter_busy_returns_skipped(self, dispatcher, task_pending):
"""counter_blocked → skipped(非 error"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(
side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked")
)
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
# counter_blocked 通常在 can_acquire 阶段被拦,结果为 skipped
# 如果穿透到 spawn_full_agent,则为 error
assert result["status"] in ("skipped", "error")