Files
sanguo_moziplus_v2/tests/unit/test_spawner.py
T
2026-06-05 11:03:30 +08:00

428 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent Spawner 单元测试 — classify/session 管理"""
import asyncio
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.daemon.spawner import AgentSpawner, AgentBusyError
pytestmark = pytest.mark.unit
@pytest.fixture
def db_path(tmp_path):
return tmp_path / "blackboard.db"
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def spawner(db_path):
return AgentSpawner(db_path=db_path, dry_run=True)
# ---------------------------------------------------------------------------
# T1: spawn 成功(dry_run
# ---------------------------------------------------------------------------
class TestSpawnSuccess:
def test_dry_run_spawn(self, spawner):
"""dry_run 模式不实际 spawn"""
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "do something", task_id="t1")
)
assert session_id
assert spawner.get_session(session_id) is not None
assert spawner.get_session(session_id)["agent_id"] == "test-agent"
def test_session_registered(self, spawner):
"""spawn 后 session 注册"""
asyncio.run(spawner.spawn_full_agent("agent-1", "task", task_id="t1"))
sessions = spawner.active_sessions
assert len(sessions) >= 1
def test_spawn_subagent_dry_run(self, spawner):
"""subagent dry_run"""
session_id = asyncio.run(
spawner.spawn_subagent("do task", task_id="t1")
)
assert session_id
def test_multiple_spawns(self, spawner):
"""多次 spawn 独立 session"""
ids = []
for i in range(3):
sid = asyncio.run(
spawner.spawn_full_agent(f"agent-{i}", f"task {i}")
)
ids.append(sid)
assert len(set(ids)) == 3 # 每个 session_id 唯一
# ---------------------------------------------------------------------------
# T4: session 清理
# ---------------------------------------------------------------------------
class TestSessionCleanup:
def test_cleanup_removes_session(self, spawner):
"""cleanup 删除 session"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
assert spawner.get_session(sid) is not None
spawner.cleanup_session(sid)
assert spawner.get_session(sid) is None
def test_cleanup_nonexistent(self, spawner):
"""清理不存在的 session 不报错"""
spawner.cleanup_session("nonexistent-id") # no error
def test_active_sessions_excludes_completed(self, spawner):
"""active_sessions 排除已完成"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
session = spawner.get_session(sid)
session["status"] = "completed"
active = spawner.active_sessions
assert sid not in active
# ---------------------------------------------------------------------------
# E11: Spawner Acquire-First Phase 0-4v2.8 #07.1 新增)
# ---------------------------------------------------------------------------
class TestAcquireFirst:
"""E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试"""
def test_phase0_revive_before_acquire(self, spawner):
"""E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功"""
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert session_id
def test_phase0_stuck_detection(self, spawner):
"""E11.2 Phase 0: status=running + lock PID 死 → revive"""
original_check = spawner._check_session_state
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] == 1:
return {"status": "running", "lock_pid_alive": False}
return {"status": "idle", "lock_pid_alive": False}
spawner._check_session_state = mock_check
revive_called = [False]
def mock_revive(agent_id):
revive_called[0] = True
return True
spawner._revive_session = mock_revive
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert revive_called[0], "Phase 0 should have called _revive_session for stuck session"
def test_phase1_counter_acquire_exclusive(self, spawner):
"""E11.3 Phase 1: counter acquire 互斥"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_concurrent_sessions=1)
spawner.counter = counter
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert session_id
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task2", task_id="t2")
)
assert "counter" in exc_info.value.reason or "blocked" in exc_info.value.reason
def test_phase2_session_check_under_lock(self, spawner):
"""E11.4 Phase 2: session check 在锁保护下执行"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "session_locked" in exc_info.value.reason
assert counter.global_active == 0
def test_phase2_multiple_blockers(self, spawner):
"""E11.5 Phase 2: 多 blocker 并列收集"""
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
"recent_compact": True,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert exc_info.value.detail is not None
blockers = exc_info.value.detail.get("blockers", [])
blocker_reasons = [b[0] for b in blockers]
assert "session_locked" in blocker_reasons
assert "session_compacting" in blocker_reasons
def test_phase3_on_checks_passed_exception_rollback(self, spawner):
"""E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
def bad_callback():
raise RuntimeError("callback failed")
with pytest.raises(RuntimeError, match="callback failed"):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1",
on_checks_passed=bad_callback,
)
)
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E13: Compact Hanging 不标 failedv2.8 #07.3 ACT-2 新增)
# ---------------------------------------------------------------------------
class TestCompactHanging:
"""E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working"""
def test_compact_hanging_releases_counter(self):
"""E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working"""
from src.daemon.counter import ActiveAgentCounter
from src.blackboard.models import Task as TaskModel
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
db_path = Path("/tmp/test_compact_hanging.db")
try:
bb = Blackboard(db_path)
bb.create_task(TaskModel(id="t1", title="T", status="working", assigned_by="d",
current_agent="test-agent"))
spawner = AgentSpawner(db_path=db_path, dry_run=True)
spawner.counter = counter
outcomes = []
async def mock_on_complete(aid, outcome):
outcomes.append((aid, outcome))
sid = asyncio.run(spawner.spawn_full_agent(
"test-agent", "task", task_id="t1",
on_complete=mock_on_complete,
))
counter.release("test-agent", sid)
assert counter.global_active == 0
finally:
if db_path.exists():
db_path.unlink()
def test_retry_agent_busy_releases_counter(self, spawner):
"""E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
spawner._check_session_state = lambda agent_id: {
"status": "running",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E14: AgentBusyError 分类(v2.8 #07.1 O3 新增)
# ---------------------------------------------------------------------------
class TestAgentBusyErrorClassification:
"""E14: AgentBusyError reason/detail 分类验证"""
def test_counter_blocked_reason(self, spawner):
"""E14.1: counter acquire 失败 → reason=counter_blocked"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=1, max_per_agent=1)
spawner.counter = counter
asyncio.run(counter.acquire("test-agent"))
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert exc_info.value.reason == "counter_blocked"
assert exc_info.value.agent_id == "test-agent"
def test_session_blocker_reasons(self, spawner):
"""E14.2: session locked/running/compacting → 具体 reason + detail.blockers"""
test_cases = [
{
"state": {"status": "idle", "lock_pid_alive": True, "lock_expired": False},
"expected": "session_locked",
},
{
"state": {"status": "running", "lock_pid_alive": True, "lock_expired": False},
"expected": "session_locked",
},
{
"state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True},
"expected": "session_compacting",
},
]
for i, tc in enumerate(test_cases):
state = tc["state"]
expected = tc["expected"]
spawner._check_session_state = lambda aid, s=state: s
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert expected in exc_info.value.reason, \
f"Case {i}: expected '{expected}' in '{exc_info.value.reason}'"
assert exc_info.value.detail is not None, \
f"Case {i}: expected detail to be set"
def test_session_running_in_blockers(self, spawner):
"""session_running 出现在 blockers 列表中(session_locked 优先)"""
spawner._check_session_state = lambda aid: {
"status": "running", "lock_pid_alive": True, "lock_expired": False,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "session_locked" in exc_info.value.reason
blockers = exc_info.value.detail.get("blockers", [])
blocker_reasons = [b[0] for b in blockers]
assert "session_running" in blocker_reasons
assert "session_locked" in blocker_reasons
def test_error_attributes(self):
"""E14 补充: AgentBusyError 属性验证"""
err = AgentBusyError(
"my-agent",
reason="session_locked",
detail={"blockers": [("session_locked", 12345)]},
)
assert err.agent_id == "my-agent"
assert err.reason == "session_locked"
assert err.detail["blockers"][0][0] == "session_locked"
assert "my-agent" in str(err)
assert "session_locked" in str(err)
# ---------------------------------------------------------------------------
# 司马懿评审补充:Phase 2.5 + session_stuckv2.8 #07.1 v1.1 兜底)
# ---------------------------------------------------------------------------
class TestPhase25AndStuck:
"""司马懿评审遗漏 #1 + session_stuck 遗漏补充"""
def test_phase25_stuck_fallback(self, spawner):
"""Phase 2.5: Phase 2 检测到假死 → revive → 成功 spawn"""
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] <= 1:
return {"status": "idle", "lock_pid_alive": False}
if call_count[0] == 2:
return {"status": "running", "lock_pid_alive": False}
return {"status": "idle", "lock_pid_alive": False}
spawner._check_session_state = mock_check
revive_called = [False]
def mock_revive(agent_id):
revive_called[0] = True
return True
spawner._revive_session = mock_revive
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert revive_called[0], "Phase 2.5 should have revived stuck session"
assert session_id
def test_session_stuck_after_failed_revive(self, spawner):
"""Phase 2.5 revive 失败 → session_stuck"""
revive_and_check_count = [0]
def mock_check_v2(agent_id):
revive_and_check_count[0] += 1
if revive_and_check_count[0] <= 1:
return {"status": "idle", "lock_pid_alive": False}
if revive_and_check_count[0] == 2:
return {"status": "running", "lock_pid_alive": False}
return {"status": "running", "lock_pid_alive": False}
spawner._check_session_state = mock_check_v2
spawner._revive_session = lambda agent_id: True
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "stuck" in exc_info.value.reason