813 lines
30 KiB
Python
813 lines
30 KiB
Python
import pytest
|
||
|
||
pytestmark = pytest.mark.e2e
|
||
|
||
skip_no_integration = pytest.mark.skipif(
|
||
not __import__("os").environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
|
||
)
|
||
|
||
"""v3.1 端到端测试 — 新增场景覆盖
|
||
|
||
覆盖 v3.1 新增功能:
|
||
E9-4 广播认领:无 assignee → 广播 → Agent 认领 → done
|
||
E9-5 状态机:暂停 → 恢复 (resumed_from)
|
||
E9-6 状态机:cancelled → 重新启动 → done
|
||
E9-7 超时处理:claimed 超时 → pending (assignee 清空)
|
||
E9-8 缓存头:HTML no-cache + JS/CSS immutable
|
||
E10c 失败重试链:failed → pending → 广播 → done
|
||
E10d 完整生命周期:pending → claimed → working → review → done
|
||
|
||
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional
|
||
|
||
import pytest
|
||
import requests as http_requests
|
||
|
||
# ── 路径设置 ──
|
||
|
||
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
|
||
sys.path.insert(0, str(DEPLOY_DIR))
|
||
|
||
from src.utils import get_data_root
|
||
|
||
# ── 常量 ──
|
||
|
||
API_BASE = "http://localhost:8083"
|
||
POLL_INTERVAL = 5 # 轮询间隔秒
|
||
MAX_WAIT_DISPATCH = 120 # 等待调度超时(~4个tick,给 tick 时序留余量)
|
||
MAX_WAIT_AGENT = 300 # 等待 Agent 完成超时
|
||
E2E_PREFIX = "e2e-v31-"
|
||
DATA_ROOT = get_data_root()
|
||
|
||
|
||
# ── 工具函数 ──
|
||
|
||
def _check_environment():
|
||
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
|
||
try:
|
||
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
|
||
data = resp.json()
|
||
if data.get("status") != "running" or not data.get("ticker_running"):
|
||
pytest.skip(f"Daemon not ready: {data}")
|
||
return data
|
||
except Exception as e:
|
||
pytest.skip(f"Production API not available at {API_BASE}: {e}")
|
||
|
||
|
||
def _cleanup_project(pid: str):
|
||
"""清理测试项目"""
|
||
try:
|
||
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _create_project(project_list: list, name_prefix: str = "E9",
|
||
agents: list = None) -> str:
|
||
"""创建测试项目,自动注册到 project_list 用于 teardown"""
|
||
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
|
||
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
|
||
resp = http_requests.post(f"{API_BASE}/api/projects", json={
|
||
"id": pid,
|
||
"name": f"{name_prefix}-{pid}",
|
||
"config": config,
|
||
}, timeout=10)
|
||
assert resp.status_code == 200, f"Create project failed: {resp.text}"
|
||
project_list.append(pid)
|
||
return pid
|
||
|
||
|
||
def _create_task(pid: str, **kwargs) -> str:
|
||
"""创建测试任务"""
|
||
tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}"
|
||
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
|
||
)
|
||
assert resp.status_code == 200, f"Create task failed: {resp.text}"
|
||
return tid
|
||
|
||
|
||
def _get_task(pid: str, tid: str) -> Dict[str, Any]:
|
||
"""获取任务详情"""
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
assert resp.status_code == 200, f"Get task failed: {resp.text}"
|
||
return resp.json()
|
||
|
||
|
||
def _update_status(pid: str, tid: str, status: str,
|
||
agent: str = "test") -> Dict:
|
||
"""手动更新任务状态"""
|
||
resp = http_requests.post(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
|
||
json={"status": status, "agent": agent}, timeout=10,
|
||
)
|
||
return resp.json()
|
||
|
||
|
||
def _poll_task(pid: str, tid: str, timeout: int,
|
||
terminal_states: tuple = None) -> Dict[str, Any]:
|
||
"""轮询任务状态直到终态或超时"""
|
||
terminal = terminal_states or ("done", "failed", "cancelled")
|
||
deadline = time.time() + timeout
|
||
last_status = None
|
||
while time.time() < deadline:
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
last_status = data.get("status")
|
||
if last_status in terminal:
|
||
return data
|
||
except Exception:
|
||
pass
|
||
time.sleep(POLL_INTERVAL)
|
||
# 超时返回最后状态
|
||
try:
|
||
resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
|
||
)
|
||
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
|
||
except Exception:
|
||
return {"status": "unknown"}
|
||
|
||
|
||
def _get_db_path(pid: str) -> Path:
|
||
"""获取项目的 blackboard.db 路径"""
|
||
return DATA_ROOT / pid / "blackboard.db"
|
||
|
||
|
||
def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str):
|
||
"""直接操作 DB 设置 claimed_at 时间戳(模拟超时)"""
|
||
db_path = _get_db_path(pid)
|
||
assert db_path.exists(), f"DB not found: {db_path}"
|
||
conn = sqlite3.connect(str(db_path))
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET claimed_at=? WHERE id=?",
|
||
(claimed_at, tid),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ===================================================================
|
||
# E9-4: 广播认领
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE94BroadcastClaim:
|
||
"""E9-4: 无 assignee 任务 → 广播认领 → Agent 执行 → done"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_broadcast_claim(self):
|
||
"""创建不指定 assignee 的任务,等待广播认领并执行完成"""
|
||
pid = _create_project(self._projects, "E9-4",
|
||
agents=["zhangfei-dev", "simayi-challenger"])
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E广播认领任务:echo broadcast",
|
||
description=(
|
||
"这是一个E2E测试的广播认领任务。\n"
|
||
"请执行 echo broadcast 并标记done。\n"
|
||
"这是E2E自动化测试,不需要做其他事。"
|
||
),
|
||
task_type="coding",
|
||
# 不指定 assignee → 触发广播认领
|
||
)
|
||
|
||
print(f"\n🚀 E9-4: 等待广播认领 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
# 必须被认领(不是 pending)
|
||
assert status != "pending", (
|
||
f"广播认领未生效!任务 {tid} 在 {MAX_WAIT_AGENT}s 后仍为 pending。"
|
||
f"\n请检查:1) Ticker广播 2) Agent spawn 3) _get_idle_agents()"
|
||
)
|
||
# 不能被拦截
|
||
assert status != "blocked", f"广播任务被错误拦截: {result}"
|
||
|
||
# 验证 assignee 已设置
|
||
assignee = result.get("assignee")
|
||
print(f" 认领Agent: {assignee}")
|
||
assert assignee, f"任务已离开pending但assignee为空: {result}"
|
||
|
||
if status == "done":
|
||
print(f" ✅ 广播认领执行成功")
|
||
else:
|
||
print(f" ⚠️ 广播认领后状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# E9-5: 暂停→恢复 (resumed_from)
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE95PauseResume:
|
||
"""E9-5: 手动推状态到 working → paused → 恢复 → 验证 resumed_from"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_pause_resume_resumed_from(self):
|
||
"""working → paused → 恢复 working,验证 resumed_from 字段"""
|
||
pid = _create_project(self._projects, "E9-5")
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E暂停恢复测试",
|
||
description="测试暂停恢复功能",
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 claimed → working
|
||
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
|
||
assert r1.get("ok"), f"claimed失败: {r1}"
|
||
r2 = _update_status(pid, tid, "working", agent="zhangfei-dev")
|
||
assert r2.get("ok"), f"working失败: {r2}"
|
||
|
||
# 暂停
|
||
r3 = _update_status(pid, tid, "paused", agent="test")
|
||
assert r3.get("ok"), f"paused失败: {r3}"
|
||
|
||
# 验证 resumed_from == "working"
|
||
task = _get_task(pid, tid)
|
||
resumed_from = task.get("resumed_from")
|
||
print(f"\n🚀 E9-5: 暂停后 resumed_from={resumed_from}")
|
||
assert resumed_from == "working", (
|
||
f"resumed_from 应为 'working',实际: {resumed_from}"
|
||
)
|
||
assert task.get("status") == "paused"
|
||
|
||
# 恢复到 working
|
||
r4 = _update_status(pid, tid, "working", agent="zhangfei-dev")
|
||
assert r4.get("ok"), f"恢复working失败: {r4}"
|
||
|
||
# 验证恢复后状态
|
||
task2 = _get_task(pid, tid)
|
||
print(f" 恢复后 status={task2.get('status')}")
|
||
assert task2.get("status") == "working", (
|
||
f"恢复后状态应为 working,实际: {task2.get('status')}"
|
||
)
|
||
|
||
print(f" ✅ 暂停恢复流程正确")
|
||
|
||
def test_review_pause_resume(self):
|
||
"""review → paused → 恢复 review"""
|
||
pid = _create_project(self._projects, "E9-5b")
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E Review暂停恢复",
|
||
assignee="simayi-challenger",
|
||
)
|
||
|
||
_update_status(pid, tid, "claimed", agent="simayi-challenger")
|
||
_update_status(pid, tid, "working", agent="simayi-challenger")
|
||
_update_status(pid, tid, "review", agent="simayi-challenger")
|
||
|
||
# 暂停
|
||
r = _update_status(pid, tid, "paused", agent="test")
|
||
assert r.get("ok"), f"paused失败: {r}"
|
||
|
||
task = _get_task(pid, tid)
|
||
assert task.get("resumed_from") == "review", (
|
||
f"resumed_from 应为 'review',实际: {task.get('resumed_from')}"
|
||
)
|
||
|
||
# 恢复到 review
|
||
r2 = _update_status(pid, tid, "review", agent="simayi-challenger")
|
||
assert r2.get("ok"), f"恢复review失败: {r2}"
|
||
|
||
task2 = _get_task(pid, tid)
|
||
assert task2.get("status") == "review"
|
||
|
||
print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)")
|
||
|
||
|
||
# ===================================================================
|
||
# E9-6: cancelled → 重新启动
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE96CancelledRestart:
|
||
"""E9-6: cancelled → pending(重新启动)→ Agent 执行 → done"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_cancelled_to_pending_restart(self):
|
||
"""cancelled → pending → 等待调度执行"""
|
||
pid = _create_project(self._projects, "E9-6")
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E取消重启任务:echo restart",
|
||
description=(
|
||
"请执行 echo restart 并标记done。"
|
||
"这是E2E测试,不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 cancelled
|
||
r1 = _update_status(pid, tid, "cancelled")
|
||
assert r1.get("ok"), f"cancelled失败: {r1}"
|
||
task = _get_task(pid, tid)
|
||
assert task.get("status") == "cancelled"
|
||
|
||
# 重新启动 → pending
|
||
r2 = _update_status(pid, tid, "pending")
|
||
assert r2.get("ok"), f"pending重启失败: {r2}"
|
||
task2 = _get_task(pid, tid)
|
||
assert task2.get("status") == "pending"
|
||
# assignee 应被清空(v3.1: pending时清空assignee)
|
||
assert task2.get("assignee") is None or task2.get("assignee") == "", (
|
||
f"重新启动后assignee应清空,实际: {task2.get('assignee')}"
|
||
)
|
||
|
||
print(f"\n🚀 E9-6: 等待重新调度执行 (pid={pid}, tid={tid})")
|
||
# 等待调度执行
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 重启后最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
|
||
)
|
||
|
||
if status == "done":
|
||
print(f" ✅ 取消重启流程正确")
|
||
else:
|
||
print(f" ⚠️ 重启后状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# E9-7: claimed 超时 → pending (assignee 清空)
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE97ClaimedTimeout:
|
||
"""E9-7: claimed 超时 → pending (assignee 清空)"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_claimed_timeout_to_pending(self):
|
||
"""claimed 任务超时 → ticker 重置为 pending → assignee 清空"""
|
||
pid = _create_project(self._projects, "E9-7")
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E超时测试任务",
|
||
description="测试claimed超时处理",
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 claimed
|
||
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
|
||
assert r1.get("ok"), f"claimed失败: {r1}"
|
||
|
||
# 验证 claimed
|
||
task = _get_task(pid, tid)
|
||
assert task.get("status") == "claimed"
|
||
|
||
# 直接操作 DB:把 claimed_at 设为 2 小时前(模拟超时)
|
||
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
|
||
_patch_db_claimed_at(pid, tid, two_hours_ago)
|
||
print(f"\n🚀 E9-7: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})")
|
||
|
||
# 等待 ticker 处理(1-2 个 tick)
|
||
# poll 直到状态不是 claimed(变为 pending 或 escalated)
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_DISPATCH,
|
||
terminal_states=("pending", "escalated"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 超时后状态: {status}")
|
||
|
||
# 应该回到 pending(或 escalated 如果 retry_count >= 3)
|
||
assert status != "claimed", (
|
||
f"超时处理未生效!任务 {tid} 在 {MAX_WAIT_DISPATCH}s 后仍为 claimed"
|
||
)
|
||
|
||
# assignee 应被清空
|
||
assignee = result.get("assignee")
|
||
print(f" assignee: {assignee}")
|
||
assert assignee is None or assignee == "", (
|
||
f"超时重置后assignee应清空,实际: {assignee}"
|
||
)
|
||
|
||
print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)")
|
||
|
||
|
||
# ===================================================================
|
||
# E9-8: 缓存头验证
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE98CacheHeaders:
|
||
"""E9-8: 验证 CachedStaticFiles 缓存头"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
|
||
def test_html_no_cache(self):
|
||
"""HTML 页面应为 no-cache"""
|
||
resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if resp.status_code != 200:
|
||
pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}")
|
||
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f"\n🚀 E9-8a: HTML Cache-Control: {cache_control}")
|
||
assert "no-cache" in cache_control or "no-store" in cache_control, (
|
||
f"HTML 应为 no-cache/no-store,实际: {cache_control}"
|
||
)
|
||
|
||
def test_js_immutable(self):
|
||
"""JS 文件应为 immutable + 长缓存"""
|
||
# 先获取 HTML 找到 JS 文件路径
|
||
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if html_resp.status_code != 200:
|
||
pytest.skip(f"Frontend not available")
|
||
|
||
import re
|
||
js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text)
|
||
if not js_matches:
|
||
pytest.skip("No JS files found in HTML")
|
||
|
||
js_path = js_matches[0]
|
||
resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10)
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f" E9-8b: JS ({js_path}) Cache-Control: {cache_control}")
|
||
assert "immutable" in cache_control, (
|
||
f"JS 应含 immutable,实际: {cache_control}"
|
||
)
|
||
assert "31536000" in cache_control, (
|
||
f"JS max-age 应为 31536000,实际: {cache_control}"
|
||
)
|
||
|
||
def test_css_immutable(self):
|
||
"""CSS 文件应为 immutable + 长缓存"""
|
||
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
|
||
if html_resp.status_code != 200:
|
||
pytest.skip("Frontend not available")
|
||
|
||
import re
|
||
css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text)
|
||
if not css_matches:
|
||
pytest.skip("No CSS files found in HTML")
|
||
|
||
css_path = css_matches[0]
|
||
resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10)
|
||
cache_control = resp.headers.get("cache-control", "")
|
||
print(f" E9-8c: CSS ({css_path}) Cache-Control: {cache_control}")
|
||
assert "immutable" in cache_control, (
|
||
f"CSS 应含 immutable,实际: {cache_control}"
|
||
)
|
||
|
||
print(f" ✅ 缓存头验证通过")
|
||
|
||
|
||
# ===================================================================
|
||
# E10c: 失败重试链
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE10cRetryChain:
|
||
"""E10c: failed → pending(手动重试)→ 广播 → 认领 → done"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_failed_to_pending_retry(self):
|
||
"""手动模拟失败 → 重试 → 等待调度完成"""
|
||
pid = _create_project(self._projects, "E10c",
|
||
agents=["zhangfei-dev", "simayi-challenger"])
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E重试任务:echo retry",
|
||
description=(
|
||
"请执行 echo retry 并标记done。"
|
||
"这是E2E测试,不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
)
|
||
|
||
# 手动推到 failed(模拟 Agent 执行失败)
|
||
_update_status(pid, tid, "claimed", agent="zhangfei-dev")
|
||
_update_status(pid, tid, "working", agent="zhangfei-dev")
|
||
r_fail = _update_status(pid, tid, "failed", agent="zhangfei-dev")
|
||
assert r_fail.get("ok"), f"failed失败: {r_fail}"
|
||
|
||
task = _get_task(pid, tid)
|
||
assert task.get("status") == "failed"
|
||
print(f"\n🚀 E10c: 任务已标记failed,准备重试")
|
||
|
||
# 手动重试 → pending
|
||
r_retry = _update_status(pid, tid, "pending")
|
||
assert r_retry.get("ok"), f"重试pending失败: {r_retry}"
|
||
|
||
task2 = _get_task(pid, tid)
|
||
assert task2.get("status") == "pending"
|
||
# assignee 应被清空
|
||
assert task2.get("assignee") is None or task2.get("assignee") == "", (
|
||
f"重试后assignee应清空,实际: {task2.get('assignee')}"
|
||
)
|
||
# retry_count 应递增
|
||
retry_count = task2.get("retry_count", 0) or 0
|
||
print(f" retry_count: {retry_count}")
|
||
|
||
# 等待重新调度执行
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 重试后最终状态: {status}")
|
||
|
||
assert status != "pending", (
|
||
f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
|
||
)
|
||
|
||
if status == "done":
|
||
print(f" ✅ 失败重试链正确")
|
||
else:
|
||
print(f" ⚠️ 重试后状态: {status}")
|
||
|
||
|
||
# ===================================================================
|
||
# E10d: 完整生命周期(广播认领版)
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE10dFullLifecycle:
|
||
"""E10d: 无 assignee → 广播认领 → claimed → working → review → done
|
||
|
||
验证完整状态转换链 + events 记录。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_full_lifecycle_with_review(self):
|
||
"""完整生命周期:创建 → 广播 → 认领 → 执行 → review → done"""
|
||
pid = _create_project(self._projects, "E10d",
|
||
agents=["zhangfei-dev", "simayi-challenger"])
|
||
|
||
# 第一步:编码任务(张飞执行)
|
||
code_tid = _create_task(
|
||
pid,
|
||
title="E2E完整链路:编码任务",
|
||
description=(
|
||
"请执行 echo lifecycle 并标记done。"
|
||
"这是E2E完整生命周期测试,不需要做其他事。"
|
||
),
|
||
task_type="coding",
|
||
# 不指定 assignee → 广播认领
|
||
)
|
||
|
||
print(f"\n🚀 E10d: 等待编码任务广播认领 (pid={pid}, tid={code_tid})")
|
||
result = _poll_task(
|
||
pid, code_tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
code_status = result.get("status")
|
||
print(f" 编码任务最终状态: {code_status}")
|
||
|
||
assert code_status != "pending", "编码任务未被认领"
|
||
|
||
if code_status == "done":
|
||
# 验证 events 记录存在
|
||
events_resp = http_requests.get(
|
||
f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events",
|
||
timeout=10,
|
||
)
|
||
if events_resp.status_code == 200:
|
||
events = events_resp.json()
|
||
event_types = [e.get("event_type") for e in events.get("events", [])]
|
||
print(f" Events: {event_types}")
|
||
# 应该有状态变化事件
|
||
assert any("claimed" in str(e) or "started" in str(e)
|
||
for e in event_types), (
|
||
f"缺少状态变化事件: {event_types}"
|
||
)
|
||
|
||
# 第二步:review 任务(不依赖 Agent 执行,手动推)
|
||
review_tid = _create_task(
|
||
pid,
|
||
title="E2E完整链路:review任务",
|
||
description="测试review状态",
|
||
assignee="simayi-challenger",
|
||
)
|
||
|
||
# 手动推完整生命周期
|
||
transitions = ["claimed", "working", "review", "done"]
|
||
for s in transitions:
|
||
r = _update_status(pid, review_tid, s, agent="simayi-challenger")
|
||
assert r.get("ok"), f"{s}失败: {r}"
|
||
|
||
task = _get_task(pid, review_tid)
|
||
assert task.get("status") == "done"
|
||
print(f" Review任务手动生命周期: ✅")
|
||
|
||
# 第三步:验证 done → cancelled(取消已完成任务)
|
||
r_cancel = _update_status(pid, review_tid, "cancelled")
|
||
assert r_cancel.get("ok"), f"done→cancelled失败: {r_cancel}"
|
||
task3 = _get_task(pid, review_tid)
|
||
assert task3.get("status") == "cancelled"
|
||
print(f" done→cancelled: ✅")
|
||
|
||
print(f" ✅ E10d 完整生命周期测试通过")
|
||
|
||
|
||
# ===================================================================
|
||
# E15: Prompt v3.0 广播三级响应 E2E
|
||
# ===================================================================
|
||
|
||
@pytest.mark.integration
|
||
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
|
||
reason="Set RUN_INTEGRATION=1 to run real agent tests")
|
||
@skip_no_integration
|
||
class TestE15PromptV3Broadcast:
|
||
"""E15: Prompt v3.0 广播认领三级响应 E2E
|
||
|
||
创建一个 assignee 不匹配任何已注册 Agent 的任务,
|
||
验证广播后 Agent 写了 observation comment(而非静默 NO_REPLY)。
|
||
"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_env(self):
|
||
_check_environment()
|
||
self._projects = []
|
||
yield
|
||
for pid in self._projects:
|
||
_cleanup_project(pid)
|
||
|
||
def test_broadcast_observation_comment(self):
|
||
"""广播任务 → Agent 写 observation comment
|
||
|
||
Prompt v3.0 的 _build_claim_prompt 三级响应:
|
||
- 匹配 → claim
|
||
- 不匹配但能帮忙 → observation comment
|
||
- 不匹配且帮不上 → NO_REPLY(静默)
|
||
|
||
创建一个 assignee=simayi-challenger 但 task_type=coding 的任务,
|
||
司马懿收到后应写 observation comment(挑战者视角),而不是执行。
|
||
"""
|
||
pid = _create_project(self._projects, "E15",
|
||
agents=["simayi-challenger"])
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E Prompt v3.0:观察型任务",
|
||
description=(
|
||
"这是一个编码任务,但 assignee 是司马懿。\n"
|
||
"按照 Prompt v3.0 三级响应:\n"
|
||
"- 如果你认为应该由其他人执行,请写 observation comment\n"
|
||
"- 不需要实际执行编码\n"
|
||
"- 标记 done 即可\n"
|
||
"这是E2E测试,验证广播三级响应。"
|
||
),
|
||
assignee="simayi-challenger",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 E15: 等待广播认领+Agent响应 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", "任务未被调度"
|
||
|
||
# 检查是否有 comment(Agent 响应的证据)
|
||
db_path = _get_db_path(pid)
|
||
if db_path.exists():
|
||
import sqlite3 as sq3
|
||
conn = sq3.connect(str(db_path))
|
||
try:
|
||
comments = conn.execute(
|
||
"SELECT author, comment_type, body FROM comments "
|
||
"WHERE task_id=? ORDER BY id DESC LIMIT 5",
|
||
(tid,),
|
||
).fetchall()
|
||
print(f" Comments ({len(comments)}):")
|
||
for c in comments:
|
||
print(f" [{c[0]}] {c[1]}: {c[2][:80]}...")
|
||
# 应该有至少一个 comment(Agent 的响应)
|
||
assert len(comments) > 0, (
|
||
f"Agent 未写任何 comment,Prompt v3.0 三级响应可能未生效"
|
||
)
|
||
finally:
|
||
conn.close()
|
||
|
||
print(f" ✅ Prompt v3.0 广播响应验证完成")
|
||
|
||
def test_broadcast_claim_by_matching_agent(self):
|
||
"""广播任务 → 匹配 Agent 执行 claim → done
|
||
|
||
对比测试:正确 assignee 的任务应被认领并执行。
|
||
"""
|
||
pid = _create_project(self._projects, "E15b",
|
||
agents=["zhangfei-dev"])
|
||
tid = _create_task(
|
||
pid,
|
||
title="E2E Prompt v3.0:认领型任务",
|
||
description=(
|
||
"请执行 echo claim-test 并标记done。\n"
|
||
"这是E2E测试,验证正确 assignee 的任务被认领执行。\n"
|
||
"不需要做其他事。"
|
||
),
|
||
assignee="zhangfei-dev",
|
||
task_type="coding",
|
||
)
|
||
|
||
print(f"\n🚀 E15b: 等待正确Agent认领 (pid={pid}, tid={tid})")
|
||
result = _poll_task(
|
||
pid, tid, timeout=MAX_WAIT_AGENT,
|
||
terminal_states=("done", "failed", "cancelled", "blocked"),
|
||
)
|
||
status = result.get("status")
|
||
print(f" 最终状态: {status}")
|
||
|
||
assert status != "pending", "任务未被认领"
|
||
assert status != "blocked", "任务被错误拦截"
|
||
|
||
print(f" ✅ 正确 assignee 认领执行验证通过")
|