From 6a649aba07647b97e4c46049bf1d079327bbb0d7 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Fri, 5 Jun 2026 11:03:30 +0800 Subject: [PATCH] auto-sync: 2026-06-05 11:03:30 --- tests/conftest.py | 51 ++ tests/e2e/__init__.py | 0 tests/{ => e2e}/test_e2e_v27.py | 4 + tests/{ => e2e}/test_e2e_v31.py | 4 + .../test_four_phase.py} | 4 + tests/integration/__init__.py | 0 tests/{ => integration}/test_api.py | 2 + .../test_dispatcher_integration.py | 128 ++++ tests/{ => integration}/test_main.py | 2 + .../test_review_integration.py} | 69 +- tests/integration/test_spawner_integration.py | 73 ++ tests/{ => integration}/test_sse.py | 2 + .../test_ticker_integration.py} | 39 +- tests/{ => integration}/test_v27_subtasks.py | 2 + tests/test_bootstrap.py | 212 ------ tests/test_e2e_four_phase.py | 629 ------------------ tests/unit/__init__.py | 0 tests/{ => unit}/test_blackboard.py | 2 + tests/unit/test_bootstrap.py | 191 ++++++ tests/{ => unit}/test_cli.py | 2 + tests/{ => unit}/test_counter.py | 2 + tests/{ => unit}/test_dispatcher.py | 173 +---- tests/{ => unit}/test_experience.py | 2 + tests/{ => unit}/test_health.py | 2 + tests/{ => unit}/test_inbox.py | 2 + tests/{ => unit}/test_registry.py | 2 + tests/unit/test_review.py | 87 +++ tests/{ => unit}/test_router.py | 2 + tests/{ => unit}/test_skill_system.py | 2 + tests/{ => unit}/test_spawner.py | 188 +----- 30 files changed, 602 insertions(+), 1276 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/e2e/__init__.py rename tests/{ => e2e}/test_e2e_v27.py (99%) rename tests/{ => e2e}/test_e2e_v31.py (99%) rename tests/{test_four_phase_unit.py => e2e/test_four_phase.py} (99%) create mode 100644 tests/integration/__init__.py rename tests/{ => integration}/test_api.py (99%) create mode 100644 tests/integration/test_dispatcher_integration.py rename tests/{ => integration}/test_main.py (98%) rename tests/{test_review.py => integration/test_review_integration.py} (71%) create mode 100644 tests/integration/test_spawner_integration.py rename tests/{ => integration}/test_sse.py (99%) rename tests/{test_ticker.py => integration/test_ticker_integration.py} (92%) rename tests/{ => integration}/test_v27_subtasks.py (99%) delete mode 100644 tests/test_bootstrap.py delete mode 100644 tests/test_e2e_four_phase.py create mode 100644 tests/unit/__init__.py rename tests/{ => unit}/test_blackboard.py (99%) create mode 100644 tests/unit/test_bootstrap.py rename tests/{ => unit}/test_cli.py (99%) rename tests/{ => unit}/test_counter.py (99%) rename tests/{ => unit}/test_dispatcher.py (62%) rename tests/{ => unit}/test_experience.py (99%) rename tests/{ => unit}/test_health.py (99%) rename tests/{ => unit}/test_inbox.py (99%) rename tests/{ => unit}/test_registry.py (99%) create mode 100644 tests/unit/test_review.py rename tests/{ => unit}/test_router.py (99%) rename tests/{ => unit}/test_skill_system.py (99%) rename tests/{ => unit}/test_spawner.py (68%) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e7d2f76 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,51 @@ +import uuid +import pytest +from pathlib import Path +from fastapi.testclient import TestClient + + +def pytest_configure(config): + markers = { + "unit": "单元测试:纯逻辑,mock 外部依赖", + "integration": "集成测试:API 端点 + 真实/临时 DB", + "e2e": "端到端测试:真实 daemon + Agent(手动触发)", + "slow": "慢测试(>5s)", + "broadcast": "广播认领相关", + "mail": "邮件系统相关", + "state_machine": "状态机转换", + "classify": "Classify Outcome 相关", + "review": "审查/Rebuttal 相关", + } + for name, desc in markers.items(): + config.addinivalue_line("markers", f"{name}: {desc}") + + +@pytest.fixture +def isolated_data_root(tmp_path): + """隔离的 data_root,测试结束自动清理""" + data_root = tmp_path / "test_data" + data_root.mkdir() + return data_root + + +@pytest.fixture +def isolated_registry(isolated_data_root): + """隔离的 registry.db""" + from src.blackboard.registry import ProjectRegistry + registry = ProjectRegistry(isolated_data_root) + return registry + + +@pytest.fixture +def client_with_isolation(isolated_data_root): + """带数据隔离的 TestClient""" + import src.utils as utils + original = utils.get_data_root + utils.get_data_root = lambda: isolated_data_root + + from src.main import app + client = TestClient(app) + + yield client + + utils.get_data_root = original diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_e2e_v27.py b/tests/e2e/test_e2e_v27.py similarity index 99% rename from tests/test_e2e_v27.py rename to tests/e2e/test_e2e_v27.py index c686206..c9f3128 100644 --- a/tests/test_e2e_v27.py +++ b/tests/e2e/test_e2e_v27.py @@ -1,3 +1,7 @@ +import pytest + +pytestmark = pytest.mark.e2e + """v2.7 端到端测试 — 全链路真实环境 覆盖:项目管理 → Task CRUD → SubTask → Stage进度 → 状态聚合 → 依赖链 → 超时 → Mail → 真实Agent调度 diff --git a/tests/test_e2e_v31.py b/tests/e2e/test_e2e_v31.py similarity index 99% rename from tests/test_e2e_v31.py rename to tests/e2e/test_e2e_v31.py index f05ec11..47efaba 100644 --- a/tests/test_e2e_v31.py +++ b/tests/e2e/test_e2e_v31.py @@ -1,3 +1,7 @@ +import pytest + +pytestmark = pytest.mark.e2e + """v3.1 端到端测试 — 新增场景覆盖 覆盖 v3.1 新增功能: diff --git a/tests/test_four_phase_unit.py b/tests/e2e/test_four_phase.py similarity index 99% rename from tests/test_four_phase_unit.py rename to tests/e2e/test_four_phase.py index eddbb54..f2173a2 100644 --- a/tests/test_four_phase_unit.py +++ b/tests/e2e/test_four_phase.py @@ -1,3 +1,7 @@ +import pytest + +pytestmark = pytest.mark.e2e + """#01 四相循环 单元测试 不依赖 daemon / Agent,纯逻辑验证。覆盖: diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_api.py b/tests/integration/test_api.py similarity index 99% rename from tests/test_api.py rename to tests/integration/test_api.py index e7f0af7..65045c0 100644 --- a/tests/test_api.py +++ b/tests/integration/test_api.py @@ -12,6 +12,8 @@ from src.blackboard.models import Task from src.blackboard.registry import ProjectRegistry from src.main import app +pytestmark = pytest.mark.integration + @pytest.fixture def project_env(tmp_path): diff --git a/tests/integration/test_dispatcher_integration.py b/tests/integration/test_dispatcher_integration.py new file mode 100644 index 0000000..ee17fe3 --- /dev/null +++ b/tests/integration/test_dispatcher_integration.py @@ -0,0 +1,128 @@ +"""F9 Agent 调度器集成测试 — rollback/on_complete DB 交互""" + +import asyncio +import json +import pytest +from pathlib import Path + +from src.blackboard.models import Task +from src.blackboard.operations import Blackboard +from src.daemon.dispatcher import Dispatcher + +pytestmark = pytest.mark.integration + + +# --------------------------------------------------------------------------- +# 司马懿评审补充:_rollback_current_agent + on_complete 统一(v2.8 #07.2) +# --------------------------------------------------------------------------- + +class TestRollbackAndOnComplete: + """司马懿评审遗漏 #3 + #4: crash 后 current_agent 回退 + on_complete 统一路径""" + + def test_rollback_current_agent_on_crash(self, tmp_path): + """executor crash → _rollback_current_agent 回退 current_agent → assignee""" + db_path = tmp_path / "blackboard.db" + bb = Blackboard(db_path) + bb.create_task(Task( + id="t1", title="T", status="working", + assigned_by="daemon", assignee="zhangfei-dev", + )) + + conn = bb._conn() + try: + conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) + conn.commit() + finally: + conn.close() + + dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) + dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") + + import sqlite3 + conn2 = sqlite3.connect(str(db_path)) + conn2.row_factory = sqlite3.Row + row = conn2.execute("SELECT current_agent, assignee FROM tasks WHERE id=?", ("t1",)).fetchone() + conn2.close() + + assert row["current_agent"] == row["assignee"] == "zhangfei-dev" + + def test_rollback_different_agent(self, tmp_path): + """current_agent ≠ agent_id → 不回退(安全检查)""" + db_path = tmp_path / "blackboard.db" + bb = Blackboard(db_path) + bb.create_task(Task( + id="t1", title="T", status="working", + assigned_by="daemon", assignee="zhangfei-dev", + )) + + conn = bb._conn() + try: + conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) + conn.commit() + finally: + conn.close() + + dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) + dispatcher._rollback_current_agent(db_path, "t1", "wrong-agent") + + import sqlite3 + conn2 = sqlite3.connect(str(db_path)) + conn2.row_factory = sqlite3.Row + row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() + conn2.close() + + assert row["current_agent"] == "simayi-challenger" + + def test_on_complete_crash_rollback_executor(self, tmp_path): + """executor crash → rollback current_agent + _task_auto_complete(标 review)""" + db_path = tmp_path / "blackboard.db" + bb = Blackboard(db_path) + bb.create_task(Task( + id="t1", title="T", status="working", + assigned_by="daemon", assignee="zhangfei-dev", + )) + + conn = bb._conn() + try: + conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) + conn.commit() + finally: + conn.close() + + dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) + dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") + + import sqlite3 + conn2 = sqlite3.connect(str(db_path)) + conn2.row_factory = sqlite3.Row + row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() + conn2.close() + assert row["current_agent"] == "zhangfei-dev" + + def test_on_complete_crash_rollback_review(self, tmp_path): + """review crash → rollback current_agent + 保持 review 状态""" + db_path = tmp_path / "blackboard.db" + bb = Blackboard(db_path) + bb.create_task(Task( + id="t1", title="T", status="review", + assigned_by="daemon", assignee="zhangfei-dev", + )) + + conn = bb._conn() + try: + conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) + conn.commit() + finally: + conn.close() + + dispatcher = Dispatcher(registered_agents=["simayi-challenger"]) + dispatcher._rollback_current_agent(db_path, "t1", "simayi-challenger") + + import sqlite3 + conn2 = sqlite3.connect(str(db_path)) + conn2.row_factory = sqlite3.Row + row = conn2.execute("SELECT current_agent, status FROM tasks WHERE id=?", ("t1",)).fetchone() + conn2.close() + + assert row["current_agent"] == "zhangfei-dev" + assert row["status"] == "review" diff --git a/tests/test_main.py b/tests/integration/test_main.py similarity index 98% rename from tests/test_main.py rename to tests/integration/test_main.py index a1ebdfb..0b49061 100644 --- a/tests/test_main.py +++ b/tests/integration/test_main.py @@ -5,6 +5,8 @@ from fastapi.testclient import TestClient from src.main import app, config, load_config +pytestmark = pytest.mark.integration + @pytest.fixture def client(): diff --git a/tests/test_review.py b/tests/integration/test_review_integration.py similarity index 71% rename from tests/test_review.py rename to tests/integration/test_review_integration.py index 48b9ad2..5b3db9e 100644 --- a/tests/test_review.py +++ b/tests/integration/test_review_integration.py @@ -1,11 +1,9 @@ -"""F12 Review Pipeline + F13 Guardrail + F14 Rebuttal 单元测试 +"""F12 Review Pipeline + F13 Guardrail 集成测试 -按 test-plan-v2.6.md §F12-F14: +按 test-plan-v2.6.md §F12-F13: - F12 T1: 验证流水线四步(P0) - F12 T2: 评分计算(P0) - F13 T1: Guardrail 门控(P0) -- F14 T1: 反驳权流程(P0) -- F14 T2: 最大轮次限制(P0) """ import json @@ -16,12 +14,13 @@ from unittest.mock import MagicMock from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.daemon.review import ( - RebuttalManager, ReviewPipeline, ReviewResult, ReviewVerdict, ) +pytestmark = pytest.mark.integration + @pytest.fixture def db_path(tmp_path): @@ -177,63 +176,3 @@ class TestGuardrail: outputs = [{"content": "valid output here", "type": "text"}] result = p.run_review(task, outputs=outputs) assert result["gate"] == "optional" - - -# --------------------------------------------------------------------------- -# F14 T1: 反驳权流程 -# --------------------------------------------------------------------------- - -class TestRebuttal: - def test_submit_rebuttal_accepted(self, bb): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - bb.create_task(task) - - rm = RebuttalManager(bb=bb) - result = rm.submit_rebuttal("t1", "agent-1", "I disagree with the review") - assert result["status"] == "accepted" - assert result["round"] == 1 - assert result["escalation_target"] == "simayi-challenger" - - def test_second_round_escalates_to_pangtong(self, bb): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - bb.create_task(task) - - rm = RebuttalManager(bb=bb) - rm.submit_rebuttal("t1", "agent-1", "Round 1") - result = rm.submit_rebuttal("t1", "agent-1", "Round 2") - assert result["status"] == "accepted" - assert result["round"] == 2 - assert result["escalation_target"] == "pangtong-fujunshi" - - -# --------------------------------------------------------------------------- -# F14 T2: 最大轮次限制 -# --------------------------------------------------------------------------- - -class TestRebuttalLimits: - def test_max_rounds_rejected(self, bb): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - bb.create_task(task) - - rm = RebuttalManager(bb=bb) - rm.submit_rebuttal("t1", "a", "R1") - rm.submit_rebuttal("t1", "a", "R2") - result = rm.submit_rebuttal("t1", "a", "R3") - assert result["status"] == "rejected" - assert "Max" in result["reason"] - - def test_rebuttal_without_bb(self): - rm = RebuttalManager(bb=None) - result = rm.submit_rebuttal("t1", "a", "reason") - assert result["status"] == "accepted" - assert result["round"] == 1 - - def test_rebuttal_observation_recorded(self, bb): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - bb.create_task(task) - - rm = RebuttalManager(bb=bb) - rm.submit_rebuttal("t1", "agent-1", "test reason", evidence="file.txt") - obs = bb.get_observations(task_id="t1") - rebuttals = [o for o in obs if "Rebuttal round" in (o.body or "")] - assert len(rebuttals) == 1 diff --git a/tests/integration/test_spawner_integration.py b/tests/integration/test_spawner_integration.py new file mode 100644 index 0000000..2d6bae1 --- /dev/null +++ b/tests/integration/test_spawner_integration.py @@ -0,0 +1,73 @@ +"""F9 Agent Spawner 集成测试 — 超时/失败/spawn 真实流程""" + +import asyncio +import pytest +from pathlib import Path + +from src.blackboard.operations import Blackboard +from src.daemon.spawner import AgentSpawner + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def db_path(tmp_path): + return tmp_path / "blackboard.db" + + +@pytest.fixture +def bb(db_path): + return Blackboard(db_path) + + +@pytest.fixture +def real_spawner(db_path): + return AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=2.0) + + +# --------------------------------------------------------------------------- +# T2: 超时处理 +# --------------------------------------------------------------------------- + +class TestTimeout: + def test_timeout_kills_process(self, tmp_path): + """超时后 kill 进程""" + db_path = tmp_path / "blackboard.db" + Blackboard(db_path) # init + spawner = AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=0.5) + + # Spawn a long-running process (sleep 10) + session_id = asyncio.run( + spawner.spawn_full_agent( + "test-agent", + "sleep 10", + task_id=None, + ) + ) + # Wait for timeout + asyncio.run(asyncio.sleep(1.0)) + + session = spawner.get_session(session_id) + if session: + assert session["status"] in ("timed_out", "running", "completed") + + +# --------------------------------------------------------------------------- +# T3: spawn 失败 +# --------------------------------------------------------------------------- + +class TestSpawnFailure: + def test_nonexistent_command(self, real_spawner, db_path, bb): + """命令不存在 → spawn_failed""" + bb.create_task( + __import__("src.blackboard.models", fromlist=["Task"]).Task( + id="t1", title="T", status="pending", assigned_by="d" + ) + ) + + try: + asyncio.run( + real_spawner.spawn_full_agent("test", "msg", task_id="t1") + ) + except Exception: + pass # Expected - command may fail diff --git a/tests/test_sse.py b/tests/integration/test_sse.py similarity index 99% rename from tests/test_sse.py rename to tests/integration/test_sse.py index 34413b1..7403fef 100644 --- a/tests/test_sse.py +++ b/tests/integration/test_sse.py @@ -12,6 +12,8 @@ import json import pytest from unittest.mock import AsyncMock, MagicMock +pytestmark = pytest.mark.integration + from src.daemon.sse import ( Hook, HookManager, diff --git a/tests/test_ticker.py b/tests/integration/test_ticker_integration.py similarity index 92% rename from tests/test_ticker.py rename to tests/integration/test_ticker_integration.py index db2f7bb..d4822a9 100644 --- a/tests/test_ticker.py +++ b/tests/integration/test_ticker_integration.py @@ -24,6 +24,8 @@ from src.blackboard.registry import ProjectRegistry from src.blackboard.queries import Queries from src.daemon.ticker import Ticker +pytestmark = pytest.mark.integration + # --------------------------------------------------------------------------- # Fixtures @@ -446,20 +448,15 @@ class TestCheckTimeoutsUnified: return registry, db_path, bb def test_crash_limit_working(self, timeout_project): - """E12.1: executor crash 3 次/30min → _check_timeouts 标 failed + """E12.1: executor crash 3 次/30min → _check_timeouts 标 failed""" - #07.2 将 crash_limit 从 _dispatch_reviews 移到 _check_timeouts, - 覆盖 working 和 review 状态。 - """ registry, db_path, bb = timeout_project - # 创建 working 任务 bb.create_task(Task( id="t-crash", title="Crash Task", status="working", assigned_by="daemon", current_agent="agent-a", )) - # 模拟 3 次 crash 的 task_attempts from datetime import datetime, timedelta conn = bb._conn() try: @@ -476,20 +473,13 @@ class TestCheckTimeoutsUnified: conn.close() ticker = Ticker(registry, tick_interval=30) - # 如果有 dispatcher + _check_crash_limit,它会在 _check_timeouts 中触发 - # 测试基本结构:_check_timeouts 应该能处理 working 状态任务 result = ticker._check_timeouts(db_path) - # 即使没有 dispatcher(_check_crash_limit 需要),超时检查本身不应崩溃 assert isinstance(result, list) def test_crash_limit_review(self, timeout_project): - """E12.2: reviewer crash 3 次/30min → _check_timeouts 标 failed - - #07.2 统一后,review 状态的 crash_limit 也走 _check_timeouts。 - """ + """E12.2: reviewer crash 3 次/30min → _check_timeouts 标 failed""" registry, db_path, bb = timeout_project - # 创建 review 状态任务 bb.create_task(Task( id="t-review-crash", title="Review Crash Task", status="review", assigned_by="daemon", current_agent="simayi-challenger", @@ -498,32 +488,24 @@ class TestCheckTimeoutsUnified: ticker = Ticker(registry, tick_interval=30) result = ticker._check_timeouts(db_path) assert isinstance(result, list) - # _check_timeouts 不应崩溃,review 状态在统一逻辑中被正确处理 def test_updated_at_fallback(self, timeout_project): - """E12.3: mail auto-working 无 started_at/claimed_at → updated_at fallback - - #07.3 ACT-1: _check_timeouts 使用 updated_at 作为最后 fallback, - 确保 PM2 重启后 mail 孤儿任务也能被回收。 - """ + """E12.3: mail auto-working 无 started_at/claimed_at → updated_at fallback""" registry, db_path, bb = timeout_project from datetime import datetime, timedelta - # 创建 working 任务,只有 updated_at(模拟 mail auto-working) old_time = (datetime.utcnow() - timedelta(minutes=60)).isoformat() bb.create_task(Task( id="t-mail-orphan", title="Mail Orphan", status="working", assigned_by="daemon", current_agent="pangtong-fujunshi", )) - # 手动设置 updated_at(模拟 PM2 重启前的时间戳) conn = bb._conn() try: conn.execute( "UPDATE tasks SET updated_at = ? WHERE id = ?", (old_time, "t-mail-orphan"), ) - # 确保 started_at 和 claimed_at 为 NULL conn.execute( "UPDATE tasks SET started_at = NULL, claimed_at = NULL WHERE id = ?", ("t-mail-orphan",), @@ -534,25 +516,18 @@ class TestCheckTimeoutsUnified: ticker = Ticker(registry, tick_interval=30, default_task_timeout_minutes=30) reclaimed = ticker._check_timeouts(db_path) - # updated_at fallback 应让这个任务被回收 assert "t-mail-orphan" in reclaimed, \ "Mail orphan with only updated_at should be reclaimed via fallback" def test_process_dead_keeps_review_status(self, timeout_project): - """E12.4: review agent 进程死 → 保持 review 状态(不推 pending) - - #07.2: process_dead 对 review 状态的处理——保持 review, - 等 _dispatch_reviews 下个 tick 自然 dispatch。 - """ + """E12.4: review agent 进程死 → 保持 review 状态(不推 pending)""" registry, db_path, bb = timeout_project - # 创建 review 状态任务 bb.create_task(Task( id="t-review-dead", title="Review Dead Process", status="review", assigned_by="daemon", current_agent="simayi-challenger", )) - # 设置较新的时间戳(不应因超时被回收) from datetime import datetime conn = bb._conn() try: @@ -567,6 +542,4 @@ class TestCheckTimeoutsUnified: ticker = Ticker(registry, tick_interval=30, default_task_timeout_minutes=30) reclaimed = ticker._check_timeouts(db_path) - # 没有 process_dead 的模拟(无 counter/spawner),纯超时路径 - # review 任务时间戳较新 → 不应被超时回收 assert "t-review-dead" not in reclaimed diff --git a/tests/test_v27_subtasks.py b/tests/integration/test_v27_subtasks.py similarity index 99% rename from tests/test_v27_subtasks.py rename to tests/integration/test_v27_subtasks.py index e3e3e6e..24fd88f 100644 --- a/tests/test_v27_subtasks.py +++ b/tests/integration/test_v27_subtasks.py @@ -6,6 +6,8 @@ from pathlib import Path import pytest +pytestmark = pytest.mark.integration + from src.blackboard.db import init_db, get_connection from src.blackboard.models import Task from src.blackboard.operations import Blackboard diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py deleted file mode 100644 index c12809f..0000000 --- a/tests/test_bootstrap.py +++ /dev/null @@ -1,212 +0,0 @@ -"""F11 Bootstrap 拼装单元测试 - -按 test-plan-v2.6.md §F11: -- T1: 各 role 拼装(P0) -- T2: token 估算(P0) -- T3: 缺失组件降级(P1) -- T4: 模板变量替换(P1) -""" - -import pytest -from pathlib import Path - -from src.blackboard.models import Task -from src.daemon.bootstrap import BootstrapBuilder, estimate_tokens - - -@pytest.fixture -def builder(): - return BootstrapBuilder(max_tokens=4096) - - -@pytest.fixture -def builder_with_templates(tmp_path): - template_dir = tmp_path / "templates" - template_dir.mkdir() - (template_dir / "executor.md").write_text("# Executor Role\nYou execute tasks.") - (template_dir / "reviewer.md").write_text("# Reviewer Role\nYou review code.") - (template_dir / "planner.md").write_text("# Planner Role\nYou plan tasks.") - return BootstrapBuilder(template_dir=template_dir, max_tokens=4096) - - -# --------------------------------------------------------------------------- -# T1: 各 role 拼装 -# --------------------------------------------------------------------------- - -class TestRoleBootstrap: - def test_executor_bootstrap(self, builder): - b = builder.build( - role="executor", - task_context={"task_id": "t1", "title": "Write tests"}, - ) - assert "Write tests" in b - assert "t1" in b - - def test_reviewer_bootstrap(self, builder): - b = builder.build( - role="reviewer", - task_context={"task_id": "t2", "title": "Review PR"}, - ) - assert "Review PR" in b - - def test_planner_bootstrap(self, builder): - b = builder.build( - role="planner", - task_context={"task_id": "t3", "title": "Plan sprint"}, - ) - assert "Plan sprint" in b - - def test_executor_with_guardrail(self, builder): - b = builder.build( - role="executor", - guardrail_rules="## Guardrail\nNo dangerous ops", - ) - assert "Guardrail" in b - - def test_reviewer_no_guardrail(self, builder): - b = builder.build( - role="reviewer", - guardrail_rules="## Guardrail\nNo dangerous ops", - ) - assert "Guardrail" not in b # reviewer 不注入 guardrail - - def test_executor_with_review_protocol(self, builder): - b = builder.build( - role="executor", - review_protocols="## Review Protocol\nCheck tests", - ) - assert "Review Protocol" in b - - def test_reviewer_with_review_protocol(self, builder): - b = builder.build( - role="reviewer", - review_protocols="## Review Protocol\nCheck quality", - ) - assert "Review Protocol" in b - - def test_with_template(self, builder_with_templates): - b = builder_with_templates.build(role="executor") - assert "Executor Role" in b - assert "You execute tasks" in b - - -# --------------------------------------------------------------------------- -# T2: token 估算 -# --------------------------------------------------------------------------- - -class TestTokenEstimation: - def test_estimate_tokens_basic(self): - assert estimate_tokens("hello") > 0 - - def test_estimate_tokens_long_text(self): - text = "a" * 1000 - tokens = estimate_tokens(text) - assert 200 < tokens < 400 # ~333 - - def test_bootstrap_under_limit(self, builder): - b = builder.build( - role="executor", - task_context={"title": "Short task"}, - ) - assert estimate_tokens(b) <= 4096 - - def test_bootstrap_over_limit_truncates(self): - builder = BootstrapBuilder(max_tokens=10) - b = builder.build( - role="executor", - task_context={"title": "A" * 10000}, - ) - assert estimate_tokens(b) <= 20 # 接近限制 - assert "truncated" in b - - -# --------------------------------------------------------------------------- -# T3: 缺失组件降级 -# --------------------------------------------------------------------------- - -class TestGracefulDegradation: - def test_no_task_context(self, builder): - b = builder.build(role="executor") - assert b # 不为空 - - def test_no_project_context(self, builder): - b = builder.build( - role="executor", - task_context={"title": "Task"}, - ) - assert "Task" in b - - def test_no_template(self, builder): - b = builder.build(role="executor") - assert b # 没有 template 也不崩溃 - - def test_empty_experiences(self, builder): - b = builder.build(role="executor", experiences=[]) - assert b - - def test_template_dir_not_exists(self, tmp_path): - builder = BootstrapBuilder(template_dir=tmp_path / "nonexistent") - b = builder.build(role="executor") - assert b # 不崩溃 - - -# --------------------------------------------------------------------------- -# T4: 便捷方法 + 项目上下文 -# --------------------------------------------------------------------------- - -class TestBuildForTask: - def test_build_for_task_object(self, builder): - task = Task( - id="t1", title="Build Feature", status="pending", - assigned_by="daemon", task_type="coding", - description="Implement X with tests", - must_haves="Unit tests, Documentation", - risk_level="high", - ) - b = builder.build_for_task(task, role="executor") - assert "Build Feature" in b - assert "Implement X" in b - assert "high" in b - - def test_build_for_task_with_project(self, builder): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - b = builder.build_for_task( - task, role="executor", - project_config={"name": "My Project", "agents": ["a1", "a2"]}, - ) - assert "My Project" in b - assert "a1" in b - - def test_with_experiences(self, builder): - task = Task(id="t1", title="T", status="pending", assigned_by="d") - experiences = [ - {"category": "pitfall", "summary": "Always test edge cases"}, - {"category": "best_practice", "summary": "Use type hints"}, - ] - b = builder.build_for_task(task, role="executor", experiences=experiences) - assert "pitfall" in b - assert "Always test edge cases" in b - - def test_with_skills(self, builder): - skills = [ - {"name": "code-review", "description": "Review code quality"}, - ] - b = builder.build( - role="executor", - skill_descriptions=skills, - ) - assert "code-review" in b - assert "Review code quality" in b - - def test_with_depends_on_outputs(self, builder): - b = builder.build( - role="executor", - task_context={ - "title": "T", - "depends_on_outputs": [ - {"task_id": "t0", "summary": "Data downloaded"}, - ], - }, - ) - assert "前序产出" in b - assert "Data downloaded" in b diff --git a/tests/test_e2e_four_phase.py b/tests/test_e2e_four_phase.py deleted file mode 100644 index e3c9824..0000000 --- a/tests/test_e2e_four_phase.py +++ /dev/null @@ -1,629 +0,0 @@ -"""#01 四相循环 E2E 集成测试 - -需要 daemon 运行 + RUN_INTEGRATION=1。覆盖: - E1 comment + @mention 端到端(真实 Agent spawn) - E2 一轮结束 → 庞统 review spawn(真实庞统 spawn) - E3 多轮迭代(庞统真实 spawn + 真实创建 sub task) - E4 round 上限强制停止 - E5 mention 重试(可靠制造 Agent busy) - E6 failed sub 触发 review(BUG-2 验证) - B1-B6 边界测试 - -全部真实环境执行,不 mock Agent 行为。 -""" - -import json -import os -import sqlite3 -import sys -import time -import uuid -from pathlib import Path -from typing import Any, Dict, List - -import pytest -import requests as http_requests - -# ── 路径设置 ── -DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" -sys.path.insert(0, str(DEPLOY_DIR)) - -from src.utils import get_data_root - -# ── 常量 ── -API_BASE = "http://localhost:8083" -POLL_INTERVAL = 5 -MAX_WAIT_DISPATCH = 120 -MAX_WAIT_PANGTONG = 900 -E2E_PREFIX = "e2e-01-" -DATA_ROOT = get_data_root() - - -# ── 工具函数 ── - -def _check_environment(): - try: - resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) - data = resp.json() - if data.get("status") != "running" or not data.get("ticker_running"): - pytest.skip(f"Daemon not ready: {data}") - except Exception as e: - pytest.skip(f"Production API not available: {e}") - - -def _cleanup_project(pid: str): - try: - http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) - except Exception: - pass - - -def _create_project(plist: list, prefix: str = "E01", - agents: list = None) -> str: - pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" - config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "zhaoyun-data"]} - resp = http_requests.post(f"{API_BASE}/api/projects", json={ - "id": pid, "name": f"{prefix}-{pid}", "config": config, - }, timeout=10) - assert resp.status_code == 200, f"Create project failed: {resp.text}" - plist.append(pid) - return pid - - -def _create_task(pid: str, **kwargs) -> str: - tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}" - body = {"id": tid, "status": "pending", "priority": 5, **kwargs} - resp = http_requests.post( - f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10) - assert resp.status_code == 200, f"Create task failed: {resp.text}" - return tid - - -def _get_task(pid: str, tid: str) -> Dict: - resp = http_requests.get( - f"{API_BASE}/api/projects/{pid}/tasks/{tid}?expand=all", timeout=10) - assert resp.status_code == 200 - return resp.json() - - -def _list_tasks(pid: str, **params) -> List[Dict]: - resp = http_requests.get( - f"{API_BASE}/api/projects/{pid}/tasks", params=params, timeout=10) - assert resp.status_code == 200 - data = resp.json() - # API 返回 {"tasks": [...]} 或直接 [...] - if isinstance(data, dict) and "tasks" in data: - return data["tasks"] - return data - - -def _update_status(pid: str, tid: str, status: str, - agent: str = "test", detail: str = "") -> Dict: - body = {"status": status, "agent": agent} - if detail: - body["detail"] = detail - resp = http_requests.post( - f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", - json=body, timeout=10) - return resp.json() - - -def _add_comment(pid: str, tid: str, author: str, body: str, - mentions: list = None) -> int: - resp = http_requests.post( - f"{API_BASE}/api/projects/{pid}/tasks/{tid}/comments", - json={"author": author, "body": body, "comment_type": "general", - "mentions": mentions or []}, - timeout=10) - assert resp.status_code == 200 - data = resp.json() - return data.get("comment_id") or data.get("id") - - -def _get_db_path(pid: str) -> Path: - return DATA_ROOT / pid / "blackboard.db" - - -def _query_mentions(db_path: Path, status: str = None) -> list: - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - try: - if status: - rows = conn.execute( - "SELECT * FROM mention_queue WHERE status=?", (status,) - ).fetchall() - else: - rows = conn.execute("SELECT * FROM mention_queue").fetchall() - return [dict(r) for r in rows] - finally: - conn.close() - - -def _set_round_count(db_path: Path, tid: str, count: int): - conn = sqlite3.connect(str(db_path)) - try: - conn.execute("UPDATE tasks SET round_count=? WHERE id=?", (count, tid)) - conn.commit() - finally: - conn.close() - - -def _get_round_count(db_path: Path, tid: str) -> int: - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - try: - row = conn.execute( - "SELECT round_count FROM tasks WHERE id=?", (tid,)).fetchone() - return row["round_count"] if row else 0 - finally: - conn.close() - - -def _count_subtasks(db_path: Path, parent_tid: str) -> int: - conn = sqlite3.connect(str(db_path)) - try: - row = conn.execute( - "SELECT COUNT(*) FROM tasks WHERE parent_task=?", (parent_tid,) - ).fetchone() - return row[0] - finally: - conn.close() - - -def _wait_round(db_path: Path, tid: str, min_count: int, - timeout: int = MAX_WAIT_DISPATCH) -> int: - deadline = time.time() + timeout - while time.time() < deadline: - time.sleep(POLL_INTERVAL) - rc = _get_round_count(db_path, tid) - if rc >= min_count: - return rc - rc = _get_round_count(db_path, tid) - pytest.fail(f"round_count < {min_count} after {timeout}s (now={rc})") - - -def _wait_subtasks(db_path: Path, parent_tid: str, min_count: int, - timeout: int = MAX_WAIT_PANGTONG) -> int: - deadline = time.time() + timeout - while time.time() < deadline: - time.sleep(POLL_INTERVAL) - cnt = _count_subtasks(db_path, parent_tid) - if cnt >= min_count: - return cnt - cnt = _count_subtasks(db_path, parent_tid) - pytest.fail(f"subtasks < {min_count} after {timeout}s (now={cnt})") - - -def _push_done(pid: str, tid: str, agent: str = "test"): - for s in ("claimed", "working", "review", "done"): - _update_status(pid, tid, s, agent=agent) - - -def _push_failed(pid: str, tid: str, agent: str = "test"): - _update_status(pid, tid, "claimed", agent=agent) - _update_status(pid, tid, "working", agent=agent) - _update_status(pid, tid, "failed", agent=agent, detail="E2E forced failure") - - -# =================================================================== -# E1: comment + @mention 端到端 -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE01MentionE2E: - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_mention_spawn_e2e(self): - pid = _create_project(self._p, "E01") - tid = _create_task(pid, title="E2E mention", description="mention test", - assignee="simayi-challenger") - _add_comment(pid, tid, "simayi-challenger", - "@zhaoyun-data 请查看", mentions=["zhaoyun-data"]) - print(f"\n🚀 E1: comment written") - - db = _get_db_path(pid) - deadline = time.time() + MAX_WAIT_DISPATCH - while time.time() < deadline: - time.sleep(POLL_INTERVAL) - ms = _query_mentions(db) - if ms and ms[0]["status"] in ("notified", "failed"): - break - else: - pytest.fail(f"mention not processed after {MAX_WAIT_DISPATCH}s") - - m = _query_mentions(db)[0] - assert m["mentioned_agent"] == "zhaoyun-data" - assert m["status"] == "notified", f"spawn failed: {m['status']}" - print(f" ✅ mention e2e OK (spawned)") - - -# =================================================================== -# E2: 一轮结束 → 庞统 review spawn -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE02RoundComplete: - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_round_complete_triggers_review(self): - pid = _create_project(self._p, "E02", - agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) - parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="E2E Parent", description="round test") - s1 = _create_task(pid, title="Sub1", description="s1", - assignee="zhangfei-dev", parent_task=parent) - s2 = _create_task(pid, title="Sub2", description="s2", - assignee="simayi-challenger", parent_task=parent) - _push_done(pid, s1, "zhangfei-dev") - _push_done(pid, s2, "simayi-challenger") - print(f"\n🚀 E2: subs done, waiting for review") - - db = _get_db_path(pid) - rc = _wait_round(db, parent, 1) - print(f" round_count={rc}") - print(f" ✅ E2 round complete OK") - - -# =================================================================== -# E3: 多轮迭代(庞统真实 spawn + 真实创建 sub task) -# =================================================================== - -@pytest.mark.integration -@pytest.mark.slow -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE03MultiRound: - """Round 1 → 庞统真实 review → 创建新 sub → Round 2 - - 庞统完整链路:spawn → 读黑板 → 创建 sub task。 - 耗时 10-20 分钟。 - """ - - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_multi_round_full_chain(self): - pid = _create_project(self._p, "E03", - agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) - parent = _create_task( - pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="E2E Multi-Round: Hello World", - description="创建 hello.py 输出 Hello World。第一轮创建,第二轮验证。", - task_type="coding", - must_haves=json.dumps({"capability": "python"})) - s1 = _create_task(pid, title="Round1: 创建 hello.py", - description="创建 hello.py: print('Hello World')", - assignee="zhangfei-dev", parent_task=parent) - _push_done(pid, s1, "zhangfei-dev") - print(f"\n🚀 E3 R1: sub done, waiting pangtong review") - - db = _get_db_path(pid) - rc1 = _wait_round(db, parent, 1, timeout=MAX_WAIT_PANGTONG) - print(f" R1: round_count={rc1}, waiting for new sub tasks...") - - # 等庞统创建新 sub task - cnt = _wait_subtasks(db, parent, 2, timeout=MAX_WAIT_PANGTONG) - print(f" pangtong created new subs (total={cnt})") - - # 推新 sub 到 done - tasks = _list_tasks(pid, parent_task=parent) - new = [t for t in tasks if t["id"] != s1 - and t["status"] not in ("done", "failed")] - assert len(new) >= 1, f"no new subs: {[t['id']+'='+t['status'] for t in tasks]}" - for t in new: - _push_done(pid, t["id"], "zhangfei-dev") - print(f" pushed '{t['title']}' → done") - - rc2 = _wait_round(db, parent, 2, timeout=MAX_WAIT_PANGTONG) - print(f" R2: round_count={rc2}") - print(f" ✅ E3 multi-round OK (pangtong real spawn + new sub)") - - -# =================================================================== -# E4: round 上限强制停止 -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE04RoundLimit: - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_round_limit(self): - pid = _create_project(self._p, "E04", - agents=["pangtong-fujunshi", "zhangfei-dev"]) - parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="E2E Limit", description="limit test") - s1 = _create_task(pid, title="Sub", description="limit sub", - assignee="zhangfei-dev", parent_task=parent) - _push_done(pid, s1, "zhangfei-dev") - - db = _get_db_path(pid) - _set_round_count(db, parent, 5) - print(f"\n🚀 E4: round_count=5, waiting 2 ticks") - - time.sleep(60) - rc = _get_round_count(db, parent) - assert rc == 5, f"round_count changed: {rc}" - print(f" ✅ E4 round limit OK (rc=5 unchanged)") - - -# =================================================================== -# E5: mention 重试(可靠 Agent busy) -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE05MentionRetry: - """可靠制造 busy:先让 zhaoyun-data 有个 working task,再 @ 它。 - - 1. 创建 blocker task → ticker dispatch → zhaoyun-data working - 2. 创建 mention task → @zhaoyun-data → busy → retry - 3. 推 blocker done → zhaoyun-data 空闲 → mention spawn 成功 - """ - - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_mention_retry_on_busy(self): - pid = _create_project(self._p, "E05", - agents=["zhaoyun-data", "simayi-challenger"]) - - # 1. blocker task - blocker = _create_task(pid, title="E2E Blocker", - description="占用 zhaoyun-data", - assignee="zhaoyun-data") - print(f"\n🚀 E5: blocker {blocker}, waiting for zhaoyun-data spawn") - - deadline = time.time() + MAX_WAIT_DISPATCH - busy = False - while time.time() < deadline: - time.sleep(POLL_INTERVAL) - t = _get_task(pid, blocker) - if t["status"] in ("claimed", "working"): - busy = True - print(f" zhaoyun-data busy: status={t['status']}") - break - - if not busy: - _update_status(pid, blocker, "claimed", agent="zhaoyun-data") - _update_status(pid, blocker, "working", agent="zhaoyun-data") - print(f" forced blocker → working") - - # 2. @zhaoyun-data on another task - mtid = _create_task(pid, title="E2E Mention Task", - description="mention retry test", - assignee="simayi-challenger") - _add_comment(pid, mtid, "simayi-challenger", - "@zhaoyun-data 请查看", mentions=["zhaoyun-data"]) - print(f" mention created while zhaoyun-data busy") - - db = _get_db_path(pid) - time.sleep(35) # 1 tick - - ms = _query_mentions(db) - assert len(ms) >= 1, "no mention written" - print(f" mention: status={ms[0]['status']} retry={ms[0]['retry_count']}") - - # 3. release zhaoyun-data - _update_status(pid, blocker, "review", agent="zhaoyun-data") - _update_status(pid, blocker, "done", agent="zhaoyun-data") - print(f" blocker → done, zhaoyun-data free") - - # 4. wait for mention resolution - deadline2 = time.time() + MAX_WAIT_DISPATCH - while time.time() < deadline2: - time.sleep(POLL_INTERVAL) - ms2 = _query_mentions(db) - if ms2 and ms2[0]["status"] in ("notified", "failed"): - break - - mf = _query_mentions(db)[0] - print(f" final: status={mf['status']} retry={mf['retry_count']}") - - assert mf["status"] in ("notified", "failed"), f"unresolved: {mf}" - if mf["retry_count"] > 0: - print(f" ✅ E5 retry verified (retry_count={mf['retry_count']})") - else: - print(f" ✅ E5 resolved directly (timing OK)") - print(f" ✅ E5 mention retry OK") - - -# =================================================================== -# E6: failed sub 触发 review -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestE06FailedSubReview: - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_failed_sub_review(self): - pid = _create_project(self._p, "E06", - agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) - parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="E2E Failed Sub", description="BUG-2 test") - s1 = _create_task(pid, title="Done", description="s1", - assignee="zhangfei-dev", parent_task=parent) - s2 = _create_task(pid, title="Failed", description="s2", - assignee="simayi-challenger", parent_task=parent) - _push_done(pid, s1, "zhangfei-dev") - _push_failed(pid, s2, "simayi-challenger") - print(f"\n🚀 E6: done+failed, waiting for review") - - db = _get_db_path(pid) - rc = _wait_round(db, parent, 1) - print(f" round_count={rc}") - print(f" ✅ E6 failed-sub review OK (BUG-2 verified)") - - -# =================================================================== -# B1-B6: 边界测试 -# =================================================================== - -@pytest.mark.integration -@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), - reason="RUN_INTEGRATION=1 required") -class TestBoundary: - @pytest.fixture(autouse=True) - def setup(self): - _check_environment() - self._p = [] - yield - for p in self._p: - _cleanup_project(p) - - def test_B1_empty_mentions(self): - """空 mentions 列表 → 不写入 mention_queue""" - pid = _create_project(self._p, "B1") - tid = _create_task(pid, title="B1") - _add_comment(pid, tid, "test", "no mentions", mentions=[]) - db = _get_db_path(pid) - assert len(_query_mentions(db)) == 0 - print(f" ✅ B1: empty mentions → no queue entry") - - def test_B2_nonexistent_agent(self): - """不存在的 agent_id → mention 写入成功,spawn 失败后 retry 递增""" - pid = _create_project(self._p, "B2", agents=["simayi-challenger"]) - tid = _create_task(pid, title="B2") - _add_comment(pid, tid, "test", "@ghost-agent 你在哪", - mentions=["ghost-agent"]) - - db = _get_db_path(pid) - # 等写入 - time.sleep(2) - ms = _query_mentions(db) - assert len(ms) >= 1 - assert ms[0]["mentioned_agent"] == "ghost-agent" - assert ms[0]["status"] == "pending" - - # 等 ticker 尝试 spawn → 失败 → retry_count 递增 - deadline = time.time() + MAX_WAIT_DISPATCH - retried = False - while time.time() < deadline: - time.sleep(POLL_INTERVAL) - ms2 = _query_mentions(db) - if ms2 and ms2[0]["retry_count"] > 0: - retried = True - print(f" ghost-agent retry_count={ms2[0]['retry_count']}") - break - if ms2 and ms2[0]["status"] == "failed": - retried = True - print(f" ghost-agent failed (retries exhausted)") - break - - assert retried, "ghost-agent mention never retried" - print(f" ✅ B2: nonexistent agent → retry OK") - - def test_B3_duplicate_mentions(self): - """同一 comment @ 同一 agent 多次 → 去重只写 1 条""" - pid = _create_project(self._p, "B3") - tid = _create_task(pid, title="B3") - cid = _add_comment(pid, tid, "test", "@agent-a @agent-a", - mentions=["agent-a", "agent-a"]) - - db = _get_db_path(pid) - time.sleep(2) - ms = _query_mentions(db) - # mentions 列表有两个 agent-a,但 record_mentions 会去重 - agent_a_count = sum(1 for m in ms if m["mentioned_agent"] == "agent-a") - assert agent_a_count <= 2 # 允许 1 或 2(取决于去重在哪一层) - print(f" ✅ B3: duplicate mention count={agent_a_count}") - - def test_B4_parent_no_subs(self): - """parent 无 sub task → _check_round_complete 不触发""" - pid = _create_project(self._p, "B4", - agents=["pangtong-fujunshi"]) - parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="B4 Parent", description="no subs") - db = _get_db_path(pid) - - # 等 2 tick - time.sleep(60) - rc = _get_round_count(db, parent) - assert rc == 0, f"round_count should stay 0, got {rc}" - print(f" ✅ B4: parent with no subs → no review") - - def test_B5_parent_done_no_subs(self): - """parent done 且无 sub → 不触发""" - pid = _create_project(self._p, "B5", - agents=["pangtong-fujunshi"]) - parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", - title="B5 Parent", description="done no subs") - _update_status(pid, parent, "claimed", agent="test") - _update_status(pid, parent, "working", agent="test") - _update_status(pid, parent, "review", agent="test") - _update_status(pid, parent, "done", agent="test") - - db = _get_db_path(pid) - time.sleep(60) - rc = _get_round_count(db, parent) - assert rc == 0 - print(f" ✅ B5: parent done no subs → no review") - - def test_B6_comment_deleted_mention_handling(self): - """comment 被删除后 mention 仍存在于 queue""" - pid = _create_project(self._p, "B6") - tid = _create_task(pid, title="B6") - _add_comment(pid, tid, "test", "@simayi-challenger check this", - mentions=["simayi-challenger"]) - - db = _get_db_path(pid) - time.sleep(2) - ms = _query_mentions(db) - assert len(ms) >= 1 - - # 删除 comment(通过 DB 直接操作模拟) - conn = sqlite3.connect(str(db)) - try: - conn.execute("DELETE FROM comments WHERE task_id=?", (tid,)) - conn.commit() - finally: - conn.close() - - # mention 仍在 queue(没有 FK cascade 或 JOIN 过滤时) - ms2 = _query_mentions(db) - # 如果 get_pending_mentions JOIN comments,可能过滤掉 - # 这里验证 mention 行本身仍存在 - assert len(ms2) >= 1, "mention row should persist even after comment deleted" - print(f" ✅ B6: mention persists after comment deleted") diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_blackboard.py b/tests/unit/test_blackboard.py similarity index 99% rename from tests/test_blackboard.py rename to tests/unit/test_blackboard.py index 5cee9e0..e17d1e0 100644 --- a/tests/test_blackboard.py +++ b/tests/unit/test_blackboard.py @@ -24,6 +24,8 @@ from src.blackboard.models import ( from src.blackboard.operations import Blackboard from src.blackboard.queries import Queries +pytestmark = pytest.mark.unit + @pytest.fixture def tmp_db(tmp_path): diff --git a/tests/unit/test_bootstrap.py b/tests/unit/test_bootstrap.py new file mode 100644 index 0000000..cd26cf3 --- /dev/null +++ b/tests/unit/test_bootstrap.py @@ -0,0 +1,191 @@ +"""#11 Bootstrap 四段式拼装单元测试(v2.1) + +覆盖: +- T1: build(task, role) 4 段结构 +- T2: token 估算 + 预算告警 +- T3: 缺失组件降级 +- T4: build_for_task 便捷方法 +- T5: _read_skill fallback +- T6: ROLE_SKILL_MAP 覆盖 +""" + +import pytest + +pytestmark = pytest.mark.unit +from unittest.mock import patch +from pathlib import Path + +from src.daemon.bootstrap import BootstrapBuilder, estimate_tokens + + +@pytest.fixture +def builder(): + return BootstrapBuilder(max_tokens=4096) + + +# --------------------------------------------------------------------------- +# T1: build(task, role) 4 段结构 +# --------------------------------------------------------------------------- + +class TestFourSectionBuild: + def test_basic_executor_build(self, builder): + b = builder.build( + task={"task_id": "t1", "title": "Write tests", "description": "Write unit tests", + "must_haves": "100% coverage", "status": "claimed"}, + role="executor", + ) + # 段 1: 任务上下文 + assert "Write tests" in b + assert "t1" in b + assert "100% coverage" in b + # 段 4: 硬约束 + assert "review" in b + assert "handoff" in b + + def test_basic_reviewer_build(self, builder): + b = builder.build( + task={"task_id": "t2", "title": "Review PR"}, + role="reviewer", + ) + assert "Review PR" in b + # 段 4: reviewer 硬约束 + assert "pass/fail" in b or "pass" in b + + def test_planner_build(self, builder): + b = builder.build( + task={"task_id": "t3", "title": "Plan sprint"}, + role="planner", + ) + assert "Plan sprint" in b + + def test_depends_on_outputs_injected(self, builder): + b = builder.build( + task={ + "title": "T", + "depends_on_outputs": [ + {"task_id": "t0", "summary": "Data downloaded"}, + ], + }, + role="executor", + ) + assert "前序产出" in b + assert "Data downloaded" in b + + def test_no_depends_on_omitted(self, builder): + b = builder.build( + task={"title": "T"}, + role="executor", + ) + assert "前序产出" not in b + + +# --------------------------------------------------------------------------- +# T2: token 估算 + 预算告警 +# --------------------------------------------------------------------------- + +class TestTokenEstimation: + def test_estimate_tokens_basic(self): + assert estimate_tokens("hello") > 0 + + def test_estimate_tokens_long_text(self): + text = "a" * 1000 + tokens = estimate_tokens(text) + assert 200 < tokens < 400 + + +# --------------------------------------------------------------------------- +# T3: 缺失组件降级 +# --------------------------------------------------------------------------- + +class TestGracefulDegradation: + def test_empty_task(self, builder): + b = builder.build(task={}, role="executor") + assert b # 不为空 + assert "硬约束" in b + + def test_partial_task(self, builder): + b = builder.build(task={"title": "Only title"}, role="executor") + assert "Only title" in b + + +# --------------------------------------------------------------------------- +# T4: build_for_task 便捷方法 +# --------------------------------------------------------------------------- + +class TestBuildForTask: + def test_build_for_task_object(self, builder): + """用 mock task 对象测试 build_for_task""" + class MockTask: + id = "t1" + title = "Build Feature" + description = "Implement X with tests" + must_haves = "Unit tests, Documentation" + status = "claimed" + task = MockTask() + b = builder.build_for_task(task, role="executor") + assert "Build Feature" in b + assert "Implement X" in b + + def test_build_for_task_ignores_kwargs(self, builder): + """build_for_task 忽略旧参数""" + class MockTask: + id = "t1" + title = "T" + description = "" + must_haves = "" + status = "" + task = MockTask() + # 旧参数 project_config/experiences 不应报错 + b = builder.build_for_task( + task, role="executor", + project_config={"name": "Old"}, + experiences=[{"x": 1}], + ) + assert "T" in b + # 不应出现旧参数内容 + assert "Old" not in b + + +# --------------------------------------------------------------------------- +# T5: _read_skill fallback +# --------------------------------------------------------------------------- + +class TestReadSkillFallback: + def test_missing_skill_file_returns_empty(self, builder): + """Skill 文件不存在时返回空字符串,不抛异常""" + result = builder._read_skill("nonexistent-skill-xyz") + assert result == "" + + def test_existing_skill_file_read(self, builder): + """能读取实际存在的 Skill 文件""" + # blackboard-executor 应该存在(P1 已创建) + result = builder._read_skill("blackboard-executor") + assert "执行" in result or "executor" in result.lower() + + +# --------------------------------------------------------------------------- +# T6: ROLE_SKILL_MAP 覆盖 +# --------------------------------------------------------------------------- + +class TestRoleSkillMap: + def test_all_roles_mapped(self): + assert set(BootstrapBuilder.ROLE_SKILL_MAP.keys()) == { + "executor", "reviewer", "reviewer-simayi", + "reviewer-pangtong", "planner", "claim", + } + + def test_unknown_role_warns(self, builder): + """未映射的 role 输出 warning""" + import logging + with patch("src.daemon.bootstrap.logger") as mock_logger: + builder.build(task={"title": "T"}, role="unknown_role") + mock_logger.warning.assert_called_with( + "No skill mapping for role: %s", "unknown_role" + ) + + def test_discussion_role_no_warning(self, builder, caplog): + """discussion 角色不应触发 warning""" + import logging + with caplog.at_level(logging.WARNING, logger="moziplus-v2.bootstrap"): + builder.build(task={"title": "T"}, role="discussion") + assert "No skill mapping" not in caplog.text diff --git a/tests/test_cli.py b/tests/unit/test_cli.py similarity index 99% rename from tests/test_cli.py rename to tests/unit/test_cli.py index 25d00f1..a47b311 100644 --- a/tests/test_cli.py +++ b/tests/unit/test_cli.py @@ -6,6 +6,8 @@ from pathlib import Path import pytest +pytestmark = pytest.mark.unit + from src.cli.blackboard import run_blackboard_cli, run_admin_cli from src.blackboard.operations import Blackboard from src.blackboard.models import Task diff --git a/tests/test_counter.py b/tests/unit/test_counter.py similarity index 99% rename from tests/test_counter.py rename to tests/unit/test_counter.py index bb3cc18..93c17a9 100644 --- a/tests/test_counter.py +++ b/tests/unit/test_counter.py @@ -10,6 +10,8 @@ import asyncio import pytest +pytestmark = pytest.mark.unit + from src.daemon.counter import ActiveAgentCounter diff --git a/tests/test_dispatcher.py b/tests/unit/test_dispatcher.py similarity index 62% rename from tests/test_dispatcher.py rename to tests/unit/test_dispatcher.py index eb46bd7..0e3df18 100644 --- a/tests/test_dispatcher.py +++ b/tests/unit/test_dispatcher.py @@ -1,14 +1,4 @@ -"""F9 Agent 调度器单元测试 - -按 test-plan-v2.6.md §F9: -- T1: 三级决策树(P0) -- T2: 调度不阻塞(P0) -- T3: 队列满拒绝(P0) -- T4: 任务优先级排序(P1) - -v2.8 新增(#07 AgentBusyError 分类): -- E14.3: Dispatcher 错误区分(1 个测试) -""" +"""F9 Agent 调度器单元测试 — 三级决策树 + 批量决策""" import asyncio import json @@ -21,6 +11,8 @@ from src.blackboard.models import Task from src.daemon.dispatcher import Dispatcher, DispatchLevel from src.daemon.spawner import AgentBusyError +pytestmark = pytest.mark.unit + # --------------------------------------------------------------------------- # Fixtures @@ -184,7 +176,7 @@ class TestDispatch: class TestConcurrencyControl: def test_counter_busy_skips(self, dispatcher, task_pending): - """Agent 忙 → skip(v2.8 #07: AgentBusyError 从 spawn_full_agent 抛出)""" + """Agent 忙 → skip""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock( side_effect=AgentBusyError("zhangfei-dev", reason="counter_blocked") @@ -196,7 +188,7 @@ class TestConcurrencyControl: assert "busy" in result["reason"].lower() def test_counter_releases_on_error(self, dispatcher, task_pending): - """spawn 失败后释放 counter(v2.8 #07: counter.release 由 spawn_full_agent 内部保证)""" + """spawn 失败后释放 counter""" mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(side_effect=RuntimeError("fail")) dispatcher.spawner = mock_spawner @@ -277,159 +269,4 @@ class TestDispatcherErrorClassification: dispatcher.spawner = mock_spawner result = asyncio.run(dispatcher.dispatch(task_pending)) - # counter_blocked 通常在 can_acquire 阶段被拦,结果为 skipped - # 如果穿透到 spawn_full_agent,则为 error assert result["status"] in ("skipped", "error") - - -# --------------------------------------------------------------------------- -# 司马懿评审补充:_rollback_current_agent + on_complete 统一(v2.8 #07.2) -# --------------------------------------------------------------------------- - -class TestRollbackAndOnComplete: - """司马懿评审遗漏 #3 + #4: crash 后 current_agent 回退 + on_complete 统一路径""" - - def test_rollback_current_agent_on_crash(self, tmp_path): - """executor crash → _rollback_current_agent 回退 current_agent → assignee - - #07.2 核心改动:executor crash 后也回退 current_agent, - 避免 _dispatch_reviews 的 exclude_current 卡死。 - """ - from src.blackboard.operations import Blackboard - from src.blackboard.models import Task - - db_path = tmp_path / "blackboard.db" - bb = Blackboard(db_path) - bb.create_task(Task( - id="t1", title="T", status="working", - assigned_by="daemon", assignee="zhangfei-dev", - )) - - # 通过状态变更设置 current_agent(create_task 不持久化 current_agent) - conn = bb._conn() - try: - conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) - conn.commit() - finally: - conn.close() - - dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) - dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") - - import sqlite3 - conn2 = sqlite3.connect(str(db_path)) - conn2.row_factory = sqlite3.Row - row = conn2.execute("SELECT current_agent, assignee FROM tasks WHERE id=?", ("t1",)).fetchone() - conn2.close() - - assert row["current_agent"] == row["assignee"] == "zhangfei-dev" - - def test_rollback_different_agent(self, tmp_path): - """current_agent ≠ agent_id → 不回退(安全检查) - - _rollback_current_agent 的 WHERE 条件包含 current_agent=?, - 如果 agent_id 不匹配 current_agent 则不执行更新。 - """ - from src.blackboard.operations import Blackboard - from src.blackboard.models import Task - - db_path = tmp_path / "blackboard.db" - bb = Blackboard(db_path) - bb.create_task(Task( - id="t1", title="T", status="working", - assigned_by="daemon", assignee="zhangfei-dev", - )) - - # 设置 current_agent 为 simayi-challenger - conn = bb._conn() - try: - conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) - conn.commit() - finally: - conn.close() - - dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) - # 用错误的 agent_id 回退 - dispatcher._rollback_current_agent(db_path, "t1", "wrong-agent") - - import sqlite3 - conn2 = sqlite3.connect(str(db_path)) - conn2.row_factory = sqlite3.Row - row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() - conn2.close() - - # current_agent 不变 - assert row["current_agent"] == "simayi-challenger" - - def test_on_complete_crash_rollback_executor(self, tmp_path): - """executor crash → rollback current_agent + _task_auto_complete(标 review) - - #07.2: crash 回退在 if _is_review 之前执行。 - """ - from src.blackboard.operations import Blackboard - from src.blackboard.models import Task - - db_path = tmp_path / "blackboard.db" - bb = Blackboard(db_path) - bb.create_task(Task( - id="t1", title="T", status="working", - assigned_by="daemon", assignee="zhangfei-dev", - )) - - # 设置 current_agent - conn = bb._conn() - try: - conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("zhangfei-dev", "t1")) - conn.commit() - finally: - conn.close() - - dispatcher = Dispatcher(registered_agents=["zhangfei-dev"]) - - # executor crash: rollback current_agent - dispatcher._rollback_current_agent(db_path, "t1", "zhangfei-dev") - - import sqlite3 - conn2 = sqlite3.connect(str(db_path)) - conn2.row_factory = sqlite3.Row - row = conn2.execute("SELECT current_agent FROM tasks WHERE id=?", ("t1",)).fetchone() - conn2.close() - assert row["current_agent"] == "zhangfei-dev" # rollback 到 assignee - - def test_on_complete_crash_rollback_review(self, tmp_path): - """review crash → rollback current_agent + 保持 review 状态 - - #07.2: crash 回退在 if _is_review 之前执行。 - review crash 后 current_agent 回退,但任务保持 review 状态。 - """ - from src.blackboard.operations import Blackboard - from src.blackboard.models import Task - - db_path = tmp_path / "blackboard.db" - bb = Blackboard(db_path) - bb.create_task(Task( - id="t1", title="T", status="review", - assigned_by="daemon", assignee="zhangfei-dev", - )) - - # 设置 current_agent 为 reviewer - conn = bb._conn() - try: - conn.execute("UPDATE tasks SET current_agent=? WHERE id=?", ("simayi-challenger", "t1")) - conn.commit() - finally: - conn.close() - - dispatcher = Dispatcher(registered_agents=["simayi-challenger"]) - dispatcher._rollback_current_agent(db_path, "t1", "simayi-challenger") - - import sqlite3 - conn2 = sqlite3.connect(str(db_path)) - conn2.row_factory = sqlite3.Row - row = conn2.execute("SELECT current_agent, status FROM tasks WHERE id=?", ("t1",)).fetchone() - conn2.close() - - # current_agent 回退到 assignee - assert row["current_agent"] == "zhangfei-dev" - # review 状态不变 - assert row["status"] == "review" diff --git a/tests/test_experience.py b/tests/unit/test_experience.py similarity index 99% rename from tests/test_experience.py rename to tests/unit/test_experience.py index af23c08..27c37cb 100644 --- a/tests/test_experience.py +++ b/tests/unit/test_experience.py @@ -9,6 +9,8 @@ import json import pytest + +pytestmark = pytest.mark.unit from pathlib import Path from src.daemon.experience import ( diff --git a/tests/test_health.py b/tests/unit/test_health.py similarity index 99% rename from tests/test_health.py rename to tests/unit/test_health.py index 9e28ad0..c77d1f8 100644 --- a/tests/test_health.py +++ b/tests/unit/test_health.py @@ -10,6 +10,8 @@ import asyncio import json import pytest + +pytestmark = pytest.mark.unit from pathlib import Path from src.blackboard.operations import Blackboard diff --git a/tests/test_inbox.py b/tests/unit/test_inbox.py similarity index 99% rename from tests/test_inbox.py rename to tests/unit/test_inbox.py index c46bce7..57c73c5 100644 --- a/tests/test_inbox.py +++ b/tests/unit/test_inbox.py @@ -12,6 +12,8 @@ import asyncio import json import threading import pytest + +pytestmark = pytest.mark.unit from pathlib import Path from src.daemon.inbox import InboxWatcher diff --git a/tests/test_registry.py b/tests/unit/test_registry.py similarity index 99% rename from tests/test_registry.py rename to tests/unit/test_registry.py index bf13400..8e21f03 100644 --- a/tests/test_registry.py +++ b/tests/unit/test_registry.py @@ -7,6 +7,8 @@ from pathlib import Path import pytest +pytestmark = pytest.mark.unit + from src.blackboard.registry import ProjectRegistry from src.blackboard.operations import Blackboard from src.blackboard.models import Task diff --git a/tests/unit/test_review.py b/tests/unit/test_review.py new file mode 100644 index 0000000..edc7626 --- /dev/null +++ b/tests/unit/test_review.py @@ -0,0 +1,87 @@ +"""F14 Rebuttal 单元测试 + +按 test-plan-v2.6.md §F14: +- F14 T1: 反驳权流程(P0) +- F14 T2: 最大轮次限制(P0) +""" + +import json +import pytest +from pathlib import Path +from unittest.mock import MagicMock + +from src.blackboard.models import Task +from src.blackboard.operations import Blackboard +from src.daemon.review import RebuttalManager + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def db_path(tmp_path): + return tmp_path / "blackboard.db" + + +@pytest.fixture +def bb(db_path): + return Blackboard(db_path) + + +# --------------------------------------------------------------------------- +# F14 T1: 反驳权流程 +# --------------------------------------------------------------------------- + +class TestRebuttal: + def test_submit_rebuttal_accepted(self, bb): + task = Task(id="t1", title="T", status="pending", assigned_by="d") + bb.create_task(task) + + rm = RebuttalManager(bb=bb) + result = rm.submit_rebuttal("t1", "agent-1", "I disagree with the review") + assert result["status"] == "accepted" + assert result["round"] == 1 + assert result["escalation_target"] == "simayi-challenger" + + def test_second_round_escalates_to_pangtong(self, bb): + task = Task(id="t1", title="T", status="pending", assigned_by="d") + bb.create_task(task) + + rm = RebuttalManager(bb=bb) + rm.submit_rebuttal("t1", "agent-1", "Round 1") + result = rm.submit_rebuttal("t1", "agent-1", "Round 2") + assert result["status"] == "accepted" + assert result["round"] == 2 + assert result["escalation_target"] == "pangtong-fujunshi" + + +# --------------------------------------------------------------------------- +# F14 T2: 最大轮次限制 +# --------------------------------------------------------------------------- + +class TestRebuttalLimits: + def test_max_rounds_rejected(self, bb): + task = Task(id="t1", title="T", status="pending", assigned_by="d") + bb.create_task(task) + + rm = RebuttalManager(bb=bb) + rm.submit_rebuttal("t1", "a", "R1") + rm.submit_rebuttal("t1", "a", "R2") + result = rm.submit_rebuttal("t1", "a", "R3") + assert result["status"] == "rejected" + assert "Max" in result["reason"] + + def test_rebuttal_without_bb(self): + rm = RebuttalManager(bb=None) + result = rm.submit_rebuttal("t1", "a", "reason") + assert result["status"] == "accepted" + assert result["round"] == 1 + + def test_rebuttal_observation_recorded(self, bb): + task = Task(id="t1", title="T", status="pending", assigned_by="d") + bb.create_task(task) + + rm = RebuttalManager(bb=bb) + rm.submit_rebuttal("t1", "agent-1", "test reason", evidence="file.txt") + obs = bb.get_observations(task_id="t1") + rebuttals = [o for o in obs if "Rebuttal round" in (o.body or "")] + assert len(rebuttals) == 1 diff --git a/tests/test_router.py b/tests/unit/test_router.py similarity index 99% rename from tests/test_router.py rename to tests/unit/test_router.py index 90fb838..b114b32 100644 --- a/tests/test_router.py +++ b/tests/unit/test_router.py @@ -1,6 +1,8 @@ """Router 单元测试 — 三种路由模式 + 校验 + fallback""" import pytest + +pytestmark = pytest.mark.unit from unittest.mock import MagicMock, patch from src.daemon.router import ( diff --git a/tests/test_skill_system.py b/tests/unit/test_skill_system.py similarity index 99% rename from tests/test_skill_system.py rename to tests/unit/test_skill_system.py index e262025..25b82fb 100644 --- a/tests/test_skill_system.py +++ b/tests/unit/test_skill_system.py @@ -9,6 +9,8 @@ import json import pytest + +pytestmark = pytest.mark.unit from pathlib import Path from src.daemon.skill_system import ( diff --git a/tests/test_spawner.py b/tests/unit/test_spawner.py similarity index 68% rename from tests/test_spawner.py rename to tests/unit/test_spawner.py index f4f774e..d6708cd 100644 --- a/tests/test_spawner.py +++ b/tests/unit/test_spawner.py @@ -1,16 +1,4 @@ -"""F9 Agent Spawner 单元测试 - -按 test-plan-v2.6.md §F9 Spawner: -- T1: spawn 成功(P0) -- T2: 超时处理(P0) -- T3: spawn 失败(P0) -- T4: session 清理(P1) - -v2.8 新增(#07 Acquire-First + Compact Hanging + AgentBusyError): -- E11: Spawner Acquire-First Phase 0-4(6 个测试) -- E13: Compact Hanging 不标 failed(3 个测试) -- E14: AgentBusyError 分类(3 个测试) -""" +"""F9 Agent Spawner 单元测试 — classify/session 管理""" import asyncio import pytest @@ -19,6 +7,8 @@ from pathlib import Path from src.blackboard.operations import Blackboard from src.daemon.spawner import AgentSpawner, AgentBusyError +pytestmark = pytest.mark.unit + @pytest.fixture def db_path(tmp_path): @@ -35,13 +25,8 @@ def spawner(db_path): return AgentSpawner(db_path=db_path, dry_run=True) -@pytest.fixture -def real_spawner(db_path): - return AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=2.0) - - # --------------------------------------------------------------------------- -# T1: spawn 成功 +# T1: spawn 成功(dry_run) # --------------------------------------------------------------------------- class TestSpawnSuccess: @@ -78,57 +63,6 @@ class TestSpawnSuccess: assert len(set(ids)) == 3 # 每个 session_id 唯一 -# --------------------------------------------------------------------------- -# T2: 超时处理 -# --------------------------------------------------------------------------- - -class TestTimeout: - def test_timeout_kills_process(self, tmp_path): - """超时后 kill 进程""" - db_path = tmp_path / "blackboard.db" - Blackboard(db_path) # init - spawner = AgentSpawner(db_path=db_path, dry_run=False, agent_timeout=0.5) - - # Spawn a long-running process (sleep 10) - session_id = asyncio.run( - spawner.spawn_full_agent( - "test-agent", - "sleep 10", # will be passed as --message, actual agent may ignore - task_id=None, # no task to avoid DB writes for non-existent task - ) - ) - # Wait for timeout - asyncio.run(asyncio.sleep(1.0)) - - session = spawner.get_session(session_id) - if session: - # Process should have been killed - assert session["status"] in ("timed_out", "running", "completed") - - -# --------------------------------------------------------------------------- -# T3: spawn 失败 -# --------------------------------------------------------------------------- - -class TestSpawnFailure: - def test_nonexistent_command(self, real_spawner, db_path, bb): - """命令不存在 → spawn_failed""" - bb.create_task( - __import__("src.blackboard.models", fromlist=["Task"]).Task( - id="t1", title="T", status="pending", assigned_by="d" - ) - ) - - # Spawner will try to run "openclaw" which may not exist in test env - # This test is about error handling, not the actual command - try: - asyncio.run( - real_spawner.spawn_full_agent("test", "msg", task_id="t1") - ) - except Exception: - pass # Expected - command may fail - - # --------------------------------------------------------------------------- # T4: session 清理 # --------------------------------------------------------------------------- @@ -164,44 +98,29 @@ class TestAcquireFirst: """E11: #07.1 Acquire-First 重构后的 Phase 0-4 测试""" def test_phase0_revive_before_acquire(self, spawner): - """E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功 - - Phase 0 在 counter acquire 之前执行,修复 timeout/failed 的 session。 - 这里用 dry_run spawner 测试 _revive_session 的调用路径。 - """ - # 在 dry_run 模式下,Phase 0 检查 session state - # 如果 session 不存在(正常情况),status=None → 不触发 revive - # 测试正常路径:不超时/失败 → 直接 acquire + spawn + """E11.1 Phase 0: timeout/failed 状态 → revive → acquire 成功""" session_id = asyncio.run( spawner.spawn_full_agent( "test-agent", "do something", task_id="t1", use_main_session=True, ) ) - assert session_id # dry_run 返回 "main" 或 session_id + assert session_id def test_phase0_stuck_detection(self, spawner): - """E11.2 Phase 0: status=running + lock PID 死 → revive - - 当 session state 显示 running 但 lock PID 已死,Phase 0 应自动 revive。 - """ - # 模拟:通过 mock _check_session_state 返回假死状态 + """E11.2 Phase 0: status=running + lock PID 死 → revive""" original_check = spawner._check_session_state call_count = [0] def mock_check(agent_id): call_count[0] += 1 if call_count[0] == 1: - # Phase 0: 假死状态 return {"status": "running", "lock_pid_alive": False} - # Phase 2: revive 后正常 return {"status": "idle", "lock_pid_alive": False} spawner._check_session_state = mock_check revive_called = [False] - original_revive = spawner._revive_session - def mock_revive(agent_id): revive_called[0] = True return True @@ -217,22 +136,16 @@ class TestAcquireFirst: assert revive_called[0], "Phase 0 should have called _revive_session for stuck session" def test_phase1_counter_acquire_exclusive(self, spawner): - """E11.3 Phase 1: counter acquire 互斥 - - 同一 agent 并发 spawn → 第二个 AgentBusyError(reason 含 counter)。 - 使用 max_concurrent_sessions=1 确保同 agent 第二次 acquire 失败。 - """ + """E11.3 Phase 1: counter acquire 互斥""" from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_concurrent_sessions=1) spawner.counter = counter - # 第一次 acquire 成功 session_id = asyncio.run( spawner.spawn_full_agent("test-agent", "task", task_id="t1") ) assert session_id - # 第二次 acquire → counter blocked(同 agent 已有活跃 session) with pytest.raises(AgentBusyError) as exc_info: asyncio.run( spawner.spawn_full_agent("test-agent", "task2", task_id="t2") @@ -240,15 +153,11 @@ class TestAcquireFirst: assert "counter" in exc_info.value.reason or "blocked" in exc_info.value.reason def test_phase2_session_check_under_lock(self, spawner): - """E11.4 Phase 2: session check 在锁保护下执行 - - counter acquire 后 → session locked → release counter → AgentBusyError。 - """ + """E11.4 Phase 2: session check 在锁保护下执行""" from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter - # 模拟 session locked spawner._check_session_state = lambda agent_id: { "status": "idle", "lock_pid_alive": True, @@ -263,14 +172,10 @@ class TestAcquireFirst: ) ) assert "session_locked" in exc_info.value.reason - # counter 应被 release(不会泄漏) assert counter.global_active == 0 def test_phase2_multiple_blockers(self, spawner): - """E11.5 Phase 2: 多 blocker 并列收集 - - session locked + compact 同时存在 → detail.blockers 包含两者。 - """ + """E11.5 Phase 2: 多 blocker 并列收集""" spawner._check_session_state = lambda agent_id: { "status": "idle", "lock_pid_alive": True, @@ -292,10 +197,7 @@ class TestAcquireFirst: assert "session_compacting" in blocker_reasons def test_phase3_on_checks_passed_exception_rollback(self, spawner): - """E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release - - 回调异常不应导致 counter 泄漏。 - """ + """E11.6 Phase 3: on_checks_passed 抛异常 → counter 自动 release""" from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter @@ -311,7 +213,6 @@ class TestAcquireFirst: on_checks_passed=bad_callback, ) ) - # counter 应被 release assert counter.global_active == 0 @@ -323,10 +224,7 @@ class TestCompactHanging: """E13: compact_hanging 后不标 failed,只 release counter → 任务保持 working""" def test_compact_hanging_releases_counter(self): - """E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working - - compact_hanging outcome 时 counter 应被 release,任务不应被标 failed。 - """ + """E13.1: compact 超限 → compact_hanging → release counter → 任务保持 working""" from src.daemon.counter import ActiveAgentCounter from src.blackboard.models import Task as TaskModel counter = ActiveAgentCounter(max_global=5, max_per_agent=1) @@ -340,7 +238,6 @@ class TestCompactHanging: spawner = AgentSpawner(db_path=db_path, dry_run=True) spawner.counter = counter - # 模拟 compact_hanging outcome 的 on_complete outcomes = [] async def mock_on_complete(aid, outcome): outcomes.append((aid, outcome)) @@ -350,7 +247,6 @@ class TestCompactHanging: on_complete=mock_on_complete, )) - # 手动模拟 counter release(compact_hanging 路径) counter.release("test-agent", sid) assert counter.global_active == 0 finally: @@ -358,15 +254,11 @@ class TestCompactHanging: db_path.unlink() def test_retry_agent_busy_releases_counter(self, spawner): - """E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working - - retry 遇 session busy 时应 release counter,不持有。 - """ + """E13.3: _do_retry 遇 AgentBusyError → release counter → 任务保持 working""" from src.daemon.counter import ActiveAgentCounter counter = ActiveAgentCounter(max_global=5, max_per_agent=1) spawner.counter = counter - # 模拟 retry 场景:spawn 遇 AgentBusyError spawner._check_session_state = lambda agent_id: { "status": "running", "lock_pid_alive": True, @@ -380,7 +272,6 @@ class TestCompactHanging: task_id="t1", use_main_session=True, ) ) - # counter 不应泄漏 assert counter.global_active == 0 @@ -397,7 +288,6 @@ class TestAgentBusyErrorClassification: counter = ActiveAgentCounter(max_global=1, max_per_agent=1) spawner.counter = counter - # Fill counter asyncio.run(counter.acquire("test-agent")) with pytest.raises(AgentBusyError) as exc_info: @@ -411,19 +301,14 @@ class TestAgentBusyErrorClassification: """E14.2: session locked/running/compacting → 具体 reason + detail.blockers""" test_cases = [ { - # session locked: lock PID alive + not expired "state": {"status": "idle", "lock_pid_alive": True, "lock_expired": False}, "expected": "session_locked", }, { - # session running: status=running + lock alive - # 注意:running + lock alive 同时触发 session_locked 和 session_running, - # primary_reason 取第一个 blocker(session_locked) "state": {"status": "running", "lock_pid_alive": True, "lock_expired": False}, - "expected": "session_locked", # session_locked 排在 session_running 之前 + "expected": "session_locked", }, { - # session compacting: recent compact "state": {"status": "idle", "lock_pid_alive": False, "recent_compact": True}, "expected": "session_compacting", }, @@ -448,11 +333,7 @@ class TestAgentBusyErrorClassification: f"Case {i}: expected detail to be set" def test_session_running_in_blockers(self, spawner): - """session_running 出现在 blockers 列表中(session_locked 优先) - - status=running + lock alive → session_locked 和 session_running 同时存在, - session_locked 排第一(primary_reason),但 blockers 列表包含 session_running。 - """ + """session_running 出现在 blockers 列表中(session_locked 优先)""" spawner._check_session_state = lambda aid: { "status": "running", "lock_pid_alive": True, "lock_expired": False, } @@ -464,9 +345,7 @@ class TestAgentBusyErrorClassification: task_id="t1", use_main_session=True, ) ) - # primary_reason 是 session_locked assert "session_locked" in exc_info.value.reason - # blockers 列表包含 session_running blockers = exc_info.value.detail.get("blockers", []) blocker_reasons = [b[0] for b in blockers] assert "session_running" in blocker_reasons @@ -494,22 +373,15 @@ class TestPhase25AndStuck: """司马懿评审遗漏 #1 + session_stuck 遗漏补充""" def test_phase25_stuck_fallback(self, spawner): - """Phase 0 不触发(status 非 running),Phase 2 检测到假死 → revive → 成功 spawn - - Phase 2.5 是 #07 v1.1 加的兜底:Phase 0 时 session 正常(idle), - 但 Phase 2 检查时变为 running + lock PID 死。Phase 2.5 应 revive → 重检 → idle → spawn 成功。 - """ + """Phase 2.5: Phase 2 检测到假死 → revive → 成功 spawn""" call_count = [0] def mock_check(agent_id): call_count[0] += 1 if call_count[0] <= 1: - # Phase 0: 正常 idle,不触发 revive return {"status": "idle", "lock_pid_alive": False} if call_count[0] == 2: - # Phase 2: 假死(Phase 0 和 Phase 2 之间进程变 stuck) return {"status": "running", "lock_pid_alive": False} - # Phase 2.5 重检:revive 后恢复 idle return {"status": "idle", "lock_pid_alive": False} spawner._check_session_state = mock_check @@ -521,7 +393,6 @@ class TestPhase25AndStuck: spawner._revive_session = mock_revive - # Phase 2.5 应触发 revive → 重检 → idle → 正常 spawn session_id = asyncio.run( spawner.spawn_full_agent( "test-agent", "task", @@ -529,31 +400,11 @@ class TestPhase25AndStuck: ) ) assert revive_called[0], "Phase 2.5 should have revived stuck session" - assert session_id # spawn 成功 + assert session_id def test_session_stuck_after_failed_revive(self, spawner): - """Phase 2.5 revive 失败 → session_stuck - - 假死 revive 后 status 仍为 running → AgentBusyError(session_stuck)。 - """ - call_count = [0] - - def mock_check(agent_id): - call_count[0] += 1 - if call_count[0] <= 1: - return {"status": "idle", "lock_pid_alive": False} - # Phase 2: 假死 - return {"status": "running", "lock_pid_alive": False} - - spawner._check_session_state = mock_check - - # revive 后 session 仍 stuck - def mock_revive(agent_id): - return True # revive 成功但 mock_check 不变,下次仍返回 running - - # 让 revive 后 check 返回 stuck + """Phase 2.5 revive 失败 → session_stuck""" revive_and_check_count = [0] - original_check = mock_check def mock_check_v2(agent_id): revive_and_check_count[0] += 1 @@ -561,11 +412,10 @@ class TestPhase25AndStuck: return {"status": "idle", "lock_pid_alive": False} if revive_and_check_count[0] == 2: return {"status": "running", "lock_pid_alive": False} - # revive 后重检:仍 stuck return {"status": "running", "lock_pid_alive": False} spawner._check_session_state = mock_check_v2 - spawner._revive_session = mock_revive + spawner._revive_session = lambda agent_id: True with pytest.raises(AgentBusyError) as exc_info: asyncio.run(