Files
2026-06-05 23:22:03 +08:00

667 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import pytest
pytestmark = [pytest.mark.e2e, pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests",
)]
"""#01 四相循环 单元测试
不依赖 daemon / Agent,纯逻辑验证。覆盖:
U1 mention_queue 写入与查询
U2 mention 重试上限
U3 一轮结束检测(全部终态)
U4 一轮结束 — 含 failed sub
U5 round_count 上限
U6 mention 按 agent 分组(mock spawner
U7 subtask_summary 聚合
U8 increment_round_count
U9 mention prompt 构建
U10 review prompt 构建
"""
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
SRC_DIR = DEPLOY_DIR / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
if str(DEPLOY_DIR) not in sys.path:
sys.path.insert(0, str(DEPLOY_DIR))
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
# ── Fixtures ──
@pytest.fixture
def data_root(tmp_path):
return tmp_path / "projects"
@pytest.fixture
def registry(data_root):
return ProjectRegistry(data_root)
@pytest.fixture
def project_env(data_root, registry):
"""创建项目 + DB + Blackboard,返回 (pid, db_path, bb)"""
pid = "test-proj"
registry.create_project(pid, "Test Project", agents=["agent-a", "agent-b"])
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
return pid, db_path, bb
def _make_task(bb: Blackboard, tid: str, **kwargs) -> str:
"""辅助:创建 task"""
defaults = {
"id": tid, "title": f"Task {tid}", "status": "pending",
"assigned_by": "daemon", "task_type": "coding",
}
defaults.update(kwargs)
bb.create_task(Task(**defaults))
return tid
def _push_status(bb: Blackboard, tid: str, *statuses: str, agent: str = "agent-a"):
"""辅助:推 task 状态链"""
for s in statuses:
bb.update_task_status(tid, s, agent=agent)
# ===================================================================
# U1: mention_queue 写入与查询
# ===================================================================
class TestU1MentionWriteQuery:
"""U1: mention 写入、去重、状态查询"""
def test_record_and_query_mentions(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello @agent-a @agent-b",
mentions=["agent-a", "agent-b"])
count = bb.record_mentions(cid, "t1", ["agent-a", "agent-b"])
assert count == 2
pending = bb.get_pending_mentions()
assert len(pending) == 2
statuses = {m["mentioned_agent"] for m in pending}
assert statuses == {"agent-a", "agent-b"}
def test_dedup_same_comment_same_agent(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
count2 = bb.record_mentions(cid, "t1", ["agent-a"])
assert count2 == 0 # 已存在,不重复写入
pending = bb.get_pending_mentions()
assert len(pending) == 1
def test_mark_notified_and_requery(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a", "agent-b"])
bb.record_mentions(cid, "t1", ["agent-a", "agent-b"])
pending = bb.get_pending_mentions()
assert len(pending) == 2
# 标记一个为 notified
bb.mark_mention_notified(pending[0]["id"])
pending2 = bb.get_pending_mentions()
assert len(pending2) == 1
def test_empty_mentions(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
count = bb.record_mentions(1, "t1", [])
assert count == 0
# ===================================================================
# U2: mention 重试上限
# ===================================================================
class TestU2MentionRetryLimit:
"""U2: retry_count 递增 + 超限后不再返回"""
def test_retry_and_limit(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
mention = bb.get_pending_mentions()[0]
assert mention["retry_count"] == 0
# 重试 4 次(retry_count → 4
for _ in range(4):
bb.mark_mention_retry(mention["id"])
# 仍可见(4 < 5
pending = bb.get_pending_mentions(max_retries=5)
assert len(pending) == 1
assert pending[0]["retry_count"] == 4
# 再 retry 一次 → retry_count = 5
bb.mark_mention_retry(mention["id"])
# 超限,不再返回
pending2 = bb.get_pending_mentions(max_retries=5)
assert len(pending2) == 0
def test_mark_failed(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
mention = bb.get_pending_mentions()[0]
bb.mark_mention_failed(mention["id"])
# failed 的不在 pending 中
pending = bb.get_pending_mentions()
assert len(pending) == 0
# ===================================================================
# U3: 一轮结束检测(全部终态 → 触发 review)
# ===================================================================
class TestU3RoundComplete:
"""U3: parent 下所有 sub 终态 → 触发庞统 review"""
def test_all_subs_done_triggers_review(self, project_env):
pid, db_path, bb = project_env
# parent + 3 subs
_make_task(bb, "parent", title="Parent Task")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_make_task(bb, "sub3", parent_task="parent")
# 所有 sub → done
for s in ("sub1", "sub2", "sub3"):
_push_status(bb, s, "claimed", "working", "review", "done")
# 需要先聚合 parent 状态
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done' WHERE id='parent'")
conn.commit()
conn.close()
# mock spawner
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=ProjectRegistry(Path("/tmp/fake")),
spawner=mock_spawner, dispatcher=MagicMock())
# mock _build_review_promptf-string 在 Python 3.9 有兼容性问题)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
result = await ticker._check_round_complete(db_path, pid)
return result
reviewed = asyncio.run(run())
assert "parent" in reviewed
# 验证 round_count 递增
task = bb.get_task("parent")
assert task.round_count == 1
# 验证 spawner 被调用
mock_spawner.spawn_full_agent.assert_called_once()
call_kwargs = mock_spawner.spawn_full_agent.call_args[1]
assert call_kwargs["agent_id"] == "pangtong-fujunshi"
def test_not_all_terminal_no_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
# sub2 仍 pending
ticker = Ticker(registry=MagicMock(), spawner=MagicMock(),
dispatcher=MagicMock())
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert reviewed == []
# ===================================================================
# U4: 一轮结束 — 含 failed sub
# ===================================================================
class TestU4RoundWithFailed:
"""U4: done + failed 都是终态,触发 review"""
def test_mixed_done_failed_triggers_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "failed")
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done' WHERE id='parent'")
conn.commit()
conn.close()
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert "parent" in reviewed
# ===================================================================
# U5: round_count 上限
# ===================================================================
class TestU5RoundLimit:
"""U5: round_count >= MAX_ROUNDS(5) 后不再触发"""
def test_at_limit_no_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
# 手动设 round_count = 5
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done', round_count=5 WHERE id='parent'")
conn.commit()
conn.close()
ticker = Ticker(registry=MagicMock(), spawner=MagicMock(),
dispatcher=MagicMock())
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert reviewed == [] # round_count=5 >= 5,不触发
def test_below_limit_triggers(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done', round_count=4 WHERE id='parent'")
conn.commit()
conn.close()
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert "parent" in reviewed # round_count=4 < 5,触发第 5 轮
# ===================================================================
# U6: mention 按 agent 分组
# ===================================================================
class TestU6MentionGrouping:
"""U6: _process_mentions 按 agent 分组,同 agent 多条 mention 合并一次 spawn"""
def test_grouped_by_agent(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "t1")
_make_task(bb, "t2")
# comment1 @agent-a
cid1 = bb.add_comment("t1", "author1", "msg1", mentions=["agent-a"])
bb.record_mentions(cid1, "t1", ["agent-a"])
# comment2 @agent-a + @agent-b
cid2 = bb.add_comment("t1", "author2", "msg2", mentions=["agent-a", "agent-b"])
bb.record_mentions(cid2, "t1", ["agent-a", "agent-b"])
# agent-a 有 2 条 mentionagent-b 有 1 条
pending = bb.get_pending_mentions()
assert len(pending) == 3
# mock spawner 追踪调用
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
async def run():
return await ticker._process_mentions(db_path, pid)
processed = asyncio.run(run())
# spawn 应被调用 2 次(agent-a 一次,agent-b 一次)
assert mock_spawner.spawn_full_agent.call_count == 2
assert set(processed) == {"agent-a", "agent-b"}
# 所有 mentions 应该是 notified
pending_after = bb.get_pending_mentions()
assert len(pending_after) == 0
# ===================================================================
# U7: subtask_summary 聚合
# ===================================================================
class TestU7SubtaskSummary:
"""U7: get_subtasks_summary 返回正确的状态计数"""
def test_mixed_statuses(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_make_task(bb, "sub3", parent_task="parent")
_make_task(bb, "sub4", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "review", "done")
_push_status(bb, "sub3", "claimed", "working", "failed")
# sub4 仍 pending
summary = bb.get_subtasks_summary("parent")
assert summary is not None
assert summary["total"] == 4
assert summary["done"] == 2
assert summary["failed"] == 1
assert summary["other"] == 1 # pending
assert summary["all_terminal"] is False
def test_all_terminal(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "failed")
summary = bb.get_subtasks_summary("parent")
assert summary["all_terminal"] is True
assert summary["done"] == 1
assert summary["failed"] == 1
def test_no_subs_returns_none(self, project_env):
_, _, bb = project_env
_make_task(bb, "lonely", title="No subs")
summary = bb.get_subtasks_summary("lonely")
assert summary is None
def test_nonexistent_parent_returns_none(self, project_env):
_, _, bb = project_env
summary = bb.get_subtasks_summary("nonexistent")
assert summary is None
# ===================================================================
# U8: increment_round_count
# ===================================================================
class TestU8IncrementRound:
"""U8: round_count 递增并持久化"""
def test_increment(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
task = bb.get_task("parent")
assert task.round_count == 0
r1 = bb.increment_round_count("parent")
assert r1 == 1
r2 = bb.increment_round_count("parent")
assert r2 == 2
# 持久化验证
task2 = bb.get_task("parent")
assert task2.round_count == 2
# ===================================================================
# U9: mention prompt 构建
# ===================================================================
class TestU9MentionPrompt:
"""U9: _build_mention_prompt 包含关键内容"""
def test_prompt_content(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1", title="测试任务标题")
ticker = Ticker(registry=MagicMock())
ticker.spawner = MagicMock()
ticker.spawner.api_host = "127.0.0.1"
ticker.spawner.api_port = 8083
task = bb.get_task("t1")
mention_lines = ["- [agent-a] 这是一条 mention 消息"]
prompt = ticker._build_mention_prompt(
"agent-b", task, mention_lines, "test-proj")
assert "agent-b" in prompt
assert "测试任务标题" in prompt
assert "mention" in prompt.lower() or "@" in prompt or "" in prompt
assert "test-proj" in prompt
assert "8083" in prompt # API 端口
# ===================================================================
# U10: review prompt 构建
# ===================================================================
class TestU10ReviewPrompt:
"""U10: _build_review_prompt 输入契约验证
注意:_build_review_prompt 的 f-string 在 Python 3.9 有兼容性问题,
所以验证函数调用契约(参数),而非 prompt 文本内容。
prompt 内容在 Python 3.12+ 环境中测试。
"""
def test_review_prompt_basic(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Goal Task",
description="这是目标描述")
parent_task = bb.get_task("parent")
mock_spawner = MagicMock()
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
summary = {"done": 2, "failed": 1, "cancelled": 0, "total": 3}
prompt = ticker._build_review_prompt(
parent_task, summary, [], [], 1, project_id="test-proj")
# 验证调用契约(参数正确传递)
call_args = ticker._build_review_prompt.call_args
assert call_args[0][0].id == "parent" # parent_task
assert call_args[0][1] == summary # summary
assert call_args[0][4] == 1 # round_num
assert call_args[1]["project_id"] == "test-proj"
assert prompt == "Review prompt mock"
def test_review_prompt_with_failures(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Goal",
description="目标")
parent_task = bb.get_task("parent")
mock_spawner = MagicMock()
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
summary = {"done": 1, "failed": 2, "cancelled": 0, "total": 3}
prompt = ticker._build_review_prompt(
parent_task, summary, [], [], 2, project_id="proj-1")
# 验证含失败的 summary 正确传递
call_args = ticker._build_review_prompt.call_args
assert call_args[0][1]["failed"] == 2
assert call_args[0][4] == 2 # round_num
class TestU11ReviewingState:
"""reviewing 中间态防重复触发"""
def test_reviewing_skipped_in_round_check(self, project_env):
"""reviewing 状态的 parent 不触发一轮结束检测"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-1")
_make_task(bb, "s1", parent_task=parent)
# sub done
bb.update_task_status("s1", "claimed", agent="test")
bb.update_task_status("s1", "working", agent="test")
bb.update_task_status("s1", "review", agent="test")
bb.update_task_status("s1", "done", agent="test")
# parent done → reviewing
bb.update_task_status("parent-1", "claimed", agent="test")
bb.update_task_status("parent-1", "working", agent="test")
bb.update_task_status("parent-1", "review", agent="test")
bb.update_task_status("parent-1", "done", agent="test")
bb.update_task_status("parent-1", "reviewing", agent="daemon")
# 验证 summary 返回 reviewing
summary = bb.get_subtasks_summary("parent-1")
assert summary is not None
assert summary["parent_status"] == "reviewing"
# reviewing 不在 ("done", "failed") 中 → _check_round_complete 应跳过
assert summary["parent_status"] not in ("done", "failed")
def test_reviewing_not_overwritten_by_aggregation(self, project_env):
"""reviewing 是 MANUAL_STATUS,不被 compute_parent_status 覆盖"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-2")
_make_task(bb, "s1", parent_task=parent)
# sub done
bb.update_task_status("s1", "claimed", agent="test")
bb.update_task_status("s1", "working", agent="test")
bb.update_task_status("s1", "review", agent="test")
bb.update_task_status("s1", "done", agent="test")
# parent done → reviewing
bb.update_task_status("parent-2", "claimed", agent="test")
bb.update_task_status("parent-2", "working", agent="test")
bb.update_task_status("parent-2", "review", agent="test")
bb.update_task_status("parent-2", "done", agent="test")
bb.update_task_status("parent-2", "reviewing", agent="daemon")
# compute_parent_status 应返回 reviewing(不覆盖)
from src.blackboard.queries import Queries
q = Queries(bb.db_path)
computed = q.compute_parent_status("parent-2")
assert computed == "reviewing"
def test_reviewing_to_done_transition(self, project_env):
"""reviewing → done 转换合法(GOAL_ACHIEVED 时)"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-3")
bb.update_task_status("parent-3", "claimed", agent="test")
bb.update_task_status("parent-3", "working", agent="test")
bb.update_task_status("parent-3", "review", agent="test")
bb.update_task_status("parent-3", "done", agent="test")
bb.update_task_status("parent-3", "reviewing", agent="daemon")
# reviewing → doneGOAL_ACHIEVED 后)
result = bb.update_task_status("parent-3", "done", agent="daemon")
assert result is True
task = bb.get_task("parent-3")
assert task.status == "done"
def test_reviewing_to_working_transition(self, project_env):
"""reviewing → working 转换合法(继续下一轮时)"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-4")
bb.update_task_status("parent-4", "claimed", agent="test")
bb.update_task_status("parent-4", "working", agent="test")
bb.update_task_status("parent-4", "review", agent="test")
bb.update_task_status("parent-4", "done", agent="test")
bb.update_task_status("parent-4", "reviewing", agent="daemon")
# reviewing → working(继续下一轮)
result = bb.update_task_status("parent-4", "working", agent="daemon")
assert result is True
task = bb.get_task("parent-4")
assert task.status == "working"