Files
sanguo_moziplus_v2/tests/test_spawner.py
T
2026-06-01 22:58:08 +08:00

578 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F9 Agent Spawner 单元测试
按 test-plan-v2.6.md §F9 Spawner
- T1: spawn 成功(P0
- T2: 超时处理(P0
- T3: spawn 失败(P0
- T4: session 清理(P1
v2.8 新增(#07 Acquire-First + Compact Hanging + AgentBusyError):
- E11: Spawner Acquire-First Phase 0-46 个测试)
- E13: Compact Hanging 不标 failed3 个测试)
- E14: AgentBusyError 分类(3 个测试)
"""
import asyncio
import pytest
from pathlib import Path
from src.blackboard.operations import Blackboard
from src.daemon.spawner import AgentSpawner, AgentBusyError
@pytest.fixture
def db_path(tmp_path):
return tmp_path / "blackboard.db"
@pytest.fixture
def bb(db_path):
return Blackboard(db_path)
@pytest.fixture
def spawner(db_path):
return AgentSpawner(db_path=db_path, dry_run=True)
@pytest.fixture
def real_spawner(db_path):
return AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=2.0)
# ---------------------------------------------------------------------------
# T1: spawn 成功
# ---------------------------------------------------------------------------
class TestSpawnSuccess:
def test_dry_run_spawn(self, spawner):
"""dry_run 模式不实际 spawn"""
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "do something", task_id="t1")
)
assert session_id
assert spawner.get_session(session_id) is not None
assert spawner.get_session(session_id)["agent_id"] == "test-agent"
def test_session_registered(self, spawner):
"""spawn 后 session 注册"""
asyncio.run(spawner.spawn_full_agent("agent-1", "task", task_id="t1"))
sessions = spawner.active_sessions
assert len(sessions) >= 1
def test_spawn_subagent_dry_run(self, spawner):
"""subagent dry_run"""
session_id = asyncio.run(
spawner.spawn_subagent("do task", task_id="t1")
)
assert session_id
def test_multiple_spawns(self, spawner):
"""多次 spawn 独立 session"""
ids = []
for i in range(3):
sid = asyncio.run(
spawner.spawn_full_agent(f"agent-{i}", f"task {i}")
)
ids.append(sid)
assert len(set(ids)) == 3 # 每个 session_id 唯一
# ---------------------------------------------------------------------------
# T2: 超时处理
# ---------------------------------------------------------------------------
class TestTimeout:
def test_timeout_kills_process(self, tmp_path):
"""超时后 kill 进程"""
db_path = tmp_path / "blackboard.db"
Blackboard(db_path) # init
spawner = AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=0.5)
# Spawn a long-running process (sleep 10)
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent",
"sleep 10", # will be passed as --message, actual agent may ignore
task_id=None, # no task to avoid DB writes for non-existent task
)
)
# Wait for timeout
asyncio.run(asyncio.sleep(1.0))
session = spawner.get_session(session_id)
if session:
# Process should have been killed
assert session["status"] in ("timed_out", "running", "completed")
# ---------------------------------------------------------------------------
# T3: spawn 失败
# ---------------------------------------------------------------------------
class TestSpawnFailure:
def test_nonexistent_command(self, real_spawner, db_path, bb):
"""命令不存在 → spawn_failed"""
bb.create_task(
__import__("src.blackboard.models", fromlist=["Task"]).Task(
id="t1", title="T", status="pending", assigned_by="d"
)
)
# Spawner will try to run "openclaw" which may not exist in test env
# This test is about error handling, not the actual command
try:
asyncio.run(
real_spawner.spawn_full_agent("test", "msg", task_id="t1")
)
except Exception:
pass # Expected - command may fail
# ---------------------------------------------------------------------------
# T4: session 清理
# ---------------------------------------------------------------------------
class TestSessionCleanup:
def test_cleanup_removes_session(self, spawner):
"""cleanup 删除 session"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
assert spawner.get_session(sid) is not None
spawner.cleanup_session(sid)
assert spawner.get_session(sid) is None
def test_cleanup_nonexistent(self, spawner):
"""清理不存在的 session 不报错"""
spawner.cleanup_session("nonexistent-id") # no error
def test_active_sessions_excludes_completed(self, spawner):
"""active_sessions 排除已完成"""
sid = asyncio.run(spawner.spawn_full_agent("a", "m"))
session = spawner.get_session(sid)
session["status"] = "completed"
active = spawner.active_sessions
assert sid not in active
# ---------------------------------------------------------------------------
# E11: Spawner Acquire-First Phase 0-4v2.8 #07.1 新增)
# ---------------------------------------------------------------------------
class TestAcquireFirst:
"""E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试"""
def test_phase0_revive_before_acquire(self, spawner):
"""E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功
Phase 0 在 counter acquire 之前执行,修复 timeout/failed 的 session。
这里用 dry_run spawner 测试 _revive_session 的调用路径。
"""
# 在 dry_run 模式下,Phase 0 检查 session state
# 如果 session 不存在(正常情况),status=None → 不触发 revive
# 测试正常路径:不超时/失败 → 直接 acquire + spawn
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert session_id # dry_run 返回 "main" 或 session_id
def test_phase0_stuck_detection(self, spawner):
"""E11.2 Phase 0: status=running + lock PID 死 → revive
当 session state 显示 running 但 lock PID 已死,Phase 0 应自动 revive。
"""
# 模拟:通过 mock _check_session_state 返回假死状态
original_check = spawner._check_session_state
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] == 1:
# Phase 0: 假死状态
return {"status": "running", "lock_pid_alive": False}
# Phase 2: revive 后正常
return {"status": "idle", "lock_pid_alive": False}
spawner._check_session_state = mock_check
revive_called = [False]
original_revive = spawner._revive_session
def mock_revive(agent_id):
revive_called[0] = True
return True
spawner._revive_session = mock_revive
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "do something",
task_id="t1", use_main_session=True,
)
)
assert revive_called[0], "Phase 0 should have called _revive_session for stuck session"
def test_phase1_counter_acquire_exclusive(self, spawner):
"""E11.3 Phase 1: counter acquire 互斥
同一 agent 并发 spawn → 第二个 AgentBusyError(reason 含 counter)。
使用 max_concurrent_sessions=1 确保同 agent 第二次 acquire 失败。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_concurrent_sessions=1)
spawner.counter = counter
# 第一次 acquire 成功
session_id = asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert session_id
# 第二次 acquire → counter blocked(同 agent 已有活跃 session
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task2", task_id="t2")
)
assert "counter" in exc_info.value.reason or "blocked" in exc_info.value.reason
def test_phase2_session_check_under_lock(self, spawner):
"""E11.4 Phase 2: session check 在锁保护下执行
counter acquire 后 → session locked → release counter → AgentBusyError。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
# 模拟 session locked
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "session_locked" in exc_info.value.reason
# counter 应被 release(不会泄漏)
assert counter.global_active == 0
def test_phase2_multiple_blockers(self, spawner):
"""E11.5 Phase 2: 多 blocker 并列收集
session locked + compact 同时存在 → detail.blockers 包含两者。
"""
spawner._check_session_state = lambda agent_id: {
"status": "idle",
"lock_pid_alive": True,
"lock_expired": False,
"recent_compact": True,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert exc_info.value.detail is not None
blockers = exc_info.value.detail.get("blockers", [])
blocker_reasons = [b[0] for b in blockers]
assert "session_locked" in blocker_reasons
assert "session_compacting" in blocker_reasons
def test_phase3_on_checks_passed_exception_rollback(self, spawner):
"""E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release
回调异常不应导致 counter 泄漏。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
def bad_callback():
raise RuntimeError("callback failed")
with pytest.raises(RuntimeError, match="callback failed"):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1",
on_checks_passed=bad_callback,
)
)
# counter 应被 release
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E13: Compact Hanging 不标 failedv2.8 #07.3 ACT-2 新增)
# ---------------------------------------------------------------------------
class TestCompactHanging:
"""E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working"""
def test_compact_hanging_releases_counter(self):
"""E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working
compact_hanging outcome 时 counter 应被 release,任务不应被标 failed。
"""
from src.daemon.counter import ActiveAgentCounter
from src.blackboard.models import Task as TaskModel
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
db_path = Path("/tmp/test_compact_hanging.db")
try:
bb = Blackboard(db_path)
bb.create_task(TaskModel(id="t1", title="T", status="working", assigned_by="d",
current_agent="test-agent"))
spawner = AgentSpawner(db_path=db_path, dry_run=True)
spawner.counter = counter
# 模拟 compact_hanging outcome 的 on_complete
outcomes = []
async def mock_on_complete(aid, outcome):
outcomes.append((aid, outcome))
sid = asyncio.run(spawner.spawn_full_agent(
"test-agent", "task", task_id="t1",
on_complete=mock_on_complete,
))
# 手动模拟 counter releasecompact_hanging 路径)
counter.release("test-agent", sid)
assert counter.global_active == 0
finally:
if db_path.exists():
db_path.unlink()
def test_retry_agent_busy_releases_counter(self, spawner):
"""E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working
retry 遇 session busy 时应 release counter,不持有。
"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=5, max_per_agent=1)
spawner.counter = counter
# 模拟 retry 场景:spawn 遇 AgentBusyError
spawner._check_session_state = lambda agent_id: {
"status": "running",
"lock_pid_alive": True,
"lock_expired": False,
}
with pytest.raises(AgentBusyError):
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
# counter 不应泄漏
assert counter.global_active == 0
# ---------------------------------------------------------------------------
# E14: AgentBusyError 分类(v2.8 #07.1 O3 新增)
# ---------------------------------------------------------------------------
class TestAgentBusyErrorClassification:
"""E14: AgentBusyError reason/detail 分类验证"""
def test_counter_blocked_reason(self, spawner):
"""E14.1: counter acquire 失败 → reason=counter_blocked"""
from src.daemon.counter import ActiveAgentCounter
counter = ActiveAgentCounter(max_global=1, max_per_agent=1)
spawner.counter = counter
# Fill counter
asyncio.run(counter.acquire("test-agent"))
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent("test-agent", "task", task_id="t1")
)
assert exc_info.value.reason == "counter_blocked"
assert exc_info.value.agent_id == "test-agent"
def test_session_blocker_reasons(self, spawner):
"""E14.2: session locked/running/compacting → 具体 reason + detail.blockers"""
test_cases = [
{
# session locked: lock PID alive + not expired
"state": {"status": "idle", "lock_pid_alive": True, "lock_expired": False},
"expected": "session_locked",
},
{
# session running: status=running + lock alive
# 注意:running + lock alive 同时触发 session_locked 和 session_running
# primary_reason 取第一个 blockersession_locked
"state": {"status": "running", "lock_pid_alive": True, "lock_expired": False},
"expected": "session_locked", # session_locked 排在 session_running 之前
},
{
# session compacting: recent compact
"state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True},
"expected": "session_compacting",
},
]
for i, tc in enumerate(test_cases):
state = tc["state"]
expected = tc["expected"]
spawner._check_session_state = lambda aid, s=state: s
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert expected in exc_info.value.reason, \
f"Case {i}: expected '{expected}' in '{exc_info.value.reason}'"
assert exc_info.value.detail is not None, \
f"Case {i}: expected detail to be set"
def test_session_running_in_blockers(self, spawner):
"""session_running 出现在 blockers 列表中(session_locked 优先)
status=running + lock alive → session_locked 和 session_running 同时存在,
session_locked 排第一(primary_reason),但 blockers 列表包含 session_running。
"""
spawner._check_session_state = lambda aid: {
"status": "running", "lock_pid_alive": True, "lock_expired": False,
}
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
# primary_reason 是 session_locked
assert "session_locked" in exc_info.value.reason
# blockers 列表包含 session_running
blockers = exc_info.value.detail.get("blockers", [])
blocker_reasons = [b[0] for b in blockers]
assert "session_running" in blocker_reasons
assert "session_locked" in blocker_reasons
def test_error_attributes(self):
"""E14 补充: AgentBusyError 属性验证"""
err = AgentBusyError(
"my-agent",
reason="session_locked",
detail={"blockers": [("session_locked", 12345)]},
)
assert err.agent_id == "my-agent"
assert err.reason == "session_locked"
assert err.detail["blockers"][0][0] == "session_locked"
assert "my-agent" in str(err)
assert "session_locked" in str(err)
# ---------------------------------------------------------------------------
# 司马懿评审补充:Phase 2.5 + session_stuckv2.8 #07.1 v1.1 兜底)
# ---------------------------------------------------------------------------
class TestPhase25AndStuck:
"""司马懿评审遗漏 #1 + session_stuck 遗漏补充"""
def test_phase25_stuck_fallback(self, spawner):
"""Phase 0 不触发(status 非 running),Phase 2 检测到假死 → revive → 成功 spawn
Phase 2.5 是 #07 v1.1 加的兜底:Phase 0 时 session 正常(idle),
但 Phase 2 检查时变为 running + lock PID 死。Phase 2.5 应 revive → 重检 → idle → spawn 成功。
"""
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] <= 1:
# Phase 0: 正常 idle,不触发 revive
return {"status": "idle", "lock_pid_alive": False}
if call_count[0] == 2:
# Phase 2: 假死(Phase 0 和 Phase 2 之间进程变 stuck
return {"status": "running", "lock_pid_alive": False}
# Phase 2.5 重检:revive 后恢复 idle
return {"status": "idle", "lock_pid_alive": False}
spawner._check_session_state = mock_check
revive_called = [False]
def mock_revive(agent_id):
revive_called[0] = True
return True
spawner._revive_session = mock_revive
# Phase 2.5 应触发 revive → 重检 → idle → 正常 spawn
session_id = asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert revive_called[0], "Phase 2.5 should have revived stuck session"
assert session_id # spawn 成功
def test_session_stuck_after_failed_revive(self, spawner):
"""Phase 2.5 revive 失败 → session_stuck
假死 revive 后 status 仍为 running → AgentBusyError(session_stuck)。
"""
call_count = [0]
def mock_check(agent_id):
call_count[0] += 1
if call_count[0] <= 1:
return {"status": "idle", "lock_pid_alive": False}
# Phase 2: 假死
return {"status": "running", "lock_pid_alive": False}
spawner._check_session_state = mock_check
# revive 后 session 仍 stuck
def mock_revive(agent_id):
return True # revive 成功但 mock_check 不变,下次仍返回 running
# 让 revive 后 check 返回 stuck
revive_and_check_count = [0]
original_check = mock_check
def mock_check_v2(agent_id):
revive_and_check_count[0] += 1
if revive_and_check_count[0] <= 1:
return {"status": "idle", "lock_pid_alive": False}
if revive_and_check_count[0] == 2:
return {"status": "running", "lock_pid_alive": False}
# revive 后重检:仍 stuck
return {"status": "running", "lock_pid_alive": False}
spawner._check_session_state = mock_check_v2
spawner._revive_session = mock_revive
with pytest.raises(AgentBusyError) as exc_info:
asyncio.run(
spawner.spawn_full_agent(
"test-agent", "task",
task_id="t1", use_main_session=True,
)
)
assert "stuck" in exc_info.value.reason