Files
sanguo_moziplus_v2/tests/test_e2e_v31.py
T
2026-05-23 07:38:58 +08:00

680 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""v3.1 端到端测试 — 新增场景覆盖
覆盖 v3.1 新增功能:
E9-4 广播认领:无 assignee → 广播 → Agent 认领 → done
E9-5 状态机:暂停 → 恢复 (resumed_from)
E9-6 状态机:cancelled → 重新启动 → done
E9-7 超时处理:claimed 超时 → pending (assignee 清空)
E9-8 缓存头:HTML no-cache + JS/CSS immutable
E10c 失败重试链:failed → pending → 广播 → done
E10d 完整生命周期:pending → claimed → working → review → done
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
"""
import json
import os
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
import requests as http_requests
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.utils import get_data_root
# ── 常量 ──
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5 # 轮询间隔秒
MAX_WAIT_DISPATCH = 120 # 等待调度超时(~4个tick,给 tick 时序留余量)
MAX_WAIT_AGENT = 300 # 等待 Agent 完成超时
E2E_PREFIX = "e2e-v31-"
DATA_ROOT = get_data_root()
# ── 工具函数 ──
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _create_project(project_list: list, name_prefix: str = "E9",
agents: list = None) -> str:
"""创建测试项目,自动注册到 project_list 用于 teardown"""
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
project_list.append(pid)
return pid
def _create_task(pid: str, **kwargs) -> str:
"""创建测试任务"""
tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def _get_task(pid: str, tid: str) -> Dict[str, Any]:
"""获取任务详情"""
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200, f"Get task failed: {resp.text}"
return resp.json()
def _update_status(pid: str, tid: str, status: str,
agent: str = "test") -> Dict:
"""手动更新任务状态"""
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": status, "agent": agent}, timeout=10,
)
return resp.json()
def _poll_task(pid: str, tid: str, timeout: int,
terminal_states: tuple = None) -> Dict[str, Any]:
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
def _get_db_path(pid: str) -> Path:
"""获取项目的 blackboard.db 路径"""
return DATA_ROOT / pid / "blackboard.db"
def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str):
"""直接操作 DB 设置 claimed_at 时间戳(模拟超时)"""
db_path = _get_db_path(pid)
assert db_path.exists(), f"DB not found: {db_path}"
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET claimed_at=? WHERE id=?",
(claimed_at, tid),
)
conn.commit()
finally:
conn.close()
# ===================================================================
# E9-4: 广播认领
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE94BroadcastClaim:
"""E9-4: 无 assignee 任务 → 广播认领 → Agent 执行 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_broadcast_claim(self):
"""创建不指定 assignee 的任务,等待广播认领并执行完成"""
pid = _create_project(self._projects, "E9-4",
agents=["zhangfei-dev", "simayi-challenger"])
tid = _create_task(
pid,
title="E2E广播认领任务:echo broadcast",
description=(
"这是一个E2E测试的广播认领任务。\n"
"请执行 echo broadcast 并标记done。\n"
"这是E2E自动化测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 触发广播认领
)
print(f"\n🚀 E9-4: 等待广播认领 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 必须被认领(不是 pending
assert status != "pending", (
f"广播认领未生效!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
f"\n请检查:1) Ticker广播 2) Agent spawn 3) _get_idle_agents()"
)
# 不能被拦截
assert status != "blocked", f"广播任务被错误拦截: {result}"
# 验证 assignee 已设置
assignee = result.get("assignee")
print(f" 认领Agent: {assignee}")
assert assignee, f"任务已离开pending但assignee为空: {result}"
if status == "done":
print(f" ✅ 广播认领执行成功")
else:
print(f" ⚠️ 广播认领后状态: {status}")
# ===================================================================
# E9-5: 暂停→恢复 (resumed_from)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE95PauseResume:
"""E9-5: 手动推状态到 working → paused → 恢复 → 验证 resumed_from"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_pause_resume_resumed_from(self):
"""working → paused → 恢复 working,验证 resumed_from 字段"""
pid = _create_project(self._projects, "E9-5")
tid = _create_task(
pid,
title="E2E暂停恢复测试",
description="测试暂停恢复功能",
assignee="zhangfei-dev",
)
# 手动推到 claimed → working
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
assert r1.get("ok"), f"claimed失败: {r1}"
r2 = _update_status(pid, tid, "working", agent="zhangfei-dev")
assert r2.get("ok"), f"working失败: {r2}"
# 暂停
r3 = _update_status(pid, tid, "paused", agent="test")
assert r3.get("ok"), f"paused失败: {r3}"
# 验证 resumed_from == "working"
task = _get_task(pid, tid)
resumed_from = task.get("resumed_from")
print(f"\n🚀 E9-5: 暂停后 resumed_from={resumed_from}")
assert resumed_from == "working", (
f"resumed_from 应为 'working',实际: {resumed_from}"
)
assert task.get("status") == "paused"
# 恢复到 working
r4 = _update_status(pid, tid, "working", agent="zhangfei-dev")
assert r4.get("ok"), f"恢复working失败: {r4}"
# 验证恢复后状态
task2 = _get_task(pid, tid)
print(f" 恢复后 status={task2.get('status')}")
assert task2.get("status") == "working", (
f"恢复后状态应为 working,实际: {task2.get('status')}"
)
print(f" ✅ 暂停恢复流程正确")
def test_review_pause_resume(self):
"""review → paused → 恢复 review"""
pid = _create_project(self._projects, "E9-5b")
tid = _create_task(
pid,
title="E2E Review暂停恢复",
assignee="simayi-challenger",
)
_update_status(pid, tid, "claimed", agent="simayi-challenger")
_update_status(pid, tid, "working", agent="simayi-challenger")
_update_status(pid, tid, "review", agent="simayi-challenger")
# 暂停
r = _update_status(pid, tid, "paused", agent="test")
assert r.get("ok"), f"paused失败: {r}"
task = _get_task(pid, tid)
assert task.get("resumed_from") == "review", (
f"resumed_from 应为 'review',实际: {task.get('resumed_from')}"
)
# 恢复到 review
r2 = _update_status(pid, tid, "review", agent="simayi-challenger")
assert r2.get("ok"), f"恢复review失败: {r2}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "review"
print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)")
# ===================================================================
# E9-6: cancelled → 重新启动
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE96CancelledRestart:
"""E9-6: cancelled → pending(重新启动)→ Agent 执行 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_cancelled_to_pending_restart(self):
"""cancelled → pending → 等待调度执行"""
pid = _create_project(self._projects, "E9-6")
tid = _create_task(
pid,
title="E2E取消重启任务:echo restart",
description=(
"请执行 echo restart 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 cancelled
r1 = _update_status(pid, tid, "cancelled")
assert r1.get("ok"), f"cancelled失败: {r1}"
task = _get_task(pid, tid)
assert task.get("status") == "cancelled"
# 重新启动 → pending
r2 = _update_status(pid, tid, "pending")
assert r2.get("ok"), f"pending重启失败: {r2}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "pending"
# assignee 应被清空(v3.1: pending时清空assignee
assert task2.get("assignee") is None or task2.get("assignee") == "", (
f"重新启动后assignee应清空,实际: {task2.get('assignee')}"
)
print(f"\n🚀 E9-6: 等待重新调度执行 (pid={pid}, tid={tid})")
# 等待调度执行
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重启后最终状态: {status}")
assert status != "pending", (
f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 取消重启流程正确")
else:
print(f" ⚠️ 重启后状态: {status}")
# ===================================================================
# E9-7: claimed 超时 → pending (assignee 清空)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE97ClaimedTimeout:
"""E9-7: claimed 超时 → pending (assignee 清空)"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_claimed_timeout_to_pending(self):
"""claimed 任务超时 → ticker 重置为 pending → assignee 清空"""
pid = _create_project(self._projects, "E9-7")
tid = _create_task(
pid,
title="E2E超时测试任务",
description="测试claimed超时处理",
assignee="zhangfei-dev",
)
# 手动推到 claimed
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
assert r1.get("ok"), f"claimed失败: {r1}"
# 验证 claimed
task = _get_task(pid, tid)
assert task.get("status") == "claimed"
# 直接操作 DB:把 claimed_at 设为 2 小时前(模拟超时)
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
_patch_db_claimed_at(pid, tid, two_hours_ago)
print(f"\n🚀 E9-7: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})")
# 等待 ticker 处理(1-2 个 tick
# poll 直到状态不是 claimed(变为 pending 或 escalated
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "escalated"),
)
status = result.get("status")
print(f" 超时后状态: {status}")
# 应该回到 pending(或 escalated 如果 retry_count >= 3
assert status != "claimed", (
f"超时处理未生效!任务 {tid}{MAX_WAIT_DISPATCH}s 后仍为 claimed"
)
# assignee 应被清空
assignee = result.get("assignee")
print(f" assignee: {assignee}")
assert assignee is None or assignee == "", (
f"超时重置后assignee应清空,实际: {assignee}"
)
print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)")
# ===================================================================
# E9-8: 缓存头验证
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE98CacheHeaders:
"""E9-8: 验证 CachedStaticFiles 缓存头"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
def test_html_no_cache(self):
"""HTML 页面应为 no-cache"""
resp = http_requests.get(f"{API_BASE}/", timeout=10)
if resp.status_code != 200:
pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}")
cache_control = resp.headers.get("cache-control", "")
print(f"\n🚀 E9-8a: HTML Cache-Control: {cache_control}")
assert "no-cache" in cache_control or "no-store" in cache_control, (
f"HTML 应为 no-cache/no-store,实际: {cache_control}"
)
def test_js_immutable(self):
"""JS 文件应为 immutable + 长缓存"""
# 先获取 HTML 找到 JS 文件路径
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip(f"Frontend not available")
import re
js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text)
if not js_matches:
pytest.skip("No JS files found in HTML")
js_path = js_matches[0]
resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" E9-8b: JS ({js_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"JS 应含 immutable,实际: {cache_control}"
)
assert "31536000" in cache_control, (
f"JS max-age 应为 31536000,实际: {cache_control}"
)
def test_css_immutable(self):
"""CSS 文件应为 immutable + 长缓存"""
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip("Frontend not available")
import re
css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text)
if not css_matches:
pytest.skip("No CSS files found in HTML")
css_path = css_matches[0]
resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" E9-8c: CSS ({css_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"CSS 应含 immutable,实际: {cache_control}"
)
print(f" ✅ 缓存头验证通过")
# ===================================================================
# E10c: 失败重试链
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE10cRetryChain:
"""E10c: failed → pending(手动重试)→ 广播 → 认领 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_failed_to_pending_retry(self):
"""手动模拟失败 → 重试 → 等待调度完成"""
pid = _create_project(self._projects, "E10c",
agents=["zhangfei-dev", "simayi-challenger"])
tid = _create_task(
pid,
title="E2E重试任务:echo retry",
description=(
"请执行 echo retry 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 failed(模拟 Agent 执行失败)
_update_status(pid, tid, "claimed", agent="zhangfei-dev")
_update_status(pid, tid, "working", agent="zhangfei-dev")
r_fail = _update_status(pid, tid, "failed", agent="zhangfei-dev")
assert r_fail.get("ok"), f"failed失败: {r_fail}"
task = _get_task(pid, tid)
assert task.get("status") == "failed"
print(f"\n🚀 E10c: 任务已标记failed,准备重试")
# 手动重试 → pending
r_retry = _update_status(pid, tid, "pending")
assert r_retry.get("ok"), f"重试pending失败: {r_retry}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "pending"
# assignee 应被清空
assert task2.get("assignee") is None or task2.get("assignee") == "", (
f"重试后assignee应清空,实际: {task2.get('assignee')}"
)
# retry_count 应递增
retry_count = task2.get("retry_count", 0) or 0
print(f" retry_count: {retry_count}")
# 等待重新调度执行
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重试后最终状态: {status}")
assert status != "pending", (
f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 失败重试链正确")
else:
print(f" ⚠️ 重试后状态: {status}")
# ===================================================================
# E10d: 完整生命周期(广播认领版)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE10dFullLifecycle:
"""E10d: 无 assignee → 广播认领 → claimed → working → review → done
验证完整状态转换链 + events 记录。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_full_lifecycle_with_review(self):
"""完整生命周期:创建 → 广播 → 认领 → 执行 → review → done"""
pid = _create_project(self._projects, "E10d",
agents=["zhangfei-dev", "simayi-challenger"])
# 第一步:编码任务(张飞执行)
code_tid = _create_task(
pid,
title="E2E完整链路:编码任务",
description=(
"请执行 echo lifecycle 并标记done。"
"这是E2E完整生命周期测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 广播认领
)
print(f"\n🚀 E10d: 等待编码任务广播认领 (pid={pid}, tid={code_tid})")
result = _poll_task(
pid, code_tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
code_status = result.get("status")
print(f" 编码任务最终状态: {code_status}")
assert code_status != "pending", "编码任务未被认领"
if code_status == "done":
# 验证 events 记录存在
events_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events",
timeout=10,
)
if events_resp.status_code == 200:
events = events_resp.json()
event_types = [e.get("event_type") for e in events.get("events", [])]
print(f" Events: {event_types}")
# 应该有状态变化事件
assert any("claimed" in str(e) or "started" in str(e)
for e in event_types), (
f"缺少状态变化事件: {event_types}"
)
# 第二步:review 任务(不依赖 Agent 执行,手动推)
review_tid = _create_task(
pid,
title="E2E完整链路:review任务",
description="测试review状态",
assignee="simayi-challenger",
)
# 手动推完整生命周期
transitions = ["claimed", "working", "review", "done"]
for s in transitions:
r = _update_status(pid, review_tid, s, agent="simayi-challenger")
assert r.get("ok"), f"{s}失败: {r}"
task = _get_task(pid, review_tid)
assert task.get("status") == "done"
print(f" Review任务手动生命周期: ✅")
# 第三步:验证 done → cancelled(取消已完成任务)
r_cancel = _update_status(pid, review_tid, "cancelled")
assert r_cancel.get("ok"), f"done→cancelled失败: {r_cancel}"
task3 = _get_task(pid, review_tid)
assert task3.get("status") == "cancelled"
print(f" done→cancelled: ✅")
print(f" ✅ E10d 完整生命周期测试通过")