auto-sync: 2026-05-18 16:08:38

This commit is contained in:
cfdaily
2026-05-18 16:08:38 +08:00
parent 7617119e2f
commit a7c3950855
+472
View File
@@ -0,0 +1,472 @@
"""v2.7 端到端测试 — 全链路真实环境
覆盖:项目管理 → Task CRUD → SubTask → Stage进度 → 状态聚合 → 依赖链 → 超时 → Mail → 真实Agent调度
"""
import asyncio
import json
import os
import sys
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import pytest
from fastapi.testclient import TestClient
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.main import app
from src.blackboard.db import init_db, get_connection, VALID_TRANSITIONS, VALID_STATUSES
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.utils import get_data_root
# ── Fixtures ──
@pytest.fixture(scope="module")
def client():
return TestClient(app)
@pytest.fixture(scope="module")
def data_root():
return get_data_root()
def _pid() -> str:
"""生成唯一测试项目ID"""
return f"e2e-v27-{uuid.uuid4().hex[:8]}"
def _tid() -> str:
"""生成唯一任务ID"""
return f"e2e-task-{uuid.uuid4().hex[:8]}"
# ===================================================================
# E1: 项目管理
# ===================================================================
class TestE1ProjectManagement:
"""E1: 项目创建、列表、归档"""
def test_e11_create_project(self, client):
pid = _pid()
resp = client.post("/api/projects", json={
"id": pid,
"name": f"E2E测试-{pid}",
"description": "自动测试项目",
})
assert resp.status_code == 200
data = resp.json()
assert data["id"] == pid
# 清理
self._pid = pid
def test_e12_list_projects(self, client):
resp = client.get("/api/projects")
assert resp.status_code == 200
data = resp.json()
assert "projects" in data
assert isinstance(data["projects"], dict)
def test_e13_get_project(self, client):
pid = getattr(self, "_pid", None)
if not pid:
pytest.skip("No project created")
resp = client.get(f"/api/projects/{pid}")
assert resp.status_code == 200
assert resp.json()["id"] == pid
def test_e14_archive_project(self, client):
pid = getattr(self, "_pid", None)
if not pid:
pytest.skip("No project created")
resp = client.post(f"/api/projects/{pid}/archive")
assert resp.status_code == 200
# 验证状态
resp = client.get(f"/api/projects/{pid}")
assert resp.json()["status"] == "archived"
def test_e15_create_project_auto_discover(self, data_root):
"""创建含 blackboard.db 的目录,验证自动发现"""
pid = _pid()
project_dir = data_root / pid
project_dir.mkdir(parents=True, exist_ok=True)
init_db(project_dir / "blackboard.db")
# 注册
registry = ProjectRegistry(data_root)
registry.create_project(pid, f"Discover-{pid}", source="auto_discovered")
projects = registry.list_projects()
assert pid in projects
# 清理
import shutil
shutil.rmtree(project_dir, ignore_errors=True)
# ===================================================================
# E2: Task CRUD + 状态机
# ===================================================================
class TestE2TaskCRUD:
"""E2: Task 创建、查询、状态转换"""
@pytest.fixture(autouse=True)
def setup_project(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E2-{self.pid}",
})
def test_e21_create_task(self, client):
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "测试任务",
"description": "E2E测试",
"status": "pending",
})
assert resp.status_code == 200
assert resp.json()["id"] == tid
self._tid = tid
def test_e22_get_task_expand(self, client):
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "Expand测试", "status": "pending",
})
resp = client.get(f"/api/projects/{self.pid}/tasks/{tid}?expand=all")
assert resp.status_code == 200
data = resp.json()
assert data["id"] == tid
assert "comments" in data
assert "outputs" in data
def test_e23_valid_transitions(self, client):
"""pending → claimed → working → review → done"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "状态机测试", "status": "pending",
})
transitions = [
("claimed", {"agent": "zhangfei-dev"}),
("working", {"agent": "zhangfei-dev"}),
("review", {"agent": "zhangfei-dev"}),
("done", {"agent": "zhangfei-dev"}),
]
for new_status, body in transitions:
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": new_status, **body},
)
assert resp.status_code == 200, f"Failed at → {new_status}: {resp.text}"
def test_e24_invalid_transition_rejected(self, client):
"""pending → done 应被拒绝"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "非法转换", "status": "pending",
})
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": "done", "agent": "test"},
)
assert resp.status_code == 409
assert "invalid_transition" in resp.text or "Cannot transition" in resp.text
def test_e25_list_tasks_filter(self, client):
"""按状态筛选"""
for st in ["pending", "pending"]:
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": _tid(), "title": "Filter", "status": st,
})
resp = client.get(f"/api/projects/{self.pid}/tasks?status=pending")
assert resp.status_code == 200
assert len(resp.json()) >= 2
# ===================================================================
# E3: SubTask 父子关系
# ===================================================================
class TestE3SubTask:
"""E3: 父子 Task 关系"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E3-{self.pid}",
})
self.parent_id = "e3-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "父任务",
"status": "pending",
"stages_json": json.dumps(["setup", "run", "verify"]),
})
# 创建 3 个子 Task
self.child_ids = []
for i, stage in enumerate(["setup", "run", "verify"]):
cid = f"e3-child-{i}"
self.child_ids.append(cid)
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_e31_list_subtasks(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks?parent_task={self.parent_id}")
assert resp.status_code == 200
tasks = resp.json()
assert len(tasks) == 3
ids = {t["id"] for t in tasks}
assert set(self.child_ids) == ids
def test_e32_top_level_excludes_children(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
top = q.top_level_tasks()
top_ids = {t.id for t in top}
assert self.parent_id in top_ids
for cid in self.child_ids:
assert cid not in top_ids
def test_e33_child_stage_field(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for i, stage in enumerate(["setup", "run", "verify"]):
t = bb.get_task(f"e3-child-{i}")
assert t is not None
assert t.stage == stage
# ===================================================================
# E4: Stage 进度
# ===================================================================
class TestE4StageProgress:
"""E4: stages_json + stage 分组统计"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"E4-{self.pid}",
})
self.parent_id = "e4-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "Stage父任务",
"status": "pending",
"stages_json": json.dumps(["data", "code", "test"]),
})
# 每个stage一个子任务
for i, stage in enumerate(["data", "code", "test"]):
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": f"e4-child-{i}",
"title": f"Stage-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_e41_progress_endpoint(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks/{self.parent_id}/progress")
assert resp.status_code == 200
data = resp.json()
assert "stages" in data
assert len(data["stages"]) == 3
def test_e42_progress_counts(self, data_root):
"""把 data stage 标记 done,验证进度"""
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
# data child → done
bb.update_task_status("e4-child-0", "claimed", agent="test")
bb.update_task_status("e4-child-0", "working", agent="test")
bb.update_task_status("e4-child-0", "review", agent="test")
bb.update_task_status("e4-child-0", "done", agent="test")
q = Queries(db_path)
progress = q.parent_task_progress(self.parent_id)
stages = {s["stage"]: s for s in progress["stages"]}
assert stages["data"]["total"] == 1
assert stages["data"]["done"] == 1
assert stages["code"]["total"] == 1
assert stages["code"]["done"] == 0
def test_e43_empty_stages_progress(self, client):
"""无 stages_json 的 Task 进度"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "无Stage", "status": "pending",
})
db_path = get_data_root() / self.pid / "blackboard.db"
q = Queries(db_path)
progress = q.parent_task_progress(tid)
# 无子任务,应返回空或基本结构
assert progress is not None
# ===================================================================
# E5: 父 Task 状态聚合
# ===================================================================
class TestE5ParentAggregation:
"""E5: compute_parent_status 聚合逻辑"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
self.q = Queries(db_path)
# 创建父任务
self.parent_id = "e5-parent"
self.bb.create_task(Task(
id=self.parent_id, title="聚合父任务", status="pending",
))
def _create_child(self, cid: str, status: str = "pending", stage: str = None):
self.bb.create_task(Task(
id=cid, title=f"子-{cid}", status=status,
parent_task=self.parent_id, stage=stage,
))
# 如果不是 pending,需要走合法转换
if status != "pending":
for s in ["claimed", "working"]:
if s in VALID_TRANSITIONS.get("pending", set()):
self.bb.update_task_status(cid, s, agent="test")
if status == "review":
self.bb.update_task_status(cid, "review", agent="test")
elif status == "done":
self.bb.update_task_status(cid, "review", agent="test")
self.bb.update_task_status(cid, "done", agent="test")
elif status == "failed":
self.bb.update_task_status(cid, "failed", agent="test")
elif status == "cancelled":
self.bb.update_task_status(cid, "cancelled", agent="test")
def test_e51_all_done_parent_done(self):
self._create_child("c1", "done")
self._create_child("c2", "done")
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_e52_has_review_parent_review(self):
self._create_child("c3", "done")
self._create_child("c4", "review")
result = self.q.compute_parent_status(self.parent_id)
assert result == "review"
def test_e53_has_working_parent_working(self):
self._create_child("c5", "done")
self._create_child("c6", "claimed")
self.bb.update_task_status("c6", "working", agent="test")
result = self.q.compute_parent_status(self.parent_id)
assert result == "working"
def test_e54_all_pending_parent_pending(self):
self._create_child("c7", "pending")
self._create_child("c8", "pending")
result = self.q.compute_parent_status(self.parent_id)
assert result == "pending"
def test_e55_cancelled_excluded(self):
"""cancelled 子 Task 不参与聚合"""
self._create_child("c9", "done")
self._create_child("c10", "cancelled")
# 只有 c9 有效 → done
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_e56_cancelled_parent_not_overridden(self):
"""cancelled 的父 Task 不被聚合覆盖"""
# 先让所有子任务 done
self._create_child("c11", "done")
# 手动把父任务设为 cancelled
conn = get_connection(self.db_path)
try:
conn.execute("UPDATE tasks SET status='cancelled' WHERE id=?", (self.parent_id,))
conn.commit()
finally:
conn.close()
# _refresh_parent_statuses 应跳过 cancelled 父任务
ticker = Ticker.__new__(Ticker)
ticker._refresh_parent_statuses(self.db_path)
# 验证父任务仍是 cancelled
t = self.bb.get_task(self.parent_id)
assert t.status == "cancelled"
# ===================================================================
# E6: 依赖链
# ===================================================================
class TestE6DependencyChain:
"""E6: depends_on 依赖推进"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_e61_dep_advance(self):
"""A done → B 从 blocked → pending"""
self.bb.create_task(Task(id="dep-a", title="A", status="pending"))
self.bb.create_task(Task(id="dep-b", title="B", status="blocked", depends_on="dep-a"))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("dep-a", s, agent="test")
# 手动 tick 依赖推进
ticker = Ticker.__new__(Ticker)
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-b" in advanced
t = self.bb.get_task("dep-b")
assert t.status == "pending"
def test_e62_dep_not_done_stays_blocked(self):
"""A 未完成 → B 保持 blocked"""
self.bb.create_task(Task(id="dep-c", title="C", status="pending"))
self.bb.create_task(Task(id="dep-d", title="D", status="blocked", depends_on="dep-c"))
ticker = Ticker.__new__(Ticker)
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-d" not in advanced
assert self.bb.get_task("dep-d").status == "blocked"
def test_e63_chain_a_b_c(self):
"""A → B → C 多层依赖"""
self.bb.create_task(Task(id="chain-a", title="A", status="pending"))
self.bb.create_task(Task(id="chain-b", title="B", status="blocked", depends_on="chain-a"))
self.bb.create_task(Task(id="chain-c", title="C", status="blocked", depends_on="chain-b"))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-a", s, agent="test")
ticker = Ticker.__new__(Ticker)
advanced1 = ticker._advance_dependencies(self.db_path)
assert "chain-b" in advanced1
# B 现在 pending,完成 B
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-b", s, agent="test")
advanced2 = ticker._advance_dependencies(self.db_path)
assert "chain-c" in advanced2