1820 lines
68 KiB
Python
1820 lines
68 KiB
Python
"""S9-S21 E2E 场景测试
|
||
|
||
从 test_e2e_v27.py (E9-E14) 和 test_e2e_v31.py (E94-E98/E10c/E10d/E15) 合并。
|
||
通过生产 HTTP API 测试真实 daemon + Agent 场景。
|
||
|
||
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Dict
|
||
|
||
import pytest
|
||
import requests as http_requests
|
||
|
||
# 指向部署目录
|
||
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
|
||
sys.path.insert(0, str(DEPLOY_DIR))
|
||
|
||
from src.utils import get_data_root
|
||
|
||
# ── 常量 ──
|
||
|
||
API_BASE = os.environ.get("API_BASE", "http://localhost:8083")
|
||
POLL_INTERVAL = 5
|
||
MAX_WAIT_DISPATCH = 120
|
||
MAX_WAIT_AGENT = 300
|
||
E2E_PREFIX = "e2e-v30-"
|
||
DATA_ROOT = get_data_root()
|
||
|
||
pytestmark = pytest.mark.e2e
|
||
|
||
|
||
# ── E2E gate ──
|
||
|
||
skip_no_integration = pytest.mark.skipif(
|
||
not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
|
||
)
|
||
|
||
|
||
# ── 工具函数 ──
|
||
|
||
def _check_environment():
|
||
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
|
||
try:
|
||
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
|
||
data = resp.json()
|
||
if data.get("status") != "running" or not data.get("ticker_running"):
|
||
pytest.skip(f"Daemon not ready: {data}")
|
||
return data
|
||
except Exception as e:
|
||
pytest.skip(f"Production API not available at {API_BASE}: {e}")
|
||
|
||
|
||
def _cleanup_project(pid: str):
|
||
"""清理测试项目"""
|
||
try:
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _poll_task(pid, tid, timeout, terminal_states=None):
|
||
"""轮询任务状态直到终态或超时"""
|
||
terminal = terminal_states or ("done", "failed", "cancelled")
|
||
deadline = time.time() + timeout
|
||
last_status = None
|
||
while time.time() < deadline:
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
|
||
)
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
last_status = data.get("status")
|
||
if last_status in terminal:
|
||
return data
|
||
except Exception:
|
||
pass
|
||
time.sleep(POLL_INTERVAL)
|
||
# 超时,返回最后状态
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
|
||
)
|
||
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
|
||
except Exception:
|
||
return {"status": "unknown"}
|
||
|
||
|
||
def _get_db_path(pid: str) -> Path:
|
||
"""获取项目的 blackboard.db 路径"""
|
||
return DATA_ROOT / pid / "blackboard.db"
|
||
|
||
|
||
def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str):
|
||
"""直接操作 DB 设置 claimed_at 时间戳(模拟超时)"""
|
||
db_path = _get_db_path(pid)
|
||
assert db_path.exists(), f"DB not found: {db_path}"
|
||
conn = sqlite3.connect(str(db_path))
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET claimed_at=? WHERE id=?",
|
||
(claimed_at, tid),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ===================================================================
|
||
# S9: 真实 Agent 调度
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS9RealAgentDispatch:
|
||
"""S9: 真实 Agent 调度测试
|
||
|
||
通过生产 HTTP API 创建任务,依赖生产 Ticker 自动调度,
|
||
真实 Agent spawn 执行,全程不手动推动状态。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S9") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200, f"Create project failed: {resp.text}"
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
|
||
)
|
||
assert resp.status_code == 200, f"Create task failed: {resp.text}"
|
||
return tid
|
||
|
||
def test_s91_simple_task_agent_execute(self):
|
||
"""S9-1: 简单任务 → 生产Ticker调度 → 真实Agent执行 → 状态自动流转到终态"""
|
||
pid = self._create_project("S9-1")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E简单任务:echo hello",
|
||
description=(
|
||
"请执行以下操作:\n"
|
||
"1. 运行命令 echo hello\n"
|
||
"2. 把输出结果写入黑板\n"
|
||
"这是E2E自动化测试,完成后请标记done。\n"
|
||
"重要:不需要做其他任何事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S9-1: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"Agent未被调度!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。"
|
||
)
|
||
assert status != "blocked", f"普通任务被错误拦截: {result}"
|
||
|
||
if status == "failed":
|
||
print(f" ⚠️ Agent执行失败,detail: {result}")
|
||
else:
|
||
print(f" ✅ Agent执行成功")
|
||
|
||
print(f" 性能: pending → {status}")
|
||
|
||
def test_s92_review_task_dispatch(self):
|
||
"""S9-2: review任务 → 路由到can_review Agent → 真实执行"""
|
||
pid = self._create_project("S9-2")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E Review:审查测试",
|
||
description=(
|
||
"这是一个E2E测试的review任务。\n"
|
||
"请简单回复:\"审查通过,E2E测试正常。\"\n"
|
||
"然后标记done即可。不需要做其他事。"
|
||
),
|
||
assignee="simayi-challenger",
|
||
task_type="review",
|
||
)
|
||
|
||
print(f"\n🚀 S9-2: 等待review调度 + Agent执行 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"Review Agent未被调度!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。"
|
||
)
|
||
assert status != "blocked", f"Review任务被错误拦截: {result}"
|
||
|
||
if status == "failed":
|
||
print(f" ⚠️ Review Agent执行失败")
|
||
else:
|
||
print(f" ✅ Review Agent执行成功")
|
||
|
||
def test_s93_guardrail_block(self):
|
||
"""S9-3: 实盘交易任务 → Guardrails拦截 → status=blocked"""
|
||
pid = self._create_project("S9-3")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="执行实盘买入SH600000",
|
||
description="用真金白银买入浦发银行1000股",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S9-3: 等待Guardrails拦截 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("done", "failed", "cancelled", "blocked", "claimed", "working"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status not in ("claimed", "working", "done"), (
|
||
f"Guardrails未拦截实盘任务!状态: {status}"
|
||
)
|
||
print(f" ✅ Guardrails拦截生效,状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# S10: 全链路集成
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS10FullChain:
|
||
"""S10: 项目 → 父子Task → 生产Ticker → 聚合 → 依赖 → Mail → 前端API"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S10") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev", "simayi-challenger"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def test_s10a_logic_chain(self):
|
||
"""S10a: 父子Task + 依赖推进 + 状态聚合 + Mail(不依赖Agent完成)"""
|
||
pid = self._create_project("S10a")
|
||
|
||
# 1. 创建父任务(带stages)
|
||
parent_id = f"{pid}-parent"
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
|
||
"id": parent_id,
|
||
"title": "S10a全链路父任务",
|
||
"status": "pending",
|
||
"stages_json": json.dumps([
|
||
{"id": "setup", "label": "Setup"},
|
||
{"id": "execute", "label": "Execute"},
|
||
{"id": "verify", "label": "Verify"},
|
||
]),
|
||
}, timeout=10)
|
||
|
||
# 2. 创建3个子任务
|
||
child_ids = []
|
||
for i, stage in enumerate(["setup", "execute", "verify"]):
|
||
cid = f"{pid}-child-{i}"
|
||
child_ids.append(cid)
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
|
||
"id": cid,
|
||
"title": f"子任务-{stage}",
|
||
"status": "pending",
|
||
"parent_task": parent_id,
|
||
"stage": stage,
|
||
}, timeout=10)
|
||
|
||
# 3. 创建依赖任务(blocked)
|
||
dep_id = f"{pid}-dep"
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
|
||
"id": dep_id,
|
||
"title": "依赖任务",
|
||
"depends_on": json.dumps([child_ids[0]]),
|
||
}, timeout=10)
|
||
|
||
# 4. 推进依赖任务到 blocked
|
||
for status in ["claimed", "working", "blocked"]:
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{dep_id}/status",
|
||
json={"status": status, "agent": "test"}, timeout=10,
|
||
)
|
||
|
||
# 5. 推进 setup 子任务到 done(触发依赖推进条件)
|
||
for status in ["claimed", "working", "review", "done"]:
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{child_ids[0]}/status",
|
||
json={"status": status, "agent": "test"}, timeout=10,
|
||
)
|
||
|
||
# 6. 等待 Ticker 依赖推进
|
||
print(f"\n🚀 S10a: 等待依赖推进 (pid={pid})")
|
||
dep_result = _poll_task(
|
||
pid, dep_id, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("pending", "claimed", "working", "done"),
|
||
)
|
||
dep_status = dep_result.get("status")
|
||
print(f" 依赖任务状态: blocked → {dep_status}")
|
||
assert dep_status != "blocked", (
|
||
f"依赖推进未生效!{dep_id} 在 {MAX_WAIT_DISPATCH}s 后仍为 blocked"
|
||
)
|
||
|
||
# 7. 发送 Mail
|
||
http_requests.post(f"{API_BASE}/api/mail", json={
|
||
"title": f"S10a全链路通知-{pid}",
|
||
"text": f"项目 {pid} setup阶段完成",
|
||
"from": "simayi-challenger",
|
||
"to": "pangtong-fujunshi",
|
||
"type": "inform",
|
||
}, timeout=10)
|
||
|
||
# 8. 验证 Mail
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/mail?from_agent=simayi-challenger", timeout=10
|
||
)
|
||
mails = resp.json()["mails"]
|
||
assert any(m["title"].startswith("S10a全链路") for m in mails), \
|
||
f"Mail未找到,当前mails: {[m['title'] for m in mails]}"
|
||
|
||
# 9. 验证 Stage 进度
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
|
||
)
|
||
if resp.status_code == 200:
|
||
progress = resp.json()
|
||
stages = {s["id"]: s for s in progress.get("stages", [])}
|
||
assert stages["setup"]["done"] == 1, f"setup stage should be done"
|
||
assert stages["execute"]["done"] == 0
|
||
|
||
# 10. 验证父状态聚合
|
||
parent_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10
|
||
)
|
||
parent_data = parent_resp.json()
|
||
print(f" 父任务状态: {parent_data.get('status')}")
|
||
assert parent_data.get("status") in ("pending", "working"), \
|
||
f"父任务状态异常: {parent_data['status']}"
|
||
|
||
print(f" ✅ S10a 全链路测试通过")
|
||
|
||
def test_s10b_agent_full_chain(self):
|
||
"""S10b: 真实Agent全链路 — 创建子任务 → Agent执行 → 父状态自动聚合"""
|
||
pid = self._create_project("S10b")
|
||
|
||
# 1. 创建父任务
|
||
parent_id = f"{pid}-parent"
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
|
||
"id": parent_id,
|
||
"title": "S10b Agent全链路父任务",
|
||
"status": "pending",
|
||
"stages_json": json.dumps([
|
||
{"id": "step1", "label": "Step 1"},
|
||
{"id": "step2", "label": "Step 2"},
|
||
]),
|
||
}, timeout=10)
|
||
|
||
# 2. 创建子任务(真实Agent执行)
|
||
child_id = f"{pid}-child-0"
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
|
||
"id": child_id,
|
||
"title": "E2E子任务:echo test",
|
||
"description": (
|
||
"请执行 echo test 并标记done。"
|
||
"这是E2E测试,不需要做其他事。"
|
||
),
|
||
"status": "pending",
|
||
"parent_task": parent_id,
|
||
"stage": "step1",
|
||
"assignee": "zhangfei-dev",
|
||
"task_type": "coding",
|
||
}, timeout=10)
|
||
|
||
# 3. 等待Agent执行完成
|
||
print(f"\n🚀 S10b: 等待Agent执行子任务 (pid={pid}, tid={child_id})")
|
||
result = _poll_task(
|
||
pid, child_id, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
child_status = result.get("status")
|
||
print(f" 子任务最终状态: {child_status}")
|
||
|
||
assert child_status != "pending", (
|
||
f"子任务未被调度!{MAX_WAIT_AGENT}s后仍为pending"
|
||
)
|
||
|
||
# 4. 轮询父任务状态变化
|
||
parent_result = _poll_task(
|
||
pid, parent_id, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "review", "done", "failed"),
|
||
)
|
||
parent_data = parent_result
|
||
print(f" 父任务状态: {parent_data.get('status')}")
|
||
|
||
# 5. 验证 Stage 进度
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
|
||
)
|
||
if resp.status_code == 200:
|
||
progress = resp.json()
|
||
stages = {s["id"]: s for s in progress.get("stages", [])}
|
||
print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}")
|
||
|
||
print(f" ✅ S10b 真实Agent全链路完成")
|
||
|
||
|
||
# ===================================================================
|
||
# S11: Acquire-First 真实 Agent E2E
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS11AcquireFirstE2E:
|
||
"""S11: Acquire-First Phase 1-4 真实 Agent E2E
|
||
|
||
验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S11") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s111a_dispatch_with_routing_audit(self):
|
||
"""S11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent"""
|
||
pid = self._create_project("S11a")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E Acquire-First:echo hello",
|
||
description=(
|
||
"请执行 echo hello-world 并标记done。\n"
|
||
"这是E2E测试 Acquire-First,不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending,调度未生效"
|
||
)
|
||
|
||
# 验证 routing_decisions 有记录
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
if db_path.exists():
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
conn.row_factory = sq3.Row
|
||
try:
|
||
tables = [r[0] for r in conn.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||
).fetchall()]
|
||
if "routing_decisions" not in tables:
|
||
print(f" ⚠️ routing_decisions 表不存在,跳过审计验证")
|
||
else:
|
||
rows = conn.execute(
|
||
"SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1",
|
||
(tid,),
|
||
).fetchall()
|
||
if len(rows) > 0:
|
||
row = rows[0]
|
||
print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}")
|
||
assert row["selected_agent"] == "zhangfei-dev"
|
||
assert row["outcome"] == "dispatched"
|
||
else:
|
||
print(f" ⚠️ routing_decisions 无记录 for {tid}")
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f" ✅ Acquire-First 调度审计验证通过")
|
||
|
||
def test_s111b_concurrent_dispatch_counter_block(self):
|
||
"""S11b: 同 agent 连续两个任务 → 第二个应被 skip"""
|
||
pid = self._create_project("S11b")
|
||
|
||
tid1 = self._create_task(
|
||
pid,
|
||
title="E2E Counter-Block 任务1:echo first",
|
||
description="请执行 echo first 并标记done。E2E测试,不需要做其他事。",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
tid2 = self._create_task(
|
||
pid,
|
||
title="E2E Counter-Block 任务2:echo second",
|
||
description="请执行 echo second 并标记done。E2E测试,不需要做其他事。",
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S11b: 等待两个任务调度 (pid={pid})")
|
||
|
||
result1 = _poll_task(
|
||
pid, tid1, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status1 = result1.get("status")
|
||
print(f" 任务1状态: {status1}")
|
||
assert status1 != "pending", "任务1未被调度"
|
||
|
||
result2 = _poll_task(
|
||
pid, tid2, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status2 = result2.get("status")
|
||
print(f" 任务2状态: {status2}")
|
||
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
if db_path.exists():
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
conn.row_factory = sq3.Row
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id",
|
||
(tid2,),
|
||
).fetchall()
|
||
outcomes = [r["outcome"] for r in rows]
|
||
print(f" 任务2 routing outcomes: {outcomes}")
|
||
assert len(rows) > 0, "任务2无 routing 记录"
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f" ✅ Counter block 并发调度验证完成")
|
||
|
||
|
||
# ===================================================================
|
||
# S12: _check_timeouts 统一 + crash_limit E2E
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS12TimeoutsUnifiedE2E:
|
||
"""S12: _check_timeouts 统一超时 + crash_limit + updated_at fallback"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S12") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def test_s121a_crash_limit_marks_failed(self):
|
||
"""S12a: 3 次 crash → _check_timeouts 标 failed"""
|
||
pid = self._create_project("S12a")
|
||
|
||
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks",
|
||
json={"id": tid, "title": "Crash Limit 测试", "status": "pending",
|
||
"assignee": "zhangfei-dev"},
|
||
timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
|
||
# 写 3 条 crash attempt
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
for i in range(3):
|
||
conn.execute(
|
||
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
|
||
"VALUES (?, ?, ?, ?, datetime('now', ?))",
|
||
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f"\n🚀 S12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("failed", "done", "cancelled"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status == "failed", (
|
||
f"3次crash后任务应为failed,实际: {status}"
|
||
)
|
||
print(f" ✅ crash_limit 统一检查验证通过")
|
||
|
||
def test_s121b_updated_at_fallback_reclaim(self):
|
||
"""S12b: working 任务无 started_at → updated_at fallback → 超时回收"""
|
||
pid = self._create_project("S12b")
|
||
|
||
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks",
|
||
json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending",
|
||
"assignee": "zhangfei-dev"},
|
||
timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
|
||
# 清空 started_at/claimed_at,设 updated_at 为 60 分钟前
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET started_at=NULL, claimed_at=NULL, "
|
||
"updated_at=datetime('now','-60 minutes') WHERE id=?",
|
||
(tid,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f"\n🚀 S12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("failed", "done", "cancelled"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "working", (
|
||
f"超时任务应被回收,实际仍为 working"
|
||
)
|
||
print(f" ✅ updated_at fallback 回收验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# S13: Compact Hanging 不标 failed E2E
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS13CompactHangingE2E:
|
||
"""S13: compact_hanging outcome → 任务保持 working(不标 failed)"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S13") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def test_s131a_compact_hanging_keeps_working(self):
|
||
"""S13a: compact_hanging attempt → ticker 不应立即标 failed"""
|
||
pid = self._create_project("S13a")
|
||
|
||
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks",
|
||
json={"id": tid, "title": "Compact Hanging 测试", "status": "pending",
|
||
"assignee": "zhangfei-dev"},
|
||
timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
|
||
# 写入 compact_hanging attempt
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
conn.execute(
|
||
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
|
||
"VALUES (?, ?, ?, ?, datetime('now'))",
|
||
(tid, 1, "zhangfei-dev", "compact_hanging"),
|
||
)
|
||
conn.execute(
|
||
"UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?",
|
||
(tid,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f"\n🚀 S13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed")
|
||
time.sleep(45)
|
||
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
status = resp.json().get("status")
|
||
print(f" 一个 tick 后状态: {status}")
|
||
|
||
assert status == "working", (
|
||
f"compact_hanging 后任务应保持 working,实际: {status}"
|
||
)
|
||
print(f" ✅ compact_hanging 不标 failed 验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# S14: AgentBusyError 分类 E2E
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS14AgentBusyError:
|
||
"""S14: AgentBusyError reason/detail 分类 E2E
|
||
|
||
验证真实 daemon 调度路径中 AgentBusyError 的三种分类:
|
||
- S14.1 counter_blocked:同 agent 并发 acquire → 第二个 blocked
|
||
- S14.2 session_locked:agent session 被占用 → session_locked blocker
|
||
- S14.3 Dispatcher 错误区分:AgentBusyError 写入 routing_decisions
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S14", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs):
|
||
tid = _tid()
|
||
body = {
|
||
"id": tid,
|
||
"title": kwargs.pop("title", f"S14-task-{tid[:8]}"),
|
||
"status": "pending",
|
||
"priority": 5,
|
||
**kwargs,
|
||
}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s141_counter_blocked(self):
|
||
"""S14.1: 同 agent 并发 acquire → 第二个任务 AgentBusyError(reason=counter_blocked)"""
|
||
pid = self._create_project(agents=["zhangfei-dev"])
|
||
|
||
# 创建第一个任务并等待被 dispatch
|
||
tid1 = self._create_task(pid, assignee="zhangfei-dev")
|
||
print(f" 任务1 {tid1} 已创建,等待 dispatch...")
|
||
|
||
# 等待第一个任务被 dispatch(进入 working)
|
||
task1 = _poll_task(pid, tid1, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "done", "failed"))
|
||
status1 = task1.get("status", "")
|
||
print(f" 任务1 状态: {status1}")
|
||
assert status1 in ("working", "done"), (
|
||
f"任务1 应被 dispatch,实际: {status1}"
|
||
)
|
||
|
||
# 第二个任务指定同一 agent,等 counter acquire 冲突
|
||
tid2 = self._create_task(pid, assignee="zhangfei-dev")
|
||
print(f" 任务2 {tid2} 已创建,等待 counter blocked...")
|
||
|
||
# 等待足够 tick 让 dispatcher 尝试 acquire
|
||
task2 = _poll_task(pid, tid2, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "done", "failed"))
|
||
status2 = task2.get("status", "")
|
||
print(f" 任务2 状态: {status2}")
|
||
|
||
# 任务2 可能被 blocked(保持 pending/claimed)或排队后执行
|
||
# 关键验证:检查 routing_decisions 是否记录了 counter_blocked
|
||
db_path = _get_db_path(pid)
|
||
if db_path.exists():
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(db_path))
|
||
try:
|
||
# 查 routing_decisions 表看是否有 counter_blocked 记录
|
||
rows = conn.execute(
|
||
"SELECT detail FROM routing_decisions WHERE task_id = ?",
|
||
(tid2,)
|
||
).fetchall()
|
||
reasons = []
|
||
for row in rows:
|
||
try:
|
||
detail = json.loads(row[0]) if row[0] else {}
|
||
reason = detail.get("reason", "")
|
||
if reason:
|
||
reasons.append(reason)
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
print(f" routing_decisions 中任务2 的 reasons: {reasons}")
|
||
# 只要看到 counter_blocked 就算通过
|
||
if "counter_blocked" in reasons:
|
||
print(" ✅ counter_blocked 分类验证通过")
|
||
else:
|
||
print(f" ⚠️ 未检测到 counter_blocked,可能任务2 已排队执行(reasons: {reasons})")
|
||
finally:
|
||
conn.close()
|
||
else:
|
||
print(" ⚠️ DB 不存在,跳过 routing_decisions 检查")
|
||
|
||
def test_s142_session_blocker(self):
|
||
"""S14.2: session 被占用 → AgentBusyError 携带具体 reason + detail.blockers
|
||
|
||
通过让一个 agent 的 main session 被 occupy,验证第二个 spawn 检测到 session 状态。
|
||
由于 E2E 层无法精确控制 session state,本测试验证的是:
|
||
- dispatch 失败时 routing_decisions 记录了具体 reason
|
||
- reason 属于已知的 session blocker 类型
|
||
"""
|
||
pid = self._create_project(agents=["zhangfei-dev"])
|
||
|
||
# 创建任务等待 dispatch
|
||
tid = self._create_task(pid, assignee="zhangfei-dev")
|
||
print(f" 任务 {tid} 已创建,等待 dispatch...")
|
||
|
||
task = _poll_task(pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "done", "failed"))
|
||
print(f" 任务状态: {task.get('status', '')}")
|
||
|
||
# 检查 routing_decisions 表中是否有 session 相关的 blocker 记录
|
||
db_path = _get_db_path(pid)
|
||
if db_path.exists():
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(db_path))
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT detail FROM routing_decisions WHERE task_id = ?",
|
||
(tid,)
|
||
).fetchall()
|
||
blocker_types = set()
|
||
for row in rows:
|
||
try:
|
||
detail = json.loads(row[0]) if row[0] else {}
|
||
reason = detail.get("reason", "")
|
||
blockers = detail.get("blockers", [])
|
||
if reason:
|
||
blocker_types.add(reason)
|
||
for b in blockers:
|
||
if isinstance(b, (list, tuple)) and len(b) > 0:
|
||
blocker_types.add(b[0])
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
# 已知的 session blocker 类型
|
||
known_blockers = {
|
||
"session_locked", "session_running", "session_compacting",
|
||
"counter_blocked",
|
||
}
|
||
matched = blocker_types & known_blockers
|
||
print(f" 检测到的 blocker 类型: {blocker_types}")
|
||
print(f" 匹配已知类型: {matched}")
|
||
# 只要 dispatch 过程记录了任何已知 blocker 类型就通过
|
||
print(" ✅ session blocker 分类结构验证通过")
|
||
finally:
|
||
conn.close()
|
||
else:
|
||
print(" ⚠️ DB 不存在,跳过 blocker 检查")
|
||
|
||
def test_s143_dispatcher_error_classification(self):
|
||
"""S14.3: Dispatcher 捕获 AgentBusyError → 路由决策写入 routing_decisions
|
||
|
||
验证:
|
||
- routing_decisions 表存在且有记录
|
||
- 记录的 detail 包含 reason 字段
|
||
- reason 是已知的 AgentBusyError 分类之一
|
||
"""
|
||
pid = self._create_project(agents=["zhangfei-dev"])
|
||
|
||
# 创建两个任务触发 dispatch + potential busy
|
||
tid1 = self._create_task(pid, assignee="zhangfei-dev")
|
||
tid2 = self._create_task(pid, assignee="zhangfei-dev")
|
||
|
||
# 等待 dispatch
|
||
_poll_task(pid, tid1, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "done", "failed"))
|
||
_poll_task(pid, tid2, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("working", "done", "failed", "pending", "claimed"))
|
||
|
||
# 验证 routing_decisions 表
|
||
db_path = _get_db_path(pid)
|
||
assert db_path.exists(), f"DB 不存在: {db_path}"
|
||
|
||
import sqlite3
|
||
conn = sqlite3.connect(str(db_path))
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT task_id, detail FROM routing_decisions"
|
||
).fetchall()
|
||
print(f" routing_decisions 总记录数: {len(rows)}")
|
||
|
||
reasons_found = set()
|
||
for row in rows:
|
||
task_id, detail_str = row
|
||
try:
|
||
detail = json.loads(detail_str) if detail_str else {}
|
||
reason = detail.get("reason", "")
|
||
if reason:
|
||
reasons_found.add(reason)
|
||
print(f" 任务 {task_id}: reason={reason}")
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
|
||
# 至少应有 dispatch 记录(即使没有 busy,dispatch 本身也写 routing_decisions)
|
||
assert len(rows) > 0, (
|
||
"routing_decisions 应有至少一条记录"
|
||
)
|
||
|
||
# 验证 reason 格式正确(如果有 busy 的话)
|
||
known_reasons = {
|
||
"counter_blocked", "session_locked", "session_running",
|
||
"session_compacting", "dispatched", "spawned",
|
||
}
|
||
for r in reasons_found:
|
||
assert r in known_reasons or r == "", (
|
||
f"未知 reason: {r},不在已知 AgentBusyError 分类中"
|
||
)
|
||
|
||
print(f" ✅ Dispatcher 错误分类验证通过 (reasons: {reasons_found})")
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ===================================================================
|
||
# S15: Crash Rollback E2E(原 E14 → S15)
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS15CrashRollback:
|
||
"""S15: crash 后 current_agent 回退验证"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S15") -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": {"agents": ["zhangfei-dev"]},
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def test_s151a_rollback_on_crash(self):
|
||
"""S15a: 3次crash → failed + current_agent 回退"""
|
||
pid = self._create_project("S15a")
|
||
|
||
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks",
|
||
json={"id": tid, "title": "Rollback 测试", "status": "pending",
|
||
"assignee": "zhangfei-dev"},
|
||
timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
|
||
# 设置 current_agent 并写 3 次 crash attempt
|
||
db_path = DATA_ROOT / pid / "blackboard.db"
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?",
|
||
(tid,),
|
||
)
|
||
for i in range(3):
|
||
conn.execute(
|
||
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
|
||
"VALUES (?, ?, ?, ?, datetime('now', ?))",
|
||
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f"\n🚀 S15a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("failed", "done", "cancelled"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status == "failed", f"应为 failed,实际: {status}"
|
||
|
||
import sqlite3 as sq3
|
||
conn2 = sq3.connect(str(db_path))
|
||
conn2.row_factory = sq3.Row
|
||
try:
|
||
events = conn2.execute(
|
||
"SELECT detail FROM events WHERE task_id=? AND event_type='status_change' ORDER BY id DESC LIMIT 1",
|
||
(tid,),
|
||
).fetchall()
|
||
if events:
|
||
detail = events[0]["detail"] or ""
|
||
print(f" last event detail: {detail[:100]}")
|
||
assert "crash" in detail.lower(), f"event detail 应含 crash,实际: {detail}"
|
||
else:
|
||
print(f" ⚠️ 无 event 记录,跳过 detail 验证")
|
||
finally:
|
||
conn2.close()
|
||
|
||
print(f" ✅ crash_limit 标 failed 验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# S16: 广播认领(E94 + E15 合并)
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS16BroadcastClaim:
|
||
"""S16: 广播认领 — 无 assignee 广播 + Prompt v3.0 三级响应"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S16", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200, f"Create task failed: {resp.text}"
|
||
return tid
|
||
|
||
def test_s161_broadcast_claim_no_assignee(self):
|
||
"""S16-1: 创建不指定 assignee 的任务,等待广播认领并执行完成"""
|
||
pid = self._create_project("S16-1")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E广播认领任务:echo broadcast",
|
||
description=(
|
||
"这是一个E2E测试的广播认领任务。\n"
|
||
"请执行 echo broadcast 并标记done。\n"
|
||
"这是E2E自动化测试,不需要做其他事。"
|
||
),
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S16-1: 等待广播认领 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"广播认领未生效!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。"
|
||
)
|
||
assert status != "blocked", f"广播任务被错误拦截: {result}"
|
||
|
||
assignee = result.get("assignee")
|
||
print(f" 认领Agent: {assignee}")
|
||
assert assignee, f"任务已离开pending但assignee为空: {result}"
|
||
|
||
if status == "done":
|
||
print(f" ✅ 广播认领执行成功")
|
||
else:
|
||
print(f" ⚠️ 广播认领后状态: {status}")
|
||
|
||
def test_s162_broadcast_observation_comment(self):
|
||
"""S16-2: 广播任务 → Agent 写 observation comment(Prompt v3.0 三级响应)"""
|
||
pid = self._create_project("S16-2", agents=["simayi-challenger"])
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E Prompt v3.0:观察型任务",
|
||
description=(
|
||
"这是一个编码任务,但 assignee 是司马懿。\n"
|
||
"按照 Prompt v3.0 三级响应:\n"
|
||
"- 如果你认为应该由其他人执行,请写 observation comment\n"
|
||
"- 不需要实际执行编码\n"
|
||
"- 标记 done 即可\n"
|
||
"这是E2E测试,验证广播三级响应。"
|
||
),
|
||
assignee="simayi-challenger",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S16-2: 等待广播认领+Agent响应 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", "任务未被调度"
|
||
|
||
db_path = _get_db_path(pid)
|
||
if db_path.exists():
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
comments = conn.execute(
|
||
"SELECT author, comment_type, body FROM comments "
|
||
"WHERE task_id=? ORDER BY id DESC LIMIT 5",
|
||
(tid,),
|
||
).fetchall()
|
||
print(f" Comments ({len(comments)}):")
|
||
for c in comments:
|
||
print(f" [{c[0]}] {c[1]}: {c[2][:80]}...")
|
||
assert len(comments) > 0, (
|
||
f"Agent 未写任何 comment,Prompt v3.0 三级响应可能未生效"
|
||
)
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f" ✅ Prompt v3.0 广播响应验证完成")
|
||
|
||
def test_s163_broadcast_claim_by_matching_agent(self):
|
||
"""S16-3: 广播任务 → 匹配 Agent 执行 claim → done"""
|
||
pid = self._create_project("S16-3", agents=["zhangfei-dev"])
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E Prompt v3.0:认领型任务",
|
||
description=(
|
||
"请执行 echo claim-test 并标记done。\n"
|
||
"这是E2E测试,验证正确 assignee 的任务被认领执行。\n"
|
||
"不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S16-3: 等待正确Agent认领 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", "任务未被认领"
|
||
assert status != "blocked", "任务被错误拦截"
|
||
|
||
print(f" ✅ 正确 assignee 认领执行验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# S17: 暂停→恢复
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS17PauseResume:
|
||
"""S17: 手动推状态到 working → paused → 恢复 → 验证 resumed_from"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S17", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s171_pause_resume_resumed_from(self):
|
||
"""S17-1: working → paused → 恢复 working,验证 resumed_from 字段"""
|
||
pid = self._create_project("S17-1")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E暂停恢复测试",
|
||
description="测试暂停恢复功能",
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 claimed → working
|
||
r1 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
assert r1.json().get("ok")
|
||
r2 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
assert r2.json().get("ok")
|
||
|
||
# 暂停
|
||
r3 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "paused", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r3.json().get("ok")
|
||
|
||
# 验证 resumed_from == "working"
|
||
task_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task = task_resp.json()
|
||
resumed_from = task.get("resumed_from")
|
||
print(f"\n🚀 S17-1: 暂停后 resumed_from={resumed_from}")
|
||
assert resumed_from == "working", (
|
||
f"resumed_from 应为 'working',实际: {resumed_from}"
|
||
)
|
||
assert task.get("status") == "paused"
|
||
|
||
# 恢复到 working
|
||
r4 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
assert r4.json().get("ok")
|
||
|
||
task2_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task2 = task2_resp.json()
|
||
print(f" 恢复后 status={task2.get('status')}")
|
||
assert task2.get("status") == "working"
|
||
|
||
print(f" ✅ 暂停恢复流程正确")
|
||
|
||
def test_s172_review_pause_resume(self):
|
||
"""S17-2: review → paused → 恢复 review"""
|
||
pid = self._create_project("S17-2")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E Review暂停恢复",
|
||
assignee="simayi-challenger",
|
||
)
|
||
|
||
for s in ["claimed", "working", "review"]:
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": s, "agent": "simayi-challenger"}, timeout=10,
|
||
)
|
||
|
||
# 暂停
|
||
r = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "paused", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r.json().get("ok")
|
||
|
||
task_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task = task_resp.json()
|
||
assert task.get("resumed_from") == "review"
|
||
|
||
# 恢复到 review
|
||
r2 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "review", "agent": "simayi-challenger"}, timeout=10,
|
||
)
|
||
assert r2.json().get("ok")
|
||
|
||
task2_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task2 = task2_resp.json()
|
||
assert task2.get("status") == "review"
|
||
|
||
print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)")
|
||
|
||
|
||
# ===================================================================
|
||
# S18: cancelled → 重新启动
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS18CancelledRestart:
|
||
"""S18: cancelled → pending(重新启动)→ Agent 执行 → done"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S18", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s181_cancelled_to_pending_restart(self):
|
||
"""S18-1: cancelled → pending → 等待调度执行"""
|
||
pid = self._create_project("S18-1")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E取消重启任务:echo restart",
|
||
description=(
|
||
"请执行 echo restart 并标记done。"
|
||
"这是E2E测试,不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 cancelled
|
||
r1 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "cancelled", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r1.json().get("ok")
|
||
task_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
assert task_resp.json().get("status") == "cancelled"
|
||
|
||
# 重新启动 → pending
|
||
r2 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "pending", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r2.json().get("ok")
|
||
task2_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task2 = task2_resp.json()
|
||
assert task2.get("status") == "pending"
|
||
assert task2.get("assignee") is None or task2.get("assignee") == ""
|
||
|
||
print(f"\n🚀 S18-1: 等待重新调度执行 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 重启后最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
|
||
)
|
||
|
||
if status == "done":
|
||
print(f" ✅ 取消重启流程正确")
|
||
else:
|
||
print(f" ⚠️ 重启后状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# S19: Retry Chain(E97 claimed timeout + E10c retry chain 合并)
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS19RetryChain:
|
||
"""S19: claimed 超时回收 + failed → pending 重试链"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S19", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s191_claimed_timeout_to_pending(self):
|
||
"""S19-1: claimed 任务超时 → ticker 重置为 pending → assignee 清空"""
|
||
pid = self._create_project("S19-1")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E超时测试任务",
|
||
description="测试claimed超时处理",
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 claimed
|
||
r1 = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
assert r1.json().get("ok")
|
||
|
||
# 直接操作 DB:把 claimed_at 设为 2 小时前
|
||
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
|
||
_patch_db_claimed_at(pid, tid, two_hours_ago)
|
||
print(f"\n🚀 S19-1: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})")
|
||
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("pending", "escalated"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 超时后状态: {status}")
|
||
|
||
assert status != "claimed", (
|
||
f"超时处理未生效!任务 {tid} 在 {MAX_WAIT_DISPATCH}s 后仍为 claimed"
|
||
)
|
||
|
||
assignee = result.get("assignee")
|
||
print(f" assignee: {assignee}")
|
||
assert assignee is None or assignee == "", (
|
||
f"超时重置后assignee应清空,实际: {assignee}"
|
||
)
|
||
|
||
print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)")
|
||
|
||
def test_s192_failed_to_pending_retry(self):
|
||
"""S19-2: 手动模拟失败 → 重试 → 等待调度完成"""
|
||
pid = self._create_project("S19-2")
|
||
tid = self._create_task(
|
||
pid,
|
||
title="E2E重试任务:echo retry",
|
||
description=(
|
||
"请执行 echo retry 并标记done。"
|
||
"这是E2E测试,不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 failed(模拟 Agent 执行失败)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
r_fail = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "failed", "agent": "zhangfei-dev"}, timeout=10,
|
||
)
|
||
assert r_fail.json().get("ok")
|
||
|
||
task_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
assert task_resp.json().get("status") == "failed"
|
||
print(f"\n🚀 S19-2: 任务已标记failed,准备重试")
|
||
|
||
# 手动重试 → pending
|
||
r_retry = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": "pending", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r_retry.json().get("ok")
|
||
|
||
task2_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
task2 = task2_resp.json()
|
||
assert task2.get("status") == "pending"
|
||
assert task2.get("assignee") is None or task2.get("assignee") == ""
|
||
retry_count = task2.get("retry_count", 0) or 0
|
||
print(f" retry_count: {retry_count}")
|
||
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 重试后最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
|
||
)
|
||
|
||
if status == "done":
|
||
print(f" ✅ 失败重试链正确")
|
||
else:
|
||
print(f" ⚠️ 重试后状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# S20: 完整生命周期(广播认领版)
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS20FullLifecycle:
|
||
"""S20: 无 assignee → 广播认领 → claimed → working → review → done"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def _create_project(self, name_prefix="S20", agents=None) -> str:
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200
|
||
self._projects.append(pid)
|
||
return pid
|
||
|
||
def _create_task(self, pid, **kwargs) -> str:
|
||
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200
|
||
return tid
|
||
|
||
def test_s201_full_lifecycle_with_review(self):
|
||
"""S20-1: 完整生命周期:创建 → 广播 → 认领 → 执行 → review → done"""
|
||
pid = self._create_project("S20-1")
|
||
|
||
# 第一步:编码任务(广播认领)
|
||
code_tid = self._create_task(
|
||
pid,
|
||
title="E2E完整链路:编码任务",
|
||
description=(
|
||
"请执行 echo lifecycle 并标记done。"
|
||
"这是E2E完整生命周期测试,不需要做其他事。"
|
||
),
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 S20-1: 等待编码任务广播认领 (pid={pid}, tid={code_tid})")
|
||
result = _poll_task(
|
||
pid, code_tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
code_status = result.get("status")
|
||
print(f" 编码任务最终状态: {code_status}")
|
||
|
||
assert code_status != "pending", "编码任务未被认领"
|
||
|
||
if code_status == "done":
|
||
events_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events",
|
||
timeout=10,
|
||
)
|
||
if events_resp.status_code == 200:
|
||
events = events_resp.json()
|
||
event_types = [e.get("event_type") for e in events.get("events", [])]
|
||
print(f" Events: {event_types}")
|
||
assert any("claimed" in str(e) or "started" in str(e)
|
||
for e in event_types), (
|
||
f"缺少状态变化事件: {event_types}"
|
||
)
|
||
|
||
# 第二步:review 任务(手动推)
|
||
review_tid = self._create_task(
|
||
pid,
|
||
title="E2E完整链路:review任务",
|
||
description="测试review状态",
|
||
assignee="simayi-challenger",
|
||
)
|
||
|
||
for s in ["claimed", "working", "review", "done"]:
|
||
r = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status",
|
||
json={"status": s, "agent": "simayi-challenger"}, timeout=10,
|
||
)
|
||
assert r.json().get("ok")
|
||
|
||
task_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10,
|
||
)
|
||
assert task_resp.json().get("status") == "done"
|
||
print(f" Review任务手动生命周期: ✅")
|
||
|
||
# 第三步:done → cancelled
|
||
r_cancel = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status",
|
||
json={"status": "cancelled", "agent": "test"}, timeout=10,
|
||
)
|
||
assert r_cancel.json().get("ok")
|
||
task3_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10,
|
||
)
|
||
assert task3_resp.json().get("status") == "cancelled"
|
||
print(f" done→cancelled: ✅")
|
||
|
||
print(f" ✅ S20-1 完整生命周期测试通过")
|
||
|
||
|
||
# ===================================================================
|
||
# S21: 缓存头验证
|
||
# ===================================================================
|
||
|
||
@skip_no_integration
|
||
class TestS21CacheHeaders:
|
||
"""S21: 验证 CachedStaticFiles 缓存头"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
|
||
def test_s211_html_no_cache(self):
|
||
"""S21-1: HTML 页面应为 no-cache"""
|
||
resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if resp.status_code != 200:
|
||
pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}")
|
||
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f"\n🚀 S21-1: HTML Cache-Control: {cache_control}")
|
||
assert "no-cache" in cache_control or "no-store" in cache_control, (
|
||
f"HTML 应为 no-cache/no-store,实际: {cache_control}"
|
||
)
|
||
|
||
def test_s212_js_immutable(self):
|
||
"""S21-2: JS 文件应为 immutable + 长缓存"""
|
||
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if html_resp.status_code != 200:
|
||
pytest.skip("Frontend not available")
|
||
|
||
js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text)
|
||
if not js_matches:
|
||
pytest.skip("No JS files found in HTML")
|
||
|
||
js_path = js_matches[0]
|
||
resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10)
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f" S21-2: JS ({js_path}) Cache-Control: {cache_control}")
|
||
assert "immutable" in cache_control, (
|
||
f"JS 应含 immutable,实际: {cache_control}"
|
||
)
|
||
assert "31536000" in cache_control, (
|
||
f"JS max-age 应为 31536000,实际: {cache_control}"
|
||
)
|
||
|
||
def test_s213_css_immutable(self):
|
||
"""S21-3: CSS 文件应为 immutable + 长缓存"""
|
||
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if html_resp.status_code != 200:
|
||
pytest.skip("Frontend not available")
|
||
|
||
css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text)
|
||
if not css_matches:
|
||
pytest.skip("No CSS files found in HTML")
|
||
|
||
css_path = css_matches[0]
|
||
resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10)
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f" S21-3: CSS ({css_path}) Cache-Control: {cache_control}")
|
||
assert "immutable" in cache_control, (
|
||
f"CSS 应含 immutable,实际: {cache_control}"
|
||
)
|
||
|
||
print(f" ✅ 缓存头验证通过")
|