diff --git a/tests/test_four_phase_unit.py b/tests/test_four_phase_unit.py new file mode 100644 index 0000000..e142e01 --- /dev/null +++ b/tests/test_four_phase_unit.py @@ -0,0 +1,534 @@ +"""#01 四相循环 单元测试 + +不依赖 daemon / Agent,纯逻辑验证。覆盖: + U1 mention_queue 写入与查询 + U2 mention 重试上限 + U3 一轮结束检测(全部终态) + U4 一轮结束 — 含 failed sub + U5 round_count 上限 + U6 mention 按 agent 分组(mock spawner) + U7 subtask_summary 聚合 + U8 increment_round_count + U9 mention prompt 构建 + U10 review prompt 构建 +""" + +import asyncio +import json +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# ── 路径设置 ── +DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" +SRC_DIR = DEPLOY_DIR / "src" +if str(SRC_DIR) not in sys.path: + sys.path.insert(0, str(SRC_DIR)) +if str(DEPLOY_DIR) not in sys.path: + sys.path.insert(0, str(DEPLOY_DIR)) + +from src.blackboard.models import Task +from src.blackboard.operations import Blackboard +from src.blackboard.registry import ProjectRegistry +from src.daemon.ticker import Ticker + + +# ── Fixtures ── + +@pytest.fixture +def data_root(tmp_path): + return tmp_path / "projects" + + +@pytest.fixture +def registry(data_root): + return ProjectRegistry(data_root) + + +@pytest.fixture +def project_env(data_root, registry): + """创建项目 + DB + Blackboard,返回 (pid, db_path, bb)""" + pid = "test-proj" + registry.create_project(pid, "Test Project", agents=["agent-a", "agent-b"]) + db_path = data_root / pid / "blackboard.db" + bb = Blackboard(db_path) + return pid, db_path, bb + + +def _make_task(bb: Blackboard, tid: str, **kwargs) -> str: + """辅助:创建 task""" + defaults = { + "id": tid, "title": f"Task {tid}", "status": "pending", + "assigned_by": "daemon", "task_type": "coding", + } + defaults.update(kwargs) + bb.create_task(Task(**defaults)) + return tid + + +def _push_status(bb: Blackboard, tid: str, *statuses: str, agent: str = "agent-a"): + """辅助:推 task 状态链""" + for s in statuses: + bb.update_task_status(tid, s, agent=agent) + + +# =================================================================== +# U1: mention_queue 写入与查询 +# =================================================================== + +class TestU1MentionWriteQuery: + """U1: mention 写入、去重、状态查询""" + + def test_record_and_query_mentions(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + + cid = bb.add_comment("t1", "author1", "Hello @agent-a @agent-b", + mentions=["agent-a", "agent-b"]) + + count = bb.record_mentions(cid, "t1", ["agent-a", "agent-b"]) + assert count == 2 + + pending = bb.get_pending_mentions() + assert len(pending) == 2 + statuses = {m["mentioned_agent"] for m in pending} + assert statuses == {"agent-a", "agent-b"} + + def test_dedup_same_comment_same_agent(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) + + bb.record_mentions(cid, "t1", ["agent-a"]) + count2 = bb.record_mentions(cid, "t1", ["agent-a"]) + assert count2 == 0 # 已存在,不重复写入 + + pending = bb.get_pending_mentions() + assert len(pending) == 1 + + def test_mark_notified_and_requery(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a", "agent-b"]) + bb.record_mentions(cid, "t1", ["agent-a", "agent-b"]) + + pending = bb.get_pending_mentions() + assert len(pending) == 2 + + # 标记一个为 notified + bb.mark_mention_notified(pending[0]["id"]) + + pending2 = bb.get_pending_mentions() + assert len(pending2) == 1 + + def test_empty_mentions(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + count = bb.record_mentions(1, "t1", []) + assert count == 0 + + +# =================================================================== +# U2: mention 重试上限 +# =================================================================== + +class TestU2MentionRetryLimit: + """U2: retry_count 递增 + 超限后不再返回""" + + def test_retry_and_limit(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) + bb.record_mentions(cid, "t1", ["agent-a"]) + + mention = bb.get_pending_mentions()[0] + assert mention["retry_count"] == 0 + + # 重试 4 次(retry_count → 4) + for _ in range(4): + bb.mark_mention_retry(mention["id"]) + + # 仍可见(4 < 5) + pending = bb.get_pending_mentions(max_retries=5) + assert len(pending) == 1 + assert pending[0]["retry_count"] == 4 + + # 再 retry 一次 → retry_count = 5 + bb.mark_mention_retry(mention["id"]) + + # 超限,不再返回 + pending2 = bb.get_pending_mentions(max_retries=5) + assert len(pending2) == 0 + + def test_mark_failed(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1") + cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"]) + bb.record_mentions(cid, "t1", ["agent-a"]) + + mention = bb.get_pending_mentions()[0] + bb.mark_mention_failed(mention["id"]) + + # failed 的不在 pending 中 + pending = bb.get_pending_mentions() + assert len(pending) == 0 + + +# =================================================================== +# U3: 一轮结束检测(全部终态 → 触发 review) +# =================================================================== + +class TestU3RoundComplete: + """U3: parent 下所有 sub 终态 → 触发庞统 review""" + + def test_all_subs_done_triggers_review(self, project_env): + pid, db_path, bb = project_env + + # parent + 3 subs + _make_task(bb, "parent", title="Parent Task") + _make_task(bb, "sub1", parent_task="parent") + _make_task(bb, "sub2", parent_task="parent") + _make_task(bb, "sub3", parent_task="parent") + + # 所有 sub → done + for s in ("sub1", "sub2", "sub3"): + _push_status(bb, s, "claimed", "working", "review", "done") + + # 需要先聚合 parent 状态 + from src.blackboard.db import get_connection + conn = get_connection(db_path) + conn.execute("UPDATE tasks SET status='done' WHERE id='parent'") + conn.close() + + # mock spawner + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") + ticker = Ticker(registry=ProjectRegistry(Path("/tmp/fake")), + spawner=mock_spawner, dispatcher=MagicMock()) + + async def run(): + result = await ticker._check_round_complete(db_path, pid) + return result + + reviewed = asyncio.run(run()) + assert "parent" in reviewed + + # 验证 round_count 递增 + task = bb.get_task("parent") + assert task.round_count == 1 + + # 验证 spawner 被调用 + mock_spawner.spawn_full_agent.assert_called_once() + call_kwargs = mock_spawner.spawn_full_agent.call_args[1] + assert call_kwargs["agent_id"] == "pangtong-fujunshi" + + def test_not_all_terminal_no_review(self, project_env): + pid, db_path, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + _make_task(bb, "sub2", parent_task="parent") + + _push_status(bb, "sub1", "claimed", "working", "review", "done") + # sub2 仍 pending + + ticker = Ticker(registry=MagicMock(), spawner=MagicMock(), + dispatcher=MagicMock()) + + async def run(): + return await ticker._check_round_complete(db_path, pid) + + reviewed = asyncio.run(run()) + assert reviewed == [] + + +# =================================================================== +# U4: 一轮结束 — 含 failed sub +# =================================================================== + +class TestU4RoundWithFailed: + """U4: done + failed 都是终态,触发 review""" + + def test_mixed_done_failed_triggers_review(self, project_env): + pid, db_path, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + _make_task(bb, "sub2", parent_task="parent") + + _push_status(bb, "sub1", "claimed", "working", "review", "done") + _push_status(bb, "sub2", "claimed", "working", "failed") + + from src.blackboard.db import get_connection + conn = get_connection(db_path) + conn.execute("UPDATE tasks SET status='done' WHERE id='parent'") + conn.close() + + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") + ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, + dispatcher=MagicMock()) + + async def run(): + return await ticker._check_round_complete(db_path, pid) + + reviewed = asyncio.run(run()) + assert "parent" in reviewed + + +# =================================================================== +# U5: round_count 上限 +# =================================================================== + +class TestU5RoundLimit: + """U5: round_count >= MAX_ROUNDS(5) 后不再触发""" + + def test_at_limit_no_review(self, project_env): + pid, db_path, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + + _push_status(bb, "sub1", "claimed", "working", "review", "done") + + # 手动设 round_count = 5 + from src.blackboard.db import get_connection + conn = get_connection(db_path) + conn.execute("UPDATE tasks SET status='done', round_count=5 WHERE id='parent'") + conn.close() + + ticker = Ticker(registry=MagicMock(), spawner=MagicMock(), + dispatcher=MagicMock()) + + async def run(): + return await ticker._check_round_complete(db_path, pid) + + reviewed = asyncio.run(run()) + assert reviewed == [] # round_count=5 >= 5,不触发 + + def test_below_limit_triggers(self, project_env): + pid, db_path, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + _push_status(bb, "sub1", "claimed", "working", "review", "done") + + from src.blackboard.db import get_connection + conn = get_connection(db_path) + conn.execute("UPDATE tasks SET status='done', round_count=4 WHERE id='parent'") + conn.close() + + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") + ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, + dispatcher=MagicMock()) + + async def run(): + return await ticker._check_round_complete(db_path, pid) + + reviewed = asyncio.run(run()) + assert "parent" in reviewed # round_count=4 < 5,触发第 5 轮 + + +# =================================================================== +# U6: mention 按 agent 分组 +# =================================================================== + +class TestU6MentionGrouping: + """U6: _process_mentions 按 agent 分组,同 agent 多条 mention 合并一次 spawn""" + + def test_grouped_by_agent(self, project_env): + pid, db_path, bb = project_env + + _make_task(bb, "t1") + _make_task(bb, "t2") + + # comment1 @agent-a + cid1 = bb.add_comment("t1", "author1", "msg1", mentions=["agent-a"]) + bb.record_mentions(cid1, "t1", ["agent-a"]) + + # comment2 @agent-a + @agent-b + cid2 = bb.add_comment("t1", "author2", "msg2", mentions=["agent-a", "agent-b"]) + bb.record_mentions(cid2, "t1", ["agent-a", "agent-b"]) + + # agent-a 有 2 条 mention,agent-b 有 1 条 + pending = bb.get_pending_mentions() + assert len(pending) == 3 + + # mock spawner 追踪调用 + mock_spawner = MagicMock() + mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1") + mock_spawner.api_host = "127.0.0.1" + mock_spawner.api_port = 8083 + + ticker = Ticker(registry=MagicMock(), spawner=mock_spawner, + dispatcher=MagicMock()) + + async def run(): + return await ticker._process_mentions(db_path, pid) + + processed = asyncio.run(run()) + + # spawn 应被调用 2 次(agent-a 一次,agent-b 一次) + assert mock_spawner.spawn_full_agent.call_count == 2 + assert set(processed) == {"agent-a", "agent-b"} + + # 所有 mentions 应该是 notified + pending_after = bb.get_pending_mentions() + assert len(pending_after) == 0 + + +# =================================================================== +# U7: subtask_summary 聚合 +# =================================================================== + +class TestU7SubtaskSummary: + """U7: get_subtasks_summary 返回正确的状态计数""" + + def test_mixed_statuses(self, project_env): + _, _, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + _make_task(bb, "sub2", parent_task="parent") + _make_task(bb, "sub3", parent_task="parent") + _make_task(bb, "sub4", parent_task="parent") + + _push_status(bb, "sub1", "claimed", "working", "review", "done") + _push_status(bb, "sub2", "claimed", "working", "review", "done") + _push_status(bb, "sub3", "claimed", "working", "failed") + # sub4 仍 pending + + summary = bb.get_subtasks_summary("parent") + assert summary is not None + assert summary["total"] == 4 + assert summary["done"] == 2 + assert summary["failed"] == 1 + assert summary["other"] == 1 # pending + assert summary["all_terminal"] is False + + def test_all_terminal(self, project_env): + _, _, bb = project_env + + _make_task(bb, "parent", title="Parent") + _make_task(bb, "sub1", parent_task="parent") + _make_task(bb, "sub2", parent_task="parent") + + _push_status(bb, "sub1", "claimed", "working", "review", "done") + _push_status(bb, "sub2", "claimed", "working", "failed") + + summary = bb.get_subtasks_summary("parent") + assert summary["all_terminal"] is True + assert summary["done"] == 1 + assert summary["failed"] == 1 + + def test_no_subs_returns_none(self, project_env): + _, _, bb = project_env + _make_task(bb, "lonely", title="No subs") + + summary = bb.get_subtasks_summary("lonely") + assert summary is None + + def test_nonexistent_parent_returns_none(self, project_env): + _, _, bb = project_env + summary = bb.get_subtasks_summary("nonexistent") + assert summary is None + + +# =================================================================== +# U8: increment_round_count +# =================================================================== + +class TestU8IncrementRound: + """U8: round_count 递增并持久化""" + + def test_increment(self, project_env): + _, _, bb = project_env + _make_task(bb, "parent", title="Parent") + + task = bb.get_task("parent") + assert task.round_count == 0 + + r1 = bb.increment_round_count("parent") + assert r1 == 1 + + r2 = bb.increment_round_count("parent") + assert r2 == 2 + + # 持久化验证 + task2 = bb.get_task("parent") + assert task2.round_count == 2 + + +# =================================================================== +# U9: mention prompt 构建 +# =================================================================== + +class TestU9MentionPrompt: + """U9: _build_mention_prompt 包含关键内容""" + + def test_prompt_content(self, project_env): + _, _, bb = project_env + _make_task(bb, "t1", title="测试任务标题") + + ticker = Ticker(registry=MagicMock()) + ticker.spawner = MagicMock() + ticker.spawner.api_host = "127.0.0.1" + ticker.spawner.api_port = 8083 + + task = bb.get_task("t1") + mention_lines = ["- [agent-a] 这是一条 mention 消息"] + + prompt = ticker._build_mention_prompt( + "agent-b", task, mention_lines, "test-proj") + + assert "agent-b" in prompt + assert "测试任务标题" in prompt + assert "mention" in prompt.lower() or "@" in prompt or "被" in prompt + assert "test-proj" in prompt + assert "8083" in prompt # API 端口 + + +# =================================================================== +# U10: review prompt 构建 +# =================================================================== + +class TestU10ReviewPrompt: + """U10: _build_review_prompt 包含三问框架 + 失败处理""" + + def test_review_prompt_basic(self, project_env): + _, _, bb = project_env + _make_task(bb, "parent", title="Goal Task", + description="这是目标描述") + + parent_task = bb.get_task("parent") + ticker = Ticker(registry=MagicMock()) + + summary = {"done": 2, "failed": 1, "cancelled": 0, "total": 3} + prompt = ticker._build_review_prompt( + parent_task, summary, [], [], 1, project_id="test-proj") + + assert "Goal" in prompt + assert "三问" in prompt + assert "test-proj" in prompt + assert "第 1 轮" in prompt + assert "Round 上限" in prompt + + def test_review_prompt_with_failures(self, project_env): + _, _, bb = project_env + _make_task(bb, "parent", title="Goal", + description="目标") + + parent_task = bb.get_task("parent") + ticker = Ticker(registry=MagicMock()) + + summary = {"done": 1, "failed": 2, "cancelled": 0, "total": 3} + prompt = ticker._build_review_prompt( + parent_task, summary, [], [], 2, project_id="proj-1") + + assert "失败处理" in prompt + assert "2 个" in prompt # "有 2 个 sub task failed" + assert "第 2 轮" in prompt