auto-sync: 2026-06-05 11:03:30

This commit is contained in:
cfdaily
2026-06-05 11:03:30 +08:00
parent e9c9aaddfe
commit 6a649aba07
30 changed files with 602 additions and 1276 deletions
View File
File diff suppressed because it is too large Load Diff
+799
View File
@@ -0,0 +1,799 @@
import pytest
pytestmark = pytest.mark.e2e
"""v3.1 端到端测试 — 新增场景覆盖
覆盖 v3.1 新增功能:
E9-4 广播认领:无 assignee → 广播 → Agent 认领 → done
E9-5 状态机:暂停 → 恢复 (resumed_from)
E9-6 状态机:cancelled → 重新启动 → done
E9-7 超时处理:claimed 超时 → pending (assignee 清空)
E9-8 缓存头:HTML no-cache + JS/CSS immutable
E10c 失败重试链:failed → pending → 广播 → done
E10d 完整生命周期:pending → claimed → working → review → done
需要 RUN_INTEGRATION=1 + 生产 daemon 运行。
"""
import json
import os
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
import requests as http_requests
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.utils import get_data_root
# ── 常量 ──
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5 # 轮询间隔秒
MAX_WAIT_DISPATCH = 120 # 等待调度超时(~4个tick,给 tick 时序留余量)
MAX_WAIT_AGENT = 300 # 等待 Agent 完成超时
E2E_PREFIX = "e2e-v31-"
DATA_ROOT = get_data_root()
# ── 工具函数 ──
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _create_project(project_list: list, name_prefix: str = "E9",
agents: list = None) -> str:
"""创建测试项目,自动注册到 project_list 用于 teardown"""
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
config = {"agents": agents or ["zhangfei-dev", "simayi-challenger"]}
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": config,
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
project_list.append(pid)
return pid
def _create_task(pid: str, **kwargs) -> str:
"""创建测试任务"""
tid = kwargs.pop("id", None) or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def _get_task(pid: str, tid: str) -> Dict[str, Any]:
"""获取任务详情"""
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200, f"Get task failed: {resp.text}"
return resp.json()
def _update_status(pid: str, tid: str, status: str,
agent: str = "test") -> Dict:
"""手动更新任务状态"""
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": status, "agent": agent}, timeout=10,
)
return resp.json()
def _poll_task(pid: str, tid: str, timeout: int,
terminal_states: tuple = None) -> Dict[str, Any]:
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
def _get_db_path(pid: str) -> Path:
"""获取项目的 blackboard.db 路径"""
return DATA_ROOT / pid / "blackboard.db"
def _patch_db_claimed_at(pid: str, tid: str, claimed_at: str):
"""直接操作 DB 设置 claimed_at 时间戳(模拟超时)"""
db_path = _get_db_path(pid)
assert db_path.exists(), f"DB not found: {db_path}"
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET claimed_at=? WHERE id=?",
(claimed_at, tid),
)
conn.commit()
finally:
conn.close()
# ===================================================================
# E9-4: 广播认领
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE94BroadcastClaim:
"""E9-4: 无 assignee 任务 → 广播认领 → Agent 执行 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_broadcast_claim(self):
"""创建不指定 assignee 的任务,等待广播认领并执行完成"""
pid = _create_project(self._projects, "E9-4",
agents=["zhangfei-dev", "simayi-challenger"])
tid = _create_task(
pid,
title="E2E广播认领任务:echo broadcast",
description=(
"这是一个E2E测试的广播认领任务。\n"
"请执行 echo broadcast 并标记done。\n"
"这是E2E自动化测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 触发广播认领
)
print(f"\n🚀 E9-4: 等待广播认领 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 必须被认领(不是 pending
assert status != "pending", (
f"广播认领未生效!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
f"\n请检查:1) Ticker广播 2) Agent spawn 3) _get_idle_agents()"
)
# 不能被拦截
assert status != "blocked", f"广播任务被错误拦截: {result}"
# 验证 assignee 已设置
assignee = result.get("assignee")
print(f" 认领Agent: {assignee}")
assert assignee, f"任务已离开pending但assignee为空: {result}"
if status == "done":
print(f" ✅ 广播认领执行成功")
else:
print(f" ⚠️ 广播认领后状态: {status}")
# ===================================================================
# E9-5: 暂停→恢复 (resumed_from)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE95PauseResume:
"""E9-5: 手动推状态到 working → paused → 恢复 → 验证 resumed_from"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_pause_resume_resumed_from(self):
"""working → paused → 恢复 working,验证 resumed_from 字段"""
pid = _create_project(self._projects, "E9-5")
tid = _create_task(
pid,
title="E2E暂停恢复测试",
description="测试暂停恢复功能",
assignee="zhangfei-dev",
)
# 手动推到 claimed → working
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
assert r1.get("ok"), f"claimed失败: {r1}"
r2 = _update_status(pid, tid, "working", agent="zhangfei-dev")
assert r2.get("ok"), f"working失败: {r2}"
# 暂停
r3 = _update_status(pid, tid, "paused", agent="test")
assert r3.get("ok"), f"paused失败: {r3}"
# 验证 resumed_from == "working"
task = _get_task(pid, tid)
resumed_from = task.get("resumed_from")
print(f"\n🚀 E9-5: 暂停后 resumed_from={resumed_from}")
assert resumed_from == "working", (
f"resumed_from 应为 'working',实际: {resumed_from}"
)
assert task.get("status") == "paused"
# 恢复到 working
r4 = _update_status(pid, tid, "working", agent="zhangfei-dev")
assert r4.get("ok"), f"恢复working失败: {r4}"
# 验证恢复后状态
task2 = _get_task(pid, tid)
print(f" 恢复后 status={task2.get('status')}")
assert task2.get("status") == "working", (
f"恢复后状态应为 working,实际: {task2.get('status')}"
)
print(f" ✅ 暂停恢复流程正确")
def test_review_pause_resume(self):
"""review → paused → 恢复 review"""
pid = _create_project(self._projects, "E9-5b")
tid = _create_task(
pid,
title="E2E Review暂停恢复",
assignee="simayi-challenger",
)
_update_status(pid, tid, "claimed", agent="simayi-challenger")
_update_status(pid, tid, "working", agent="simayi-challenger")
_update_status(pid, tid, "review", agent="simayi-challenger")
# 暂停
r = _update_status(pid, tid, "paused", agent="test")
assert r.get("ok"), f"paused失败: {r}"
task = _get_task(pid, tid)
assert task.get("resumed_from") == "review", (
f"resumed_from 应为 'review',实际: {task.get('resumed_from')}"
)
# 恢复到 review
r2 = _update_status(pid, tid, "review", agent="simayi-challenger")
assert r2.get("ok"), f"恢复review失败: {r2}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "review"
print(f"\n ✅ Review暂停恢复流程正确 (resumed_from=review)")
# ===================================================================
# E9-6: cancelled → 重新启动
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE96CancelledRestart:
"""E9-6: cancelled → pending(重新启动)→ Agent 执行 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_cancelled_to_pending_restart(self):
"""cancelled → pending → 等待调度执行"""
pid = _create_project(self._projects, "E9-6")
tid = _create_task(
pid,
title="E2E取消重启任务:echo restart",
description=(
"请执行 echo restart 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 cancelled
r1 = _update_status(pid, tid, "cancelled")
assert r1.get("ok"), f"cancelled失败: {r1}"
task = _get_task(pid, tid)
assert task.get("status") == "cancelled"
# 重新启动 → pending
r2 = _update_status(pid, tid, "pending")
assert r2.get("ok"), f"pending重启失败: {r2}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "pending"
# assignee 应被清空(v3.1: pending时清空assignee
assert task2.get("assignee") is None or task2.get("assignee") == "", (
f"重新启动后assignee应清空,实际: {task2.get('assignee')}"
)
print(f"\n🚀 E9-6: 等待重新调度执行 (pid={pid}, tid={tid})")
# 等待调度执行
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重启后最终状态: {status}")
assert status != "pending", (
f"重启后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 取消重启流程正确")
else:
print(f" ⚠️ 重启后状态: {status}")
# ===================================================================
# E9-7: claimed 超时 → pending (assignee 清空)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE97ClaimedTimeout:
"""E9-7: claimed 超时 → pending (assignee 清空)"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_claimed_timeout_to_pending(self):
"""claimed 任务超时 → ticker 重置为 pending → assignee 清空"""
pid = _create_project(self._projects, "E9-7")
tid = _create_task(
pid,
title="E2E超时测试任务",
description="测试claimed超时处理",
assignee="zhangfei-dev",
)
# 手动推到 claimed
r1 = _update_status(pid, tid, "claimed", agent="zhangfei-dev")
assert r1.get("ok"), f"claimed失败: {r1}"
# 验证 claimed
task = _get_task(pid, tid)
assert task.get("status") == "claimed"
# 直接操作 DB:把 claimed_at 设为 2 小时前(模拟超时)
two_hours_ago = (datetime.utcnow() - timedelta(hours=2)).isoformat()
_patch_db_claimed_at(pid, tid, two_hours_ago)
print(f"\n🚀 E9-7: 已设claimed_at为2小时前,等待ticker处理 (pid={pid}, tid={tid})")
# 等待 ticker 处理(1-2 个 tick
# poll 直到状态不是 claimed(变为 pending 或 escalated
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "escalated"),
)
status = result.get("status")
print(f" 超时后状态: {status}")
# 应该回到 pending(或 escalated 如果 retry_count >= 3
assert status != "claimed", (
f"超时处理未生效!任务 {tid}{MAX_WAIT_DISPATCH}s 后仍为 claimed"
)
# assignee 应被清空
assignee = result.get("assignee")
print(f" assignee: {assignee}")
assert assignee is None or assignee == "", (
f"超时重置后assignee应清空,实际: {assignee}"
)
print(f" ✅ claimed超时处理正确 (status={status}, assignee cleared)")
# ===================================================================
# E9-8: 缓存头验证
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE98CacheHeaders:
"""E9-8: 验证 CachedStaticFiles 缓存头"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
def test_html_no_cache(self):
"""HTML 页面应为 no-cache"""
resp = http_requests.get(f"{API_BASE}/", timeout=10)
if resp.status_code != 200:
pytest.skip(f"Frontend not served at {API_BASE}/: {resp.status_code}")
cache_control = resp.headers.get("cache-control", "")
print(f"\n🚀 E9-8a: HTML Cache-Control: {cache_control}")
assert "no-cache" in cache_control or "no-store" in cache_control, (
f"HTML 应为 no-cache/no-store,实际: {cache_control}"
)
def test_js_immutable(self):
"""JS 文件应为 immutable + 长缓存"""
# 先获取 HTML 找到 JS 文件路径
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip(f"Frontend not available")
import re
js_matches = re.findall(r'src="(/assets/[^"]+\.js)"', html_resp.text)
if not js_matches:
pytest.skip("No JS files found in HTML")
js_path = js_matches[0]
resp = http_requests.get(f"{API_BASE}{js_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" E9-8b: JS ({js_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"JS 应含 immutable,实际: {cache_control}"
)
assert "31536000" in cache_control, (
f"JS max-age 应为 31536000,实际: {cache_control}"
)
def test_css_immutable(self):
"""CSS 文件应为 immutable + 长缓存"""
html_resp = http_requests.get(f"{API_BASE}/", timeout=10)
if html_resp.status_code != 200:
pytest.skip("Frontend not available")
import re
css_matches = re.findall(r'href="(/assets/[^"]+\.css)"', html_resp.text)
if not css_matches:
pytest.skip("No CSS files found in HTML")
css_path = css_matches[0]
resp = http_requests.get(f"{API_BASE}{css_path}", timeout=10)
cache_control = resp.headers.get("cache-control", "")
print(f" E9-8c: CSS ({css_path}) Cache-Control: {cache_control}")
assert "immutable" in cache_control, (
f"CSS 应含 immutable,实际: {cache_control}"
)
print(f" ✅ 缓存头验证通过")
# ===================================================================
# E10c: 失败重试链
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE10cRetryChain:
"""E10c: failed → pending(手动重试)→ 广播 → 认领 → done"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_failed_to_pending_retry(self):
"""手动模拟失败 → 重试 → 等待调度完成"""
pid = _create_project(self._projects, "E10c",
agents=["zhangfei-dev", "simayi-challenger"])
tid = _create_task(
pid,
title="E2E重试任务:echo retry",
description=(
"请执行 echo retry 并标记done。"
"这是E2E测试,不需要做其他事。"
),
assignee="zhangfei-dev",
)
# 手动推到 failed(模拟 Agent 执行失败)
_update_status(pid, tid, "claimed", agent="zhangfei-dev")
_update_status(pid, tid, "working", agent="zhangfei-dev")
r_fail = _update_status(pid, tid, "failed", agent="zhangfei-dev")
assert r_fail.get("ok"), f"failed失败: {r_fail}"
task = _get_task(pid, tid)
assert task.get("status") == "failed"
print(f"\n🚀 E10c: 任务已标记failed,准备重试")
# 手动重试 → pending
r_retry = _update_status(pid, tid, "pending")
assert r_retry.get("ok"), f"重试pending失败: {r_retry}"
task2 = _get_task(pid, tid)
assert task2.get("status") == "pending"
# assignee 应被清空
assert task2.get("assignee") is None or task2.get("assignee") == "", (
f"重试后assignee应清空,实际: {task2.get('assignee')}"
)
# retry_count 应递增
retry_count = task2.get("retry_count", 0) or 0
print(f" retry_count: {retry_count}")
# 等待重新调度执行
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 重试后最终状态: {status}")
assert status != "pending", (
f"重试后未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
if status == "done":
print(f" ✅ 失败重试链正确")
else:
print(f" ⚠️ 重试后状态: {status}")
# ===================================================================
# E10d: 完整生命周期(广播认领版)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE10dFullLifecycle:
"""E10d: 无 assignee → 广播认领 → claimed → working → review → done
验证完整状态转换链 + events 记录。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_full_lifecycle_with_review(self):
"""完整生命周期:创建 → 广播 → 认领 → 执行 → review → done"""
pid = _create_project(self._projects, "E10d",
agents=["zhangfei-dev", "simayi-challenger"])
# 第一步:编码任务(张飞执行)
code_tid = _create_task(
pid,
title="E2E完整链路:编码任务",
description=(
"请执行 echo lifecycle 并标记done。"
"这是E2E完整生命周期测试,不需要做其他事。"
),
task_type="coding",
# 不指定 assignee → 广播认领
)
print(f"\n🚀 E10d: 等待编码任务广播认领 (pid={pid}, tid={code_tid})")
result = _poll_task(
pid, code_tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
code_status = result.get("status")
print(f" 编码任务最终状态: {code_status}")
assert code_status != "pending", "编码任务未被认领"
if code_status == "done":
# 验证 events 记录存在
events_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{code_tid}/events",
timeout=10,
)
if events_resp.status_code == 200:
events = events_resp.json()
event_types = [e.get("event_type") for e in events.get("events", [])]
print(f" Events: {event_types}")
# 应该有状态变化事件
assert any("claimed" in str(e) or "started" in str(e)
for e in event_types), (
f"缺少状态变化事件: {event_types}"
)
# 第二步:review 任务(不依赖 Agent 执行,手动推)
review_tid = _create_task(
pid,
title="E2E完整链路:review任务",
description="测试review状态",
assignee="simayi-challenger",
)
# 手动推完整生命周期
transitions = ["claimed", "working", "review", "done"]
for s in transitions:
r = _update_status(pid, review_tid, s, agent="simayi-challenger")
assert r.get("ok"), f"{s}失败: {r}"
task = _get_task(pid, review_tid)
assert task.get("status") == "done"
print(f" Review任务手动生命周期: ✅")
# 第三步:验证 done → cancelled(取消已完成任务)
r_cancel = _update_status(pid, review_tid, "cancelled")
assert r_cancel.get("ok"), f"done→cancelled失败: {r_cancel}"
task3 = _get_task(pid, review_tid)
assert task3.get("status") == "cancelled"
print(f" done→cancelled: ✅")
print(f" ✅ E10d 完整生命周期测试通过")
# ===================================================================
# E15: Prompt v3.0 广播三级响应 E2E
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
class TestE15PromptV3Broadcast:
"""E15: Prompt v3.0 广播认领三级响应 E2E
创建一个 assignee 不匹配任何已注册 Agent 的任务,
验证广播后 Agent 写了 observation comment(而非静默 NO_REPLY)。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def test_broadcast_observation_comment(self):
"""广播任务 → Agent 写 observation comment
Prompt v3.0 的 _build_claim_prompt 三级响应:
- 匹配 → claim
- 不匹配但能帮忙 → observation comment
- 不匹配且帮不上 → NO_REPLY(静默)
创建一个 assignee=simayi-challenger 但 task_type=coding 的任务,
司马懿收到后应写 observation comment(挑战者视角),而不是执行。
"""
pid = _create_project(self._projects, "E15",
agents=["simayi-challenger"])
tid = _create_task(
pid,
title="E2E Prompt v3.0:观察型任务",
description=(
"这是一个编码任务,但 assignee 是司马懿。\n"
"按照 Prompt v3.0 三级响应:\n"
"- 如果你认为应该由其他人执行,请写 observation comment\n"
"- 不需要实际执行编码\n"
"- 标记 done 即可\n"
"这是E2E测试,验证广播三级响应。"
),
assignee="simayi-challenger",
task_type="coding",
)
print(f"\n🚀 E15: 等待广播认领+Agent响应 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", "任务未被调度"
# 检查是否有 commentAgent 响应的证据)
db_path = _get_db_path(pid)
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
comments = conn.execute(
"SELECT author, comment_type, body FROM comments "
"WHERE task_id=? ORDER BY id DESC LIMIT 5",
(tid,),
).fetchall()
print(f" Comments ({len(comments)}):")
for c in comments:
print(f" [{c[0]}] {c[1]}: {c[2][:80]}...")
# 应该有至少一个 commentAgent 的响应)
assert len(comments) > 0, (
f"Agent 未写任何 commentPrompt v3.0 三级响应可能未生效"
)
finally:
conn.close()
print(f" ✅ Prompt v3.0 广播响应验证完成")
def test_broadcast_claim_by_matching_agent(self):
"""广播任务 → 匹配 Agent 执行 claim → done
对比测试:正确 assignee 的任务应被认领并执行。
"""
pid = _create_project(self._projects, "E15b",
agents=["zhangfei-dev"])
tid = _create_task(
pid,
title="E2E Prompt v3.0:认领型任务",
description=(
"请执行 echo claim-test 并标记done。\n"
"这是E2E测试,验证正确 assignee 的任务被认领执行。\n"
"不需要做其他事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E15b: 等待正确Agent认领 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", "任务未被认领"
assert status != "blocked", "任务被错误拦截"
print(f" ✅ 正确 assignee 认领执行验证通过")
+662
View File
@@ -0,0 +1,662 @@
import pytest
pytestmark = pytest.mark.e2e
"""#01 四相循环 单元测试
不依赖 daemon / Agent,纯逻辑验证。覆盖:
U1 mention_queue 写入与查询
U2 mention 重试上限
U3 一轮结束检测(全部终态)
U4 一轮结束 — 含 failed sub
U5 round_count 上限
U6 mention 按 agent 分组(mock spawner
U7 subtask_summary 聚合
U8 increment_round_count
U9 mention prompt 构建
U10 review prompt 构建
"""
import asyncio
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ── 路径设置 ──
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
SRC_DIR = DEPLOY_DIR / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
if str(DEPLOY_DIR) not in sys.path:
sys.path.insert(0, str(DEPLOY_DIR))
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
# ── Fixtures ──
@pytest.fixture
def data_root(tmp_path):
return tmp_path / "projects"
@pytest.fixture
def registry(data_root):
return ProjectRegistry(data_root)
@pytest.fixture
def project_env(data_root, registry):
"""创建项目 + DB + Blackboard,返回 (pid, db_path, bb)"""
pid = "test-proj"
registry.create_project(pid, "Test Project", agents=["agent-a", "agent-b"])
db_path = data_root / pid / "blackboard.db"
bb = Blackboard(db_path)
return pid, db_path, bb
def _make_task(bb: Blackboard, tid: str, **kwargs) -> str:
"""辅助:创建 task"""
defaults = {
"id": tid, "title": f"Task {tid}", "status": "pending",
"assigned_by": "daemon", "task_type": "coding",
}
defaults.update(kwargs)
bb.create_task(Task(**defaults))
return tid
def _push_status(bb: Blackboard, tid: str, *statuses: str, agent: str = "agent-a"):
"""辅助:推 task 状态链"""
for s in statuses:
bb.update_task_status(tid, s, agent=agent)
# ===================================================================
# U1: mention_queue 写入与查询
# ===================================================================
class TestU1MentionWriteQuery:
"""U1: mention 写入、去重、状态查询"""
def test_record_and_query_mentions(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello @agent-a @agent-b",
mentions=["agent-a", "agent-b"])
count = bb.record_mentions(cid, "t1", ["agent-a", "agent-b"])
assert count == 2
pending = bb.get_pending_mentions()
assert len(pending) == 2
statuses = {m["mentioned_agent"] for m in pending}
assert statuses == {"agent-a", "agent-b"}
def test_dedup_same_comment_same_agent(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
count2 = bb.record_mentions(cid, "t1", ["agent-a"])
assert count2 == 0 # 已存在,不重复写入
pending = bb.get_pending_mentions()
assert len(pending) == 1
def test_mark_notified_and_requery(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a", "agent-b"])
bb.record_mentions(cid, "t1", ["agent-a", "agent-b"])
pending = bb.get_pending_mentions()
assert len(pending) == 2
# 标记一个为 notified
bb.mark_mention_notified(pending[0]["id"])
pending2 = bb.get_pending_mentions()
assert len(pending2) == 1
def test_empty_mentions(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
count = bb.record_mentions(1, "t1", [])
assert count == 0
# ===================================================================
# U2: mention 重试上限
# ===================================================================
class TestU2MentionRetryLimit:
"""U2: retry_count 递增 + 超限后不再返回"""
def test_retry_and_limit(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
mention = bb.get_pending_mentions()[0]
assert mention["retry_count"] == 0
# 重试 4 次(retry_count → 4
for _ in range(4):
bb.mark_mention_retry(mention["id"])
# 仍可见(4 < 5
pending = bb.get_pending_mentions(max_retries=5)
assert len(pending) == 1
assert pending[0]["retry_count"] == 4
# 再 retry 一次 → retry_count = 5
bb.mark_mention_retry(mention["id"])
# 超限,不再返回
pending2 = bb.get_pending_mentions(max_retries=5)
assert len(pending2) == 0
def test_mark_failed(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1")
cid = bb.add_comment("t1", "author1", "Hello", mentions=["agent-a"])
bb.record_mentions(cid, "t1", ["agent-a"])
mention = bb.get_pending_mentions()[0]
bb.mark_mention_failed(mention["id"])
# failed 的不在 pending 中
pending = bb.get_pending_mentions()
assert len(pending) == 0
# ===================================================================
# U3: 一轮结束检测(全部终态 → 触发 review)
# ===================================================================
class TestU3RoundComplete:
"""U3: parent 下所有 sub 终态 → 触发庞统 review"""
def test_all_subs_done_triggers_review(self, project_env):
pid, db_path, bb = project_env
# parent + 3 subs
_make_task(bb, "parent", title="Parent Task")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_make_task(bb, "sub3", parent_task="parent")
# 所有 sub → done
for s in ("sub1", "sub2", "sub3"):
_push_status(bb, s, "claimed", "working", "review", "done")
# 需要先聚合 parent 状态
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done' WHERE id='parent'")
conn.commit()
conn.close()
# mock spawner
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=ProjectRegistry(Path("/tmp/fake")),
spawner=mock_spawner, dispatcher=MagicMock())
# mock _build_review_promptf-string 在 Python 3.9 有兼容性问题)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
result = await ticker._check_round_complete(db_path, pid)
return result
reviewed = asyncio.run(run())
assert "parent" in reviewed
# 验证 round_count 递增
task = bb.get_task("parent")
assert task.round_count == 1
# 验证 spawner 被调用
mock_spawner.spawn_full_agent.assert_called_once()
call_kwargs = mock_spawner.spawn_full_agent.call_args[1]
assert call_kwargs["agent_id"] == "pangtong-fujunshi"
def test_not_all_terminal_no_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
# sub2 仍 pending
ticker = Ticker(registry=MagicMock(), spawner=MagicMock(),
dispatcher=MagicMock())
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert reviewed == []
# ===================================================================
# U4: 一轮结束 — 含 failed sub
# ===================================================================
class TestU4RoundWithFailed:
"""U4: done + failed 都是终态,触发 review"""
def test_mixed_done_failed_triggers_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "failed")
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done' WHERE id='parent'")
conn.commit()
conn.close()
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert "parent" in reviewed
# ===================================================================
# U5: round_count 上限
# ===================================================================
class TestU5RoundLimit:
"""U5: round_count >= MAX_ROUNDS(5) 后不再触发"""
def test_at_limit_no_review(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
# 手动设 round_count = 5
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done', round_count=5 WHERE id='parent'")
conn.commit()
conn.close()
ticker = Ticker(registry=MagicMock(), spawner=MagicMock(),
dispatcher=MagicMock())
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert reviewed == [] # round_count=5 >= 5,不触发
def test_below_limit_triggers(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
from src.blackboard.db import get_connection
conn = get_connection(db_path)
conn.execute("UPDATE tasks SET status='done', round_count=4 WHERE id='parent'")
conn.commit()
conn.close()
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
async def run():
return await ticker._check_round_complete(db_path, pid)
reviewed = asyncio.run(run())
assert "parent" in reviewed # round_count=4 < 5,触发第 5 轮
# ===================================================================
# U6: mention 按 agent 分组
# ===================================================================
class TestU6MentionGrouping:
"""U6: _process_mentions 按 agent 分组,同 agent 多条 mention 合并一次 spawn"""
def test_grouped_by_agent(self, project_env):
pid, db_path, bb = project_env
_make_task(bb, "t1")
_make_task(bb, "t2")
# comment1 @agent-a
cid1 = bb.add_comment("t1", "author1", "msg1", mentions=["agent-a"])
bb.record_mentions(cid1, "t1", ["agent-a"])
# comment2 @agent-a + @agent-b
cid2 = bb.add_comment("t1", "author2", "msg2", mentions=["agent-a", "agent-b"])
bb.record_mentions(cid2, "t1", ["agent-a", "agent-b"])
# agent-a 有 2 条 mentionagent-b 有 1 条
pending = bb.get_pending_mentions()
assert len(pending) == 3
# mock spawner 追踪调用
mock_spawner = MagicMock()
mock_spawner.spawn_full_agent = AsyncMock(return_value="session-1")
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner,
dispatcher=MagicMock())
async def run():
return await ticker._process_mentions(db_path, pid)
processed = asyncio.run(run())
# spawn 应被调用 2 次(agent-a 一次,agent-b 一次)
assert mock_spawner.spawn_full_agent.call_count == 2
assert set(processed) == {"agent-a", "agent-b"}
# 所有 mentions 应该是 notified
pending_after = bb.get_pending_mentions()
assert len(pending_after) == 0
# ===================================================================
# U7: subtask_summary 聚合
# ===================================================================
class TestU7SubtaskSummary:
"""U7: get_subtasks_summary 返回正确的状态计数"""
def test_mixed_statuses(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_make_task(bb, "sub3", parent_task="parent")
_make_task(bb, "sub4", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "review", "done")
_push_status(bb, "sub3", "claimed", "working", "failed")
# sub4 仍 pending
summary = bb.get_subtasks_summary("parent")
assert summary is not None
assert summary["total"] == 4
assert summary["done"] == 2
assert summary["failed"] == 1
assert summary["other"] == 1 # pending
assert summary["all_terminal"] is False
def test_all_terminal(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
_make_task(bb, "sub1", parent_task="parent")
_make_task(bb, "sub2", parent_task="parent")
_push_status(bb, "sub1", "claimed", "working", "review", "done")
_push_status(bb, "sub2", "claimed", "working", "failed")
summary = bb.get_subtasks_summary("parent")
assert summary["all_terminal"] is True
assert summary["done"] == 1
assert summary["failed"] == 1
def test_no_subs_returns_none(self, project_env):
_, _, bb = project_env
_make_task(bb, "lonely", title="No subs")
summary = bb.get_subtasks_summary("lonely")
assert summary is None
def test_nonexistent_parent_returns_none(self, project_env):
_, _, bb = project_env
summary = bb.get_subtasks_summary("nonexistent")
assert summary is None
# ===================================================================
# U8: increment_round_count
# ===================================================================
class TestU8IncrementRound:
"""U8: round_count 递增并持久化"""
def test_increment(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Parent")
task = bb.get_task("parent")
assert task.round_count == 0
r1 = bb.increment_round_count("parent")
assert r1 == 1
r2 = bb.increment_round_count("parent")
assert r2 == 2
# 持久化验证
task2 = bb.get_task("parent")
assert task2.round_count == 2
# ===================================================================
# U9: mention prompt 构建
# ===================================================================
class TestU9MentionPrompt:
"""U9: _build_mention_prompt 包含关键内容"""
def test_prompt_content(self, project_env):
_, _, bb = project_env
_make_task(bb, "t1", title="测试任务标题")
ticker = Ticker(registry=MagicMock())
ticker.spawner = MagicMock()
ticker.spawner.api_host = "127.0.0.1"
ticker.spawner.api_port = 8083
task = bb.get_task("t1")
mention_lines = ["- [agent-a] 这是一条 mention 消息"]
prompt = ticker._build_mention_prompt(
"agent-b", task, mention_lines, "test-proj")
assert "agent-b" in prompt
assert "测试任务标题" in prompt
assert "mention" in prompt.lower() or "@" in prompt or "" in prompt
assert "test-proj" in prompt
assert "8083" in prompt # API 端口
# ===================================================================
# U10: review prompt 构建
# ===================================================================
class TestU10ReviewPrompt:
"""U10: _build_review_prompt 输入契约验证
注意:_build_review_prompt 的 f-string 在 Python 3.9 有兼容性问题,
所以验证函数调用契约(参数),而非 prompt 文本内容。
prompt 内容在 Python 3.12+ 环境中测试。
"""
def test_review_prompt_basic(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Goal Task",
description="这是目标描述")
parent_task = bb.get_task("parent")
mock_spawner = MagicMock()
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
summary = {"done": 2, "failed": 1, "cancelled": 0, "total": 3}
prompt = ticker._build_review_prompt(
parent_task, summary, [], [], 1, project_id="test-proj")
# 验证调用契约(参数正确传递)
call_args = ticker._build_review_prompt.call_args
assert call_args[0][0].id == "parent" # parent_task
assert call_args[0][1] == summary # summary
assert call_args[0][4] == 1 # round_num
assert call_args[1]["project_id"] == "test-proj"
assert prompt == "Review prompt mock"
def test_review_prompt_with_failures(self, project_env):
_, _, bb = project_env
_make_task(bb, "parent", title="Goal",
description="目标")
parent_task = bb.get_task("parent")
mock_spawner = MagicMock()
mock_spawner.api_host = "127.0.0.1"
mock_spawner.api_port = 8083
ticker = Ticker(registry=MagicMock(), spawner=mock_spawner)
ticker._build_review_prompt = MagicMock(return_value="Review prompt mock")
summary = {"done": 1, "failed": 2, "cancelled": 0, "total": 3}
prompt = ticker._build_review_prompt(
parent_task, summary, [], [], 2, project_id="proj-1")
# 验证含失败的 summary 正确传递
call_args = ticker._build_review_prompt.call_args
assert call_args[0][1]["failed"] == 2
assert call_args[0][4] == 2 # round_num
class TestU11ReviewingState:
"""reviewing 中间态防重复触发"""
def test_reviewing_skipped_in_round_check(self, project_env):
"""reviewing 状态的 parent 不触发一轮结束检测"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-1")
_make_task(bb, "s1", parent_task=parent)
# sub done
bb.update_task_status("s1", "claimed", agent="test")
bb.update_task_status("s1", "working", agent="test")
bb.update_task_status("s1", "review", agent="test")
bb.update_task_status("s1", "done", agent="test")
# parent done → reviewing
bb.update_task_status("parent-1", "claimed", agent="test")
bb.update_task_status("parent-1", "working", agent="test")
bb.update_task_status("parent-1", "review", agent="test")
bb.update_task_status("parent-1", "done", agent="test")
bb.update_task_status("parent-1", "reviewing", agent="daemon")
# 验证 summary 返回 reviewing
summary = bb.get_subtasks_summary("parent-1")
assert summary is not None
assert summary["parent_status"] == "reviewing"
# reviewing 不在 ("done", "failed") 中 → _check_round_complete 应跳过
assert summary["parent_status"] not in ("done", "failed")
def test_reviewing_not_overwritten_by_aggregation(self, project_env):
"""reviewing 是 MANUAL_STATUS,不被 compute_parent_status 覆盖"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-2")
_make_task(bb, "s1", parent_task=parent)
# sub done
bb.update_task_status("s1", "claimed", agent="test")
bb.update_task_status("s1", "working", agent="test")
bb.update_task_status("s1", "review", agent="test")
bb.update_task_status("s1", "done", agent="test")
# parent done → reviewing
bb.update_task_status("parent-2", "claimed", agent="test")
bb.update_task_status("parent-2", "working", agent="test")
bb.update_task_status("parent-2", "review", agent="test")
bb.update_task_status("parent-2", "done", agent="test")
bb.update_task_status("parent-2", "reviewing", agent="daemon")
# compute_parent_status 应返回 reviewing(不覆盖)
from src.blackboard.queries import Queries
q = Queries(bb.db_path)
computed = q.compute_parent_status("parent-2")
assert computed == "reviewing"
def test_reviewing_to_done_transition(self, project_env):
"""reviewing → done 转换合法(GOAL_ACHIEVED 时)"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-3")
bb.update_task_status("parent-3", "claimed", agent="test")
bb.update_task_status("parent-3", "working", agent="test")
bb.update_task_status("parent-3", "review", agent="test")
bb.update_task_status("parent-3", "done", agent="test")
bb.update_task_status("parent-3", "reviewing", agent="daemon")
# reviewing → doneGOAL_ACHIEVED 后)
result = bb.update_task_status("parent-3", "done", agent="daemon")
assert result is True
task = bb.get_task("parent-3")
assert task.status == "done"
def test_reviewing_to_working_transition(self, project_env):
"""reviewing → working 转换合法(继续下一轮时)"""
tmpdir, pid, bb = project_env
parent = _make_task(bb, "parent-4")
bb.update_task_status("parent-4", "claimed", agent="test")
bb.update_task_status("parent-4", "working", agent="test")
bb.update_task_status("parent-4", "review", agent="test")
bb.update_task_status("parent-4", "done", agent="test")
bb.update_task_status("parent-4", "reviewing", agent="daemon")
# reviewing → working(继续下一轮)
result = bb.update_task_status("parent-4", "working", agent="daemon")
assert result is True
task = bb.get_task("parent-4")
assert task.status == "working"