Files
sanguo_moziplus_v2/tests/test_dispatcher.py
T
2026-06-01 22:57:14 +08:00

436 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent 调度器单元测试
按 test-plan-v2.6.md §F9
- T1: 三级决策树(P0
- T2: 调度不阻塞(P0
- T3: 队列满拒绝(P0
- T4: 任务优先级排序(P1
v2.8 新增(#07 AgentBusyError 分类):
- E14.3: Dispatcher 错误区分(1 个测试)
"""
import asyncio
import json
import pytest
from pathlib import Path
from typing import Any, Dict, Optional
from unittest.mock import AsyncMock, MagicMock
from src.blackboard.models import Task
from src.daemon.dispatcher import Dispatcher, DispatchLevel
from src.daemon.spawner import AgentBusyError
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def dispatcher():
return Dispatcher(
registered_agents=["zhangfei-dev", "guanyu-dev", "simayi-challenger"],
)
@pytest.fixture
def task_pending():
return Task(
id="t1", title="Write code", status="pending",
assigned_by="daemon", task_type="coding",
assignee="zhangfei-dev",
)
@pytest.fixture
def task_no_assignee():
return Task(
id="t2", title="Generic task", status="pending",
assigned_by="daemon",
)
@pytest.fixture
def task_unknown_agent():
return Task(
id="t3", title="Unknown agent task", status="pending",
assigned_by="daemon", assignee="nonexistent-agent",
)
# ---------------------------------------------------------------------------
# T1: 三级决策树
# ---------------------------------------------------------------------------
class TestDecisionTree:
def test_local_action(self, dispatcher, task_pending):
"""L1 本地执行"""
decision = dispatcher.decide(task_pending, "L1_guardrail")
assert decision["level"] == DispatchLevel.LOCAL
assert decision["agent_id"] == "daemon"
def test_local_format_check(self, dispatcher, task_pending):
"""format_check → 本地"""
decision = dispatcher.decide(task_pending, "format_check")
assert decision["level"] == DispatchLevel.LOCAL
def test_local_file_exists(self, dispatcher, task_pending):
"""file_exists_check → 本地"""
decision = dispatcher.decide(task_pending, "file_exists_check")
assert decision["level"] == DispatchLevel.LOCAL
def test_registered_agent(self, dispatcher, task_pending):
"""注册 Agent → Full Agent"""
decision = dispatcher.decide(task_pending)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == "zhangfei-dev"
def test_adjudication_new_session(self, dispatcher, task_pending):
"""adjudication → new_session=True"""
decision = dispatcher.decide(task_pending, "adjudication")
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["new_session"] is True
def test_no_assignee_subagent(self, dispatcher, task_no_assignee):
"""无 assignee → 能力映射 fallback 庞统(Full Agent"""
decision = dispatcher.decide(task_no_assignee)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == "pangtong-fujunshi" # fallback
def test_unknown_agent_escalate(self, dispatcher, task_unknown_agent):
"""未注册 Agent → 升级庞统"""
decision = dispatcher.decide(task_unknown_agent)
assert decision["level"] == DispatchLevel.ESCALATE
assert decision["agent_id"] == "pangtong-fujunshi"
assert decision["new_session"] is True
def test_different_registered_agents(self, dispatcher):
"""不同注册 Agent 正确路由"""
for agent_id in ["zhangfei-dev", "guanyu-dev", "simayi-challenger"]:
task = Task(id="t", title="T", status="pending",
assigned_by="d", assignee=agent_id)
decision = dispatcher.decide(task)
assert decision["level"] == DispatchLevel.FULL_AGENT
assert decision["agent_id"] == agent_id
def test_unknown_action_with_registered_agent(self, dispatcher, task_pending):
"""未知 action_type 但有注册 assignee → Full Agent"""
decision = dispatcher.decide(task_pending, "some_unknown_action")
assert decision["level"] == DispatchLevel.FULL_AGENT
# ---------------------------------------------------------------------------
# T2: 调度执行(带 mock
# ---------------------------------------------------------------------------
class TestDispatch:
def test_dispatch_local(self, dispatcher, task_pending):
"""本地调度不需要 spawner"""
result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail"))
assert result["status"] == "dispatched"
assert result["level"] == "local"
def test_dispatch_full_agent(self, dispatcher, task_pending):
"""Full Agent 调度"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "dispatched"
assert result["session_id"] == "session-123"
assert result["level"] == "full"
def test_dispatch_no_spawner_returns_error(self, dispatcher, task_pending):
"""无 spawner → error"""
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
assert "No spawner" in result["reason"]
def test_dispatch_subagent(self, dispatcher, task_no_assignee):
"""无 assignee → 能力映射 fallback 庞统(Full Agent"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="auto-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_no_assignee))
assert result["status"] == "dispatched"
assert result["level"] == "full"
assert result["agent_id"] == "pangtong-fujunshi"
def test_dispatch_escalate(self, dispatcher, task_unknown_agent):
"""升级调度"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="esc-123")
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_unknown_agent))
assert result["status"] == "dispatched"
assert result["level"] == "escalate"
def test_dispatch_spawn_failure(self, dispatcher, task_pending):
"""spawn 失败 → error"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("spawn failed"))
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
# ---------------------------------------------------------------------------
# T3: 队列满拒绝
# ---------------------------------------------------------------------------
class TestConcurrencyControl:
def test_counter_busy_skips(self, dispatcher, task_pending):
"""Agent 忙 → skipv2.8 #07: AgentBusyError 从 spawn_full_agent 抛出)"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(
side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked")
)
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "skipped"
assert "busy" in result["reason"].lower()
def test_counter_releases_on_error(self, dispatcher, task_pending):
"""spawn 失败后释放 counterv2.8 #07: counter.release 由 spawn_full_agent 内部保证)"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("fail"))
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "error"
def test_local_not_blocked_by_counter(self, dispatcher, task_pending):
"""本地执行不受 counter 限制"""
mock_counter = MagicMock()
mock_counter.can_acquire = AsyncMock(return_value=False)
dispatcher.counter = mock_counter
result = asyncio.run(dispatcher.dispatch(task_pending, "L1_guardrail"))
assert result["status"] == "dispatched"
# ---------------------------------------------------------------------------
# T4: 批量决策
# ---------------------------------------------------------------------------
class TestBatchDecision:
def test_dispatch_pending(self, dispatcher):
"""批量决策返回所有任务的决策"""
tasks = [
Task(id="t1", title="T1", status="pending",
assigned_by="d", assignee="zhangfei-dev"),
Task(id="t2", title="T2", status="pending",
assigned_by="d", assignee=None),
Task(id="t3", title="T3", status="pending",
assigned_by="d", assignee="unknown"),
]
results = dispatcher.dispatch_pending(tasks)
assert len(results) == 3
assert results[0]["level"] == DispatchLevel.FULL_AGENT
assert results[1]["level"] == DispatchLevel.FULL_AGENT
assert results[1]["agent_id"] == "pangtong-fujunshi" # fallback
assert results[2]["level"] == DispatchLevel.ESCALATE
def test_message_building(self, dispatcher):
"""构建给 Agent 的消息"""
task = Task(
id="t1", title="Build Feature", status="pending",
assigned_by="daemon", task_type="coding",
description="Implement X", must_haves="Tests, Docs",
)
msg = dispatcher._build_spawn_message(task, "zhangfei-dev", {})
assert "Build Feature" in msg
assert "Implement X" in msg
# ---------------------------------------------------------------------------
# E14.3: Dispatcher 错误区分(v2.8 #07.1 O3 新增)
# ---------------------------------------------------------------------------
class TestDispatcherErrorClassification:
"""E14.3: Dispatcher 捕获 AgentBusyError → 日志记录具体原因 → 路由决策"""
def test_busy_error_reason_in_result(self, dispatcher, task_pending):
"""AgentBusyError 被捕获后,结果包含具体 busy 原因(status=skipped"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(
side_effect=AgentBusyError("zhangfei-dev", reason="session_locked",
detail={"blockers": [("session_locked", 12345)]})
)
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
assert result["status"] == "skipped"
assert "session" in result.get("reason", "").lower() or "locked" in result.get("reason", "").lower()
def test_counter_busy_returns_skipped(self, dispatcher, task_pending):
"""counter_blocked → skipped(非 error"""
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(
side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked")
)
dispatcher.spawner = mock_spawner
result = asyncio.run(dispatcher.dispatch(task_pending))
# counter_blocked 通常在 can_acquire 阶段被拦,结果为 skipped
# 如果穿透到 spawn_full_agent,则为 error
assert result["status"] in ("skipped", "error")
# ---------------------------------------------------------------------------
# 司马懿评审补充:_rollback_current_agent + on_complete 统一(v2.8 #07.2
# ---------------------------------------------------------------------------
class TestRollbackAndOnComplete:
"""司马懿评审遗漏 #3 + #4: crash 后 current_agent 回退 + on_complete 统一路径"""
def test_rollback_current_agent_on_crash(self, tmp_path):
"""executor crash → _rollback_current_agent 回退 current_agent → assignee
#07.2 核心改动:executor crash 后也回退 current_agent
避免 _dispatch_reviews 的 exclude_current 卡死。
"""
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
# 通过状态变更设置 current_agentcreate_task 不持久化 current_agent
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent, assignee FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == row["assignee"] == "zhangfei-dev"
def test_rollback_different_agent(self, tmp_path):
"""current_agent ≠ agent_id → 不回退(安全检查)
_rollback_current_agent 的 WHERE 条件包含 current_agent=?
如果 agent_id 不匹配 current_agent 则不执行更新。
"""
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
# 设置 current_agent 为 simayi-challenger
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
# 用错误的 agent_id 回退
dispatcher._rollback_current_agent(db_path, "t1", "wrong-agent")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
# current_agent 不变
assert row["current_agent"] == "simayi-challenger"
def test_on_complete_crash_rollback_executor(self, tmp_path):
"""executor crash → rollback current_agent + _task_auto_complete(标 review
#07.2: crash 回退在 if _is_review 之前执行。
"""
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="working",
assigned_by="daemon", assignee="zhangfei-dev",
))
# 设置 current_agent
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["zhangfei-dev"])
# executor crash: rollback current_agent
dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
assert row["current_agent"] == "zhangfei-dev" # rollback 到 assignee
def test_on_complete_crash_rollback_review(self, tmp_path):
"""review crash → rollback current_agent + 保持 review 状态
#07.2: crash 回退在 if _is_review 之前执行。
review crash 后 current_agent 回退,但任务保持 review 状态。
"""
from src.blackboard.operations import Blackboard
from src.blackboard.models import Task
db_path = tmp_path / "blackboard.db"
bb = Blackboard(db_path)
bb.create_task(Task(
id="t1", title="T", status="review",
assigned_by="daemon", assignee="zhangfei-dev",
))
# 设置 current_agent 为 reviewer
conn = bb._conn()
try:
conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1"))
conn.commit()
finally:
conn.close()
dispatcher = Dispatcher(registered_agents=["simayi-challenger"])
dispatcher._rollback_current_agent(db_path, "t1", "simayi-challenger")
import sqlite3
conn2 = sqlite3.connect(str(db_path))
conn2.row_factory = sqlite3.Row
row = conn2.execute("SELECT current_agent, status FROM tasks WHERE id=?", ("t1",)).fetchone()
conn2.close()
# current_agent 回退到 assignee
assert row["current_agent"] == "zhangfei-dev"
# review 状态不变
assert row["status"] == "review"