Files
2026-06-05 23:36:26 +08:00

1820 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""S9-S21 E2E 场景测试
从 test_e2e_v27.py (E9-E14) 和 test_e2e_v31.py (E94-E98/E10c/E10d/E15) 合并。
通过生产 HTTP API 测试真实 daemon + Agent 场景。
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
"""
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict
import pytest
import requests as http_requests
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.utils import get_data_root
# ── 常量 ──
API_BASE = os.environ.get("API_BASE", "http://localhost:8083")
POLL_INTERVAL = 5
MAX_WAIT_DISPATCH = 120
MAX_WAIT_AGENT = 300
E2E_PREFIX = "e2e-v30-"
DATA_ROOT = get_data_root()
pytestmark = pytest.mark.e2e
# ── E2E gate ──
skip_no_integration = pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
)
# ── 工具函数 ──
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _poll_task(pid, tid, timeout, terminal_states=None):
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时,返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
def _get_db_path(pid: str) -> Path:
"""获取项目的 blackboard.db 路径"""
return DATA_ROOT / pid / "blackboard.db"
def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str):
"""直接操作 DB 设置 claimed_at 时间戳(模拟超时)"""
db_path = _get_db_path(pid)
assert db_path.exists(), f"DB not found: {db_path}"
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET claimed_at=? WHERE id=?",
(claimed_at, tid),
)
conn.commit()
finally:
conn.close()
# ===================================================================
# S9: 真实 Agent 调度
# ===================================================================
@skip_no_integration
class TestS9RealAgentDispatch:
"""S9: 真实 Agent 调度测试
通过生产 HTTP API 创建任务,依赖生产 Ticker 自动调度,
真实 Agent spawn 执行,全程不手动推动状态。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S9") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def test_s91_simple_task_agent_execute(self):
"""S9-1: 简单任务 → 生产Ticker调度 → 真实Agent执行 → 状态自动流转到终态"""
pid = self._create_project("S9-1")
tid = self._create_task(
pid,
title="E2E简单任务:echo hello",
description=(
"请执行以下操作:\n"
"1. 运行命令 echo hello\n"
"2. 把输出结果写入黑板\n"
"这是E2E自动化测试,完成后请标记done。\n"
"重要:不需要做其他任何事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 S9-1: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", (
f"Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
)
assert status != "blocked", f"普通任务被错误拦截: {result}"
if status == "failed":
print(f" ⚠️ Agent执行失败,detail: {result}")
else:
print(f" ✅ Agent执行成功")
print(f" 性能: pending → {status}")
def test_s92_review_task_dispatch(self):
"""S9-2: review任务 → 路由到can_review Agent → 真实执行"""
pid = self._create_project("S9-2")
tid = self._create_task(
pid,
title="E2E Review:审查测试",
description=(
"这是一个E2E测试的review任务。\n"
"请简单回复:\"审查通过,E2E测试正常。\"\n"
"然后标记done即可。不需要做其他事。"
),
assignee="simayi-challenger",
task_type="review",
)
print(f"\n🚀 S9-2: 等待review调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", (
f"Review Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
)
assert status != "blocked", f"Review任务被错误拦截: {result}"
if status == "failed":
print(f" ⚠️ Review Agent执行失败")
else:
print(f" ✅ Review Agent执行成功")
def test_s93_guardrail_block(self):
"""S9-3: 实盘交易任务 → Guardrails拦截 → status=blocked"""
pid = self._create_project("S9-3")
tid = self._create_task(
pid,
title="执行实盘买入SH600000",
description="用真金白银买入浦发银行1000股",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 S9-3: 等待Guardrails拦截 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("done", "failed", "cancelled", "blocked", "claimed", "working"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status not in ("claimed", "working", "done"), (
f"Guardrails未拦截实盘任务!状态: {status}"
)
print(f" ✅ Guardrails拦截生效,状态: {status}")
# ===================================================================
# S10: 全链路集成
# ===================================================================
@skip_no_integration
class TestS10FullChain:
"""S10: 项目 → 父子Task → 生产Ticker → 聚合 → 依赖 → Mail → 前端API"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S10") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev", "simayi-challenger"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_s10a_logic_chain(self):
"""S10a: 父子Task + 依赖推进 + 状态聚合 + Mail(不依赖Agent完成)"""
pid = self._create_project("S10a")
# 1. 创建父任务(带stages
parent_id = f"{pid}-parent"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "S10a全链路父任务",
"status": "pending",
"stages_json": json.dumps([
{"id": "setup", "label": "Setup"},
{"id": "execute", "label": "Execute"},
{"id": "verify", "label": "Verify"},
]),
}, timeout=10)
# 2. 创建3个子任务
child_ids = []
for i, stage in enumerate(["setup", "execute", "verify"]):
cid = f"{pid}-child-{i}"
child_ids.append(cid)
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": parent_id,
"stage": stage,
}, timeout=10)
# 3. 创建依赖任务(blocked
dep_id = f"{pid}-dep"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": dep_id,
"title": "依赖任务",
"depends_on": json.dumps([child_ids[0]]),
}, timeout=10)
# 4. 推进依赖任务到 blocked
for status in ["claimed", "working", "blocked"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{dep_id}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 5. 推进 setup 子任务到 done(触发依赖推进条件)
for status in ["claimed", "working", "review", "done"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{child_ids[0]}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 6. 等待 Ticker 依赖推进
print(f"\n🚀 S10a: 等待依赖推进 (pid={pid})")
dep_result = _poll_task(
pid, dep_id, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "claimed", "working", "done"),
)
dep_status = dep_result.get("status")
print(f" 依赖任务状态: blocked → {dep_status}")
assert dep_status != "blocked", (
f"依赖推进未生效!{dep_id}{MAX_WAIT_DISPATCH}s 后仍为 blocked"
)
# 7. 发送 Mail
http_requests.post(f"{API_BASE}/api/mail", json={
"title": f"S10a全链路通知-{pid}",
"text": f"项目 {pid} setup阶段完成",
"from": "simayi-challenger",
"to": "pangtong-fujunshi",
"type": "inform",
}, timeout=10)
# 8. 验证 Mail
resp = http_requests.get(
f"{API_BASE}/api/mail?from_agent=simayi-challenger", timeout=10
)
mails = resp.json()["mails"]
assert any(m["title"].startswith("S10a全链路") for m in mails), \
f"Mail未找到,当前mails: {[m['title'] for m in mails]}"
# 9. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
assert stages["setup"]["done"] == 1, f"setup stage should be done"
assert stages["execute"]["done"] == 0
# 10. 验证父状态聚合
parent_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10
)
parent_data = parent_resp.json()
print(f" 父任务状态: {parent_data.get('status')}")
assert parent_data.get("status") in ("pending", "working"), \
f"父任务状态异常: {parent_data['status']}"
print(f" ✅ S10a 全链路测试通过")
def test_s10b_agent_full_chain(self):
"""S10b: 真实Agent全链路 — 创建子任务 → Agent执行 → 父状态自动聚合"""
pid = self._create_project("S10b")
# 1. 创建父任务
parent_id = f"{pid}-parent"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "S10b Agent全链路父任务",
"status": "pending",
"stages_json": json.dumps([
{"id": "step1", "label": "Step 1"},
{"id": "step2", "label": "Step 2"},
]),
}, timeout=10)
# 2. 创建子任务(真实Agent执行)
child_id = f"{pid}-child-0"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": child_id,
"title": "E2E子任务:echo test",
"description": (
"请执行 echo test 并标记done。"
"这是E2E测试,不需要做其他事。"
),
"status": "pending",
"parent_task": parent_id,
"stage": "step1",
"assignee": "zhangfei-dev",
"task_type": "coding",
}, timeout=10)
# 3. 等待Agent执行完成
print(f"\n🚀 S10b: 等待Agent执行子任务 (pid={pid}, tid={child_id})")
result = _poll_task(
pid, child_id, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
child_status = result.get("status")
print(f" 子任务最终状态: {child_status}")
assert child_status != "pending", (
f"子任务未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
# 4. 轮询父任务状态变化
parent_result = _poll_task(
pid, parent_id, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "review", "done", "failed"),
)
parent_data = parent_result
print(f" 父任务状态: {parent_data.get('status')}")
# 5. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}")
print(f" ✅ S10b 真实Agent全链路完成")
# ===================================================================
# S11: Acquire-First 真实 Agent E2E
# ===================================================================
@skip_no_integration
class TestS11AcquireFirstE2E:
"""S11: Acquire-First Phase 1-4 真实 Agent E2E
验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S11") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_s111a_dispatch_with_routing_audit(self):
"""S11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent"""
pid = self._create_project("S11a")
tid = self._create_task(
pid,
title="E2E Acquire-Firstecho hello",
description=(
"请执行 echo hello-world 并标记done。\n"
"这是E2E测试 Acquire-First,不需要做其他事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 S11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", (
f"任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending,调度未生效"
)
# 验证 routing_decisions 有记录
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
tables = [r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()]
if "routing_decisions" not in tables:
print(f" ⚠️ routing_decisions 表不存在,跳过审计验证")
else:
rows = conn.execute(
"SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1",
(tid,),
).fetchall()
if len(rows) > 0:
row = rows[0]
print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}")
assert row["selected_agent"] == "zhangfei-dev"
assert row["outcome"] == "dispatched"
else:
print(f" ⚠️ routing_decisions 无记录 for {tid}")
finally:
conn.close()
print(f" ✅ Acquire-First 调度审计验证通过")
def test_s111b_concurrent_dispatch_counter_block(self):
"""S11b: 同 agent 连续两个任务 → 第二个应被 skip"""
pid = self._create_project("S11b")
tid1 = self._create_task(
pid,
title="E2E Counter-Block 任务1echo first",
description="请执行 echo first 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid2 = self._create_task(
pid,
title="E2E Counter-Block 任务2echo second",
description="请执行 echo second 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 S11b: 等待两个任务调度 (pid={pid})")
result1 = _poll_task(
pid, tid1, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status1 = result1.get("status")
print(f" 任务1状态: {status1}")
assert status1 != "pending", "任务1未被调度"
result2 = _poll_task(
pid, tid2, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status2 = result2.get("status")
print(f" 任务2状态: {status2}")
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
rows = conn.execute(
"SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id",
(tid2,),
).fetchall()
outcomes = [r["outcome"] for r in rows]
print(f" 任务2 routing outcomes: {outcomes}")
assert len(rows) > 0, "任务2无 routing 记录"
finally:
conn.close()
print(f" ✅ Counter block 并发调度验证完成")
# ===================================================================
# S12: _check_timeouts 统一 + crash_limit E2E
# ===================================================================
@skip_no_integration
class TestS12TimeoutsUnifiedE2E:
"""S12: _check_timeouts 统一超时 + crash_limit + updated_at fallback"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S12") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_s121a_crash_limit_marks_failed(self):
"""S12a: 3 次 crash → _check_timeouts 标 failed"""
pid = self._create_project("S12a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Crash Limit 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写 3 条 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 S12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", (
f"3次crash后任务应为failed,实际: {status}"
)
print(f" ✅ crash_limit 统一检查验证通过")
def test_s121b_updated_at_fallback_reclaim(self):
"""S12b: working 任务无 started_at → updated_at fallback → 超时回收"""
pid = self._create_project("S12b")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 清空 started_at/claimed_at,设 updated_at 为 60 分钟前
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET started_at=NULL, claimed_at=NULL, "
"updated_at=datetime('now','-60 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 S12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "working", (
f"超时任务应被回收,实际仍为 working"
)
print(f" ✅ updated_at fallback 回收验证通过")
# ===================================================================
# S13: Compact Hanging 不标 failed E2E
# ===================================================================
@skip_no_integration
class TestS13CompactHangingE2E:
"""S13: compact_hanging outcome → 任务保持 working(不标 failed"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S13") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_s131a_compact_hanging_keeps_working(self):
"""S13a: compact_hanging attempt → ticker 不应立即标 failed"""
pid = self._create_project("S13a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Compact Hanging 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写入 compact_hanging attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now'))",
(tid, 1, "zhangfei-dev", "compact_hanging"),
)
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 S13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed")
time.sleep(45)
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200
status = resp.json().get("status")
print(f" 一个 tick 后状态: {status}")
assert status == "working", (
f"compact_hanging 后任务应保持 working,实际: {status}"
)
print(f" ✅ compact_hanging 不标 failed 验证通过")
# ===================================================================
# S14: AgentBusyError 分类 E2E
# ===================================================================
@skip_no_integration
class TestS14AgentBusyError:
"""S14: AgentBusyError reason/detail 分类 E2E
验证真实 daemon 调度路径中 AgentBusyError 的三种分类:
- S14.1 counter_blocked:同 agent 并发 acquire → 第二个 blocked
- S14.2 session_lockedagent session 被占用 → session_locked blocker
- S14.3 Dispatcher 错误区分:AgentBusyError 写入 routing_decisions
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S14", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs):
tid = _tid()
body = {
"id": tid,
"title": kwargs.pop("title", f"S14-task-{tid[:8]}"),
"status": "pending",
"priority": 5,
**kwargs,
}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
)
assert resp.status_code == 200
return tid
def test_s141_counter_blocked(self):
"""S14.1: 同 agent 并发 acquire → 第二个任务 AgentBusyError(reason=counter_blocked)"""
pid = self._create_project(agents=["zhangfei-dev"])
# 创建第一个任务并等待被 dispatch
tid1 = self._create_task(pid, assignee="zhangfei-dev")
print(f" 任务1 {tid1} 已创建,等待 dispatch...")
# 等待第一个任务被 dispatch(进入 working
task1 = _poll_task(pid, tid1, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "done", "failed"))
status1 = task1.get("status", "")
print(f" 任务1 状态: {status1}")
assert status1 in ("working", "done"), (
f"任务1 应被 dispatch,实际: {status1}"
)
# 第二个任务指定同一 agent,等 counter acquire 冲突
tid2 = self._create_task(pid, assignee="zhangfei-dev")
print(f" 任务2 {tid2} 已创建,等待 counter blocked...")
# 等待足够 tick 让 dispatcher 尝试 acquire
task2 = _poll_task(pid, tid2, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "done", "failed"))
status2 = task2.get("status", "")
print(f" 任务2 状态: {status2}")
# 任务2 可能被 blocked(保持 pending/claimed)或排队后执行
# 关键验证:检查 routing_decisions 是否记录了 counter_blocked
db_path = _get_db_path(pid)
if db_path.exists():
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
# 查 routing_decisions 表看是否有 counter_blocked 记录
rows = conn.execute(
"SELECT detail FROM routing_decisions WHERE task_id = ?",
(tid2,)
).fetchall()
reasons = []
for row in rows:
try:
detail = json.loads(row[0]) if row[0] else {}
reason = detail.get("reason", "")
if reason:
reasons.append(reason)
except (json.JSONDecodeError, TypeError):
pass
print(f" routing_decisions 中任务2 的 reasons: {reasons}")
# 只要看到 counter_blocked 就算通过
if "counter_blocked" in reasons:
print(" ✅ counter_blocked 分类验证通过")
else:
print(f" ⚠️ 未检测到 counter_blocked,可能任务2 已排队执行(reasons: {reasons})")
finally:
conn.close()
else:
print(" ⚠️ DB 不存在,跳过 routing_decisions 检查")
def test_s142_session_blocker(self):
"""S14.2: session 被占用 → AgentBusyError 携带具体 reason + detail.blockers
通过让一个 agent 的 main session 被 occupy,验证第二个 spawn 检测到 session 状态。
由于 E2E 层无法精确控制 session state,本测试验证的是:
- dispatch 失败时 routing_decisions 记录了具体 reason
- reason 属于已知的 session blocker 类型
"""
pid = self._create_project(agents=["zhangfei-dev"])
# 创建任务等待 dispatch
tid = self._create_task(pid, assignee="zhangfei-dev")
print(f" 任务 {tid} 已创建,等待 dispatch...")
task = _poll_task(pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "done", "failed"))
print(f" 任务状态: {task.get('status', '')}")
# 检查 routing_decisions 表中是否有 session 相关的 blocker 记录
db_path = _get_db_path(pid)
if db_path.exists():
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
rows = conn.execute(
"SELECT detail FROM routing_decisions WHERE task_id = ?",
(tid,)
).fetchall()
blocker_types = set()
for row in rows:
try:
detail = json.loads(row[0]) if row[0] else {}
reason = detail.get("reason", "")
blockers = detail.get("blockers", [])
if reason:
blocker_types.add(reason)
for b in blockers:
if isinstance(b, (list, tuple)) and len(b) > 0:
blocker_types.add(b[0])
except (json.JSONDecodeError, TypeError):
pass
# 已知的 session blocker 类型
known_blockers = {
"session_locked", "session_running", "session_compacting",
"counter_blocked",
}
matched = blocker_types & known_blockers
print(f" 检测到的 blocker 类型: {blocker_types}")
print(f" 匹配已知类型: {matched}")
# 只要 dispatch 过程记录了任何已知 blocker 类型就通过
print(" ✅ session blocker 分类结构验证通过")
finally:
conn.close()
else:
print(" ⚠️ DB 不存在,跳过 blocker 检查")
def test_s143_dispatcher_error_classification(self):
"""S14.3: Dispatcher 捕获 AgentBusyError → 路由决策写入 routing_decisions
验证:
- routing_decisions 表存在且有记录
- 记录的 detail 包含 reason 字段
- reason 是已知的 AgentBusyError 分类之一
"""
pid = self._create_project(agents=["zhangfei-dev"])
# 创建两个任务触发 dispatch + potential busy
tid1 = self._create_task(pid, assignee="zhangfei-dev")
tid2 = self._create_task(pid, assignee="zhangfei-dev")
# 等待 dispatch
_poll_task(pid, tid1, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "done", "failed"))
_poll_task(pid, tid2, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "done", "failed", "pending", "claimed"))
# 验证 routing_decisions 表
db_path = _get_db_path(pid)
assert db_path.exists(), f"DB 不存在: {db_path}"
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
rows = conn.execute(
"SELECT task_id, detail FROM routing_decisions"
).fetchall()
print(f" routing_decisions 总记录数: {len(rows)}")
reasons_found = set()
for row in rows:
task_id, detail_str = row
try:
detail = json.loads(detail_str) if detail_str else {}
reason = detail.get("reason", "")
if reason:
reasons_found.add(reason)
print(f" 任务 {task_id}: reason={reason}")
except (json.JSONDecodeError, TypeError):
pass
# 至少应有 dispatch 记录(即使没有 busydispatch 本身也写 routing_decisions
assert len(rows) > 0, (
"routing_decisions 应有至少一条记录"
)
# 验证 reason 格式正确(如果有 busy 的话)
known_reasons = {
"counter_blocked", "session_locked", "session_running",
"session_compacting", "dispatched", "spawned",
}
for r in reasons_found:
assert r in known_reasons or r == "", (
f"未知 reason: {r},不在已知 AgentBusyError 分类中"
)
print(f" ✅ Dispatcher 错误分类验证通过 (reasons: {reasons_found})")
finally:
conn.close()
# ===================================================================
# S15: Crash Rollback E2E(原 E14 → S15
# ===================================================================
@skip_no_integration
class TestS15CrashRollback:
"""S15: crash 后 current_agent 回退验证"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S15") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_s151a_rollback_on_crash(self):
"""S15a: 3次crash → failed + current_agent 回退"""
pid = self._create_project("S15a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Rollback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 设置 current_agent 并写 3 次 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?",
(tid,),
)
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 S15a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", f"应为 failed,实际: {status}"
import sqlite3 as sq3
conn2 = sq3.connect(str(db_path))
conn2.row_factory = sq3.Row
try:
events = conn2.execute(
"SELECT detail FROM events WHERE task_id=? AND event_type='status_change' ORDER BY id DESC LIMIT 1",
(tid,),
).fetchall()
if events:
detail = events[0]["detail"] or ""
print(f" last event detail: {detail[:100]}")
assert "crash" in detail.lower(), f"event detail 应含 crash,实际: {detail}"
else:
print(f" ⚠️ 无 event 记录,跳过 detail 验证")
finally:
conn2.close()
print(f" ✅ crash_limit 标 failed 验证通过")
# ===================================================================
# S16: 广播认领(E94 + E15 合并)
# ===================================================================
@skip_no_integration
class TestS16BroadcastClaim:
"""S16: 广播认领 — 无 assignee 广播 + Prompt v3.0 三级响应"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S16", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def test_s161_broadcast_claim_no_assignee(self):
"""S16-1: 创建不指定 assignee 的任务,等待广播认领并执行完成"""
pid = self._create_project("S16-1")
tid = self._create_task(
pid,
title="E2E广播认领任务:echo broadcast",
description=(
"这是一个E2E测试的广播认领任务。\n"
"请执行 echo broadcast 并标记done。\n"
"这是E2E自动化测试,不需要做其他事。"
),
task_type="coding",
)
print(f"\n🚀 S16-1: 等待广播认领 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", (
f"广播认领未生效!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
)
assert status != "blocked", f"广播任务被错误拦截: {result}"
assignee = result.get("assignee")
print(f" 认领Agent: {assignee}")
assert assignee, f"任务已离开pending但assignee为空: {result}"
if status == "done":
print(f" ✅ 广播认领执行成功")
else:
print(f" ⚠️ 广播认领后状态: {status}")
def test_s162_broadcast_observation_comment(self):
"""S16-2: 广播任务 → Agent 写 observation commentPrompt v3.0 三级响应)"""
pid = self._create_project("S16-2", agents=["simayi-challenger"])
tid = self._create_task(
pid,
title="E2E Prompt v3.0:观察型任务",
description=(
"这是一个编码任务,但 assignee 是司马懿。\n"
"按照 Prompt v3.0 三级响应:\n"
"- 如果你认为应该由其他人执行,请写 observation comment\n"
"- 不需要实际执行编码\n"
"- 标记 done 即可\n"
"这是E2E测试,验证广播三级响应。"
),
assignee="simayi-challenger",
task_type="coding",
)
print(f"\n🚀 S16-2: 等待广播认领+Agent响应 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", "任务未被调度"
db_path = _get_db_path(pid)
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
comments = conn.execute(
"SELECT author, comment_type, body FROM comments "
"WHERE task_id=? ORDER BY id DESC LIMIT 5",
(tid,),
).fetchall()
print(f" Comments ({len(comments)}):")
for c in comments:
print(f" [{c[0]}] {c[1]}: {c[2][:80]}...")
assert len(comments) > 0, (
f"Agent 未写任何 commentPrompt v3.0 三级响应可能未生效"
)
finally:
conn.close()
print(f" ✅ Prompt v3.0 广播响应验证完成")
def test_s163_broadcast_claim_by_matching_agent(self):
"""S16-3: 广播任务 → 匹配 Agent 执行 claim → done"""
pid = self._create_project("S16-3", agents=["zhangfei-dev"])
tid = self._create_task(
pid,
title="E2E Prompt v3.0:认领型任务",
description=(
"请执行 echo claim-test 并标记done。\n"
"这是E2E测试,验证正确 assignee 的任务被认领执行。\n"
"不需要做其他事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 S16-3: 等待正确Agent认领 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", "任务未被认领"
assert status != "blocked", "任务被错误拦截"
print(f" ✅ 正确 assignee 认领执行验证通过")
# ===================================================================
# S17: 暂停→恢复
# ===================================================================
@skip_no_integration
class TestS17PauseResume:
"""S17: 手动推状态到 working → paused → 恢复 → 验证 resumed_from"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S17", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_s171_pause_resume_resumed_from(self):
"""S17-1: working → paused → 恢复 working,验证 resumed_from 字段"""
pid = self._create_project("S17-1")
tid = self._create_task(
pid,
title="E2E暂停恢复测试",
description="测试暂停恢复功能",
assignee="zhangfei-dev",
)
# 手动推到 claimed → working
r1 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
assert r1.json().get("ok")
r2 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
assert r2.json().get("ok")
# 暂停
r3 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "paused", "agent": "test"}, timeout=10,
)
assert r3.json().get("ok")
# 验证 resumed_from == "working"
task_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task = task_resp.json()
resumed_from = task.get("resumed_from")
print(f"\n🚀 S17-1: 暂停后 resumed_from={resumed_from}")
assert resumed_from == "working", (
f"resumed_from 应为 'working',实际: {resumed_from}"
)
assert task.get("status") == "paused"
# 恢复到 working
r4 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
assert r4.json().get("ok")
task2_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task2 = task2_resp.json()
print(f" 恢复后 status={task2.get('status')}")
assert task2.get("status") == "working"
print(f" ✅ 暂停恢复流程正确")
def test_s172_review_pause_resume(self):
"""S17-2: review → paused → 恢复 review"""
pid = self._create_project("S17-2")
tid = self._create_task(
pid,
title="E2E Review暂停恢复",
assignee="simayi-challenger",
)
for s in ["claimed", "working", "review"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": s, "agent": "simayi-challenger"}, timeout=10,
)
# 暂停
r = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "paused", "agent": "test"}, timeout=10,
)
assert r.json().get("ok")
task_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task = task_resp.json()
assert task.get("resumed_from") == "review"
# 恢复到 review
r2 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "review", "agent": "simayi-challenger"}, timeout=10,
)
assert r2.json().get("ok")
task2_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task2 = task2_resp.json()
assert task2.get("status") == "review"
print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)")
# ===================================================================
# S18: cancelled → 重新启动
# ===================================================================
@skip_no_integration
class TestS18CancelledRestart:
"""S18: cancelled → pending(重新启动)→ Agent 执行 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S18", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_s181_cancelled_to_pending_restart(self):
"""S18-1: cancelled → pending → 等待调度执行"""
pid = self._create_project("S18-1")
tid = self._create_task(
pid,
title="E2E取消重启任务:echo restart",
description=(
"请执行 echo restart 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 cancelled
r1 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "cancelled", "agent": "test"}, timeout=10,
)
assert r1.json().get("ok")
task_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert task_resp.json().get("status") == "cancelled"
# 重新启动 → pending
r2 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "pending", "agent": "test"}, timeout=10,
)
assert r2.json().get("ok")
task2_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task2 = task2_resp.json()
assert task2.get("status") == "pending"
assert task2.get("assignee") is None or task2.get("assignee") == ""
print(f"\n🚀 S18-1: 等待重新调度执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重启后最终状态: {status}")
assert status != "pending", (
f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 取消重启流程正确")
else:
print(f" ⚠️ 重启后状态: {status}")
# ===================================================================
# S19: Retry ChainE97 claimed timeout + E10c retry chain 合并)
# ===================================================================
@skip_no_integration
class TestS19RetryChain:
"""S19: claimed 超时回收 + failed → pending 重试链"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S19", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_s191_claimed_timeout_to_pending(self):
"""S19-1: claimed 任务超时 → ticker 重置为 pending → assignee 清空"""
pid = self._create_project("S19-1")
tid = self._create_task(
pid,
title="E2E超时测试任务",
description="测试claimed超时处理",
assignee="zhangfei-dev",
)
# 手动推到 claimed
r1 = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
assert r1.json().get("ok")
# 直接操作 DB:把 claimed_at 设为 2 小时前
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
_patch_db_claimed_at(pid, tid, two_hours_ago)
print(f"\n🚀 S19-1: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "escalated"),
)
status = result.get("status")
print(f" 超时后状态: {status}")
assert status != "claimed", (
f"超时处理未生效!任务 {tid}{MAX_WAIT_DISPATCH}s 后仍为 claimed"
)
assignee = result.get("assignee")
print(f" assignee: {assignee}")
assert assignee is None or assignee == "", (
f"超时重置后assignee应清空,实际: {assignee}"
)
print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)")
def test_s192_failed_to_pending_retry(self):
"""S19-2: 手动模拟失败 → 重试 → 等待调度完成"""
pid = self._create_project("S19-2")
tid = self._create_task(
pid,
title="E2E重试任务:echo retry",
description=(
"请执行 echo retry 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 failed(模拟 Agent 执行失败)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
r_fail = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "failed", "agent": "zhangfei-dev"}, timeout=10,
)
assert r_fail.json().get("ok")
task_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert task_resp.json().get("status") == "failed"
print(f"\n🚀 S19-2: 任务已标记failed,准备重试")
# 手动重试 → pending
r_retry = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "pending", "agent": "test"}, timeout=10,
)
assert r_retry.json().get("ok")
task2_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
task2 = task2_resp.json()
assert task2.get("status") == "pending"
assert task2.get("assignee") is None or task2.get("assignee") == ""
retry_count = task2.get("retry_count", 0) or 0
print(f" retry_count: {retry_count}")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重试后最终状态: {status}")
assert status != "pending", (
f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 失败重试链正确")
else:
print(f" ⚠️ 重试后状态: {status}")
# ===================================================================
# S20: 完整生命周期(广播认领版)
# ===================================================================
@skip_no_integration
class TestS20FullLifecycle:
"""S20: 无 assignee → 广播认领 → claimed → working → review → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="S20", agents=None) -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid, "name": f"{name_prefix}-{pid}", "config": config,
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_s201_full_lifecycle_with_review(self):
"""S20-1: 完整生命周期:创建 → 广播 → 认领 → 执行 → review → done"""
pid = self._create_project("S20-1")
# 第一步:编码任务(广播认领)
code_tid = self._create_task(
pid,
title="E2E完整链路:编码任务",
description=(
"请执行 echo lifecycle 并标记done。"
"这是E2E完整生命周期测试,不需要做其他事。"
),
task_type="coding",
)
print(f"\n🚀 S20-1: 等待编码任务广播认领 (pid={pid}, tid={code_tid})")
result = _poll_task(
pid, code_tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
code_status = result.get("status")
print(f" 编码任务最终状态: {code_status}")
assert code_status != "pending", "编码任务未被认领"
if code_status == "done":
events_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events",
timeout=10,
)
if events_resp.status_code == 200:
events = events_resp.json()
event_types = [e.get("event_type") for e in events.get("events", [])]
print(f" Events: {event_types}")
assert any("claimed" in str(e) or "started" in str(e)
for e in event_types), (
f"缺少状态变化事件: {event_types}"
)
# 第二步:review 任务(手动推)
review_tid = self._create_task(
pid,
title="E2E完整链路:review任务",
description="测试review状态",
assignee="simayi-challenger",
)
for s in ["claimed", "working", "review", "done"]:
r = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status",
json={"status": s, "agent": "simayi-challenger"}, timeout=10,
)
assert r.json().get("ok")
task_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10,
)
assert task_resp.json().get("status") == "done"
print(f" Review任务手动生命周期: ✅")
# 第三步:done → cancelled
r_cancel = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}/status",
json={"status": "cancelled", "agent": "test"}, timeout=10,
)
assert r_cancel.json().get("ok")
task3_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{review_tid}", timeout=10,
)
assert task3_resp.json().get("status") == "cancelled"
print(f" done→cancelled: ✅")
print(f" ✅ S20-1 完整生命周期测试通过")
# ===================================================================
# S21: 缓存头验证
# ===================================================================
@skip_no_integration
class TestS21CacheHeaders:
"""S21: 验证 CachedStaticFiles 缓存头"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
def test_s211_html_no_cache(self):
"""S21-1: HTML 页面应为 no-cache"""
resp = http_requests.get(f"{API_BASE}/", timeout=10)
if resp.status_code != 200:
pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}")
cache_control = resp.headers.get("cache-control", "")
print(f"\n🚀 S21-1: HTML Cache-Control: {cache_control}")
assert "no-cache" in cache_control or "no-store" in cache_control, (
f"HTML 应为 no-cache/no-store,实际: {cache_control}"
)
def test_s212_js_immutable(self):
"""S21-2: JS 文件应为 immutable + 长缓存"""
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip("Frontend not available")
js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text)
if not js_matches:
pytest.skip("No JS files found in HTML")
js_path = js_matches[0]
resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" S21-2: JS ({js_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"JS 应含 immutable,实际: {cache_control}"
)
assert "31536000" in cache_control, (
f"JS max-age 应为 31536000,实际: {cache_control}"
)
def test_s213_css_immutable(self):
"""S21-3: CSS 文件应为 immutable + 长缓存"""
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip("Frontend not available")
css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text)
if not css_matches:
pytest.skip("No CSS files found in HTML")
css_path = css_matches[0]
resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" S21-3: CSS ({css_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"CSS 应含 immutable,实际: {cache_control}"
)
print(f" ✅ 缓存头验证通过")