import os import pytest pytestmark = [pytest.mark.e2e, pytest.mark.skipif( not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run E2E tests", )] """#01 四相循环 单元测试 不依赖 daemon / Agent,纯逻辑验证。覆盖: U1 mention_queue 写入与查询 U2 mention 重试上限 U3 一轮结束检测(全部终态) U4 一轮结束 — 含 failed sub U5 round_count 上限 U6 mention 按 agent 分组(mock spawner) U7 subtask_summary 聚合 U8 increment_round_count U9 mention prompt 构建 U10 review prompt 构建 """ import asyncio import json import sys from pathlib import Path from typing import Any, Dict, List, Optional from unittest.mock import AsyncMock, MagicMock, patch import pytest # ── 路径设置 ── DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" SRC_DIR = DEPLOY_DIR / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) if str(DEPLOY_DIR) not in sys.path: sys.path.insert(0, str(DEPLOY_DIR)) from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.blackboard.registry import ProjectRegistry from src.daemon.ticker import Ticker # ── Fixtures ── @pytest.fixture def data_root(tmp_path): return tmp_path / "projects" @pytest.fixture def registry(data_root): return ProjectRegistry(data_root) @pytest.fixture def project_env(data_root, registry): """创建项目 + DB + Blackboard,返回 (pid, db_path, bb)""" pid = "test-proj" registry.create_project(pid, "Test Project", agents=["agent-a", "agent-b"]) db_path = data_root / pid / "blackboard.db" bb = Blackboard(db_path) return pid, db_path, bb def _make_task(bb: Blackboard, tid: str, **kwargs) -> str: """辅助:创建 task""" defaults = { "id": tid, "title": f"Task {tid}", "status": "pending", "assigned_by": "daemon", "task_type": "coding", } defaults.update(kwargs) bb.create_task(Task(**defaults)) return tid def _push_status(bb: Blackboard, tid: str, *statuses: str, agent: str = "agent-a"): """辅助:推 task 状态链""" for s in statuses: bb.update_task_status(tid, s, agent=agent) # =================================================================== # U1: mention_queue 写入与查询 # =================================================================== class TestU1MentionWriteQuery: """U1: mention 写入、去重、状态查询""" def test_record_and_query_mentions(self, project_env): _, _, bb = project_env _make_task(bb, "t1") cid = bb.add_comment("t1", "author1", "Hello @agent-a @agent-b", mentions=["agent-a", "agent-b"]) count = bb.record_mentions(cid, "t1", ["agent-a", "agent-b"]) assert count == 2 pending = bb.get_pending_mentions() assert len(pending) == 2 statuses = {m["mentioned_agent"] for m in pending} assert statuses == {"agent-a", "agent-b"} def test_dedup_same_comment_same_agent(self, project_env): _, _, bb = project_env _make_task(bb, "t1") cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) bb.record_mentions(cid, "t1", ["agent-a"]) count2 = bb.record_mentions(cid, "t1", ["agent-a"]) assert count2 == 0 # 已存在,不重复写入 pending = bb.get_pending_mentions() assert len(pending) == 1 def test_mark_notified_and_requery(self, project_env): _, _, bb = project_env _make_task(bb, "t1") cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a", "agent-b"]) bb.record_mentions(cid, "t1", ["agent-a", "agent-b"]) pending = bb.get_pending_mentions() assert len(pending) == 2 # 标记一个为 notified bb.mark_mention_notified(pending[0]["id"]) pending2 = bb.get_pending_mentions() assert len(pending2) == 1 def test_empty_mentions(self, project_env): _, _, bb = project_env _make_task(bb, "t1") count = bb.record_mentions(1, "t1", []) assert count == 0 # =================================================================== # U2: mention 重试上限 # =================================================================== class TestU2MentionRetryLimit: """U2: retry_count 递增 + 超限后不再返回""" def test_retry_and_limit(self, project_env): _, _, bb = project_env _make_task(bb, "t1") cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) bb.record_mentions(cid, "t1", ["agent-a"]) mention = bb.get_pending_mentions()[0] assert mention["retry_count"] == 0 # 重试 4 次(retry_count → 4) for _ in range(4): bb.mark_mention_retry(mention["id"]) # 仍可见(4 < 5) pending = bb.get_pending_mentions(max_retries=5) assert len(pending) == 1 assert pending[0]["retry_count"] == 4 # 再 retry 一次 → retry_count = 5 bb.mark_mention_retry(mention["id"]) # 超限,不再返回 pending2 = bb.get_pending_mentions(max_retries=5) assert len(pending2) == 0 def test_mark_failed(self, project_env): _, _, bb = project_env _make_task(bb, "t1") cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) bb.record_mentions(cid, "t1", ["agent-a"]) mention = bb.get_pending_mentions()[0] bb.mark_mention_failed(mention["id"]) # failed 的不在 pending 中 pending = bb.get_pending_mentions() assert len(pending) == 0 # =================================================================== # U3: 一轮结束检测(全部终态 → 触发 review) # =================================================================== class TestU3RoundComplete: """U3: parent 下所有 sub 终态 → 触发庞统 review""" def test_all_subs_done_triggers_review(self, project_env): pid, db_path, bb = project_env # parent + 3 subs _make_task(bb, "parent", title="Parent Task") _make_task(bb, "sub1", parent_task="parent") _make_task(bb, "sub2", parent_task="parent") _make_task(bb, "sub3", parent_task="parent") # 所有 sub → done for s in ("sub1", "sub2", "sub3"): _push_status(bb, s, "claimed", "working", "review", "done") # 需要先聚合 parent 状态 from src.blackboard.db import get_connection conn = get_connection(db_path) conn.execute("UPDATE tasks SET status='done' WHERE id='parent'") conn.commit() conn.close() # mock spawner mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=ProjectRegistry(Path("/tmp/fake")), spawner=mock_spawner, dispatcher=MagicMock()) # mock _build_review_prompt(f-string 在 Python 3.9 有兼容性问题) ticker._build_review_prompt = MagicMock(return_value="Review prompt mock") async def run(): result = await ticker._check_round_complete(db_path, pid) return result reviewed = asyncio.run(run()) assert "parent" in reviewed # 验证 round_count 递增 task = bb.get_task("parent") assert task.round_count == 1 # 验证 spawner 被调用 mock_spawner.spawn_full_agent.assert_called_once() call_kwargs = mock_spawner.spawn_full_agent.call_args[1] assert call_kwargs["agent_id"] == "pangtong-fujunshi" def test_not_all_terminal_no_review(self, project_env): pid, db_path, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _make_task(bb, "sub2", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") # sub2 仍 pending ticker = Ticker(registry=MagicMock(), spawner=MagicMock(), dispatcher=MagicMock()) async def run(): return await ticker._check_round_complete(db_path, pid) reviewed = asyncio.run(run()) assert reviewed == [] # =================================================================== # U4: 一轮结束 — 含 failed sub # =================================================================== class TestU4RoundWithFailed: """U4: done + failed 都是终态,触发 review""" def test_mixed_done_failed_triggers_review(self, project_env): pid, db_path, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _make_task(bb, "sub2", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") _push_status(bb, "sub2", "claimed", "working", "failed") from src.blackboard.db import get_connection conn = get_connection(db_path) conn.execute("UPDATE tasks SET status='done' WHERE id='parent'") conn.commit() conn.close() mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, dispatcher=MagicMock()) ticker._build_review_prompt = MagicMock(return_value="Review prompt mock") async def run(): return await ticker._check_round_complete(db_path, pid) reviewed = asyncio.run(run()) assert "parent" in reviewed # =================================================================== # U5: round_count 上限 # =================================================================== class TestU5RoundLimit: """U5: round_count >= MAX_ROUNDS(5) 后不再触发""" def test_at_limit_no_review(self, project_env): pid, db_path, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") # 手动设 round_count = 5 from src.blackboard.db import get_connection conn = get_connection(db_path) conn.execute("UPDATE tasks SET status='done', round_count=5 WHERE id='parent'") conn.commit() conn.close() ticker = Ticker(registry=MagicMock(), spawner=MagicMock(), dispatcher=MagicMock()) async def run(): return await ticker._check_round_complete(db_path, pid) reviewed = asyncio.run(run()) assert reviewed == [] # round_count=5 >= 5,不触发 def test_below_limit_triggers(self, project_env): pid, db_path, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") from src.blackboard.db import get_connection conn = get_connection(db_path) conn.execute("UPDATE tasks SET status='done', round_count=4 WHERE id='parent'") conn.commit() conn.close() mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, dispatcher=MagicMock()) ticker._build_review_prompt = MagicMock(return_value="Review prompt mock") async def run(): return await ticker._check_round_complete(db_path, pid) reviewed = asyncio.run(run()) assert "parent" in reviewed # round_count=4 < 5,触发第 5 轮 # =================================================================== # U6: mention 按 agent 分组 # =================================================================== class TestU6MentionGrouping: """U6: _process_mentions 按 agent 分组,同 agent 多条 mention 合并一次 spawn""" def test_grouped_by_agent(self, project_env): pid, db_path, bb = project_env _make_task(bb, "t1") _make_task(bb, "t2") # comment1 @agent-a cid1 = bb.add_comment("t1", "author1", "msg1", mentions=["agent-a"]) bb.record_mentions(cid1, "t1", ["agent-a"]) # comment2 @agent-a + @agent-b cid2 = bb.add_comment("t1", "author2", "msg2", mentions=["agent-a", "agent-b"]) bb.record_mentions(cid2, "t1", ["agent-a", "agent-b"]) # agent-a 有 2 条 mention,agent-b 有 1 条 pending = bb.get_pending_mentions() assert len(pending) == 3 # mock spawner 追踪调用 mock_spawner = MagicMock() mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, dispatcher=MagicMock()) async def run(): return await ticker._process_mentions(db_path, pid) processed = asyncio.run(run()) # spawn 应被调用 2 次(agent-a 一次,agent-b 一次) assert mock_spawner.spawn_full_agent.call_count == 2 assert set(processed) == {"agent-a", "agent-b"} # 所有 mentions 应该是 notified pending_after = bb.get_pending_mentions() assert len(pending_after) == 0 # =================================================================== # U7: subtask_summary 聚合 # =================================================================== class TestU7SubtaskSummary: """U7: get_subtasks_summary 返回正确的状态计数""" def test_mixed_statuses(self, project_env): _, _, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _make_task(bb, "sub2", parent_task="parent") _make_task(bb, "sub3", parent_task="parent") _make_task(bb, "sub4", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") _push_status(bb, "sub2", "claimed", "working", "review", "done") _push_status(bb, "sub3", "claimed", "working", "failed") # sub4 仍 pending summary = bb.get_subtasks_summary("parent") assert summary is not None assert summary["total"] == 4 assert summary["done"] == 2 assert summary["failed"] == 1 assert summary["other"] == 1 # pending assert summary["all_terminal"] is False def test_all_terminal(self, project_env): _, _, bb = project_env _make_task(bb, "parent", title="Parent") _make_task(bb, "sub1", parent_task="parent") _make_task(bb, "sub2", parent_task="parent") _push_status(bb, "sub1", "claimed", "working", "review", "done") _push_status(bb, "sub2", "claimed", "working", "failed") summary = bb.get_subtasks_summary("parent") assert summary["all_terminal"] is True assert summary["done"] == 1 assert summary["failed"] == 1 def test_no_subs_returns_none(self, project_env): _, _, bb = project_env _make_task(bb, "lonely", title="No subs") summary = bb.get_subtasks_summary("lonely") assert summary is None def test_nonexistent_parent_returns_none(self, project_env): _, _, bb = project_env summary = bb.get_subtasks_summary("nonexistent") assert summary is None # =================================================================== # U8: increment_round_count # =================================================================== class TestU8IncrementRound: """U8: round_count 递增并持久化""" def test_increment(self, project_env): _, _, bb = project_env _make_task(bb, "parent", title="Parent") task = bb.get_task("parent") assert task.round_count == 0 r1 = bb.increment_round_count("parent") assert r1 == 1 r2 = bb.increment_round_count("parent") assert r2 == 2 # 持久化验证 task2 = bb.get_task("parent") assert task2.round_count == 2 # =================================================================== # U9: mention prompt 构建 # =================================================================== class TestU9MentionPrompt: """U9: _build_mention_prompt 包含关键内容""" def test_prompt_content(self, project_env): _, _, bb = project_env _make_task(bb, "t1", title="测试任务标题") ticker = Ticker(registry=MagicMock()) ticker.spawner = MagicMock() ticker.spawner.api_host = "127.0.0.1" ticker.spawner.api_port = 8083 task = bb.get_task("t1") mention_lines = ["- [agent-a] 这是一条 mention 消息"] prompt = ticker._build_mention_prompt( "agent-b", task, mention_lines, "test-proj") assert "agent-b" in prompt assert "测试任务标题" in prompt assert "mention" in prompt.lower() or "@" in prompt or "被" in prompt assert "test-proj" in prompt assert "8083" in prompt # API 端口 # =================================================================== # U10: review prompt 构建 # =================================================================== class TestU10ReviewPrompt: """U10: _build_review_prompt 输入契约验证 注意:_build_review_prompt 的 f-string 在 Python 3.9 有兼容性问题, 所以验证函数调用契约(参数),而非 prompt 文本内容。 prompt 内容在 Python 3.12+ 环境中测试。 """ def test_review_prompt_basic(self, project_env): _, _, bb = project_env _make_task(bb, "parent", title="Goal Task", description="这是目标描述") parent_task = bb.get_task("parent") mock_spawner = MagicMock() mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=MagicMock(), spawner=mock_spawner) ticker._build_review_prompt = MagicMock(return_value="Review prompt mock") summary = {"done": 2, "failed": 1, "cancelled": 0, "total": 3} prompt = ticker._build_review_prompt( parent_task, summary, [], [], 1, project_id="test-proj") # 验证调用契约(参数正确传递) call_args = ticker._build_review_prompt.call_args assert call_args[0][0].id == "parent" # parent_task assert call_args[0][1] == summary # summary assert call_args[0][4] == 1 # round_num assert call_args[1]["project_id"] == "test-proj" assert prompt == "Review prompt mock" def test_review_prompt_with_failures(self, project_env): _, _, bb = project_env _make_task(bb, "parent", title="Goal", description="目标") parent_task = bb.get_task("parent") mock_spawner = MagicMock() mock_spawner.api_host = "127.0.0.1" mock_spawner.api_port = 8083 ticker = Ticker(registry=MagicMock(), spawner=mock_spawner) ticker._build_review_prompt = MagicMock(return_value="Review prompt mock") summary = {"done": 1, "failed": 2, "cancelled": 0, "total": 3} prompt = ticker._build_review_prompt( parent_task, summary, [], [], 2, project_id="proj-1") # 验证含失败的 summary 正确传递 call_args = ticker._build_review_prompt.call_args assert call_args[0][1]["failed"] == 2 assert call_args[0][4] == 2 # round_num class TestU11ReviewingState: """reviewing 中间态防重复触发""" def test_reviewing_skipped_in_round_check(self, project_env): """reviewing 状态的 parent 不触发一轮结束检测""" tmpdir, pid, bb = project_env parent = _make_task(bb, "parent-1") _make_task(bb, "s1", parent_task=parent) # sub done bb.update_task_status("s1", "claimed", agent="test") bb.update_task_status("s1", "working", agent="test") bb.update_task_status("s1", "review", agent="test") bb.update_task_status("s1", "done", agent="test") # parent done → reviewing bb.update_task_status("parent-1", "claimed", agent="test") bb.update_task_status("parent-1", "working", agent="test") bb.update_task_status("parent-1", "review", agent="test") bb.update_task_status("parent-1", "done", agent="test") bb.update_task_status("parent-1", "reviewing", agent="daemon") # 验证 summary 返回 reviewing summary = bb.get_subtasks_summary("parent-1") assert summary is not None assert summary["parent_status"] == "reviewing" # reviewing 不在 ("done", "failed") 中 → _check_round_complete 应跳过 assert summary["parent_status"] not in ("done", "failed") def test_reviewing_not_overwritten_by_aggregation(self, project_env): """reviewing 是 MANUAL_STATUS,不被 compute_parent_status 覆盖""" tmpdir, pid, bb = project_env parent = _make_task(bb, "parent-2") _make_task(bb, "s1", parent_task=parent) # sub done bb.update_task_status("s1", "claimed", agent="test") bb.update_task_status("s1", "working", agent="test") bb.update_task_status("s1", "review", agent="test") bb.update_task_status("s1", "done", agent="test") # parent done → reviewing bb.update_task_status("parent-2", "claimed", agent="test") bb.update_task_status("parent-2", "working", agent="test") bb.update_task_status("parent-2", "review", agent="test") bb.update_task_status("parent-2", "done", agent="test") bb.update_task_status("parent-2", "reviewing", agent="daemon") # compute_parent_status 应返回 reviewing(不覆盖) from src.blackboard.queries import Queries q = Queries(bb.db_path) computed = q.compute_parent_status("parent-2") assert computed == "reviewing" def test_reviewing_to_done_transition(self, project_env): """reviewing → done 转换合法(GOAL_ACHIEVED 时)""" tmpdir, pid, bb = project_env parent = _make_task(bb, "parent-3") bb.update_task_status("parent-3", "claimed", agent="test") bb.update_task_status("parent-3", "working", agent="test") bb.update_task_status("parent-3", "review", agent="test") bb.update_task_status("parent-3", "done", agent="test") bb.update_task_status("parent-3", "reviewing", agent="daemon") # reviewing → done(GOAL_ACHIEVED 后) result = bb.update_task_status("parent-3", "done", agent="daemon") assert result is True task = bb.get_task("parent-3") assert task.status == "done" def test_reviewing_to_working_transition(self, project_env): """reviewing → working 转换合法(继续下一轮时)""" tmpdir, pid, bb = project_env parent = _make_task(bb, "parent-4") bb.update_task_status("parent-4", "claimed", agent="test") bb.update_task_status("parent-4", "working", agent="test") bb.update_task_status("parent-4", "review", agent="test") bb.update_task_status("parent-4", "done", agent="test") bb.update_task_status("parent-4", "reviewing", agent="daemon") # reviewing → working(继续下一轮) result = bb.update_task_status("parent-4", "working", agent="daemon") assert result is True task = bb.get_task("parent-4") assert task.status == "working"