From 83e1d3b2526062c3306c25a3b7cc9c55b1cb810b Mon Sep 17 00:00:00 2001 From: cfdaily Date: Fri, 5 Jun 2026 23:24:58 +0800 Subject: [PATCH] auto-sync: 2026-06-05 23:24:58 --- tests/e2e/test_e2e_scenarios.py | 1590 +++++++++++++++++++++++++++++++ 1 file changed, 1590 insertions(+) create mode 100644 tests/e2e/test_e2e_scenarios.py diff --git a/tests/e2e/test_e2e_scenarios.py b/tests/e2e/test_e2e_scenarios.py new file mode 100644 index 0000000..21aed4b --- /dev/null +++ b/tests/e2e/test_e2e_scenarios.py @@ -0,0 +1,1590 @@ +"""S9-S21 E2E 场景测试 + +从 test_e2e_v27.py (E9-E14) 和 test_e2e_v31.py (E94-E98/E10c/E10d/E15) 合并。 +通过生产 HTTP API 测试真实 daemon + Agent 场景。 + +需要 RUN_INTEGRATION=1 + 生产 daemon 运行。 +""" + +import json +import os +import re +import sqlite3 +import sys +import time +import uuid +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict + +import pytest +import requests as http_requests + +# 指向部署目录 +DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" +sys.path.insert(0, str(DEPLOY_DIR)) + +from src.utils import get_data_root + +# ── 常量 ── + +API_BASE = os.environ.get("API_BASE", "http://localhost:8083") +POLL_INTERVAL = 5 +MAX_WAIT_DISPATCH = 120 +MAX_WAIT_AGENT = 300 +E2E_PREFIX = "e2e-v30-" +DATA_ROOT = get_data_root() + +pytestmark = pytest.mark.e2e + + +# ── E2E gate ── + +skip_no_integration = pytest.mark.skipif( + not os.environ.get("RUN_INTEGRATION"), + reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon", +) + + +# ── 工具函数 ── + +def _check_environment(): + """环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达""" + try: + resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5) + data = resp.json() + if data.get("status") != "running" or not data.get("ticker_running"): + pytest.skip(f"Daemon not ready: {data}") + return data + except Exception as e: + pytest.skip(f"Production API not available at {API_BASE}: {e}") + + +def _cleanup_project(pid: str): + """清理测试项目""" + try: + http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5) + except Exception: + pass + + +def _poll_task(pid, tid, timeout, terminal_states=None): + """轮询任务状态直到终态或超时""" + terminal = terminal_states or ("done", "failed", "cancelled") + deadline = time.time() + timeout + last_status = None + while time.time() < deadline: + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 + ) + if resp.status_code == 200: + data = resp.json() + last_status = data.get("status") + if last_status in terminal: + return data + except Exception: + pass + time.sleep(POLL_INTERVAL) + # 超时,返回最后状态 + try: + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10 + ) + return resp.json() if resp.status_code == 200 else {"status": "unknown"} + except Exception: + return {"status": "unknown"} + + +def _get_db_path(pid: str) -> Path: + """获取项目的 blackboard.db 路径""" + return DATA_ROOT / pid / "blackboard.db" + + +def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str): + """直接操作 DB 设置 claimed_at 时间戳(模拟超时)""" + db_path = _get_db_path(pid) + assert db_path.exists(), f"DB not found: {db_path}" + conn = sqlite3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET claimed_at=? WHERE id=?", + (claimed_at, tid), + ) + conn.commit() + finally: + conn.close() + + +# =================================================================== +# S9: 真实 Agent 调度 +# =================================================================== + +@skip_no_integration +class TestS9RealAgentDispatch: + """S9: 真实 Agent 调度测试 + + 通过生产 HTTP API 创建任务,依赖生产 Ticker 自动调度, + 真实 Agent spawn 执行,全程不手动推动状态。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S9") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200, f"Create project failed: {resp.text}" + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10 + ) + assert resp.status_code == 200, f"Create task failed: {resp.text}" + return tid + + def test_s91_simple_task_agent_execute(self): + """S9-1: 简单任务 → 生产Ticker调度 → 真实Agent执行 → 状态自动流转到终态""" + pid = self._create_project("S9-1") + tid = self._create_task( + pid, + title="E2E简单任务:echo hello", + description=( + "请执行以下操作:\n" + "1. 运行命令 echo hello\n" + "2. 把输出结果写入黑板\n" + "这是E2E自动化测试,完成后请标记done。\n" + "重要:不需要做其他任何事。" + ), + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 S9-1: 等待调度 + Agent执行 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", ( + f"Agent未被调度!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。" + ) + assert status != "blocked", f"普通任务被错误拦截: {result}" + + if status == "failed": + print(f" ⚠️ Agent执行失败,detail: {result}") + else: + print(f" ✅ Agent执行成功") + + print(f" 性能: pending → {status}") + + def test_s92_review_task_dispatch(self): + """S9-2: review任务 → 路由到can_review Agent → 真实执行""" + pid = self._create_project("S9-2") + tid = self._create_task( + pid, + title="E2E Review:审查测试", + description=( + "这是一个E2E测试的review任务。\n" + "请简单回复:\"审查通过,E2E测试正常。\"\n" + "然后标记done即可。不需要做其他事。" + ), + assignee="simayi-challenger", + task_type="review", + ) + + print(f"\n🚀 S9-2: 等待review调度 + Agent执行 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", ( + f"Review Agent未被调度!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。" + ) + assert status != "blocked", f"Review任务被错误拦截: {result}" + + if status == "failed": + print(f" ⚠️ Review Agent执行失败") + else: + print(f" ✅ Review Agent执行成功") + + def test_s93_guardrail_block(self): + """S9-3: 实盘交易任务 → Guardrails拦截 → status=blocked""" + pid = self._create_project("S9-3") + tid = self._create_task( + pid, + title="执行实盘买入SH600000", + description="用真金白银买入浦发银行1000股", + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 S9-3: 等待Guardrails拦截 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("done", "failed", "cancelled", "blocked", "claimed", "working"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status not in ("claimed", "working", "done"), ( + f"Guardrails未拦截实盘任务!状态: {status}" + ) + print(f" ✅ Guardrails拦截生效,状态: {status}") + + +# =================================================================== +# S10: 全链路集成 +# =================================================================== + +@skip_no_integration +class TestS10FullChain: + """S10: 项目 → 父子Task → 生产Ticker → 聚合 → 依赖 → Mail → 前端API""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S10") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev", "simayi-challenger"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_s10a_logic_chain(self): + """S10a: 父子Task + 依赖推进 + 状态聚合 + Mail(不依赖Agent完成)""" + pid = self._create_project("S10a") + + # 1. 创建父任务(带stages) + parent_id = f"{pid}-parent" + http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={ + "id": parent_id, + "title": "S10a全链路父任务", + "status": "pending", + "stages_json": json.dumps([ + {"id": "setup", "label": "Setup"}, + {"id": "execute", "label": "Execute"}, + {"id": "verify", "label": "Verify"}, + ]), + }, timeout=10) + + # 2. 创建3个子任务 + child_ids = [] + for i, stage in enumerate(["setup", "execute", "verify"]): + cid = f"{pid}-child-{i}" + child_ids.append(cid) + http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={ + "id": cid, + "title": f"子任务-{stage}", + "status": "pending", + "parent_task": parent_id, + "stage": stage, + }, timeout=10) + + # 3. 创建依赖任务(blocked) + dep_id = f"{pid}-dep" + http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={ + "id": dep_id, + "title": "依赖任务", + "depends_on": json.dumps([child_ids[0]]), + }, timeout=10) + + # 4. 推进依赖任务到 blocked + for status in ["claimed", "working", "blocked"]: + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{dep_id}/status", + json={"status": status, "agent": "test"}, timeout=10, + ) + + # 5. 推进 setup 子任务到 done(触发依赖推进条件) + for status in ["claimed", "working", "review", "done"]: + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{child_ids[0]}/status", + json={"status": status, "agent": "test"}, timeout=10, + ) + + # 6. 等待 Ticker 依赖推进 + print(f"\n🚀 S10a: 等待依赖推进 (pid={pid})") + dep_result = _poll_task( + pid, dep_id, timeout=MAX_WAIT_DISPATCH, + terminal_states=("pending", "claimed", "working", "done"), + ) + dep_status = dep_result.get("status") + print(f" 依赖任务状态: blocked → {dep_status}") + assert dep_status != "blocked", ( + f"依赖推进未生效!{dep_id} 在 {MAX_WAIT_DISPATCH}s 后仍为 blocked" + ) + + # 7. 发送 Mail + http_requests.post(f"{API_BASE}/api/mail", json={ + "title": f"S10a全链路通知-{pid}", + "text": f"项目 {pid} setup阶段完成", + "from": "simayi-challenger", + "to": "pangtong-fujunshi", + "type": "inform", + }, timeout=10) + + # 8. 验证 Mail + resp = http_requests.get( + f"{API_BASE}/api/mail?from_agent=simayi-challenger", timeout=10 + ) + mails = resp.json()["mails"] + assert any(m["title"].startswith("S10a全链路") for m in mails), \ + f"Mail未找到,当前mails: {[m['title'] for m in mails]}" + + # 9. 验证 Stage 进度 + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10 + ) + if resp.status_code == 200: + progress = resp.json() + stages = {s["id"]: s for s in progress.get("stages", [])} + assert stages["setup"]["done"] == 1, f"setup stage should be done" + assert stages["execute"]["done"] == 0 + + # 10. 验证父状态聚合 + parent_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10 + ) + parent_data = parent_resp.json() + print(f" 父任务状态: {parent_data.get('status')}") + assert parent_data.get("status") in ("pending", "working"), \ + f"父任务状态异常: {parent_data['status']}" + + print(f" ✅ S10a 全链路测试通过") + + def test_s10b_agent_full_chain(self): + """S10b: 真实Agent全链路 — 创建子任务 → Agent执行 → 父状态自动聚合""" + pid = self._create_project("S10b") + + # 1. 创建父任务 + parent_id = f"{pid}-parent" + http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={ + "id": parent_id, + "title": "S10b Agent全链路父任务", + "status": "pending", + "stages_json": json.dumps([ + {"id": "step1", "label": "Step 1"}, + {"id": "step2", "label": "Step 2"}, + ]), + }, timeout=10) + + # 2. 创建子任务(真实Agent执行) + child_id = f"{pid}-child-0" + http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={ + "id": child_id, + "title": "E2E子任务:echo test", + "description": ( + "请执行 echo test 并标记done。" + "这是E2E测试,不需要做其他事。" + ), + "status": "pending", + "parent_task": parent_id, + "stage": "step1", + "assignee": "zhangfei-dev", + "task_type": "coding", + }, timeout=10) + + # 3. 等待Agent执行完成 + print(f"\n🚀 S10b: 等待Agent执行子任务 (pid={pid}, tid={child_id})") + result = _poll_task( + pid, child_id, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + child_status = result.get("status") + print(f" 子任务最终状态: {child_status}") + + assert child_status != "pending", ( + f"子任务未被调度!{MAX_WAIT_AGENT}s后仍为pending" + ) + + # 4. 轮询父任务状态变化 + parent_result = _poll_task( + pid, parent_id, timeout=MAX_WAIT_DISPATCH, + terminal_states=("working", "review", "done", "failed"), + ) + parent_data = parent_result + print(f" 父任务状态: {parent_data.get('status')}") + + # 5. 验证 Stage 进度 + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10 + ) + if resp.status_code == 200: + progress = resp.json() + stages = {s["id"]: s for s in progress.get("stages", [])} + print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}") + + print(f" ✅ S10b 真实Agent全链路完成") + + +# =================================================================== +# S11: Acquire-First 真实 Agent E2E +# =================================================================== + +@skip_no_integration +class TestS11AcquireFirstE2E: + """S11: Acquire-First Phase 1-4 真实 Agent E2E + + 验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。 + """ + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S11") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_s111a_dispatch_with_routing_audit(self): + """S11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent""" + pid = self._create_project("S11a") + tid = self._create_task( + pid, + title="E2E Acquire-First:echo hello", + description=( + "请执行 echo hello-world 并标记done。\n" + "这是E2E测试 Acquire-First,不需要做其他事。" + ), + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 S11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", ( + f"任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending,调度未生效" + ) + + # 验证 routing_decisions 有记录 + db_path = DATA_ROOT / pid / "blackboard.db" + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + conn.row_factory = sq3.Row + try: + tables = [r[0] for r in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall()] + if "routing_decisions" not in tables: + print(f" ⚠️ routing_decisions 表不存在,跳过审计验证") + else: + rows = conn.execute( + "SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1", + (tid,), + ).fetchall() + if len(rows) > 0: + row = rows[0] + print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}") + assert row["selected_agent"] == "zhangfei-dev" + assert row["outcome"] == "dispatched" + else: + print(f" ⚠️ routing_decisions 无记录 for {tid}") + finally: + conn.close() + + print(f" ✅ Acquire-First 调度审计验证通过") + + def test_s111b_concurrent_dispatch_counter_block(self): + """S11b: 同 agent 连续两个任务 → 第二个应被 skip""" + pid = self._create_project("S11b") + + tid1 = self._create_task( + pid, + title="E2E Counter-Block 任务1:echo first", + description="请执行 echo first 并标记done。E2E测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + tid2 = self._create_task( + pid, + title="E2E Counter-Block 任务2:echo second", + description="请执行 echo second 并标记done。E2E测试,不需要做其他事。", + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 S11b: 等待两个任务调度 (pid={pid})") + + result1 = _poll_task( + pid, tid1, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status1 = result1.get("status") + print(f" 任务1状态: {status1}") + assert status1 != "pending", "任务1未被调度" + + result2 = _poll_task( + pid, tid2, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status2 = result2.get("status") + print(f" 任务2状态: {status2}") + + db_path = DATA_ROOT / pid / "blackboard.db" + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + conn.row_factory = sq3.Row + try: + rows = conn.execute( + "SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id", + (tid2,), + ).fetchall() + outcomes = [r["outcome"] for r in rows] + print(f" 任务2 routing outcomes: {outcomes}") + assert len(rows) > 0, "任务2无 routing 记录" + finally: + conn.close() + + print(f" ✅ Counter block 并发调度验证完成") + + +# =================================================================== +# S12: _check_timeouts 统一 + crash_limit E2E +# =================================================================== + +@skip_no_integration +class TestS12TimeoutsUnifiedE2E: + """S12: _check_timeouts 统一超时 + crash_limit + updated_at fallback""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S12") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_s121a_crash_limit_marks_failed(self): + """S12a: 3 次 crash → _check_timeouts 标 failed""" + pid = self._create_project("S12a") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Crash Limit 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 写 3 条 crash attempt + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + for i in range(3): + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now', ?))", + (tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 S12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status == "failed", ( + f"3次crash后任务应为failed,实际: {status}" + ) + print(f" ✅ crash_limit 统一检查验证通过") + + def test_s121b_updated_at_fallback_reclaim(self): + """S12b: working 任务无 started_at → updated_at fallback → 超时回收""" + pid = self._create_project("S12b") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 清空 started_at/claimed_at,设 updated_at 为 60 分钟前 + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET started_at=NULL, claimed_at=NULL, " + "updated_at=datetime('now','-60 minutes') WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 S12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "working", ( + f"超时任务应被回收,实际仍为 working" + ) + print(f" ✅ updated_at fallback 回收验证通过") + + +# =================================================================== +# S13: Compact Hanging 不标 failed E2E +# =================================================================== + +@skip_no_integration +class TestS13CompactHangingE2E: + """S13: compact_hanging outcome → 任务保持 working(不标 failed)""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S13") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_s131a_compact_hanging_keeps_working(self): + """S13a: compact_hanging attempt → ticker 不应立即标 failed""" + pid = self._create_project("S13a") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Compact Hanging 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 写入 compact_hanging attempt + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now'))", + (tid, 1, "zhangfei-dev", "compact_hanging"), + ) + conn.execute( + "UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?", + (tid,), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 S13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed") + time.sleep(45) + + resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert resp.status_code == 200 + status = resp.json().get("status") + print(f" 一个 tick 后状态: {status}") + + assert status == "working", ( + f"compact_hanging 后任务应保持 working,实际: {status}" + ) + print(f" ✅ compact_hanging 不标 failed 验证通过") + + +# =================================================================== +# S15: Crash Rollback E2E(原 E14 → S15) +# =================================================================== + +@skip_no_integration +class TestS15CrashRollback: + """S15: crash 后 current_agent 回退验证""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S15") -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": {"agents": ["zhangfei-dev"]}, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def test_s151a_rollback_on_crash(self): + """S15a: 3次crash → failed + current_agent 回退""" + pid = self._create_project("S15a") + + tid = f"e2e-task-{uuid.uuid4().hex[:8]}" + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", + json={"id": tid, "title": "Rollback 测试", "status": "pending", + "assignee": "zhangfei-dev"}, + timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + + # 设置 current_agent 并写 3 次 crash attempt + db_path = DATA_ROOT / pid / "blackboard.db" + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + conn.execute( + "UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?", + (tid,), + ) + for i in range(3): + conn.execute( + "INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) " + "VALUES (?, ?, ?, ?, datetime('now', ?))", + (tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"), + ) + conn.commit() + finally: + conn.close() + + print(f"\n🚀 S15a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("failed", "done", "cancelled"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status == "failed", f"应为 failed,实际: {status}" + + import sqlite3 as sq3 + conn2 = sq3.connect(str(db_path)) + conn2.row_factory = sq3.Row + try: + events = conn2.execute( + "SELECT detail FROM events WHERE task_id=? AND event_type='status_change' ORDER BY id DESC LIMIT 1", + (tid,), + ).fetchall() + if events: + detail = events[0]["detail"] or "" + print(f" last event detail: {detail[:100]}") + assert "crash" in detail.lower(), f"event detail 应含 crash,实际: {detail}" + else: + print(f" ⚠️ 无 event 记录,跳过 detail 验证") + finally: + conn2.close() + + print(f" ✅ crash_limit 标 failed 验证通过") + + +# =================================================================== +# S16: 广播认领(E94 + E15 合并) +# =================================================================== + +@skip_no_integration +class TestS16BroadcastClaim: + """S16: 广播认领 — 无 assignee 广播 + Prompt v3.0 三级响应""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S16", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, + "name": f"{name_prefix}-{pid}", + "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200, f"Create task failed: {resp.text}" + return tid + + def test_s161_broadcast_claim_no_assignee(self): + """S16-1: 创建不指定 assignee 的任务,等待广播认领并执行完成""" + pid = self._create_project("S16-1") + tid = self._create_task( + pid, + title="E2E广播认领任务:echo broadcast", + description=( + "这是一个E2E测试的广播认领任务。\n" + "请执行 echo broadcast 并标记done。\n" + "这是E2E自动化测试,不需要做其他事。" + ), + task_type="coding", + ) + + print(f"\n🚀 S16-1: 等待广播认领 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", ( + f"广播认领未生效!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。" + ) + assert status != "blocked", f"广播任务被错误拦截: {result}" + + assignee = result.get("assignee") + print(f" 认领Agent: {assignee}") + assert assignee, f"任务已离开pending但assignee为空: {result}" + + if status == "done": + print(f" ✅ 广播认领执行成功") + else: + print(f" ⚠️ 广播认领后状态: {status}") + + def test_s162_broadcast_observation_comment(self): + """S16-2: 广播任务 → Agent 写 observation comment(Prompt v3.0 三级响应)""" + pid = self._create_project("S16-2", agents=["simayi-challenger"]) + tid = self._create_task( + pid, + title="E2E Prompt v3.0:观察型任务", + description=( + "这是一个编码任务,但 assignee 是司马懿。\n" + "按照 Prompt v3.0 三级响应:\n" + "- 如果你认为应该由其他人执行,请写 observation comment\n" + "- 不需要实际执行编码\n" + "- 标记 done 即可\n" + "这是E2E测试,验证广播三级响应。" + ), + assignee="simayi-challenger", + task_type="coding", + ) + + print(f"\n🚀 S16-2: 等待广播认领+Agent响应 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", "任务未被调度" + + db_path = _get_db_path(pid) + if db_path.exists(): + import sqlite3 as sq3 + conn = sq3.connect(str(db_path)) + try: + comments = conn.execute( + "SELECT author, comment_type, body FROM comments " + "WHERE task_id=? ORDER BY id DESC LIMIT 5", + (tid,), + ).fetchall() + print(f" Comments ({len(comments)}):") + for c in comments: + print(f" [{c[0]}] {c[1]}: {c[2][:80]}...") + assert len(comments) > 0, ( + f"Agent 未写任何 comment,Prompt v3.0 三级响应可能未生效" + ) + finally: + conn.close() + + print(f" ✅ Prompt v3.0 广播响应验证完成") + + def test_s163_broadcast_claim_by_matching_agent(self): + """S16-3: 广播任务 → 匹配 Agent 执行 claim → done""" + pid = self._create_project("S16-3", agents=["zhangfei-dev"]) + tid = self._create_task( + pid, + title="E2E Prompt v3.0:认领型任务", + description=( + "请执行 echo claim-test 并标记done。\n" + "这是E2E测试,验证正确 assignee 的任务被认领执行。\n" + "不需要做其他事。" + ), + assignee="zhangfei-dev", + task_type="coding", + ) + + print(f"\n🚀 S16-3: 等待正确Agent认领 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 最终状态: {status}") + + assert status != "pending", "任务未被认领" + assert status != "blocked", "任务被错误拦截" + + print(f" ✅ 正确 assignee 认领执行验证通过") + + +# =================================================================== +# S17: 暂停→恢复 +# =================================================================== + +@skip_no_integration +class TestS17PauseResume: + """S17: 手动推状态到 working → paused → 恢复 → 验证 resumed_from""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S17", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, "name": f"{name_prefix}-{pid}", "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_s171_pause_resume_resumed_from(self): + """S17-1: working → paused → 恢复 working,验证 resumed_from 字段""" + pid = self._create_project("S17-1") + tid = self._create_task( + pid, + title="E2E暂停恢复测试", + description="测试暂停恢复功能", + assignee="zhangfei-dev", + ) + + # 手动推到 claimed → working + r1 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + assert r1.json().get("ok") + r2 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + assert r2.json().get("ok") + + # 暂停 + r3 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "paused", "agent": "test"}, timeout=10, + ) + assert r3.json().get("ok") + + # 验证 resumed_from == "working" + task_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task = task_resp.json() + resumed_from = task.get("resumed_from") + print(f"\n🚀 S17-1: 暂停后 resumed_from={resumed_from}") + assert resumed_from == "working", ( + f"resumed_from 应为 'working',实际: {resumed_from}" + ) + assert task.get("status") == "paused" + + # 恢复到 working + r4 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + assert r4.json().get("ok") + + task2_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task2 = task2_resp.json() + print(f" 恢复后 status={task2.get('status')}") + assert task2.get("status") == "working" + + print(f" ✅ 暂停恢复流程正确") + + def test_s172_review_pause_resume(self): + """S17-2: review → paused → 恢复 review""" + pid = self._create_project("S17-2") + tid = self._create_task( + pid, + title="E2E Review暂停恢复", + assignee="simayi-challenger", + ) + + for s in ["claimed", "working", "review"]: + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": s, "agent": "simayi-challenger"}, timeout=10, + ) + + # 暂停 + r = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "paused", "agent": "test"}, timeout=10, + ) + assert r.json().get("ok") + + task_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task = task_resp.json() + assert task.get("resumed_from") == "review" + + # 恢复到 review + r2 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "review", "agent": "simayi-challenger"}, timeout=10, + ) + assert r2.json().get("ok") + + task2_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task2 = task2_resp.json() + assert task2.get("status") == "review" + + print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)") + + +# =================================================================== +# S18: cancelled → 重新启动 +# =================================================================== + +@skip_no_integration +class TestS18CancelledRestart: + """S18: cancelled → pending(重新启动)→ Agent 执行 → done""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S18", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, "name": f"{name_prefix}-{pid}", "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_s181_cancelled_to_pending_restart(self): + """S18-1: cancelled → pending → 等待调度执行""" + pid = self._create_project("S18-1") + tid = self._create_task( + pid, + title="E2E取消重启任务:echo restart", + description=( + "请执行 echo restart 并标记done。" + "这是E2E测试,不需要做其他事。" + ), + assignee="zhangfei-dev", + ) + + # 手动推到 cancelled + r1 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "cancelled", "agent": "test"}, timeout=10, + ) + assert r1.json().get("ok") + task_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert task_resp.json().get("status") == "cancelled" + + # 重新启动 → pending + r2 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "pending", "agent": "test"}, timeout=10, + ) + assert r2.json().get("ok") + task2_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task2 = task2_resp.json() + assert task2.get("status") == "pending" + assert task2.get("assignee") is None or task2.get("assignee") == "" + + print(f"\n🚀 S18-1: 等待重新调度执行 (pid={pid}, tid={tid})") + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 重启后最终状态: {status}") + + assert status != "pending", ( + f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending" + ) + + if status == "done": + print(f" ✅ 取消重启流程正确") + else: + print(f" ⚠️ 重启后状态: {status}") + + +# =================================================================== +# S19: Retry Chain(E97 claimed timeout + E10c retry chain 合并) +# =================================================================== + +@skip_no_integration +class TestS19RetryChain: + """S19: claimed 超时回收 + failed → pending 重试链""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S19", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, "name": f"{name_prefix}-{pid}", "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_s191_claimed_timeout_to_pending(self): + """S19-1: claimed 任务超时 → ticker 重置为 pending → assignee 清空""" + pid = self._create_project("S19-1") + tid = self._create_task( + pid, + title="E2E超时测试任务", + description="测试claimed超时处理", + assignee="zhangfei-dev", + ) + + # 手动推到 claimed + r1 = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + assert r1.json().get("ok") + + # 直接操作 DB:把 claimed_at 设为 2 小时前 + two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat() + _patch_db_claimed_at(pid, tid, two_hours_ago) + print(f"\n🚀 S19-1: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})") + + result = _poll_task( + pid, tid, timeout=MAX_WAIT_DISPATCH, + terminal_states=("pending", "escalated"), + ) + status = result.get("status") + print(f" 超时后状态: {status}") + + assert status != "claimed", ( + f"超时处理未生效!任务 {tid} 在 {MAX_WAIT_DISPATCH}s 后仍为 claimed" + ) + + assignee = result.get("assignee") + print(f" assignee: {assignee}") + assert assignee is None or assignee == "", ( + f"超时重置后assignee应清空,实际: {assignee}" + ) + + print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)") + + def test_s192_failed_to_pending_retry(self): + """S19-2: 手动模拟失败 → 重试 → 等待调度完成""" + pid = self._create_project("S19-2") + tid = self._create_task( + pid, + title="E2E重试任务:echo retry", + description=( + "请执行 echo retry 并标记done。" + "这是E2E测试,不需要做其他事。" + ), + assignee="zhangfei-dev", + ) + + # 手动推到 failed(模拟 Agent 执行失败) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10, + ) + http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "working", "agent": "zhangfei-dev"}, timeout=10, + ) + r_fail = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "failed", "agent": "zhangfei-dev"}, timeout=10, + ) + assert r_fail.json().get("ok") + + task_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + assert task_resp.json().get("status") == "failed" + print(f"\n🚀 S19-2: 任务已标记failed,准备重试") + + # 手动重试 → pending + r_retry = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status", + json={"status": "pending", "agent": "test"}, timeout=10, + ) + assert r_retry.json().get("ok") + + task2_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10, + ) + task2 = task2_resp.json() + assert task2.get("status") == "pending" + assert task2.get("assignee") is None or task2.get("assignee") == "" + retry_count = task2.get("retry_count", 0) or 0 + print(f" retry_count: {retry_count}") + + result = _poll_task( + pid, tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + status = result.get("status") + print(f" 重试后最终状态: {status}") + + assert status != "pending", ( + f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending" + ) + + if status == "done": + print(f" ✅ 失败重试链正确") + else: + print(f" ⚠️ 重试后状态: {status}") + + +# =================================================================== +# S20: 完整生命周期(广播认领版) +# =================================================================== + +@skip_no_integration +class TestS20FullLifecycle: + """S20: 无 assignee → 广播认领 → claimed → working → review → done""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + self._projects = [] + yield + for pid in self._projects: + _cleanup_project(pid) + + def _create_project(self, name_prefix="S20", agents=None) -> str: + pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}" + config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]} + resp = http_requests.post(f"{API_BASE}/api/projects", json={ + "id": pid, "name": f"{name_prefix}-{pid}", "config": config, + }, timeout=10) + assert resp.status_code == 200 + self._projects.append(pid) + return pid + + def _create_task(self, pid, **kwargs) -> str: + tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}" + body = {"id": tid, "status": "pending", "priority": 5, **kwargs} + resp = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10, + ) + assert resp.status_code == 200 + return tid + + def test_s201_full_lifecycle_with_review(self): + """S20-1: 完整生命周期:创建 → 广播 → 认领 → 执行 → review → done""" + pid = self._create_project("S20-1") + + # 第一步:编码任务(广播认领) + code_tid = self._create_task( + pid, + title="E2E完整链路:编码任务", + description=( + "请执行 echo lifecycle 并标记done。" + "这是E2E完整生命周期测试,不需要做其他事。" + ), + task_type="coding", + ) + + print(f"\n🚀 S20-1: 等待编码任务广播认领 (pid={pid}, tid={code_tid})") + result = _poll_task( + pid, code_tid, timeout=MAX_WAIT_AGENT, + terminal_states=("done", "failed", "cancelled", "blocked"), + ) + code_status = result.get("status") + print(f" 编码任务最终状态: {code_status}") + + assert code_status != "pending", "编码任务未被认领" + + if code_status == "done": + events_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events", + timeout=10, + ) + if events_resp.status_code == 200: + events = events_resp.json() + event_types = [e.get("event_type") for e in events.get("events", [])] + print(f" Events: {event_types}") + assert any("claimed" in str(e) or "started" in str(e) + for e in event_types), ( + f"缺少状态变化事件: {event_types}" + ) + + # 第二步:review 任务(手动推) + review_tid = self._create_task( + pid, + title="E2E完整链路:review任务", + description="测试review状态", + assignee="simayi-challenger", + ) + + for s in ["claimed", "working", "review", "done"]: + r = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status", + json={"status": s, "agent": "simayi-challenger"}, timeout=10, + ) + assert r.json().get("ok") + + task_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10, + ) + assert task_resp.json().get("status") == "done" + print(f" Review任务手动生命周期: ✅") + + # 第三步:done → cancelled + r_cancel = http_requests.post( + f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status", + json={"status": "cancelled", "agent": "test"}, timeout=10, + ) + assert r_cancel.json().get("ok") + task3_resp = http_requests.get( + f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10, + ) + assert task3_resp.json().get("status") == "cancelled" + print(f" done→cancelled: ✅") + + print(f" ✅ S20-1 完整生命周期测试通过") + + +# =================================================================== +# S21: 缓存头验证 +# =================================================================== + +@skip_no_integration +class TestS21CacheHeaders: + """S21: 验证 CachedStaticFiles 缓存头""" + + @pytest.fixture(autouse=True) + def setup_env(self): + _check_environment() + + def test_s211_html_no_cache(self): + """S21-1: HTML 页面应为 no-cache""" + resp = http_requests.get(f"{API_BASE}/", timeout=10) + if resp.status_code != 200: + pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}") + + cache_control = resp.headers.get("cache-control", "") + print(f"\n🚀 S21-1: HTML Cache-Control: {cache_control}") + assert "no-cache" in cache_control or "no-store" in cache_control, ( + f"HTML 应为 no-cache/no-store,实际: {cache_control}" + ) + + def test_s212_js_immutable(self): + """S21-2: JS 文件应为 immutable + 长缓存""" + html_resp = http_requests.get(f"{API_BASE}/", timeout=10) + if html_resp.status_code != 200: + pytest.skip("Frontend not available") + + js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text) + if not js_matches: + pytest.skip("No JS files found in HTML") + + js_path = js_matches[0] + resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10) + cache_control = resp.headers.get("cache-control", "") + print(f" S21-2: JS ({js_path}) Cache-Control: {cache_control}") + assert "immutable" in cache_control, ( + f"JS 应含 immutable,实际: {cache_control}" + ) + assert "31536000" in cache_control, ( + f"JS max-age 应为 31536000,实际: {cache_control}" + ) + + def test_s213_css_immutable(self): + """S21-3: CSS 文件应为 immutable + 长缓存""" + html_resp = http_requests.get(f"{API_BASE}/", timeout=10) + if html_resp.status_code != 200: + pytest.skip("Frontend not available") + + css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text) + if not css_matches: + pytest.skip("No CSS files found in HTML") + + css_path = css_matches[0] + resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10) + cache_control = resp.headers.get("cache-control", "") + print(f" S21-3: CSS ({css_path}) Cache-Control: {cache_control}") + assert "immutable" in cache_control, ( + f"CSS 应含 immutable,实际: {cache_control}" + ) + + print(f" ✅ 缓存头验证通过")