Files
sanguo_moziplus_v2/tests/e2e/test_e2e_v27.py
T
cfdaily b90b7b37c7
CI / lint (pull_request) Successful in 8s
CI / test (pull_request) Successful in 8s
CI / notify-on-failure (pull_request) Successful in 1s
fix(test): e2e test 在 collection 阶段跳过(不 import 安装目录)
根因: test_e2e_v27.py 的 skipif 只标记了函数级别,pytest collection 阶段
仍会 import 该文件,触发 sys.path.insert 指向安装目录的 spawner.py。
如果安装目录有 merge conflict 残留,整个 test job crash。

修复: 将 skipif 加入 pytestmark 级别,collection 阶段即跳过。
2026-06-10 07:52:41 +08:00

1636 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest
skip_no_integration = pytest.mark.skipif(
not __import__("os").environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests against real daemon",
)
pytestmark = [pytest.mark.e2e, skip_no_integration]
"""v2.7 端到端测试 — 全链路真实环境
覆盖:项目管理 → Task CRUD → SubTask → Stage进度 → 状态聚合 → 依赖链 → 超时 → Mail → 真实Agent调度
"""
import asyncio
import json
import os
import sys
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.main import app
from src.blackboard.db import init_db, get_connection, VALID_TRANSITIONS, VALID_STATUSES
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.utils import get_data_root
DATA_ROOT = get_data_root()
# ── Fixtures ──
@pytest.fixture(scope="module")
def client():
return TestClient(app)
@pytest.fixture(scope="module")
def data_root():
return get_data_root()
def _pid() -> str:
"""生成唯一测试项目ID"""
return f"e2e-v27-{uuid.uuid4().hex[:8]}"
def _tid() -> str:
"""生成唯一任务ID"""
return f"e2e-task-{uuid.uuid4().hex[:8]}"
# ===================================================================
# E1: 项目管理
# ===================================================================
@skip_no_integration
class TestE1ProjectManagement:
"""E1: 项目创建、列表、归档"""
def test_e11_create_and_get_project(self, client):
pid = _pid()
resp = client.post("/api/projects", json={
"id": pid,
"name": f"E2E测试-{pid}",
"description": "自动测试项目",
})
assert resp.status_code == 200
data = resp.json()
assert data.get("project_id") == pid or data.get("id") == pid
# 获取
resp = client.get(f"/api/projects/{pid}")
assert resp.status_code == 200
assert resp.json().get("id") == pid
def test_e12_list_projects(self, client):
resp = client.get("/api/projects")
assert resp.status_code == 200
data = resp.json()
assert "projects" in data
assert isinstance(data["projects"], dict)
def test_e13_archive_project(self, client):
pid = _pid()
client.post("/api/projects", json={"id": pid, "name": f"Archive-{pid}"})
resp = client.post(f"/api/projects/{pid}/archive")
assert resp.status_code == 200
# 验证状态
resp = client.get(f"/api/projects/{pid}")
assert resp.json()["status"] == "archived"
def test_e14_create_project_auto_discover(self, data_root):
"""创建含 blackboard.db 的目录,验证自动发现"""
pid = _pid()
project_dir = data_root / pid
project_dir.mkdir(parents=True, exist_ok=True)
init_db(project_dir / "blackboard.db")
# 注册
registry = ProjectRegistry(data_root)
registry.create_project(pid, f"Discover-{pid}", source="auto_discovered")
projects = registry.list_projects()
assert pid in projects
# 清理
import shutil
shutil.rmtree(project_dir, ignore_errors=True)
# ===================================================================
# E2: Task CRUD + 状态机
# ===================================================================
@skip_no_integration
class TestE2TaskCRUD:
"""E2: Task 创建、查询、状态转换"""
@pytest.fixture(autouse=True)
def setup_project(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E2-{self.pid}",
})
def test_e21_create_task(self, client):
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "测试任务",
"description": "E2E测试",
"status": "pending",
})
assert resp.status_code == 200
assert resp.json().get("task_id") == tid or resp.json().get("id") == tid
self._tid = tid
def test_e22_get_task_expand(self, client):
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "Expand测试", "status": "pending",
})
resp = client.get(f"/api/projects/{self.pid}/tasks/{tid}?expand=all")
assert resp.status_code == 200
data = resp.json()
assert data.get("id") == tid
assert "comments" in data or "outputs" in data
def test_e23_valid_transitions(self, client):
"""pending → claimed → working → review → done"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "状态机测试", "status": "pending",
})
transitions = [
("claimed", {"agent": "zhangfei-dev"}),
("working", {"agent": "zhangfei-dev"}),
("review", {"agent": "zhangfei-dev"}),
("done", {"agent": "zhangfei-dev"}),
]
for new_status, body in transitions:
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": new_status, **body},
)
assert resp.status_code == 200, f"Failed at → {new_status}: {resp.text}"
def test_e24_invalid_transition_rejected(self, client):
"""pending → done 应被拒绝"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "非法转换", "status": "pending",
})
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": "done", "agent": "test"},
)
assert resp.status_code == 409
assert "invalid_transition" in resp.text or "Cannot transition" in resp.text
def test_e25_list_tasks_filter(self, client):
"""按状态筛选"""
for st in ["pending", "pending"]:
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": _tid(), "title": "Filter", "status": st,
})
resp = client.get(f"/api/projects/{self.pid}/tasks?status=pending")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) >= 2
# ===================================================================
# E3: SubTask 父子关系
# ===================================================================
@skip_no_integration
class TestE3SubTask:
"""E3: 父子 Task 关系"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E3-{self.pid}",
})
self.parent_id = "e3-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "setup", "label": "Setup"}, {"id": "run", "label": "Run"}, {"id": "verify", "label": "Verify"}]),
})
# 创建 3 个子 Task
self.child_ids = []
for i, stage in enumerate(["setup", "run", "verify"]):
cid = f"e3-child-{i}"
self.child_ids.append(cid)
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_e31_list_subtasks(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks?parent_task={self.parent_id}")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) == 3
ids = {t["id"] for t in tasks}
assert set(self.child_ids) == ids
def test_e32_top_level_excludes_children(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
top = q.top_level_tasks()
top_ids = {t.id for t in top}
assert self.parent_id in top_ids
for cid in self.child_ids:
assert cid not in top_ids
def test_e33_child_stage_field(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for i, stage in enumerate(["setup", "run", "verify"]):
t = bb.get_task(f"e3-child-{i}")
assert t is not None
assert t.stage == stage
# ===================================================================
# E4: Stage 进度
# ===================================================================
@skip_no_integration
class TestE4StageProgress:
"""E4: stages_json + stage 分组统计"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E4-{self.pid}",
})
self.parent_id = "e4-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "Stage父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "data", "label": "Data"}, {"id": "code", "label": "Code"}, {"id": "test", "label": "Test"}]),
})
# 每个stage一个子任务
for i, stage in enumerate(["data", "code", "test"]):
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": f"e4-child-{i}",
"title": f"Stage-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_e41_progress_endpoint(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks/{self.parent_id}/progress")
assert resp.status_code == 200
data = resp.json()
assert "stages" in data
assert len(data["stages"]) == 3
def test_e42_progress_counts(self, data_root):
"""把 data stage 标记 done,验证进度"""
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
# data child → done
bb.update_task_status("e4-child-0", "claimed", agent="test")
bb.update_task_status("e4-child-0", "working", agent="test")
bb.update_task_status("e4-child-0", "review", agent="test")
bb.update_task_status("e4-child-0", "done", agent="test")
q = Queries(db_path)
progress = q.parent_task_progress(self.parent_id)
stages = {s["id"]: s for s in progress["stages"]}
assert stages["data"]["total"] == 1
assert stages["data"]["done"] == 1
assert stages["code"]["total"] == 1
assert stages["code"]["done"] == 0
def test_e43_empty_stages_progress(self, client):
"""无 stages_json 的 Task 进度"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "无Stage", "status": "pending",
})
db_path = get_data_root() / self.pid / "blackboard.db"
q = Queries(db_path)
progress = q.parent_task_progress(tid)
# 无子任务,应返回空或基本结构
assert progress is not None
# ===================================================================
# E5: 父 Task 状态聚合
# ===================================================================
@skip_no_integration
class TestE5ParentAggregation:
"""E5: compute_parent_status 聚合逻辑"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
self.q = Queries(db_path)
# 创建父任务
self.parent_id = "e5-parent"
self.bb.create_task(Task(
id=self.parent_id, title="聚合父任务", status="pending",
))
def _create_child(self, cid: str, status: str = "pending", stage: str = None):
self.bb.create_task(Task(
id=cid, title=f"子-{cid}", status="pending",
parent_task=self.parent_id, stage=stage,
))
if status == "pending":
return
# Walk state machine to target status
path_map = {
"claimed": ["claimed"],
"working": ["claimed", "working"],
"review": ["claimed", "working", "review"],
"done": ["claimed", "working", "review", "done"],
"failed": ["claimed", "working", "failed"],
"blocked": ["claimed", "working", "blocked"],
"cancelled": ["cancelled"],
}
for s in path_map.get(status, []):
self.bb.update_task_status(cid, s, agent="test")
def test_e51_all_done_parent_done(self):
self._create_child("c1", "done")
self._create_child("c2", "done")
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_e52_has_review_parent_review(self):
self._create_child("c3", "done")
self._create_child("c4", "review")
result = self.q.compute_parent_status(self.parent_id)
assert result == "review"
def test_e53_has_working_parent_working(self):
self._create_child("c5", "done")
self._create_child("c6", "working")
result = self.q.compute_parent_status(self.parent_id)
assert result == "working"
def test_e54_all_pending_parent_pending(self):
self._create_child("c7", "pending")
self._create_child("c8", "pending")
result = self.q.compute_parent_status(self.parent_id)
assert result == "pending"
def test_e55_cancelled_excluded(self):
"""cancelled 子 Task 不参与聚合"""
self._create_child("c9", "done")
self._create_child("c10", "cancelled")
# 只有 c9 有效 → done
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_e56_cancelled_parent_not_overridden(self):
"""cancelled 的父 Task 不被聚合覆盖"""
# 先让所有子任务 done
self._create_child("c11", "done")
# 手动把父任务设为 cancelled
conn = get_connection(self.db_path)
try:
conn.execute("UPDATE tasks SET status='cancelled' WHERE id=?", (self.parent_id,))
conn.commit()
finally:
conn.close()
# _refresh_parent_statuses 应跳过 cancelled 父任务
ticker = Ticker(registry=MagicMock())
ticker._refresh_parent_statuses(self.db_path)
# 验证父任务仍是 cancelled
t = self.bb.get_task(self.parent_id)
assert t.status == "cancelled"
# ===================================================================
# E6: 依赖链
# ===================================================================
@skip_no_integration
class TestE6DependencyChain:
"""E6: depends_on 依赖推进"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_e61_dep_advance(self):
"""A done → B 从 blocked → pending"""
self.bb.create_task(Task(id="dep-a", title="A", status="pending"))
self.bb.create_task(Task(id="dep-b", title="B", status="blocked", depends_on=json.dumps(["dep-a"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("dep-a", s, agent="test")
# 手动 tick 依赖推进
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-b" in advanced
t = self.bb.get_task("dep-b")
assert t.status == "pending"
def test_e62_dep_not_done_stays_blocked(self):
"""A 未完成 → B 保持 blocked"""
self.bb.create_task(Task(id="dep-c", title="C", status="pending"))
self.bb.create_task(Task(id="dep-d", title="D", status="blocked", depends_on=json.dumps(["dep-c"])))
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-d" not in advanced
assert self.bb.get_task("dep-d").status == "blocked"
def test_e63_chain_a_b_c(self):
"""A → B → C 多层依赖"""
self.bb.create_task(Task(id="chain-a", title="A", status="pending"))
self.bb.create_task(Task(id="chain-b", title="B", status="blocked", depends_on=json.dumps(["chain-a"])))
self.bb.create_task(Task(id="chain-c", title="C", status="blocked", depends_on=json.dumps(["chain-b"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-a", s, agent="test")
ticker = Ticker(registry=MagicMock())
advanced1 = ticker._advance_dependencies(self.db_path)
assert "chain-b" in advanced1
# B 现在 pending,完成 B
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-b", s, agent="test")
advanced2 = ticker._advance_dependencies(self.db_path)
assert "chain-c" in advanced2
# ===================================================================
# E7: 超时回收
# ===================================================================
@skip_no_integration
class TestE7Timeout:
"""E7: claimed/working 超时回收"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_e71_claimed_timeout_to_pending(self):
"""claimed 超过 claim_timeout → pending"""
self.bb.create_task(Task(id="to-001", title="超时测试", status="pending"))
self.bb.update_task_status("to-001", "claimed", agent="test-agent")
# 手动把 claimed_at 设为 10 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET claimed_at=datetime('now','-10 minutes') WHERE id=?",
("to-001",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-001" in reclaimed
assert self.bb.get_task("to-001").status == "pending"
def test_e72_working_timeout_to_failed(self):
"""working 超过 task_timeout → failed"""
self.bb.create_task(Task(id="to-002", title="工作超时", status="pending"))
self.bb.update_task_status("to-002", "claimed", agent="test-agent")
self.bb.update_task_status("to-002", "working", agent="test-agent")
# 手动把 started_at 设为 60 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-60 minutes') WHERE id=?",
("to-002",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-002" in reclaimed
assert self.bb.get_task("to-002").status == "failed"
# ===================================================================
# E8: Mail Tab 6 端点
# ===================================================================
@skip_no_integration
class TestE8MailTab:
"""E8: Mail 端到端"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.mail_ids = []
def _send_mail(self, client, **kwargs):
# Fix: 'from' is a Python keyword, callers use 'from_'
if 'from_' in kwargs:
kwargs['from'] = kwargs.pop('from_')
resp = client.post("/api/mail", json=kwargs)
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True
mid = data["mail_id"]
self.mail_ids.append(mid)
return mid
def test_e81_send_inform_auto_done(self, client):
"""inform 类型 mail — API 层创建为 pending,由 ticker 处理自动完成"""
mid = self._send_mail(client,
title="通知测试",
text="这是一条通知",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="inform",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
# API 层创建为 pendingA1-A10 防御改造后 inform 不再自动标 done
# 实际 done 由 ticker 的 mail 幻觉门控兜底处理
assert data["status"] in ("pending", "done"), (
f"inform mail status should be pending or done, got {data['status']}"
)
def test_e82_send_task_assign_pending(self, client):
"""task-assign 类型保持 pending"""
mid = self._send_mail(client,
title="任务分配",
text="请完成此任务",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["status"] == "pending"
def test_e83_list_with_filters(self, client):
"""列表 + 筛选"""
self._send_mail(client,
title="筛选测试1",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
self._send_mail(client,
title="筛选测试2",
text="body",
from_="zhangfei-dev",
to="simayi-challenger",
type="text",
)
# 按 from 筛选
resp = client.get("/api/mail?from_agent=pangtong-fujunshi")
mails = resp.json()["mails"]
assert any(m["title"] == "筛选测试1" for m in mails)
# 按 to 筛选
resp = client.get("/api/mail?to_agent=simayi-challenger")
mails = resp.json()["mails"]
assert len(mails) >= 2
def test_e84_mail_detail_with_comments(self, client):
"""详情 + 评论"""
mid = self._send_mail(client,
title="详情测试",
text="正文内容",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 添加评论(通过 blackboard 直接写)
from src.api.mail_routes import _db_path
conn = get_connection(_db_path())
try:
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(mid, "simayi-challenger", "general", "收到,正在处理"),
)
conn.commit()
finally:
conn.close()
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["title"] == "详情测试"
# 验证评论已写入(通过直接查 DB,绕过 Comment.from_row 的 card_id 兼容问题)
from src.api.mail_routes import _db_path as mail_db
conn = get_connection(mail_db())
try:
row = conn.execute("SELECT COUNT(*) as cnt FROM comments WHERE task_id=?", (mid,)).fetchone()
assert row["cnt"] == 1
finally:
conn.close()
def test_e85_mark_read(self, client):
"""标记已读"""
mid = self._send_mail(client,
title="已读测试",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 确认初始未读
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is False
# 标记已读
resp = client.patch(f"/api/mail/{mid}", json={"is_read": True})
assert resp.status_code == 200
# 验证
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is True
def test_e86_mark_executed(self, client):
"""标记已执行(走完整状态链)"""
mid = self._send_mail(client,
title="执行测试",
text="body",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
# 先走状态链到 review,再标记 executed
from src.api.mail_routes import _db_path as mail_db
bb = Blackboard(mail_db())
for s in ["claimed", "working", "review"]:
bb.update_task_status(mid, s, agent="zhangfei-dev")
resp = client.patch(f"/api/mail/{mid}", json={"mark_executed": True})
assert resp.status_code == 200
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["is_read"] is True
assert data["status"] == "done"
def test_e87_summary_and_agents(self, client):
"""统计 + Agent 列表"""
self._send_mail(client,
title="统计测试",
text="body",
from_="zhaoyun-data",
to="guanyu-dev",
type="inform",
)
resp = client.get("/api/mail/summary")
data = resp.json()
assert "total" in data
assert "unread" in data
assert data["total"] > 0
resp = client.get("/api/mail/agents/list")
data = resp.json()
assert "agents" in data
assert isinstance(data["agents"], list)
# ===================================================================
# E9: 真实 Agent 调度(生产环境全链路)
# ===================================================================
import requests as http_requests
API_BASE = "http://localhost:8083"
POLL_INTERVAL = 5 # 轮询间隔秒
MAX_WAIT_DISPATCH = 60 # 等待调度超时(2个tick
MAX_WAIT_AGENT = 300 # 等待Agent完成超时
E2E_PREFIX = "e2e-v30-"
def _check_environment():
"""环境前置检查:daemon 运行 + ticker 活跃 + 8083 可达"""
try:
resp = http_requests.get(f"{API_BASE}/api/daemon/status", timeout=5)
data = resp.json()
if data.get("status") != "running" or not data.get("ticker_running"):
pytest.skip(f"Daemon not ready: {data}")
return data
except Exception as e:
pytest.skip(f"Production API not available at {API_BASE}: {e}")
def _cleanup_project(pid: str):
"""清理测试项目"""
try:
http_requests.post(f"{API_BASE}/api/projects/{pid}/archive", timeout=5)
except Exception:
pass
def _poll_task(pid, tid, timeout, terminal_states=None):
"""轮询任务状态直到终态或超时"""
terminal = terminal_states or ("done", "failed", "cancelled")
deadline = time.time() + timeout
last_status = None
while time.time() < deadline:
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
if resp.status_code == 200:
data = resp.json()
last_status = data.get("status")
if last_status in terminal:
return data
except Exception:
pass
time.sleep(POLL_INTERVAL)
# 超时,返回最后状态
try:
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10
)
return resp.json() if resp.status_code == 200 else {"status": "unknown"}
except Exception:
return {"status": "unknown"}
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE9RealAgentDispatch:
"""E9: 真实 Agent 调度测试
通过生产 HTTP API 创建任务,依赖生产 Ticker 自动调度,
真实 Agent spawn 执行,全程不手动推动状态。
需要设置环境变量 RUN_INTEGRATION=1 才会运行。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
# teardown: 清理所有测试项目
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E9") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200, f"Create project failed: {resp.text}"
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10
)
assert resp.status_code == 200, f"Create task failed: {resp.text}"
return tid
def test_e91_simple_task_agent_execute(self):
"""E9-1: 简单任务 → 生产Ticker调度 → 真实Agent执行 → 状态自动流转到终态"""
pid = self._create_project("E9-1")
tid = self._create_task(
pid,
title="E2E简单任务:echo hello",
description=(
"请执行以下操作:\n"
"1. 运行命令 echo hello\n"
"2. 把输出结果写入黑板\n"
"这是E2E自动化测试,完成后请标记done。\n"
"重要:不需要做其他任何事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E9-1: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 断言1:不能还是 pending(证明被调度了)
assert status != "pending", (
f"Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
f"\n请检查:1) Ticker是否运行 2) Agent是否可spawn 3) pm2 logs sanguo-moziplus-v2"
)
# 断言2:不能是 blockedguardrails 不应拦截普通任务)
assert status != "blocked", f"普通任务被错误拦截: {result}"
# 断言3:成功最好,failed 也记录原因
if status == "failed":
print(f" ⚠️ Agent执行失败,detail: {result}")
else:
print(f" ✅ Agent执行成功")
print(f" 性能: pending → {status}")
def test_e92_review_task_dispatch(self):
"""E9-2: review任务 → 路由到can_review Agent → 真实执行"""
pid = self._create_project("E9-2")
tid = self._create_task(
pid,
title="E2E Review:审查测试",
description=(
"这是一个E2E测试的review任务。\n"
"请简单回复:\"审查通过,E2E测试正常。\"\n"
"然后标记done即可。不需要做其他事。"
),
assignee="simayi-challenger",
task_type="review",
)
print(f"\n🚀 E9-2: 等待review调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "pending", (
f"Review Agent未被调度!任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending。"
)
assert status != "blocked", f"Review任务被错误拦截: {result}"
if status == "failed":
print(f" ⚠️ Review Agent执行失败")
else:
print(f" ✅ Review Agent执行成功")
def test_e93_guardrail_block(self):
"""E9-3: 实盘交易任务 → Guardrails拦截 → status=blocked"""
pid = self._create_project("E9-3")
tid = self._create_task(
pid,
title="执行实盘买入SH600000",
description="用真金白银买入浦发银行1000股",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E9-3: 等待Guardrails拦截 (pid={pid}, tid={tid})")
# Guardrails 在 Ticker dispatch 时检查,需要等一个 tick
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("done", "failed", "cancelled", "blocked", "claimed", "working"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 实盘任务必须被拦截,不能进入 claimed/working/done
assert status not in ("claimed", "working", "done"), (
f"Guardrails未拦截实盘任务!状态: {status}"
)
print(f" ✅ Guardrails拦截生效,状态: {status}")
# ===================================================================
# E10: 全链路集成(生产环境)
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE10FullChain:
"""E10: 项目 → 父子Task → 生产Ticker → 聚合 → 依赖 → Mail → 前端API
全部通过生产 HTTP API,真实 Ticker 推进,真实 Agent 执行。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E10") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev", "simayi-challenger"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e10a_logic_chain(self):
"""E10a: 父子Task + 依赖推进 + 状态聚合 + Mail(不依赖Agent完成)"""
pid = self._create_project("E10a")
# 1. 创建父任务(带stages
parent_id = f"{pid}-parent"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "E10a全链路父任务",
"status": "pending",
"stages_json": json.dumps([
{"id": "setup", "label": "Setup"},
{"id": "execute", "label": "Execute"},
{"id": "verify", "label": "Verify"},
]),
}, timeout=10)
# 2. 创建3个子任务
child_ids = []
for i, stage in enumerate(["setup", "execute", "verify"]):
cid = f"{pid}-child-{i}"
child_ids.append(cid)
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": parent_id,
"stage": stage,
}, timeout=10)
# 3. 创建依赖任务(blocked
dep_id = f"{pid}-dep"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": dep_id,
"title": "依赖任务",
"depends_on": json.dumps([child_ids[0]]),
}, timeout=10)
# 4. 先推进依赖任务到 blocked(必须在依赖完成之前)
for status in ["claimed", "working", "blocked"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{dep_id}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 5. 再推进 setup 子任务到 done(触发依赖推进条件)
for status in ["claimed", "working", "review", "done"]:
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{child_ids[0]}/status",
json={"status": status, "agent": "test"}, timeout=10,
)
# 6. 等待 Ticker 依赖推进(1-2个tick
print(f"\n🚀 E10a: 等待依赖推进 (pid={pid})")
dep_result = _poll_task(
pid, dep_id, timeout=MAX_WAIT_DISPATCH,
terminal_states=("pending", "claimed", "working", "done"),
)
dep_status = dep_result.get("status")
print(f" 依赖任务状态: blocked → {dep_status}")
# blocked 应被推进为 pending
assert dep_status != "blocked", (
f"依赖推进未生效!{dep_id}{MAX_WAIT_DISPATCH}s 后仍为 blocked"
)
# 7. 发送 Mail
http_requests.post(f"{API_BASE}/api/mail", json={
"title": f"E10a全链路通知-{pid}",
"text": f"项目 {pid} setup阶段完成",
"from": "simayi-challenger",
"to": "pangtong-fujunshi",
"type": "inform",
}, timeout=10)
# 8. 验证 Mail
resp = http_requests.get(
f"{API_BASE}/api/mail?from_agent=simayi-challenger", timeout=10
)
mails = resp.json()["mails"]
assert any(m["title"].startswith("E10a全链路") for m in mails), \
f"Mail未找到,当前mails: {[m['title'] for m in mails]}"
# 9. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
assert stages["setup"]["done"] == 1, f"setup stage should be done"
assert stages["execute"]["done"] == 0
# 10. 验证父状态聚合
parent_resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}", timeout=10
)
parent_data = parent_resp.json()
print(f" 父任务状态: {parent_data.get('status')}")
# 1done+1pending+1pending → 应该在 pending/working
assert parent_data.get("status") in ("pending", "working"), \
f"父任务状态异常: {parent_data['status']}"
print(f" ✅ E10a 全链路测试通过")
def test_e10b_agent_full_chain(self):
"""E10b: 真实Agent全链路 — 创建子任务 → Agent执行 → 父状态自动聚合"""
pid = self._create_project("E10b")
# 1. 创建父任务
parent_id = f"{pid}-parent"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": parent_id,
"title": "E10b Agent全链路父任务",
"status": "pending",
"stages_json": json.dumps([
{"id": "step1", "label": "Step 1"},
{"id": "step2", "label": "Step 2"},
]),
}, timeout=10)
# 2. 创建子任务(真实Agent执行)
child_id = f"{pid}-child-0"
http_requests.post(f"{API_BASE}/api/projects/{pid}/tasks", json={
"id": child_id,
"title": "E2E子任务:echo test",
"description": (
"请执行 echo test 并标记done。"
"这是E2E测试,不需要做其他事。"
),
"status": "pending",
"parent_task": parent_id,
"stage": "step1",
"assignee": "zhangfei-dev",
"task_type": "coding",
}, timeout=10)
# 3. 等待Agent执行完成
print(f"\n🚀 E10b: 等待Agent执行子任务 (pid={pid}, tid={child_id})")
result = _poll_task(
pid, child_id, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
child_status = result.get("status")
print(f" 子任务最终状态: {child_status}")
assert child_status != "pending", (
f"子任务未被调度!{MAX_WAIT_AGENT}s后仍为pending"
)
# 4. 轮询父任务状态变化(等待Ticker聚合)
parent_result = _poll_task(
pid, parent_id, timeout=MAX_WAIT_DISPATCH,
terminal_states=("working", "review", "done", "failed"),
)
parent_data = parent_result
print(f" 父任务状态: {parent_data.get('status')}")
# 5. 验证 Stage 进度
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{parent_id}/progress", timeout=10
)
if resp.status_code == 200:
progress = resp.json()
stages = {s["id"]: s for s in progress.get("stages", [])}
print(f" Stage进度: step1.done={stages.get('step1', {}).get('done', '?')}")
print(f" ✅ E10b 真实Agent全链路完成")
# ===================================================================
# E11: Acquire-First 真实 Agent E2E#07.1
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE11AcquireFirstE2E:
"""E11: #07.1 Acquire-First Phase 1-4 真实 Agent E2E
验证真实 daemon 调度路径中 counter acquire + session check + spawn 的完整流程。
通过检查 routing_decisions 表和任务状态来验证。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E11") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def _create_task(self, pid, **kwargs) -> str:
tid = kwargs.get("id") or f"e2e-task-{uuid.uuid4().hex[:8]}"
body = {"id": tid, "status": "pending", "priority": 5, **kwargs}
resp = http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks", json=body, timeout=10,
)
assert resp.status_code == 200
return tid
def test_e11a_dispatch_with_routing_audit(self):
"""E11a: 任务调度 → 检查 routing_decisions 记录了 dispatch + selected_agent
验证 Phase 1-4 正常完成:counter acquire → session check → spawn。
routing_decisions 应记录 dispatched outcome + selected_agent。
"""
pid = self._create_project("E11a")
tid = self._create_task(
pid,
title="E2E Acquire-Firstecho hello",
description=(
"请执行 echo hello-world 并标记done。\n"
"这是E2E测试 #07.1 Acquire-First,不需要做其他事。"
),
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E11a: 等待调度 + Agent执行 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status = result.get("status")
print(f" 最终状态: {status}")
# 验证任务被调度(不是 pending)
assert status != "pending", (
f"任务 {tid}{MAX_WAIT_AGENT}s 后仍为 pending,调度未生效"
)
# 验证 routing_decisions 有记录(如果表存在)
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
# 检查表是否存在
tables = [r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()]
if "routing_decisions" not in tables:
print(f" ⚠️ routing_decisions 表不存在,跳过审计验证")
else:
rows = conn.execute(
"SELECT * FROM routing_decisions WHERE task_id=? ORDER BY id DESC LIMIT 1",
(tid,),
).fetchall()
if len(rows) > 0:
row = rows[0]
print(f" routing: mode={row['mode']} agent={row['selected_agent']} outcome={row['outcome']}")
assert row["selected_agent"] == "zhangfei-dev"
assert row["outcome"] == "dispatched"
else:
print(f" ⚠️ routing_decisions 无记录 for {tid},调度可能未走 routing 路径")
finally:
conn.close()
print(f" ✅ Acquire-First 调度审计验证通过")
def test_e11b_concurrent_dispatch_counter_block(self):
"""E11b: 同 agent 连续两个任务 → 第二个应被 skipcounter full 或 session busy
#07.1 Phase 1 counter acquire 互斥:同 agent 同时只能有一个活跃 session。
第二个任务在第一个完成前应被 skip。
"""
pid = self._create_project("E11b")
# 创建两个任务,同 assignee
tid1 = self._create_task(
pid,
title="E2E Counter-Block 任务1echo first",
description="请执行 echo first 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
tid2 = self._create_task(
pid,
title="E2E Counter-Block 任务2echo second",
description="请执行 echo second 并标记done。E2E测试,不需要做其他事。",
assignee="zhangfei-dev",
task_type="coding",
)
print(f"\n🚀 E11b: 等待两个任务调度 (pid={pid})")
# 等第一个完成或至少开始执行
result1 = _poll_task(
pid, tid1, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status1 = result1.get("status")
print(f" 任务1状态: {status1}")
assert status1 != "pending", "任务1未被调度"
# 等第二个(可能被 skip 后在第一个完成后重新调度)
result2 = _poll_task(
pid, tid2, timeout=MAX_WAIT_AGENT,
terminal_states=("done", "failed", "cancelled", "blocked"),
)
status2 = result2.get("status")
print(f" 任务2状态: {status2}")
# 验证 routing_decisions 中有 skipped 记录(第二个任务在某次 tick 被 skip)
db_path = DATA_ROOT / pid / "blackboard.db"
if db_path.exists():
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
conn.row_factory = sq3.Row
try:
rows = conn.execute(
"SELECT outcome, detail FROM routing_decisions WHERE task_id=? ORDER BY id",
(tid2,),
).fetchall()
outcomes = [r["outcome"] for r in rows]
print(f" 任务2 routing outcomes: {outcomes}")
# 应该有至少一个 skippedcounter blocked 或 session busy
# 或者最终 dispatched(在第一个任务完成后重新调度)
assert len(rows) > 0, "任务2无 routing 记录"
finally:
conn.close()
print(f" ✅ Counter block 并发调度验证完成")
# ===================================================================
# E12: _check_timeouts 统一 + crash_limit E2E#07.2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE12TimeoutsUnifiedE2E:
"""E12: #07.2 _check_timeouts 统一超时 + crash_limit + updated_at fallback
通过 DB 操作模拟超时场景,等待 ticker 真实处理。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E12") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e12a_crash_limit_marks_failed(self):
"""E12a: 3 次 crash → _check_timeouts 标 failed
手动写 3 条 crashed attempt30min 内),等 ticker → 验证 failed。
"""
pid = self._create_project("E12a")
# 创建 working 任务
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Crash Limit 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写 3 条 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E12a: 3次crash已写入,等待ticker处理 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", (
f"3次crash后任务应为failed,实际: {status}"
)
print(f" ✅ crash_limit 统一检查验证通过")
def test_e12b_updated_at_fallback_reclaim(self):
"""E12b: working 任务无 started_at → updated_at fallback → 超时回收
#07.3 ACT-1: PM2 重启后 mail 孤儿任务只有 updated_at
_check_timeouts 用 updated_at 作为 fallback。
"""
pid = self._create_project("E12b")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "updated_at Fallback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 清空 started_at/claimed_at,设 updated_at 为 60 分钟前
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET started_at=NULL, claimed_at=NULL, "
"updated_at=datetime('now','-60 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E12b: updated_at 设为60min前,等待ticker回收 (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status != "working", (
f"超时任务应被回收,实际仍为 working"
)
print(f" ✅ updated_at fallback 回收验证通过")
# ===================================================================
# E13: Compact Hanging 不标 failed E2E#07.3 ACT-2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE13CompactHangingE2E:
"""E13: compact_hanging outcome → 任务保持 working(不标 failed
通过 DB 写入 compact_hanging attempt,验证 ticker 不将其标为 failed。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E13") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e13a_compact_hanging_keeps_working(self):
"""E13a: compact_hanging attempt → ticker 超时检查不应立即标 failed
写入 compact_hanging attempt(非 crash),验证任务保持 working
直到真正超时才被回收。
"""
pid = self._create_project("E13a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Compact Hanging 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 写入 compact_hanging attempt(最近的时间)
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now'))",
(tid, 1, "zhangfei-dev", "compact_hanging"),
)
# 设 started_at 为最近时间(不应超时)
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-5 minutes') WHERE id=?",
(tid,),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E13a: compact_hanging attempt 已写入,等一个 tick 验证不被标 failed")
# 等一个 tick30s + buffer
time.sleep(45)
# 验证任务仍为 working
resp = http_requests.get(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}", timeout=10,
)
assert resp.status_code == 200
status = resp.json().get("status")
print(f" 一个 tick 后状态: {status}")
assert status == "working", (
f"compact_hanging 后任务应保持 working,实际: {status}"
)
print(f" ✅ compact_hanging 不标 failed 验证通过")
# ===================================================================
# E14: _rollback_current_agent E2E#07.2
# ===================================================================
@pytest.mark.integration
@pytest.mark.skipif(not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run real agent tests")
@skip_no_integration
class TestE14RollbackE2E:
"""E14: crash 后 current_agent 回退验证
通过 DB 操作模拟 crash,验证 current_agent 被回退。
"""
@pytest.fixture(autouse=True)
def setup_env(self):
_check_environment()
self._projects = []
yield
for pid in self._projects:
_cleanup_project(pid)
def _create_project(self, name_prefix="E14") -> str:
pid = f"{E2E_PREFIX}{uuid.uuid4().hex[:6]}"
resp = http_requests.post(f"{API_BASE}/api/projects", json={
"id": pid,
"name": f"{name_prefix}-{pid}",
"config": {"agents": ["zhangfei-dev"]},
}, timeout=10)
assert resp.status_code == 200
self._projects.append(pid)
return pid
def test_e14a_rollback_on_crash(self):
"""E14a: 3次crash → failed + current_agent 回退到 assignee
#07.2: crash 回退由 _rollback_current_agent 执行。
"""
pid = self._create_project("E14a")
tid = f"e2e-task-{uuid.uuid4().hex[:8]}"
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks",
json={"id": tid, "title": "Rollback 测试", "status": "pending",
"assignee": "zhangfei-dev"},
timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "claimed", "agent": "zhangfei-dev"}, timeout=10,
)
http_requests.post(
f"{API_BASE}/api/projects/{pid}/tasks/{tid}/status",
json={"status": "working", "agent": "zhangfei-dev"}, timeout=10,
)
# 设置 current_agent 并写 3 次 crash attempt
db_path = DATA_ROOT / pid / "blackboard.db"
import sqlite3 as sq3
conn = sq3.connect(str(db_path))
try:
conn.execute(
"UPDATE tasks SET current_agent='zhangfei-dev' WHERE id=?",
(tid,),
)
for i in range(3):
conn.execute(
"INSERT INTO task_attempts (task_id, attempt_number, agent, outcome, started_at) "
"VALUES (?, ?, ?, ?, datetime('now', ?))",
(tid, i + 1, "zhangfei-dev", "crashed", f"-{(25 - i * 5)} minutes"),
)
conn.commit()
finally:
conn.close()
print(f"\n🚀 E14a: 3次crash + current_agent 已设置,等待ticker (pid={pid}, tid={tid})")
result = _poll_task(
pid, tid, timeout=MAX_WAIT_DISPATCH,
terminal_states=("failed", "done", "cancelled"),
)
status = result.get("status")
print(f" 最终状态: {status}")
assert status == "failed", f"应为 failed,实际: {status}"
# 验证 failed 的 detail 包含 crash_limit reason
# 注:_check_timeouts 标 failed 不调 _rollback_current_agent
# (回退只在 _task_on_complete 回调中,不在超时检查中)
# 所以 current_agent 保持原值,这是设计如此
import sqlite3 as sq3
conn2 = sq3.connect(str(db_path))
conn2.row_factory = sq3.Row
try:
# 验证 events 有 crash_limit 记录
events = conn2.execute(
"SELECT detail FROM events WHERE task_id=? AND event_type='status_change' ORDER BY id DESC LIMIT 1",
(tid,),
).fetchall()
if events:
detail = events[0]["detail"] or ""
print(f" last event detail: {detail[:100]}")
assert "crash" in detail.lower(), f"event detail 应含 crash,实际: {detail}"
else:
# 无 event 记录也能接受(某些版本不写 event)
print(f" ⚠️ 无 event 记录,跳过 detail 验证")
finally:
conn2.close()
print(f" ✅ crash_limit 标 failed 验证通过")