auto-sync: 2026-05-29 11:42:08

This commit is contained in:
cfdaily
2026-05-29 11:42:08 +08:00
parent 73379bedeb
commit 3a5724fe38
+556
View File
@@ -0,0 +1,556 @@
"""#01 四相循环 E2E 集成测试
需要 daemon 运行 + RUN_INTEGRATION=1。覆盖:
E1 comment + @mention 端到端
E2 一轮结束 → 庞统 review spawn
E3 多轮迭代(round_count 递增,mock 庞统创建 sub
E4 round 上限强制停止
E5 mention 重试(Agent busy@pytest.mark.flaky
E6 failed sub 触发 reviewBUG-2 验证)
基于 test_e2e_v31.py 的工具函数和模式。
"""
import json
import os
import sqlite3
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Dict
import pytest
import requests as http_requests
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.utils import get_data_root
# ── 常量 ──
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5
MAX_WAIT_DISPATCH = 120
MAX_WAIT_AGENT = 300
E2E_PREFIX = "e2e-01-"
DATA_ROOT = get_data_root()
# ── 工具函数 ──
def _check_environment():
"""环境前置检查"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _create_project(project_list: list, name_prefix: str = "E01",
agents: list = None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger", "zhaoyun-data"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
project_list.append(pid)
return pid
def _create_task(pid: str, **kwargs) -> str:
tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def _get_task(pid: str, tid: str) -> Dict[str, Any]:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200, f"Get task failed: {resp.text}"
return resp.json()
def _update_status(pid: str, tid: str, status: str,
agent: str = "test") -> Dict:
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": status, "agent": agent}, timeout=10,
)
return resp.json()
def _add_comment(pid: str, tid: str, author: str, body: str,
mentions: list = None) -> int:
"""添加 comment 并返回 comment_id"""
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/comments",
json={"author": author, "body": body, "comment_type": "general",
"mentions": mentions or []},
timeout=10,
)
assert resp.status_code == 200, f"Add comment failed: {resp.text}"
data = resp.json()
return data.get("comment_id") or data.get("id")
def _get_db_path(pid: str) -> Path:
return DATA_ROOT / pid / "blackboard.db"
def _query_mention_queue(db_path: Path, status: str = None) -> list:
"""直接查 DB 获取 mention_queue 记录"""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
if status:
rows = conn.execute(
"SELECT * FROM mention_queue WHERE status=?", (status,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM mention_queue").fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def _set_round_count(db_path: Path, tid: str, count: int):
"""直接修改 DB 中的 round_count"""
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET round_count=? WHERE id=?", (count, tid))
conn.commit()
finally:
conn.close()
# ===================================================================
# E1: comment + @mention 端到端
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE01MentionE2E:
"""E1: 写 comment @zhaoyun-data → mention 写入 → Agent spawn"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_mention_spawn_e2e(self):
pid = _create_project(self._projects, "E01-mention")
tid = _create_task(pid, title="E2E mention 测试",
description="测试 mention 端到端",
assignee="simayi-challenger")
# 写 comment @zhaoyun-data
cid = _add_comment(pid, tid, "simayi-challenger",
"@zhaoyun-data 请查看这个任务",
mentions=["zhaoyun-data"])
assert cid is not None
print(f"\n🚀 E01-E1: comment 已写入 (cid={cid})")
# 等待 1-2 tick,检查 mention_queue
db_path = _get_db_path(pid)
time.sleep(10) # 等 tick 处理
mentions = _query_mention_queue(db_path, status="pending")
if not mentions:
# 可能已经被 spawn 了,查 notified
mentions = _query_mention_queue(db_path, status="notified")
assert len(mentions) >= 1, (
f"mention_queue 中没有找到 zhaoyun-data 的记录。"
f"\n所有 mentions: {_query_mention_queue(db_path)}"
)
mention = mentions[0]
assert mention["mentioned_agent"] == "zhaoyun-data"
print(f" mention 状态: {mention['status']}")
print(f" ✅ mention 端到端写入成功")
# 等待 spawn(可能需要 1-2 tick
if mention["status"] == "pending":
time.sleep(30)
mentions2 = _query_mention_queue(db_path, status="notified")
assert len(mentions2) >= 1, (
f"mention 未被 spawn30s 后仍为 pending"
)
print(f" ✅ mention spawn 成功,status=notified")
else:
print(f" ✅ mention 已 spawn(首次查询时即为 notified")
# ===================================================================
# E2: 一轮结束 → 庞统 review spawn
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE02RoundComplete:
"""E2: parent 下所有 sub done → 一轮结束 → 庞统 review spawn"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_round_complete_triggers_review(self):
pid = _create_project(self._projects, "E02-round",
agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"])
parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Parent Task",
description="一轮结束测试")
sub1 = _create_task(pid, title="E2E Sub1",
description="子任务1", assignee="zhangfei-dev",
parent_task=parent_tid)
sub2 = _create_task(pid, title="E2E Sub2",
description="子任务2", assignee="simayi-challenger",
parent_task=parent_tid)
# 手动推所有 sub 到 done
for tid in (sub1, sub2):
_update_status(pid, tid, "claimed", agent="test")
_update_status(pid, tid, "working", agent="test")
_update_status(pid, tid, "review", agent="test")
_update_status(pid, tid, "done", agent="test")
print(f"\n🚀 E01-E2: 所有 sub 已 done,等待 ticker 检测")
# 等待 ticker 聚合 parent + 一轮结束检测(2-3 tick
db_path = _get_db_path(pid)
deadline = time.time() + MAX_WAIT_DISPATCH
round_count = 0
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count, status FROM tasks WHERE id=?",
(parent_tid,)
).fetchone()
if row and row["round_count"] > 0:
round_count = row["round_count"]
break
finally:
conn.close()
assert round_count >= 1, (
f"一轮结束未触发!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0"
)
print(f" round_count={round_count}")
print(f" ✅ 一轮结束检测成功,庞统 review 已 spawn")
# ===================================================================
# E3: 多轮迭代(round_count 递增)
# ===================================================================
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE03MultiRound:
"""E3: round 1 → 庞统 review → round 2
验证 round_count 递增 + 庞统被 spawn。
不依赖庞统真实创建 sub task(只验证 spawn 调用)。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_multi_round_increment(self):
pid = _create_project(self._projects, "E03-multi",
agents=["pangtong-fujunshi", "zhangfei-dev"])
parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Multi-Round",
description="多轮测试")
sub1 = _create_task(pid, title="Round1 Sub",
description="第一轮子任务",
assignee="zhangfei-dev",
parent_task=parent_tid)
_update_status(pid, sub1, "claimed", agent="zhangfei-dev")
_update_status(pid, sub1, "working", agent="zhangfei-dev")
_update_status(pid, sub1, "review", agent="zhangfei-dev")
_update_status(pid, sub1, "done", agent="zhangfei-dev")
print(f"\n🚀 E01-E3: 等待 Round 1 review")
db_path = _get_db_path(pid)
# 等 round_count=1
deadline = time.time() + MAX_WAIT_DISPATCH
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count FROM tasks WHERE id=?",
(parent_tid,)
).fetchone()
if row and row["round_count"] >= 1:
print(f" Round 1: round_count={row['round_count']}")
break
finally:
conn.close()
else:
pytest.fail("Round 1 未触发(超时)")
# 手动创建第二轮 sub → done,模拟庞统创建了新 sub
sub2 = _create_task(pid, title="Round2 Sub",
description="第二轮子任务",
assignee="zhangfei-dev",
parent_task=parent_tid)
_update_status(pid, sub2, "claimed", agent="zhangfei-dev")
_update_status(pid, sub2, "working", agent="zhangfei-dev")
_update_status(pid, sub2, "review", agent="zhangfei-dev")
_update_status(pid, sub2, "done", agent="zhangfei-dev")
print(f" 等待 Round 2 review")
deadline2 = time.time() + MAX_WAIT_DISPATCH
while time.time() < deadline2:
time.sleep(POLL_INTERVAL)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count FROM tasks WHERE id=?",
(parent_tid,)
).fetchone()
if row and row["round_count"] >= 2:
print(f" Round 2: round_count={row['round_count']}")
break
finally:
conn.close()
else:
pytest.fail("Round 2 未触发(超时)")
print(f" ✅ 多轮迭代验证成功(round_count 递增到 2")
# ===================================================================
# E4: round 上限强制停止
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE04RoundLimit:
"""E4: round_count=5 后不再触发 review"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_round_limit_stops_review(self):
pid = _create_project(self._projects, "E04-limit",
agents=["pangtong-fujunshi", "zhangfei-dev"])
parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Round Limit",
description="上限测试")
sub1 = _create_task(pid, title="Limit Sub",
description="上限测试子任务",
assignee="zhangfei-dev",
parent_task=parent_tid)
_update_status(pid, sub1, "claimed", agent="zhangfei-dev")
_update_status(pid, sub1, "working", agent="zhangfei-dev")
_update_status(pid, sub1, "done", agent="zhangfei-dev")
# 手动设 round_count=5
db_path = _get_db_path(pid)
_set_round_count(db_path, parent_tid, 5)
print(f"\n🚀 E01-E4: round_count=5,等待 2 tick 确认不触发")
# 等 2 tick
time.sleep(60)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count FROM tasks WHERE id=?",
(parent_tid,)
).fetchone()
final_round = row["round_count"] if row else None
finally:
conn.close()
assert final_round == 5, (
f"round_count 应保持 5,实际: {final_round}"
)
print(f" round_count={final_round}(未递增)")
print(f" ✅ round 上限验证成功")
# ===================================================================
# E5: mention 重试(Agent busy
# ===================================================================
@pytest.mark.integration
@pytest.mark.flaky
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE05MentionRetry:
"""E5: mention 重试场景(依赖 Agent 状态,标记 flaky"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_mention_retry_or_success(self):
"""验证 mention 最终被处理(成功或重试)"""
pid = _create_project(self._projects, "E05-retry")
tid = _create_task(pid, title="E2E Mention Retry",
description="mention 重试测试",
assignee="simayi-challenger")
_add_comment(pid, tid, "simayi-challenger",
"@zhaoyun-data 请处理", mentions=["zhaoyun-data"])
db_path = _get_db_path(pid)
# 等待处理
deadline = time.time() + MAX_WAIT_DISPATCH
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
mentions = _query_mention_queue(db_path)
if not mentions:
continue
all_resolved = all(
m["status"] in ("notified", "failed")
for m in mentions
)
if all_resolved:
break
mentions = _query_mention_queue(db_path)
assert len(mentions) >= 1
for m in mentions:
print(f" mention: agent={m['mentioned_agent']} "
f"status={m['status']} retry_count={m['retry_count']}")
assert m["status"] in ("notified", "failed"), (
f"mention 未被处理: {m}"
)
print(f" ✅ mention 重试/成功验证完成")
# ===================================================================
# E6: failed sub 触发 reviewBUG-2 验证)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE06FailedSubReview:
"""E6: done + failed 都是终态,触发一轮结束检测"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_failed_sub_triggers_review(self):
pid = _create_project(self._projects, "E06-failed",
agents=["pangtong-fujunshi", "zhangfei-dev", "simayi-challenger"])
parent_tid = _create_task(pid, id=f"parent-{uuid.uuid4().hex[:6]}",
title="E2E Failed Sub Review",
description="失败子任务触发 review 测试")
sub1 = _create_task(pid, title="Sub Done",
description="完成的子任务",
assignee="zhangfei-dev",
parent_task=parent_tid)
sub2 = _create_task(pid, title="Sub Failed",
description="失败的子任务",
assignee="simayi-challenger",
parent_task=parent_tid)
# sub1 → done
_update_status(pid, sub1, "claimed", agent="zhangfei-dev")
_update_status(pid, sub1, "working", agent="zhangfei-dev")
_update_status(pid, sub1, "review", agent="zhangfei-dev")
_update_status(pid, sub1, "done", agent="zhangfei-dev")
# sub2 → failed
_update_status(pid, sub2, "claimed", agent="simayi-challenger")
_update_status(pid, sub2, "working", agent="simayi-challenger")
_update_status(pid, sub2, "failed", agent="simayi-challenger")
print(f"\n🚀 E01-E6: sub1=done, sub2=failed,等待一轮结束检测")
db_path = _get_db_path(pid)
deadline = time.time() + MAX_WAIT_DISPATCH
round_count = 0
while time.time() < deadline:
time.sleep(POLL_INTERVAL)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT round_count FROM tasks WHERE id=?",
(parent_tid,)
).fetchone()
if row and row["round_count"] > 0:
round_count = row["round_count"]
break
finally:
conn.close()
assert round_count >= 1, (
f"failed sub 未触发一轮结束!{MAX_WAIT_DISPATCH}s 后 round_count 仍为 0"
)
print(f" round_count={round_count}")
print(f" ✅ failed sub 触发 review 成功(BUG-2 验证通过)")