Files
sanguo_moziplus_v2/tests/test_e2e_four_phase.py
T
2026-05-29 12:26:59 +08:00

630 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""#01 四相循环 E2E 集成测试
需要 daemon 运行 + RUN_INTEGRATION=1。覆盖:
E1 comment + @mention 端到端(真实 Agent spawn
E2 一轮结束 → 庞统 review spawn(真实庞统 spawn
E3 多轮迭代(庞统真实 spawn + 真实创建 sub task
E4 round 上限强制停止
E5 mention 重试(可靠制造 Agent busy
E6 failed sub 触发 reviewBUG-2 验证)
B1-B6 边界测试
全部真实环境执行,不 mock Agent 行为。
"""
import json
import os
import sqlite3
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List
import pytest
import requests as http_requests
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.utils import get_data_root
# ── 常量 ──
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5
MAX_WAIT_DISPATCH = 120
MAX_WAIT_PANGTONG = 900
E2E_PREFIX = "e2e-01-"
DATA_ROOT = get_data_root()
# ── 工具函数 ──
def _check_environment():
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
except Exception as e:
pytest.skip(f"Production API not available: {e}")
def _cleanup_project(pid: str):
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _create_project(plist: list, prefix: str = "E01",
agents: list = None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "zhaoyun-data"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid, "name": f"{prefix}-{pid}", "config": config,
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
plist.append(pid)
return pid
def _create_task(pid: str, **kwargs) -> str:
tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def _get_task(pid: str, tid: str) -> Dict:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}?expand=all", timeout=10)
assert resp.status_code == 200
return resp.json()
def _list_tasks(pid: str, **params) -> List[Dict]:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks", params=params, timeout=10)
assert resp.status_code == 200
data = resp.json()
# API 返回 {"tasks": [...]} 或直接 [...]
if isinstance(data, dict) and "tasks" in data:
return data["tasks"]
return data
def _update_status(pid: str, tid: str, status: str,
agent: str = "test", detail: str = "") -> Dict:
body = {"status": status, "agent": agent}
if detail:
body["detail"] = detail
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json=body, timeout=10)
return resp.json()
def _add_comment(pid: str, tid: str, author: str, body: str,
mentions: list = None) -> int:
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/comments",
json={"author": author, "body": body, "comment_type": "general",
"mentions": mentions or []},
timeout=10)
assert resp.status_code == 200
data = resp.json()
return data.get("comment_id") or data.get("id")
def _get_db_path(pid: str) -> Path:
return DATA_ROOT / pid / "blackboard.db"
def _query_mentions(db_path: Path, status: str = None) -> list:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
if status:
rows = conn.execute(
"SELECT * FROM mention_queue WHERE status=?", (status,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM mention_queue").fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def _set_round_count(db_path: Path, tid: str, count: int):
conn = sqlite3.connect(str(db_path))
try:
conn.execute("UPDATE tasks SET round_count=? WHERE id=?", (count, tid))
conn.commit()
finally:
conn.close()
def _get_round_count(db_path: Path, tid: str) -> int:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count FROM tasks WHERE id=?", (tid,)).fetchone()
return row["round_count"] if row else 0
finally:
conn.close()
def _count_subtasks(db_path: Path, parent_tid: str) -> int:
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute(
"SELECT COUNT(*) FROM tasks WHERE parent_task=?", (parent_tid,)
).fetchone()
return row[0]
finally:
conn.close()
def _wait_round(db_path: Path, tid: str, min_count: int,
timeout: int = MAX_WAIT_DISPATCH) -> int:
deadline = time.time() + timeout
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
rc = _get_round_count(db_path, tid)
if rc >= min_count:
return rc
rc = _get_round_count(db_path, tid)
pytest.fail(f"round_count < {min_count} after {timeout}s (now={rc})")
def _wait_subtasks(db_path: Path, parent_tid: str, min_count: int,
timeout: int = MAX_WAIT_PANGTONG) -> int:
deadline = time.time() + timeout
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
cnt = _count_subtasks(db_path, parent_tid)
if cnt >= min_count:
return cnt
cnt = _count_subtasks(db_path, parent_tid)
pytest.fail(f"subtasks < {min_count} after {timeout}s (now={cnt})")
def _push_done(pid: str, tid: str, agent: str = "test"):
for s in ("claimed", "working", "review", "done"):
_update_status(pid, tid, s, agent=agent)
def _push_failed(pid: str, tid: str, agent: str = "test"):
_update_status(pid, tid, "claimed", agent=agent)
_update_status(pid, tid, "working", agent=agent)
_update_status(pid, tid, "failed", agent=agent, detail="E2E forced failure")
# ===================================================================
# E1: comment + @mention 端到端
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE01MentionE2E:
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_mention_spawn_e2e(self):
pid = _create_project(self._p, "E01")
tid = _create_task(pid, title="E2E mention", description="mention test",
assignee="simayi-challenger")
_add_comment(pid, tid, "simayi-challenger",
"@zhaoyun-data 请查看", mentions=["zhaoyun-data"])
print(f"\n🚀 E1: comment written")
db = _get_db_path(pid)
deadline = time.time() + MAX_WAIT_DISPATCH
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
ms = _query_mentions(db)
if ms and ms[0]["status"] in ("notified", "failed"):
break
else:
pytest.fail(f"mention not processed after {MAX_WAIT_DISPATCH}s")
m = _query_mentions(db)[0]
assert m["mentioned_agent"] == "zhaoyun-data"
assert m["status"] == "notified", f"spawn failed: {m['status']}"
print(f" ✅ mention e2e OK (spawned)")
# ===================================================================
# E2: 一轮结束 → 庞统 review spawn
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE02RoundComplete:
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_round_complete_triggers_review(self):
pid = _create_project(self._p, "E02",
agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"])
parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Parent", description="round test")
s1 = _create_task(pid, title="Sub1", description="s1",
assignee="zhangfei-dev", parent_task=parent)
s2 = _create_task(pid, title="Sub2", description="s2",
assignee="simayi-challenger", parent_task=parent)
_push_done(pid, s1, "zhangfei-dev")
_push_done(pid, s2, "simayi-challenger")
print(f"\n🚀 E2: subs done, waiting for review")
db = _get_db_path(pid)
rc = _wait_round(db, parent, 1)
print(f" round_count={rc}")
print(f" ✅ E2 round complete OK")
# ===================================================================
# E3: 多轮迭代(庞统真实 spawn + 真实创建 sub task
# ===================================================================
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE03MultiRound:
"""Round 1 → 庞统真实 review → 创建新 sub → Round 2
庞统完整链路:spawn → 读黑板 → 创建 sub task。
耗时 10-20 分钟。
"""
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_multi_round_full_chain(self):
pid = _create_project(self._p, "E03",
agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"])
parent = _create_task(
pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Multi-Round: Hello World",
description="创建 hello.py 输出 Hello World。第一轮创建,第二轮验证。",
task_type="coding",
must_haves=json.dumps({"capability": "python"}))
s1 = _create_task(pid, title="Round1: 创建 hello.py",
description="创建 hello.py: print('Hello World')",
assignee="zhangfei-dev", parent_task=parent)
_push_done(pid, s1, "zhangfei-dev")
print(f"\n🚀 E3 R1: sub done, waiting pangtong review")
db = _get_db_path(pid)
rc1 = _wait_round(db, parent, 1, timeout=MAX_WAIT_PANGTONG)
print(f" R1: round_count={rc1}, waiting for new sub tasks...")
# 等庞统创建新 sub task
cnt = _wait_subtasks(db, parent, 2, timeout=MAX_WAIT_PANGTONG)
print(f" pangtong created new subs (total={cnt})")
# 推新 sub 到 done
tasks = _list_tasks(pid, parent_task=parent)
new = [t for t in tasks if t["id"] != s1
and t["status"] not in ("done", "failed")]
assert len(new) >= 1, f"no new subs: {[t['id']+'='+t['status'] for t in tasks]}"
for t in new:
_push_done(pid, t["id"], "zhangfei-dev")
print(f" pushed '{t['title']}' → done")
rc2 = _wait_round(db, parent, 2, timeout=MAX_WAIT_PANGTONG)
print(f" R2: round_count={rc2}")
print(f" ✅ E3 multi-round OK (pangtong real spawn + new sub)")
# ===================================================================
# E4: round 上限强制停止
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE04RoundLimit:
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_round_limit(self):
pid = _create_project(self._p, "E04",
agents=["pangtong-fujunshi", "zhangfei-dev"])
parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Limit", description="limit test")
s1 = _create_task(pid, title="Sub", description="limit sub",
assignee="zhangfei-dev", parent_task=parent)
_push_done(pid, s1, "zhangfei-dev")
db = _get_db_path(pid)
_set_round_count(db, parent, 5)
print(f"\n🚀 E4: round_count=5, waiting 2 ticks")
time.sleep(60)
rc = _get_round_count(db, parent)
assert rc == 5, f"round_count changed: {rc}"
print(f" ✅ E4 round limit OK (rc=5 unchanged)")
# ===================================================================
# E5: mention 重试(可靠 Agent busy
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE05MentionRetry:
"""可靠制造 busy:先让 zhaoyun-data 有个 working task,再 @ 它。
1. 创建 blocker task → ticker dispatch → zhaoyun-data working
2. 创建 mention task → @zhaoyun-data → busy → retry
3. 推 blocker done → zhaoyun-data 空闲 → mention spawn 成功
"""
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_mention_retry_on_busy(self):
pid = _create_project(self._p, "E05",
agents=["zhaoyun-data", "simayi-challenger"])
# 1. blocker task
blocker = _create_task(pid, title="E2E Blocker",
description="占用 zhaoyun-data",
assignee="zhaoyun-data")
print(f"\n🚀 E5: blocker {blocker}, waiting for zhaoyun-data spawn")
deadline = time.time() + MAX_WAIT_DISPATCH
busy = False
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
t = _get_task(pid, blocker)
if t["status"] in ("claimed", "working"):
busy = True
print(f" zhaoyun-data busy: status={t['status']}")
break
if not busy:
_update_status(pid, blocker, "claimed", agent="zhaoyun-data")
_update_status(pid, blocker, "working", agent="zhaoyun-data")
print(f" forced blocker → working")
# 2. @zhaoyun-data on another task
mtid = _create_task(pid, title="E2E Mention Task",
description="mention retry test",
assignee="simayi-challenger")
_add_comment(pid, mtid, "simayi-challenger",
"@zhaoyun-data 请查看", mentions=["zhaoyun-data"])
print(f" mention created while zhaoyun-data busy")
db = _get_db_path(pid)
time.sleep(35) # 1 tick
ms = _query_mentions(db)
assert len(ms) >= 1, "no mention written"
print(f" mention: status={ms[0]['status']} retry={ms[0]['retry_count']}")
# 3. release zhaoyun-data
_update_status(pid, blocker, "review", agent="zhaoyun-data")
_update_status(pid, blocker, "done", agent="zhaoyun-data")
print(f" blocker → done, zhaoyun-data free")
# 4. wait for mention resolution
deadline2 = time.time() + MAX_WAIT_DISPATCH
while time.time() < deadline2:
time.sleep(POLL_INTERVAL)
ms2 = _query_mentions(db)
if ms2 and ms2[0]["status"] in ("notified", "failed"):
break
mf = _query_mentions(db)[0]
print(f" final: status={mf['status']} retry={mf['retry_count']}")
assert mf["status"] in ("notified", "failed"), f"unresolved: {mf}"
if mf["retry_count"] > 0:
print(f" ✅ E5 retry verified (retry_count={mf['retry_count']})")
else:
print(f" ✅ E5 resolved directly (timing OK)")
print(f" ✅ E5 mention retry OK")
# ===================================================================
# E6: failed sub 触发 review
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestE06FailedSubReview:
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_failed_sub_review(self):
pid = _create_project(self._p, "E06",
agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"])
parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Failed Sub", description="BUG-2 test")
s1 = _create_task(pid, title="Done", description="s1",
assignee="zhangfei-dev", parent_task=parent)
s2 = _create_task(pid, title="Failed", description="s2",
assignee="simayi-challenger", parent_task=parent)
_push_done(pid, s1, "zhangfei-dev")
_push_failed(pid, s2, "simayi-challenger")
print(f"\n🚀 E6: done+failed, waiting for review")
db = _get_db_path(pid)
rc = _wait_round(db, parent, 1)
print(f" round_count={rc}")
print(f" ✅ E6 failed-sub review OK (BUG-2 verified)")
# ===================================================================
# B1-B6: 边界测试
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="RUN_INTEGRATION=1 required")
class TestBoundary:
@pytest.fixture(autouse=True)
def setup(self):
_check_environment()
self._p = []
yield
for p in self._p:
_cleanup_project(p)
def test_B1_empty_mentions(self):
"""空 mentions 列表 → 不写入 mention_queue"""
pid = _create_project(self._p, "B1")
tid = _create_task(pid, title="B1")
_add_comment(pid, tid, "test", "no mentions", mentions=[])
db = _get_db_path(pid)
assert len(_query_mentions(db)) == 0
print(f" ✅ B1: empty mentions → no queue entry")
def test_B2_nonexistent_agent(self):
"""不存在的 agent_id → mention 写入成功,spawn 失败后 retry 递增"""
pid = _create_project(self._p, "B2", agents=["simayi-challenger"])
tid = _create_task(pid, title="B2")
_add_comment(pid, tid, "test", "@ghost-agent 你在哪",
mentions=["ghost-agent"])
db = _get_db_path(pid)
# 等写入
time.sleep(2)
ms = _query_mentions(db)
assert len(ms) >= 1
assert ms[0]["mentioned_agent"] == "ghost-agent"
assert ms[0]["status"] == "pending"
# 等 ticker 尝试 spawn → 失败 → retry_count 递增
deadline = time.time() + MAX_WAIT_DISPATCH
retried = False
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
ms2 = _query_mentions(db)
if ms2 and ms2[0]["retry_count"] > 0:
retried = True
print(f" ghost-agent retry_count={ms2[0]['retry_count']}")
break
if ms2 and ms2[0]["status"] == "failed":
retried = True
print(f" ghost-agent failed (retries exhausted)")
break
assert retried, "ghost-agent mention never retried"
print(f" ✅ B2: nonexistent agent → retry OK")
def test_B3_duplicate_mentions(self):
"""同一 comment @ 同一 agent 多次 → 去重只写 1 条"""
pid = _create_project(self._p, "B3")
tid = _create_task(pid, title="B3")
cid = _add_comment(pid, tid, "test", "@agent-a @agent-a",
mentions=["agent-a", "agent-a"])
db = _get_db_path(pid)
time.sleep(2)
ms = _query_mentions(db)
# mentions 列表有两个 agent-a,但 record_mentions 会去重
agent_a_count = sum(1 for m in ms if m["mentioned_agent"] == "agent-a")
assert agent_a_count <= 2 # 允许 1 或 2(取决于去重在哪一层)
print(f" ✅ B3: duplicate mention count={agent_a_count}")
def test_B4_parent_no_subs(self):
"""parent 无 sub task → _check_round_complete 不触发"""
pid = _create_project(self._p, "B4",
agents=["pangtong-fujunshi"])
parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="B4 Parent", description="no subs")
db = _get_db_path(pid)
# 等 2 tick
time.sleep(60)
rc = _get_round_count(db, parent)
assert rc == 0, f"round_count should stay 0, got {rc}"
print(f" ✅ B4: parent with no subs → no review")
def test_B5_parent_done_no_subs(self):
"""parent done 且无 sub → 不触发"""
pid = _create_project(self._p, "B5",
agents=["pangtong-fujunshi"])
parent = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="B5 Parent", description="done no subs")
_update_status(pid, parent, "claimed", agent="test")
_update_status(pid, parent, "working", agent="test")
_update_status(pid, parent, "review", agent="test")
_update_status(pid, parent, "done", agent="test")
db = _get_db_path(pid)
time.sleep(60)
rc = _get_round_count(db, parent)
assert rc == 0
print(f" ✅ B5: parent done no subs → no review")
def test_B6_comment_deleted_mention_handling(self):
"""comment 被删除后 mention 仍存在于 queue"""
pid = _create_project(self._p, "B6")
tid = _create_task(pid, title="B6")
_add_comment(pid, tid, "test", "@simayi-challenger check this",
mentions=["simayi-challenger"])
db = _get_db_path(pid)
time.sleep(2)
ms = _query_mentions(db)
assert len(ms) >= 1
# 删除 comment(通过 DB 直接操作模拟)
conn = sqlite3.connect(str(db))
try:
conn.execute("DELETE FROM comments WHERE task_id=?", (tid,))
conn.commit()
finally:
conn.close()
# mention 仍在 queue(没有 FK cascade 或 JOIN 过滤时)
ms2 = _query_mentions(db)
# 如果 get_pending_mentions JOIN comments,可能过滤掉
# 这里验证 mention 行本身仍存在
assert len(ms2) >= 1, "mention row should persist even after comment deleted"
print(f" ✅ B6: mention persists after comment deleted")