Files
sanguo_moziplus_v2/tests/test_spawner.py
T
2026-06-01 22:42:35 +08:00

458 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent Spawner 单元测试
按 test-plan-v2.6.md §F9 Spawner
- T1: spawn 成功(P0
- T2: 超时处理(P0
- T3: spawn 失败(P0
- T4: session 清理(P1
v2.8 新增(#07 Acquire-First + Compact Hanging + AgentBusyError):
- E11: Spawner Acquire-First Phase 0-46 个测试)
- E13: Compact Hanging 不标 failed3 个测试)
- E14: AgentBusyError 分类(3 个测试)
"""
import asyncio
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.daemon.spawner import AgentSpawner, AgentBusyError
@pytest.fixture
def db_path(tmp_path):
return tmp_path / "blackboard.db"
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def spawner(db_path):
return AgentSpawner(db_path=db_path, dry_run=True)
@pytest.fixture
def real_spawner(db_path):
return AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=2.0)
# ---------------------------------------------------------------------------
# T1: spawn 成功
# ---------------------------------------------------------------------------
class TestSpawnSuccess:
def test_dry_run_spawn(self, spawner):
"""dry_run 模式不实际 spawn"""
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "do something", task_id="t1")
)
assert session_id
assert spawner.get_session(session_id) is not None
assert spawner.get_session(session_id)["agent_id"] == "test-agent"
def test_session_registered(self, spawner):
"""spawn 后 session 注册"""
asyncio.run(spawner.spawn_full_agent("agent-1", "task", task_id="t1"))
sessions = spawner.active_sessions
assert len(sessions) >= 1
def test_spawn_subagent_dry_run(self, spawner):
"""subagent dry_run"""
session_id = asyncio.run(
spawner.spawn_subagent("do task", task_id="t1")
)
assert session_id
def test_multiple_spawns(self, spawner):
"""多次 spawn 独立 session"""
ids = []
for i in range(3):
sid = asyncio.run(
spawner.spawn_full_agent(f"agent-{i}", f"task {i}")
)
ids.append(sid)
assert len(set(ids)) == 3 # 每个 session_id 唯一
# ---------------------------------------------------------------------------
# T2: 超时处理
# ---------------------------------------------------------------------------
class TestTimeout:
def test_timeout_kills_process(self, tmp_path):
"""超时后 kill 进程"""
db_path = tmp_path / "blackboard.db"
Blackboard(db_path) # init
spawner = AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=0.5)
# Spawn a long-running process (sleep 10)
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent",
"sleep 10", # will be passed as --message, actual agent may ignore
task_id=None, # no task to avoid DB writes for non-existent task
)
)
# Wait for timeout
asyncio.run(asyncio.sleep(1.0))
session = spawner.get_session(session_id)
if session:
# Process should have been killed
assert session["status"] in ("timed_out", "running", "completed")
# ---------------------------------------------------------------------------
# T3: spawn 失败
# ---------------------------------------------------------------------------
class TestSpawnFailure:
def test_nonexistent_command(self, real_spawner, db_path, bb):
"""命令不存在 → spawn_failed"""
bb.create_task(
__import__("src.blackboard.models", fromlist=["Task"]).Task(
id="t1", title="T", status="pending", assigned_by="d"
)
)
# Spawner will try to run "openclaw" which may not exist in test env
# This test is about error handling, not the actual command
try:
asyncio.run(
real_spawner.spawn_full_agent("test", "msg", task_id="t1")
)
except Exception:
pass # Expected - command may fail
# ---------------------------------------------------------------------------
# T4: session 清理
# ---------------------------------------------------------------------------
class TestSessionCleanup:
def test_cleanup_removes_session(self, spawner):
"""cleanup 删除 session"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
assert spawner.get_session(sid) is not None
spawner.cleanup_session(sid)
assert spawner.get_session(sid) is None
def test_cleanup_nonexistent(self, spawner):
"""清理不存在的 session 不报错"""
spawner.cleanup_session("nonexistent-id") # no error
def test_active_sessions_excludes_completed(self, spawner):
"""active_sessions 排除已完成"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
session = spawner.get_session(sid)
session["status"] = "completed"
active = spawner.active_sessions
assert sid not in active
# ---------------------------------------------------------------------------
# E11: Spawner Acquire-First Phase 0-4v2.8 #07.1 新增)
# ---------------------------------------------------------------------------
class TestAcquireFirst:
"""E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试"""
def test_phase0_revive_before_acquire(self, spawner):
"""E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功
Phase 0 在 counter acquire 之前执行,修复 timeout/failed 的 session。
这里用 dry_run spawner 测试 _revive_session 的调用路径。
"""
# 在 dry_run 模式下,Phase 0 检查 session state
# 如果 session 不存在(正常情况),status=None → 不触发 revive
# 测试正常路径:不超时/失败 → 直接 acquire + spawn
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert session_id # dry_run 返回 "main" 或 session_id
def test_phase0_stuck_detection(self, spawner):
"""E11.2 Phase 0: status=running + lock PID 死 → revive
当 session state 显示 running 但 lock PID 已死,Phase 0 应自动 revive。
"""
# 模拟:通过 mock _check_session_state 返回假死状态
original_check = spawner._check_session_state
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] == 1:
# Phase 0: 假死状态
return {"status": "running", "lock_pid_alive": False}
# Phase 2: revive 后正常
return {"status": "idle", "lock_pid_alive": False}
spawner._check_session_state = mock_check
revive_called = [False]
original_revive = spawner._revive_session
def mock_revive(agent_id):
revive_called[0] = True
return True
spawner._revive_session = mock_revive
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert revive_called[0], "Phase 0 should have called _revive_session for stuck session"
def test_phase1_counter_acquire_exclusive(self, spawner):
"""E11.3 Phase 1: counter acquire 互斥
同一 agent 并发 spawn → 第二个 AgentBusyError(reason=counter_blocked)。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
# 第一次 acquire 成功
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert session_id
# 第二次 acquire → counter_blocked
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task2", task_id="t2")
)
assert exc_info.value.reason == "counter_blocked"
def test_phase2_session_check_under_lock(self, spawner):
"""E11.4 Phase 2: session check 在锁保护下执行
counter acquire 后 → session locked → release counter → AgentBusyError。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
# 模拟 session locked
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "session_locked" in exc_info.value.reason
# counter 应被 release(不会泄漏)
assert counter.global_active == 0
def test_phase2_multiple_blockers(self, spawner):
"""E11.5 Phase 2: 多 blocker 并列收集
session locked + compact 同时存在 → detail.blockers 包含两者。
"""
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
"recent_compact": True,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert exc_info.value.detail is not None
blockers = exc_info.value.detail.get("blockers", [])
blocker_reasons = [b[0] for b in blockers]
assert "session_locked" in blocker_reasons
assert "session_compacting" in blocker_reasons
def test_phase3_on_checks_passed_exception_rollback(self, spawner):
"""E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release
回调异常不应导致 counter 泄漏。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
def bad_callback():
raise RuntimeError("callback failed")
with pytest.raises(RuntimeError, match="callback failed"):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1",
on_checks_passed=bad_callback,
)
)
# counter 应被 release
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E13: Compact Hanging 不标 failedv2.8 #07.3 ACT-2 新增)
# ---------------------------------------------------------------------------
class TestCompactHanging:
"""E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working"""
def test_compact_hanging_releases_counter(self):
"""E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working
compact_hanging outcome 时 counter 应被 release,任务不应被标 failed。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
db_path = Path("/tmp/test_compact_hanging.db")
try:
bb = Blackboard(db_path)
bb.create_task(Task(id="t1", title="T", status="working", assigned_by="d",
current_agent="test-agent"))
spawner = AgentSpawner(db_path=db_path, dry_run=True, counter=counter)
# 模拟 compact_hanging outcome 的 on_complete
outcomes = []
async def mock_on_complete(aid, outcome):
outcomes.append((aid, outcome))
# compact_hanging 的 on_complete 应 release counter
# 通过 wrapped_on_complete 机制验证
sid = asyncio.run(spawner.spawn_full_agent(
"test-agent", "task", task_id="t1",
on_complete=mock_on_complete,
))
# 手动模拟 counter releasecompact_hanging 路径)
counter.release("test-agent", sid)
assert counter.global_active == 0
finally:
if db_path.exists():
db_path.unlink()
def test_retry_agent_busy_releases_counter(self, spawner):
"""E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working
retry 遇 session busy 时应 release counter,不持有。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
# 模拟 retry 场景:spawn 遇 AgentBusyError
spawner._check_session_state = lambda agent_id: {
"status": "running",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
# counter 不应泄漏
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E14: AgentBusyError 分类(v2.8 #07.1 O3 新增)
# ---------------------------------------------------------------------------
class TestAgentBusyErrorClassification:
"""E14: AgentBusyError reason/detail 分类验证"""
def test_counter_blocked_reason(self, spawner):
"""E14.1: counter acquire 失败 → reason=counter_blocked"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=1, max_per_agent=1)
spawner.counter = counter
# Fill counter
asyncio.run(counter.acquire("test-agent"))
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert exc_info.value.reason == "counter_blocked"
assert exc_info.value.agent_id == "test-agent"
def test_session_blocker_reasons(self, spawner):
"""E14.2: session locked/running/compacting → 具体 reason + detail.blockers"""
test_cases = [
(
{"status": "idle", "lock_pid_alive": True, "lock_expired": False},
"session_locked",
),
(
{"status": "running", "lock_pid_alive": True, "lock_expired": False},
"session_running",
),
{
"state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True},
"expected": "session_compacting",
},
]
for i, tc in enumerate(test_cases):
if isinstance(tc, dict):
state, expected = tc["state"], tc["expected"]
else:
state, expected = tc
spawner._check_session_state = lambda aid, s=state: s
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert expected in exc_info.value.reason, \
f"Case {i}: expected '{expected}' in '{exc_info.value.reason}'"
assert exc_info.value.detail is not None, \
f"Case {i}: expected detail to be set"
def test_error_attributes(self):
"""E14 补充: AgentBusyError 属性验证"""
err = AgentBusyError(
"my-agent",
reason="session_locked",
detail={"blockers": [("session_locked", 12345)]},
)
assert err.agent_id == "my-agent"
assert err.reason == "session_locked"
assert err.detail["blockers"][0][0] == "session_locked"
assert "my-agent" in str(err)
assert "session_locked" in str(err)