"""#01 四相循环 E2E 集成测试 需要 daemon 运行 + RUN_INTEGRATION=1。覆盖: E1 comment + @mention 端到端 E2 一轮结束 → 庞统 review spawn E3 多轮迭代(round_count 递增,mock 庞统创建 sub) E4 round 上限强制停止 E5 mention 重试(Agent busy,@pytest.mark.flaky) E6 failed sub 触发 review(BUG-2 验证) 基于 test_e2e_v31.py 的工具函数和模式。 """ import json import os import sqlite3 import sys import time import uuid from pathlib import Path from typing import Any, Dict import pytest import requests as http_requests # ── 路径设置 ── DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" sys.path.insert(0, str(DEPLOY_DIR)) from src.utils import get_data_root # ── 常量 ── API_BASE = "http://localhost:8083" POLL_INTERVAL = 5 MAX_WAIT_DISPATCH = 120 MAX_WAIT_AGENT = 300 E2E_PREFIX = "e2e-01-" DATA_ROOT = get_data_root() # ── 工具函数 ── def _check_environment(): """环境前置检查""" try: resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) data = resp.json() if data.get("status") != "running" or not data.get("ticker_running"): pytest.skip(f"Daemon not ready: {data}") return data except Exception as e: pytest.skip(f"Production API not available at {API_BASE}: {e}") def _cleanup_project(pid: str): try: http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) except Exception: pass def _create_project(project_list: list, name_prefix: str = "E01", agents: list = None) -> str: pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "zhaoyun-data"]} resp = http_requests.post(f"{API_BASE}/api/projects", json={ "id": pid, "name": f"{name_prefix}-{pid}", "config": config, }, timeout=10) assert resp.status_code == 200, f"Create project failed: {resp.text}" project_list.append(pid) return pid def _create_task(pid: str, **kwargs) -> str: tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}" body = {"id": tid, "status": "pending", "priority": 5, **kwargs} resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, ) assert resp.status_code == 200, f"Create task failed: {resp.text}" return tid def _get_task(pid: str, tid: str) -> Dict[str, Any]: resp = http_requests.get( f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, ) assert resp.status_code == 200, f"Get task failed: {resp.text}" return resp.json() def _update_status(pid: str, tid: str, status: str, agent: str = "test") -> Dict: resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", json={"status": status, "agent": agent}, timeout=10, ) return resp.json() def _add_comment(pid: str, tid: str, author: str, body: str, mentions: list = None) -> int: """添加 comment 并返回 comment_id""" resp = http_requests.post( f"{API_BASE}/api/projects/{pid}/tasks/{tid}/comments", json={"author": author, "body": body, "comment_type": "general", "mentions": mentions or []}, timeout=10, ) assert resp.status_code == 200, f"Add comment failed: {resp.text}" data = resp.json() return data.get("comment_id") or data.get("id") def _get_db_path(pid: str) -> Path: return DATA_ROOT / pid / "blackboard.db" def _query_mention_queue(db_path: Path, status: str = None) -> list: """直接查 DB 获取 mention_queue 记录""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: if status: rows = conn.execute( "SELECT * FROM mention_queue WHERE status=?", (status,) ).fetchall() else: rows = conn.execute("SELECT * FROM mention_queue").fetchall() return [dict(r) for r in rows] finally: conn.close() def _set_round_count(db_path: Path, tid: str, count: int): """直接修改 DB 中的 round_count""" conn = sqlite3.connect(str(db_path)) try: conn.execute( "UPDATE tasks SET round_count=? WHERE id=?", (count, tid)) conn.commit() finally: conn.close() # =================================================================== # E1: comment + @mention 端到端 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE01MentionE2E: """E1: 写 comment @zhaoyun-data → mention 写入 → Agent spawn""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_mention_spawn_e2e(self): pid = _create_project(self._projects, "E01-mention") tid = _create_task(pid, title="E2E mention 测试", description="测试 mention 端到端", assignee="simayi-challenger") # 写 comment @zhaoyun-data cid = _add_comment(pid, tid, "simayi-challenger", "@zhaoyun-data 请查看这个任务", mentions=["zhaoyun-data"]) assert cid is not None print(f"\n🚀 E01-E1: comment 已写入 (cid={cid})") # 等待 1-2 tick,检查 mention_queue db_path = _get_db_path(pid) time.sleep(10) # 等 tick 处理 mentions = _query_mention_queue(db_path, status="pending") if not mentions: # 可能已经被 spawn 了,查 notified mentions = _query_mention_queue(db_path, status="notified") assert len(mentions) >= 1, ( f"mention_queue 中没有找到 zhaoyun-data 的记录。" f"\n所有 mentions: {_query_mention_queue(db_path)}" ) mention = mentions[0] assert mention["mentioned_agent"] == "zhaoyun-data" print(f" mention 状态: {mention['status']}") print(f" ✅ mention 端到端写入成功") # 等待 spawn(可能需要 1-2 tick) if mention["status"] == "pending": time.sleep(30) mentions2 = _query_mention_queue(db_path, status="notified") assert len(mentions2) >= 1, ( f"mention 未被 spawn(30s 后仍为 pending)" ) print(f" ✅ mention spawn 成功,status=notified") else: print(f" ✅ mention 已 spawn(首次查询时即为 notified)") # =================================================================== # E2: 一轮结束 → 庞统 review spawn # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE02RoundComplete: """E2: parent 下所有 sub done → 一轮结束 → 庞统 review spawn""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_round_complete_triggers_review(self): pid = _create_project(self._projects, "E02-round", agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", title="E2E Parent Task", description="一轮结束测试") sub1 = _create_task(pid, title="E2E Sub1", description="子任务1", assignee="zhangfei-dev", parent_task=parent_tid) sub2 = _create_task(pid, title="E2E Sub2", description="子任务2", assignee="simayi-challenger", parent_task=parent_tid) # 手动推所有 sub 到 done for tid in (sub1, sub2): _update_status(pid, tid, "claimed", agent="test") _update_status(pid, tid, "working", agent="test") _update_status(pid, tid, "review", agent="test") _update_status(pid, tid, "done", agent="test") print(f"\n🚀 E01-E2: 所有 sub 已 done,等待 ticker 检测") # 等待 ticker 聚合 parent + 一轮结束检测(2-3 tick) db_path = _get_db_path(pid) deadline = time.time() + MAX_WAIT_DISPATCH round_count = 0 while time.time() < deadline: time.sleep(POLL_INTERVAL) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT round_count, status FROM tasks WHERE id=?", (parent_tid,) ).fetchone() if row and row["round_count"] > 0: round_count = row["round_count"] break finally: conn.close() assert round_count >= 1, ( f"一轮结束未触发!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0" ) print(f" round_count={round_count}") print(f" ✅ 一轮结束检测成功,庞统 review 已 spawn") # =================================================================== # E3: 多轮迭代(round_count 递增) # =================================================================== @pytest.mark.integration @pytest.mark.slow @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE03MultiRound: """E3: round 1 → 庞统 review → round 2 验证 round_count 递增 + 庞统被 spawn。 不依赖庞统真实创建 sub task(只验证 spawn 调用)。 """ @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_multi_round_increment(self): pid = _create_project(self._projects, "E03-multi", agents=["pangtong-fujunshi", "zhangfei-dev"]) parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", title="E2E Multi-Round", description="多轮测试") sub1 = _create_task(pid, title="Round1 Sub", description="第一轮子任务", assignee="zhangfei-dev", parent_task=parent_tid) _update_status(pid, sub1, "claimed", agent="zhangfei-dev") _update_status(pid, sub1, "working", agent="zhangfei-dev") _update_status(pid, sub1, "review", agent="zhangfei-dev") _update_status(pid, sub1, "done", agent="zhangfei-dev") print(f"\n🚀 E01-E3: 等待 Round 1 review") db_path = _get_db_path(pid) # 等 round_count=1 deadline = time.time() + MAX_WAIT_DISPATCH while time.time() < deadline: time.sleep(POLL_INTERVAL) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT round_count FROM tasks WHERE id=?", (parent_tid,) ).fetchone() if row and row["round_count"] >= 1: print(f" Round 1: round_count={row['round_count']}") break finally: conn.close() else: pytest.fail("Round 1 未触发(超时)") # 手动创建第二轮 sub → done,模拟庞统创建了新 sub sub2 = _create_task(pid, title="Round2 Sub", description="第二轮子任务", assignee="zhangfei-dev", parent_task=parent_tid) _update_status(pid, sub2, "claimed", agent="zhangfei-dev") _update_status(pid, sub2, "working", agent="zhangfei-dev") _update_status(pid, sub2, "review", agent="zhangfei-dev") _update_status(pid, sub2, "done", agent="zhangfei-dev") print(f" 等待 Round 2 review") deadline2 = time.time() + MAX_WAIT_DISPATCH while time.time() < deadline2: time.sleep(POLL_INTERVAL) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT round_count FROM tasks WHERE id=?", (parent_tid,) ).fetchone() if row and row["round_count"] >= 2: print(f" Round 2: round_count={row['round_count']}") break finally: conn.close() else: pytest.fail("Round 2 未触发(超时)") print(f" ✅ 多轮迭代验证成功(round_count 递增到 2)") # =================================================================== # E4: round 上限强制停止 # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE04RoundLimit: """E4: round_count=5 后不再触发 review""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_round_limit_stops_review(self): pid = _create_project(self._projects, "E04-limit", agents=["pangtong-fujunshi", "zhangfei-dev"]) parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", title="E2E Round Limit", description="上限测试") sub1 = _create_task(pid, title="Limit Sub", description="上限测试子任务", assignee="zhangfei-dev", parent_task=parent_tid) _update_status(pid, sub1, "claimed", agent="zhangfei-dev") _update_status(pid, sub1, "working", agent="zhangfei-dev") _update_status(pid, sub1, "done", agent="zhangfei-dev") # 手动设 round_count=5 db_path = _get_db_path(pid) _set_round_count(db_path, parent_tid, 5) print(f"\n🚀 E01-E4: round_count=5,等待 2 tick 确认不触发") # 等 2 tick time.sleep(60) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT round_count FROM tasks WHERE id=?", (parent_tid,) ).fetchone() final_round = row["round_count"] if row else None finally: conn.close() assert final_round == 5, ( f"round_count 应保持 5,实际: {final_round}" ) print(f" round_count={final_round}(未递增)") print(f" ✅ round 上限验证成功") # =================================================================== # E5: mention 重试(Agent busy) # =================================================================== @pytest.mark.integration @pytest.mark.flaky @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE05MentionRetry: """E5: mention 重试场景(依赖 Agent 状态,标记 flaky)""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_mention_retry_or_success(self): """验证 mention 最终被处理(成功或重试)""" pid = _create_project(self._projects, "E05-retry") tid = _create_task(pid, title="E2E Mention Retry", description="mention 重试测试", assignee="simayi-challenger") _add_comment(pid, tid, "simayi-challenger", "@zhaoyun-data 请处理", mentions=["zhaoyun-data"]) db_path = _get_db_path(pid) # 等待处理 deadline = time.time() + MAX_WAIT_DISPATCH while time.time() < deadline: time.sleep(POLL_INTERVAL) mentions = _query_mention_queue(db_path) if not mentions: continue all_resolved = all( m["status"] in ("notified", "failed") for m in mentions ) if all_resolved: break mentions = _query_mention_queue(db_path) assert len(mentions) >= 1 for m in mentions: print(f" mention: agent={m['mentioned_agent']} " f"status={m['status']} retry_count={m['retry_count']}") assert m["status"] in ("notified", "failed"), ( f"mention 未被处理: {m}" ) print(f" ✅ mention 重试/成功验证完成") # =================================================================== # E6: failed sub 触发 review(BUG-2 验证) # =================================================================== @pytest.mark.integration @pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), reason="Set RUN_INTEGRATION=1 to run real agent tests") class TestE06FailedSubReview: """E6: done + failed 都是终态,触发一轮结束检测""" @pytest.fixture(autouse=True) def setup_env(self): _check_environment() self._projects = [] yield for pid in self._projects: _cleanup_project(pid) def test_failed_sub_triggers_review(self): pid = _create_project(self._projects, "E06-failed", agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"]) parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}", title="E2E Failed Sub Review", description="失败子任务触发 review 测试") sub1 = _create_task(pid, title="Sub Done", description="完成的子任务", assignee="zhangfei-dev", parent_task=parent_tid) sub2 = _create_task(pid, title="Sub Failed", description="失败的子任务", assignee="simayi-challenger", parent_task=parent_tid) # sub1 → done _update_status(pid, sub1, "claimed", agent="zhangfei-dev") _update_status(pid, sub1, "working", agent="zhangfei-dev") _update_status(pid, sub1, "review", agent="zhangfei-dev") _update_status(pid, sub1, "done", agent="zhangfei-dev") # sub2 → failed _update_status(pid, sub2, "claimed", agent="simayi-challenger") _update_status(pid, sub2, "working", agent="simayi-challenger") _update_status(pid, sub2, "failed", agent="simayi-challenger") print(f"\n🚀 E01-E6: sub1=done, sub2=failed,等待一轮结束检测") db_path = _get_db_path(pid) deadline = time.time() + MAX_WAIT_DISPATCH round_count = 0 while time.time() < deadline: time.sleep(POLL_INTERVAL) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: row = conn.execute( "SELECT round_count FROM tasks WHERE id=?", (parent_tid,) ).fetchone() if row and row["round_count"] > 0: round_count = row["round_count"] break finally: conn.close() assert round_count >= 1, ( f"failed sub 未触发一轮结束!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0" ) print(f" round_count={round_count}") print(f" ✅ failed sub 触发 review 成功(BUG-2 验证通过)")