diff --git a/tests/test_e2e_v31.py b/tests/test_e2e_v31.py new file mode 100644 index 0000000..60407a7 --- /dev/null +++ b/tests/test_e2e_v31.py @@ -0,0 +1,678 @@ +"""v3.1 端到端测试 — 新增场景覆盖 + +覆盖 v3.1 新增功能: + E9-4 广播认领:无 assignee → 广播 → Agent 认领 → done + E9-5 状态机:暂停 → 恢复 (resumed_from) + E9-6 状态机:cancelled → 重新启动 → done + E9-7 超时处理:claimed 超时 → pending (assignee 清空) + E9-8 缓存头:HTML no-cache + JS/CSS immutable + E10c 失败重试链:failed → pending → 广播 → done + E10d 完整生命周期:pending → claimed → working → review → done + +需要 RUN_INTEGRATION=1 + 生产 daemon 运行。 +""" + +import json +import os +import sqlite3 +import sys +import time +import uuid +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, Optional + +import pytest +import requests as http_requests + +# ── 路径设置 ── + +DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" +sys.path.insert(0, str(DEPLOY_DIR)) + +from src.utils import get_data_root + +# ── 常量 ── + +API_BASE = "http://localhost:8083" +POLL_INTERVAL = 5 # 轮询间隔秒 +MAX_WAIT_DISPATCH = 60 # 等待调度超时(~2个tick) +MAX_WAIT_AGENT = 300 # 等待 Agent 完成超时 +E2E_PREFIX = "e2e-v31-" +DATA_ROOT = get_data_root() + + +# ── 工具函数 ── + +def _check_environment(): + """环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达""" + try: + resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) + data = resp.json() + if data.get("status") != "running" or not data.get("ticker_running"): + pytest.skip(f"Daemon not ready: {data}") + return data + except Exception as e: + pytest.skip(f"Production API not available at {API_BASE}: {e}") + + +def _cleanup_project(pid: str): + """清理测试项目""" + try: + http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) + except Exception: + pass + + +def _create_project(project_list: list, name_prefix: str = "E9", + agents: list = None) -> str: + """创建测试项目,自动注册到 project_list 用于 teardown""" + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": config, + }, timeout=10) + assert resp.status_code == 200, f"Create project failed: {resp.text}" + project_list.append(pid) + return pid + + +def _create_task(pid: str, **kwargs) -> str: + """创建测试任务""" + tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200, f"Create task failed: {resp.text}" + return tid + + +def _get_task(pid: str, tid: str) -> Dict[str, Any]: + """获取任务详情""" + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert resp.status_code == 200, f"Get task failed: {resp.text}" + return resp.json() + + +def _update_status(pid: str, tid: str, status: str, + agent: str = "test") -> Dict: + """手动更新任务状态""" + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": status, "agent": agent}, timeout=10, + ) + return resp.json() + + +def _poll_task(pid: str, tid: str, timeout: int, + terminal_states: tuple = None) -> Dict[str, Any]: + """轮询任务状态直到终态或超时""" + terminal = terminal_states or ("done", "failed", "cancelled") + deadline = time.time() + timeout + last_status = None + while time.time() < deadline: + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + if resp.status_code == 200: + data = resp.json() + last_status = data.get("status") + if last_status in terminal: + return data + except Exception: + pass + time.sleep(POLL_INTERVAL) + # 超时返回最后状态 + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + return resp.json() if resp.status_code == 200 else {"status": "unknown"} + except Exception: + return {"status": "unknown"} + + +def _get_db_path(pid: str) -> Path: + """获取项目的 blackboard.db 路径""" + return DATA_ROOT / pid / "blackboard.db" + + +def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str): + """直接操作 DB 设置 claimed_at 时间戳(模拟超时)""" + db_path = _get_db_path(pid) + assert db_path.exists(), f"DB not found: {db_path}" + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET claimed_at=? WHERE id=?", + (claimed_at, tid), + ) + conn.commit() + finally: + conn.close() + + +# =================================================================== +# E9-4: 广播认领 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE94BroadcastClaim: + """E9-4: 无 assignee 任务 → 广播认领 → Agent 执行 → done""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_broadcast_claim(self): + """创建不指定 assignee 的任务,等待广播认领并执行完成""" + pid = _create_project(self._projects, "E9-4", + agents=["zhangfei-dev", "simayi-challenger"]) + tid = _create_task( + pid, + title="E2E广播认领任务:echo broadcast", + description=( + "这是一个E2E测试的广播认领任务。\n" + "请执行 echo broadcast 并标记done。\n" + "这是E2E自动化测试,不需要做其他事。" + ), + task_type="coding", + # 不指定 assignee → 触发广播认领 + ) + + print(f"\n🚀 E9-4: 等待广播认领 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + # 必须被认领(不是 pending) + assert status != "pending", ( + f"广播认领未生效!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。" + f"\n请检查:1) Ticker广播 2) Agent spawn 3) _get_idle_agents()" + ) + # 不能被拦截 + assert status != "blocked", f"广播任务被错误拦截: {result}" + + # 验证 assignee 已设置 + assignee = result.get("assignee") + print(f" 认领Agent: {assignee}") + assert assignee, f"任务已离开pending但assignee为空: {result}" + + if status == "done": + print(f" ✅ 广播认领执行成功") + else: + print(f" ⚠️ 广播认领后状态: {status}") + + +# =================================================================== +# E9-5: 暂停→恢复 (resumed_from) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE95PauseResume: + """E9-5: 手动推状态到 working → paused → 恢复 → 验证 resumed_from""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_pause_resume_resumed_from(self): + """working → paused → 恢复 working,验证 resumed_from 字段""" + pid = _create_project(self._projects, "E9-5") + tid = _create_task( + pid, + title="E2E暂停恢复测试", + description="测试暂停恢复功能", + assignee="zhangfei-dev", + ) + + # 手动推到 claimed → working + r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev") + assert r1.get("ok"), f"claimed失败: {r1}" + r2 = _update_status(pid, tid, "working", agent="zhangfei-dev") + assert r2.get("ok"), f"working失败: {r2}" + + # 暂停 + r3 = _update_status(pid, tid, "paused", agent="test") + assert r3.get("ok"), f"paused失败: {r3}" + + # 验证 resumed_from == "working" + task = _get_task(pid, tid) + resumed_from = task.get("resumed_from") + print(f"\n🚀 E9-5: 暂停后 resumed_from={resumed_from}") + assert resumed_from == "working", ( + f"resumed_from 应为 'working',实际: {resumed_from}" + ) + assert task.get("status") == "paused" + + # 恢复到 working + r4 = _update_status(pid, tid, "working", agent="zhangfei-dev") + assert r4.get("ok"), f"恢复working失败: {r4}" + + # 验证恢复后状态 + task2 = _get_task(pid, tid) + print(f" 恢复后 status={task2.get('status')}") + assert task2.get("status") == "working", ( + f"恢复后状态应为 working,实际: {task2.get('status')}" + ) + + print(f" ✅ 暂停恢复流程正确") + + def test_review_pause_resume(self): + """review → paused → 恢复 review""" + pid = _create_project(self._projects, "E9-5b") + tid = _create_task( + pid, + title="E2E Review暂停恢复", + assignee="simayi-challenger", + ) + + _update_status(pid, tid, "claimed", agent="simayi-challenger") + _update_status(pid, tid, "working", agent="simayi-challenger") + _update_status(pid, tid, "review", agent="simayi-challenger") + + # 暂停 + r = _update_status(pid, tid, "paused", agent="test") + assert r.get("ok"), f"paused失败: {r}" + + task = _get_task(pid, tid) + assert task.get("resumed_from") == "review", ( + f"resumed_from 应为 'review',实际: {task.get('resumed_from')}" + ) + + # 恢复到 review + r2 = _update_status(pid, tid, "review", agent="simayi-challenger") + assert r2.get("ok"), f"恢复review失败: {r2}" + + task2 = _get_task(pid, tid) + assert task2.get("status") == "review" + + print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)") + + +# =================================================================== +# E9-6: cancelled → 重新启动 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE96CancelledRestart: + """E9-6: cancelled → pending(重新启动)→ Agent 执行 → done""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_cancelled_to_pending_restart(self): + """cancelled → pending → 等待调度执行""" + pid = _create_project(self._projects, "E9-6") + tid = _create_task( + pid, + title="E2E取消重启任务:echo restart", + description=( + "请执行 echo restart 并标记done。" + "这是E2E测试,不需要做其他事。" + ), + assignee="zhangfei-dev", + ) + + # 手动推到 cancelled + r1 = _update_status(pid, tid, "cancelled") + assert r1.get("ok"), f"cancelled失败: {r1}" + task = _get_task(pid, tid) + assert task.get("status") == "cancelled" + + # 重新启动 → pending + r2 = _update_status(pid, tid, "pending") + assert r2.get("ok"), f"pending重启失败: {r2}" + task2 = _get_task(pid, tid) + assert task2.get("status") == "pending" + # assignee 应被清空(v3.1: pending时清空assignee) + assert task2.get("assignee") is None or task2.get("assignee") == "", ( + f"重新启动后assignee应清空,实际: {task2.get('assignee')}" + ) + + print(f"\n🚀 E9-6: 等待重新调度执行 (pid={pid}, tid={tid})") + # 等待调度执行 + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 重启后最终状态: {status}") + + assert status != "pending", ( + f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending" + ) + + if status == "done": + print(f" ✅ 取消重启流程正确") + else: + print(f" ⚠️ 重启后状态: {status}") + + +# =================================================================== +# E9-7: claimed 超时 → pending (assignee 清空) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE97ClaimedTimeout: + """E9-7: claimed 超时 → pending (assignee 清空)""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_claimed_timeout_to_pending(self): + """claimed 任务超时 → ticker 重置为 pending → assignee 清空""" + pid = _create_project(self._projects, "E9-7") + tid = _create_task( + pid, + title="E2E超时测试任务", + description="测试claimed超时处理", + assignee="zhangfei-dev", + ) + + # 手动推到 claimed + r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev") + assert r1.get("ok"), f"claimed失败: {r1}" + + # 验证 claimed + task = _get_task(pid, tid) + assert task.get("status") == "claimed" + + # 直接操作 DB:把 claimed_at 设为 2 小时前(模拟超时) + two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat() + _patch_db_claimed_at(pid, tid, two_hours_ago) + print(f"\n🚀 E9-7: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})") + + # 等待 ticker 处理(1-2 个 tick) + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("pending", "claimed", "escalated"), + ) + status = result.get("status") + print(f" 超时后状态: {status}") + + # 应该回到 pending(或 escalated 如果 retry_count >= 3) + assert status != "claimed", ( + f"超时处理未生效!任务 {tid} 在 {MAX_WAIT_DISPATCH}s 后仍为 claimed" + ) + + # assignee 应被清空 + assignee = result.get("assignee") + print(f" assignee: {assignee}") + assert assignee is None or assignee == "", ( + f"超时重置后assignee应清空,实际: {assignee}" + ) + + print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)") + + +# =================================================================== +# E9-8: 缓存头验证 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE98CacheHeaders: + """E9-8: 验证 CachedStaticFiles 缓存头""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + + def test_html_no_cache(self): + """HTML 页面应为 no-cache""" + resp = http_requests.get(f"{API_BASE}/", timeout=10) + if resp.status_code != 200: + pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}") + + cache_control = resp.headers.get("cache-control", "") + print(f"\n🚀 E9-8a: HTML Cache-Control: {cache_control}") + assert "no-cache" in cache_control or "no-store" in cache_control, ( + f"HTML 应为 no-cache/no-store,实际: {cache_control}" + ) + + def test_js_immutable(self): + """JS 文件应为 immutable + 长缓存""" + # 先获取 HTML 找到 JS 文件路径 + html_resp = http_requests.get(f"{API_BASE}/", timeout=10) + if html_resp.status_code != 200: + pytest.skip(f"Frontend not available") + + import re + js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text) + if not js_matches: + pytest.skip("No JS files found in HTML") + + js_path = js_matches[0] + resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10) + cache_control = resp.headers.get("cache-control", "") + print(f" E9-8b: JS ({js_path}) Cache-Control: {cache_control}") + assert "immutable" in cache_control, ( + f"JS 应含 immutable,实际: {cache_control}" + ) + assert "31536000" in cache_control, ( + f"JS max-age 应为 31536000,实际: {cache_control}" + ) + + def test_css_immutable(self): + """CSS 文件应为 immutable + 长缓存""" + html_resp = http_requests.get(f"{API_BASE}/", timeout=10) + if html_resp.status_code != 200: + pytest.skip("Frontend not available") + + import re + css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text) + if not css_matches: + pytest.skip("No CSS files found in HTML") + + css_path = css_matches[0] + resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10) + cache_control = resp.headers.get("cache-control", "") + print(f" E9-8c: CSS ({css_path}) Cache-Control: {cache_control}") + assert "immutable" in cache_control, ( + f"CSS 应含 immutable,实际: {cache_control}" + ) + + print(f" ✅ 缓存头验证通过") + + +# =================================================================== +# E10c: 失败重试链 +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE10cRetryChain: + """E10c: failed → pending(手动重试)→ 广播 → 认领 → done""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_failed_to_pending_retry(self): + """手动模拟失败 → 重试 → 等待调度完成""" + pid = _create_project(self._projects, "E10c", + agents=["zhangfei-dev", "simayi-challenger"]) + tid = _create_task( + pid, + title="E2E重试任务:echo retry", + description=( + "请执行 echo retry 并标记done。" + "这是E2E测试,不需要做其他事。" + ), + assignee="zhangfei-dev", + ) + + # 手动推到 failed(模拟 Agent 执行失败) + _update_status(pid, tid, "claimed", agent="zhangfei-dev") + _update_status(pid, tid, "working", agent="zhangfei-dev") + r_fail = _update_status(pid, tid, "failed", agent="zhangfei-dev") + assert r_fail.get("ok"), f"failed失败: {r_fail}" + + task = _get_task(pid, tid) + assert task.get("status") == "failed" + print(f"\n🚀 E10c: 任务已标记failed,准备重试") + + # 手动重试 → pending + r_retry = _update_status(pid, tid, "pending") + assert r_retry.get("ok"), f"重试pending失败: {r_retry}" + + task2 = _get_task(pid, tid) + assert task2.get("status") == "pending" + # assignee 应被清空 + assert task2.get("assignee") is None or task2.get("assignee") == "", ( + f"重试后assignee应清空,实际: {task2.get('assignee')}" + ) + # retry_count 应递增 + retry_count = task2.get("retry_count", 0) or 0 + print(f" retry_count: {retry_count}") + + # 等待重新调度执行 + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 重试后最终状态: {status}") + + assert status != "pending", ( + f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending" + ) + + if status == "done": + print(f" ✅ 失败重试链正确") + else: + print(f" ⚠️ 重试后状态: {status}") + + +# =================================================================== +# E10d: 完整生命周期(广播认领版) +# =================================================================== + +@pytest.mark.integration +@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run real agent tests") +class TestE10dFullLifecycle: + """E10d: 无 assignee → 广播认领 → claimed → working → review → done + + 验证完整状态转换链 + events 记录。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def test_full_lifecycle_with_review(self): + """完整生命周期:创建 → 广播 → 认领 → 执行 → review → done""" + pid = _create_project(self._projects, "E10d", + agents=["zhangfei-dev", "simayi-challenger"]) + + # 第一步:编码任务(张飞执行) + code_tid = _create_task( + pid, + title="E2E完整链路:编码任务", + description=( + "请执行 echo lifecycle 并标记done。" + "这是E2E完整生命周期测试,不需要做其他事。" + ), + task_type="coding", + # 不指定 assignee → 广播认领 + ) + + print(f"\n🚀 E10d: 等待编码任务广播认领 (pid={pid}, tid={code_tid})") + result = _poll_task( + pid, code_tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + code_status = result.get("status") + print(f" 编码任务最终状态: {code_status}") + + assert code_status != "pending", "编码任务未被认领" + + if code_status == "done": + # 验证 events 记录存在 + events_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events", + timeout=10, + ) + if events_resp.status_code == 200: + events = events_resp.json() + event_types = [e.get("event_type") for e in events.get("events", [])] + print(f" Events: {event_types}") + # 应该有状态变化事件 + assert any("claimed" in str(e) or "started" in str(e) + for e in event_types), ( + f"缺少状态变化事件: {event_types}" + ) + + # 第二步:review 任务(不依赖 Agent 执行,手动推) + review_tid = _create_task( + pid, + title="E2E完整链路:review任务", + description="测试review状态", + assignee="simayi-challenger", + ) + + # 手动推完整生命周期 + transitions = ["claimed", "working", "review", "done"] + for s in transitions: + r = _update_status(pid, review_tid, s, agent="simayi-challenger") + assert r.get("ok"), f"{s}失败: {r}" + + task = _get_task(pid, review_tid) + assert task.get("status") == "done" + print(f" Review任务手动生命周期: ✅") + + # 第三步:验证 done → cancelled(取消已完成任务) + r_cancel = _update_status(pid, review_tid, "cancelled") + assert r_cancel.get("ok"), f"done→cancelled失败: {r_cancel}" + task3 = _get_task(pid, review_tid) + assert task3.get("status") == "cancelled" + print(f" done→cancelled: ✅") + + print(f" ✅ E10d 完整生命周期测试通过")