diff --git a/tests/test_e2e_four_phase.py b/tests/test_e2e_four_phase.py new file mode 100644 index 0000000..4637567 --- /dev/null +++ b/tests/test_e2e_four_phase.py @@ -0,0 +1,556 @@ +"""#01 四相循环 E2E 集成测试 + +需要 daemon 运行 + RUN_INTEGRATION=1。覆盖: + E1 comment + @mention 端到端 + E2 一轮结束 → 庞统 review spawn + E3 多轮迭代(round_count 递增,mock 庞统创建 sub) + E4 round 上限强制停止 + E5 mention 重试(Agent busy,@pytest.mark.flaky) + E6 failed sub 触发 review(BUG-2 验证) + +基于 test_e2e_v31.py 的工具函数和模式。 +""" + +import json +import os +import sqlite3 +import sys +import time +import uuid +from pathlib import Path +from typing import Any, Dict + +import pytest +import requests as http_requests + +# ── 路径设置 ── +DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" +sys.path.insert(0, str(DEPLOY_DIR)) + +from src.utils import get_data_root + +# ── 常量 ── +API_BASE = "http://localhost:8083" +POLL_INTERVAL = 5 +MAX_WAIT_DISPATCH = 120 +MAX_WAIT_AGENT = 300 +E2E_PREFIX = "e2e-01-" +DATA_ROOT = get_data_root() + + +# ── 工具函数 ── + +def _check_environment(): + """环境前置检查""" + try: + resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) + data = resp.json() + if data.get("status") != "running" or not data.get("ticker_running"): + pytest.skip(f"Daemon not ready: {data}") + return data + except Exception as e: + pytest.skip(f"Production API not available at {API_BASE}: {e}") + + +def _cleanup_project(pid: str): + try: + http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) + except Exception: + pass + + +def _create_project(project_list: list, name_prefix: str = "E01", + agents: list = None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "zhaoyun-data"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": config, + }, timeout=10) + assert resp.status_code == 200, f"Create project failed: {resp.text}" + project_list.append(pid) + return pid + + +def _create_task(pid: str, **kwargs) -> str: + tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200, f"Create task failed: {resp.text}" + return tid + + +def _get_task(pid: str, tid: str) -> Dict[str, Any]: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert resp.status_code == 200, f"Get task failed: {resp.text}" + return resp.json() + + +def _update_status(pid: str, tid: str, status: str, + agent: str = "test") -> Dict: + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": status, "agent": agent}, timeout=10, + ) + return resp.json() + + +def _add_comment(pid: str, tid: str, author: str, body: str, + mentions: list = None) -> int: + """添加 comment 并返回 comment_id""" + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/comments", + json={"author": author, "body": body, "comment_type": "general", + "mentions": mentions or []}, + timeout=10, + ) + assert resp.status_code == 200, f"Add comment failed: {resp.text}" + data = resp.json() + return data.get("comment_id") or data.get("id") + + +def _get_db_path(pid: str) -> Path: + return DATA_ROOT / pid / "blackboard.db" + + +def _query_mention_queue(db_path: Path, status: str = None) -> list: + """直接查 DB 获取 mention_queue 记录""" + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + if status: + rows = conn.execute( + "SELECT * FROM mention_queue WHERE status=?", (status,) + ).fetchall() + else: + rows = conn.execute("SELECT * FROM mention_queue").fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def _set_round_count(db_path: Path, tid: str, count: int): + """直接修改 DB 中的 round_count""" + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET round_count=? WHERE id=?", (count, tid)) + conn.commit() + finally: + conn.close() + + +# =================================================================== +# E1: comment + @mention 端到端 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE01MentionE2E: + """E1: 写 comment @zhaoyun-data → mention 写入 → Agent spawn""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_mention_spawn_e2e(self): + pid = _create_project(self._projects, "E01-mention") + tid = _create_task(pid, title="E2E mention 测试", + description="测试 mention 端到端", + assignee="simayi-challenger") + + # 写 comment @zhaoyun-data + cid = _add_comment(pid, tid, "simayi-challenger", + "@zhaoyun-data 请查看这个任务", + mentions=["zhaoyun-data"]) + assert cid is not None + print(f"\n🚀 E01-E1: comment 已写入 (cid={cid})") + + # 等待 1-2 tick,检查 mention_queue + db_path = _get_db_path(pid) + time.sleep(10) # 等 tick 处理 + + mentions = _query_mention_queue(db_path, status="pending") + if not mentions: + # 可能已经被 spawn 了,查 notified + mentions = _query_mention_queue(db_path, status="notified") + + assert len(mentions) >= 1, ( + f"mention_queue 中没有找到 zhaoyun-data 的记录。" + f"\n所有 mentions: {_query_mention_queue(db_path)}" + ) + + mention = mentions[0] + assert mention["mentioned_agent"] == "zhaoyun-data" + print(f" mention 状态: {mention['status']}") + print(f" ✅ mention 端到端写入成功") + + # 等待 spawn(可能需要 1-2 tick) + if mention["status"] == "pending": + time.sleep(30) + mentions2 = _query_mention_queue(db_path, status="notified") + assert len(mentions2) >= 1, ( + f"mention 未被 spawn(30s 后仍为 pending)" + ) + print(f" ✅ mention spawn 成功,status=notified") + else: + print(f" ✅ mention 已 spawn(首次查询时即为 notified)") + + +# =================================================================== +# E2: 一轮结束 → 庞统 review spawn +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE02RoundComplete: + """E2: parent 下所有 sub done → 一轮结束 → 庞统 review spawn""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_round_complete_triggers_review(self): + pid = _create_project(self._projects, "E02-round", + agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) + parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", + title="E2E Parent Task", + description="一轮结束测试") + sub1 = _create_task(pid, title="E2E Sub1", + description="子任务1", assignee="zhangfei-dev", + parent_task=parent_tid) + sub2 = _create_task(pid, title="E2E Sub2", + description="子任务2", assignee="simayi-challenger", + parent_task=parent_tid) + + # 手动推所有 sub 到 done + for tid in (sub1, sub2): + _update_status(pid, tid, "claimed", agent="test") + _update_status(pid, tid, "working", agent="test") + _update_status(pid, tid, "review", agent="test") + _update_status(pid, tid, "done", agent="test") + + print(f"\n🚀 E01-E2: 所有 sub 已 done,等待 ticker 检测") + + # 等待 ticker 聚合 parent + 一轮结束检测(2-3 tick) + db_path = _get_db_path(pid) + deadline = time.time() + MAX_WAIT_DISPATCH + round_count = 0 + + while time.time() < deadline: + time.sleep(POLL_INTERVAL) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT round_count, status FROM tasks WHERE id=?", + (parent_tid,) + ).fetchone() + if row and row["round_count"] > 0: + round_count = row["round_count"] + break + finally: + conn.close() + + assert round_count >= 1, ( + f"一轮结束未触发!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0" + ) + print(f" round_count={round_count}") + print(f" ✅ 一轮结束检测成功,庞统 review 已 spawn") + + +# =================================================================== +# E3: 多轮迭代(round_count 递增) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.slow +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE03MultiRound: + """E3: round 1 → 庞统 review → round 2 + + 验证 round_count 递增 + 庞统被 spawn。 + 不依赖庞统真实创建 sub task(只验证 spawn 调用)。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_multi_round_increment(self): + pid = _create_project(self._projects, "E03-multi", + agents=["pangtong-fujunshi", "zhangfei-dev"]) + parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", + title="E2E Multi-Round", + description="多轮测试") + sub1 = _create_task(pid, title="Round1 Sub", + description="第一轮子任务", + assignee="zhangfei-dev", + parent_task=parent_tid) + _update_status(pid, sub1, "claimed", agent="zhangfei-dev") + _update_status(pid, sub1, "working", agent="zhangfei-dev") + _update_status(pid, sub1, "review", agent="zhangfei-dev") + _update_status(pid, sub1, "done", agent="zhangfei-dev") + + print(f"\n🚀 E01-E3: 等待 Round 1 review") + + db_path = _get_db_path(pid) + # 等 round_count=1 + deadline = time.time() + MAX_WAIT_DISPATCH + while time.time() < deadline: + time.sleep(POLL_INTERVAL) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT round_count FROM tasks WHERE id=?", + (parent_tid,) + ).fetchone() + if row and row["round_count"] >= 1: + print(f" Round 1: round_count={row['round_count']}") + break + finally: + conn.close() + else: + pytest.fail("Round 1 未触发(超时)") + + # 手动创建第二轮 sub → done,模拟庞统创建了新 sub + sub2 = _create_task(pid, title="Round2 Sub", + description="第二轮子任务", + assignee="zhangfei-dev", + parent_task=parent_tid) + _update_status(pid, sub2, "claimed", agent="zhangfei-dev") + _update_status(pid, sub2, "working", agent="zhangfei-dev") + _update_status(pid, sub2, "review", agent="zhangfei-dev") + _update_status(pid, sub2, "done", agent="zhangfei-dev") + + print(f" 等待 Round 2 review") + deadline2 = time.time() + MAX_WAIT_DISPATCH + while time.time() < deadline2: + time.sleep(POLL_INTERVAL) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT round_count FROM tasks WHERE id=?", + (parent_tid,) + ).fetchone() + if row and row["round_count"] >= 2: + print(f" Round 2: round_count={row['round_count']}") + break + finally: + conn.close() + else: + pytest.fail("Round 2 未触发(超时)") + + print(f" ✅ 多轮迭代验证成功(round_count 递增到 2)") + + +# =================================================================== +# E4: round 上限强制停止 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE04RoundLimit: + """E4: round_count=5 后不再触发 review""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_round_limit_stops_review(self): + pid = _create_project(self._projects, "E04-limit", + agents=["pangtong-fujunshi", "zhangfei-dev"]) + parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", + title="E2E Round Limit", + description="上限测试") + sub1 = _create_task(pid, title="Limit Sub", + description="上限测试子任务", + assignee="zhangfei-dev", + parent_task=parent_tid) + _update_status(pid, sub1, "claimed", agent="zhangfei-dev") + _update_status(pid, sub1, "working", agent="zhangfei-dev") + _update_status(pid, sub1, "done", agent="zhangfei-dev") + + # 手动设 round_count=5 + db_path = _get_db_path(pid) + _set_round_count(db_path, parent_tid, 5) + + print(f"\n🚀 E01-E4: round_count=5,等待 2 tick 确认不触发") + + # 等 2 tick + time.sleep(60) + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT round_count FROM tasks WHERE id=?", + (parent_tid,) + ).fetchone() + final_round = row["round_count"] if row else None + finally: + conn.close() + + assert final_round == 5, ( + f"round_count 应保持 5,实际: {final_round}" + ) + print(f" round_count={final_round}(未递增)") + print(f" ✅ round 上限验证成功") + + +# =================================================================== +# E5: mention 重试(Agent busy) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.flaky +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE05MentionRetry: + """E5: mention 重试场景(依赖 Agent 状态,标记 flaky)""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_mention_retry_or_success(self): + """验证 mention 最终被处理(成功或重试)""" + pid = _create_project(self._projects, "E05-retry") + tid = _create_task(pid, title="E2E Mention Retry", + description="mention 重试测试", + assignee="simayi-challenger") + + _add_comment(pid, tid, "simayi-challenger", + "@zhaoyun-data 请处理", mentions=["zhaoyun-data"]) + + db_path = _get_db_path(pid) + + # 等待处理 + deadline = time.time() + MAX_WAIT_DISPATCH + while time.time() < deadline: + time.sleep(POLL_INTERVAL) + mentions = _query_mention_queue(db_path) + if not mentions: + continue + + all_resolved = all( + m["status"] in ("notified", "failed") + for m in mentions + ) + if all_resolved: + break + + mentions = _query_mention_queue(db_path) + assert len(mentions) >= 1 + + for m in mentions: + print(f" mention: agent={m['mentioned_agent']} " + f"status={m['status']} retry_count={m['retry_count']}") + assert m["status"] in ("notified", "failed"), ( + f"mention 未被处理: {m}" + ) + + print(f" ✅ mention 重试/成功验证完成") + + +# =================================================================== +# E6: failed sub 触发 review(BUG-2 验证) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE06FailedSubReview: + """E6: done + failed 都是终态,触发一轮结束检测""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_failed_sub_triggers_review(self): + pid = _create_project(self._projects, "E06-failed", + agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) + parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", + title="E2E Failed Sub Review", + description="失败子任务触发 review 测试") + sub1 = _create_task(pid, title="Sub Done", + description="完成的子任务", + assignee="zhangfei-dev", + parent_task=parent_tid) + sub2 = _create_task(pid, title="Sub Failed", + description="失败的子任务", + assignee="simayi-challenger", + parent_task=parent_tid) + + # sub1 → done + _update_status(pid, sub1, "claimed", agent="zhangfei-dev") + _update_status(pid, sub1, "working", agent="zhangfei-dev") + _update_status(pid, sub1, "review", agent="zhangfei-dev") + _update_status(pid, sub1, "done", agent="zhangfei-dev") + + # sub2 → failed + _update_status(pid, sub2, "claimed", agent="simayi-challenger") + _update_status(pid, sub2, "working", agent="simayi-challenger") + _update_status(pid, sub2, "failed", agent="simayi-challenger") + + print(f"\n🚀 E01-E6: sub1=done, sub2=failed,等待一轮结束检测") + + db_path = _get_db_path(pid) + deadline = time.time() + MAX_WAIT_DISPATCH + round_count = 0 + + while time.time() < deadline: + time.sleep(POLL_INTERVAL) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + "SELECT round_count FROM tasks WHERE id=?", + (parent_tid,) + ).fetchone() + if row and row["round_count"] > 0: + round_count = row["round_count"] + break + finally: + conn.close() + + assert round_count >= 1, ( + f"failed sub 未触发一轮结束!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0" + ) + print(f" round_count={round_count}") + print(f" ✅ failed sub 触发 review 成功(BUG-2 验证通过)")