Files
2026-06-05 23:22:03 +08:00

722 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""S1-S8 API 集成测试
从 test_e2e_v27.py 的 TestE1-E8 迁移,改用 tmp_path 隔离。
使用 TestClient 直接测试 API 端点,无需 daemon。
覆盖:项目管理 → Task CRUD → SubTask → Stage 进度 → 父状态聚合 → 依赖链 → 超时 → Mail
"""
import json
import os
import sys
import uuid
from pathlib import Path
from typing import Any, Dict
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.main import app
from src.blackboard.db import init_db, get_connection, VALID_TRANSITIONS, VALID_STATUSES
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.utils import get_data_root
pytestmark = pytest.mark.integration
# ── Fixtures ──
@pytest.fixture(scope="module")
def isolated_client():
"""模块级隔离 TestClientmonkeypatch get_data_root → tmp_path"""
import tempfile
tmp = Path(tempfile.mkdtemp(prefix="s1s8-"))
tmp.mkdir(parents=True, exist_ok=True)
import src.utils as utils
original = utils.get_data_root
utils.get_data_root = lambda: tmp
client = TestClient(app)
yield client, tmp
utils.get_data_root = original
import shutil
shutil.rmtree(tmp, ignore_errors=True)
@pytest.fixture(scope="module")
def client(isolated_client):
return isolated_client[0]
@pytest.fixture(scope="module")
def data_root(isolated_client):
return isolated_client[1]
def _pid() -> str:
"""生成唯一测试项目ID"""
return f"test-s1s8-{uuid.uuid4().hex[:8]}"
def _tid() -> str:
"""生成唯一任务ID"""
return f"test-s1s8-task-{uuid.uuid4().hex[:8]}"
# ===================================================================
# S1: 项目管理
# ===================================================================
class TestS1ProjectManagement:
"""S1: 项目创建、列表、归档"""
def test_s11_create_and_get_project(self, client):
pid = _pid()
resp = client.post("/api/projects", json={
"id": pid,
"name": f"集成测试-{pid}",
"description": "自动测试项目",
})
assert resp.status_code == 200
data = resp.json()
assert data.get("project_id") == pid or data.get("id") == pid
# 获取
resp = client.get(f"/api/projects/{pid}")
assert resp.status_code == 200
assert resp.json().get("id") == pid
def test_s12_list_projects(self, client):
resp = client.get("/api/projects")
assert resp.status_code == 200
data = resp.json()
assert "projects" in data
assert isinstance(data["projects"], dict)
def test_s13_archive_project(self, client):
pid = _pid()
client.post("/api/projects", json={"id": pid, "name": f"Archive-{pid}"})
resp = client.post(f"/api/projects/{pid}/archive")
assert resp.status_code == 200
# 验证状态
resp = client.get(f"/api/projects/{pid}")
assert resp.json()["status"] == "archived"
def test_s14_create_project_auto_discover(self, data_root):
"""创建含 blackboard.db 的目录,验证自动发现"""
pid = _pid()
project_dir = data_root / pid
project_dir.mkdir(parents=True, exist_ok=True)
init_db(project_dir / "blackboard.db")
# 注册
registry = ProjectRegistry(data_root)
registry.create_project(pid, f"Discover-{pid}", source="auto_discovered")
projects = registry.list_projects()
assert pid in projects
# 清理
import shutil
shutil.rmtree(project_dir, ignore_errors=True)
# ===================================================================
# S2: Task CRUD + 状态机
# ===================================================================
class TestS2TaskCRUD:
"""S2: Task 创建、查询、状态转换"""
@pytest.fixture(autouse=True)
def setup_project(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S2-{self.pid}",
})
def test_s21_create_task(self, client):
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "测试任务",
"description": "集成测试",
"status": "pending",
})
assert resp.status_code == 200
assert resp.json().get("task_id") == tid or resp.json().get("id") == tid
self._tid = tid
def test_s22_get_task_expand(self, client):
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "Expand测试", "status": "pending",
})
resp = client.get(f"/api/projects/{self.pid}/tasks/{tid}?expand=all")
assert resp.status_code == 200
data = resp.json()
assert data.get("id") == tid
assert "comments" in data or "outputs" in data
def test_s23_valid_transitions(self, client):
"""pending → claimed → working → review → done"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "状态机测试", "status": "pending",
})
transitions = [
("claimed", {"agent": "zhangfei-dev"}),
("working", {"agent": "zhangfei-dev"}),
("review", {"agent": "zhangfei-dev"}),
("done", {"agent": "zhangfei-dev"}),
]
for new_status, body in transitions:
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": new_status, **body},
)
assert resp.status_code == 200, f"Failed at → {new_status}: {resp.text}"
def test_s24_invalid_transition_rejected(self, client):
"""pending → done 应被拒绝"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "非法转换", "status": "pending",
})
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": "done", "agent": "test"},
)
assert resp.status_code == 409
assert "invalid_transition" in resp.text or "Cannot transition" in resp.text
def test_s25_list_tasks_filter(self, client):
"""按状态筛选"""
for st in ["pending", "pending"]:
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": _tid(), "title": "Filter", "status": st,
})
resp = client.get(f"/api/projects/{self.pid}/tasks?status=pending")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) >= 2
# ===================================================================
# S3: SubTask 父子关系
# ===================================================================
class TestS3SubTask:
"""S3: 父子 Task 关系"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S3-{self.pid}",
})
self.parent_id = "s3-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "setup", "label": "Setup"}, {"id": "run", "label": "Run"}, {"id": "verify", "label": "Verify"}]),
})
# 创建 3 个子 Task
self.child_ids = []
for i, stage in enumerate(["setup", "run", "verify"]):
cid = f"s3-child-{i}"
self.child_ids.append(cid)
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_s31_list_subtasks(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks?parent_task={self.parent_id}")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) == 3
ids = {t["id"] for t in tasks}
assert set(self.child_ids) == ids
def test_s32_top_level_excludes_children(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
top = q.top_level_tasks()
top_ids = {t.id for t in top}
assert self.parent_id in top_ids
for cid in self.child_ids:
assert cid not in top_ids
def test_s33_child_stage_field(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for i, stage in enumerate(["setup", "run", "verify"]):
t = bb.get_task(f"s3-child-{i}")
assert t is not None
assert t.stage == stage
# ===================================================================
# S4: Stage 进度
# ===================================================================
class TestS4StageProgress:
"""S4: stages_json + stage 分组统计"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S4-{self.pid}",
})
self.parent_id = "s4-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "Stage父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "data", "label": "Data"}, {"id": "code", "label": "Code"}, {"id": "test", "label": "Test"}]),
})
# 每个stage一个子任务
for i, stage in enumerate(["data", "code", "test"]):
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": f"s4-child-{i}",
"title": f"Stage-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_s41_progress_endpoint(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks/{self.parent_id}/progress")
assert resp.status_code == 200
data = resp.json()
assert "stages" in data
assert len(data["stages"]) == 3
def test_s42_progress_counts(self, data_root):
"""把 data stage 标记 done,验证进度"""
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
# data child → done
bb.update_task_status("s4-child-0", "claimed", agent="test")
bb.update_task_status("s4-child-0", "working", agent="test")
bb.update_task_status("s4-child-0", "review", agent="test")
bb.update_task_status("s4-child-0", "done", agent="test")
q = Queries(db_path)
progress = q.parent_task_progress(self.parent_id)
stages = {s["id"]: s for s in progress["stages"]}
assert stages["data"]["total"] == 1
assert stages["data"]["done"] == 1
assert stages["code"]["total"] == 1
assert stages["code"]["done"] == 0
def test_s43_empty_stages_progress(self, client, data_root):
"""无 stages_json 的 Task 进度"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "无Stage", "status": "pending",
})
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
progress = q.parent_task_progress(tid)
# 无子任务,应返回空或基本结构
assert progress is not None
# ===================================================================
# S5: 父 Task 状态聚合
# ===================================================================
class TestS5ParentAggregation:
"""S5: compute_parent_status 聚合逻辑"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
self.q = Queries(db_path)
# 创建父任务
self.parent_id = "s5-parent"
self.bb.create_task(Task(
id=self.parent_id, title="聚合父任务", status="pending",
))
def _create_child(self, cid: str, status: str = "pending", stage: str = None):
self.bb.create_task(Task(
id=cid, title=f"子-{cid}", status="pending",
parent_task=self.parent_id, stage=stage,
))
if status == "pending":
return
# Walk state machine to target status
path_map = {
"claimed": ["claimed"],
"working": ["claimed", "working"],
"review": ["claimed", "working", "review"],
"done": ["claimed", "working", "review", "done"],
"failed": ["claimed", "working", "failed"],
"blocked": ["claimed", "working", "blocked"],
"cancelled": ["cancelled"],
}
for s in path_map.get(status, []):
self.bb.update_task_status(cid, s, agent="test")
def test_s51_all_done_parent_done(self):
self._create_child("c1", "done")
self._create_child("c2", "done")
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_s52_has_review_parent_review(self):
self._create_child("c3", "done")
self._create_child("c4", "review")
result = self.q.compute_parent_status(self.parent_id)
assert result == "review"
def test_s53_has_working_parent_working(self):
self._create_child("c5", "done")
self._create_child("c6", "working")
result = self.q.compute_parent_status(self.parent_id)
assert result == "working"
def test_s54_all_pending_parent_pending(self):
self._create_child("c7", "pending")
self._create_child("c8", "pending")
result = self.q.compute_parent_status(self.parent_id)
assert result == "pending"
def test_s55_cancelled_excluded(self):
"""cancelled 子 Task 不参与聚合"""
self._create_child("c9", "done")
self._create_child("c10", "cancelled")
# 只有 c9 有效 → done
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_s56_cancelled_parent_not_overridden(self):
"""cancelled 的父 Task 不被聚合覆盖"""
# 先让所有子任务 done
self._create_child("c11", "done")
# 手动把父任务设为 cancelled
conn = get_connection(self.db_path)
try:
conn.execute("UPDATE tasks SET status='cancelled' WHERE id=?", (self.parent_id,))
conn.commit()
finally:
conn.close()
# _refresh_parent_statuses 应跳过 cancelled 父任务
ticker = Ticker(registry=MagicMock())
ticker._refresh_parent_statuses(self.db_path)
# 验证父任务仍是 cancelled
t = self.bb.get_task(self.parent_id)
assert t.status == "cancelled"
# ===================================================================
# S6: 依赖链
# ===================================================================
class TestS6DependencyChain:
"""S6: depends_on 依赖推进"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_s61_dep_advance(self):
"""A done → B 从 blocked → pending"""
self.bb.create_task(Task(id="dep-a", title="A", status="pending"))
self.bb.create_task(Task(id="dep-b", title="B", status="blocked", depends_on=json.dumps(["dep-a"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("dep-a", s, agent="test")
# 手动 tick 依赖推进
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-b" in advanced
t = self.bb.get_task("dep-b")
assert t.status == "pending"
def test_s62_dep_not_done_stays_blocked(self):
"""A 未完成 → B 保持 blocked"""
self.bb.create_task(Task(id="dep-c", title="C", status="pending"))
self.bb.create_task(Task(id="dep-d", title="D", status="blocked", depends_on=json.dumps(["dep-c"])))
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-d" not in advanced
assert self.bb.get_task("dep-d").status == "blocked"
def test_s63_chain_a_b_c(self):
"""A → B → C 多层依赖"""
self.bb.create_task(Task(id="chain-a", title="A", status="pending"))
self.bb.create_task(Task(id="chain-b", title="B", status="blocked", depends_on=json.dumps(["chain-a"])))
self.bb.create_task(Task(id="chain-c", title="C", status="blocked", depends_on=json.dumps(["chain-b"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-a", s, agent="test")
ticker = Ticker(registry=MagicMock())
advanced1 = ticker._advance_dependencies(self.db_path)
assert "chain-b" in advanced1
# B 现在 pending,完成 B
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-b", s, agent="test")
advanced2 = ticker._advance_dependencies(self.db_path)
assert "chain-c" in advanced2
# ===================================================================
# S7: 超时回收
# ===================================================================
class TestS7Timeout:
"""S7: claimed/working 超时回收"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_s71_claimed_timeout_to_pending(self):
"""claimed 超过 claim_timeout → pending"""
self.bb.create_task(Task(id="to-001", title="超时测试", status="pending"))
self.bb.update_task_status("to-001", "claimed", agent="test-agent")
# 手动把 claimed_at 设为 10 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET claimed_at=datetime('now','-10 minutes') WHERE id=?",
("to-001",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-001" in reclaimed
assert self.bb.get_task("to-001").status == "pending"
def test_s72_working_timeout_to_failed(self):
"""working 超过 task_timeout → failed"""
self.bb.create_task(Task(id="to-002", title="工作超时", status="pending"))
self.bb.update_task_status("to-002", "claimed", agent="test-agent")
self.bb.update_task_status("to-002", "working", agent="test-agent")
# 手动把 started_at 设为 60 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-60 minutes') WHERE id=?",
("to-002",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-002" in reclaimed
assert self.bb.get_task("to-002").status == "failed"
# ===================================================================
# S8: Mail Tab 6 端点
# ===================================================================
class TestS8MailTab:
"""S8: Mail 端到端"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.mail_ids = []
def _send_mail(self, client, **kwargs):
# Fix: 'from' is a Python keyword, callers use 'from_'
if 'from_' in kwargs:
kwargs['from'] = kwargs.pop('from_')
resp = client.post("/api/mail", json=kwargs)
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True
mid = data["mail_id"]
self.mail_ids.append(mid)
return mid
def test_s81_send_inform_auto_done(self, client):
"""inform 类型 mail — API 层创建为 pending,由 ticker 处理自动完成"""
mid = self._send_mail(client,
title="通知测试",
text="这是一条通知",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="inform",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
# API 层创建为 pendingA1-A10 防御改造后 inform 不再自动标 done
# 实际 done 由 ticker 的 mail 幻觉门控兜底处理
assert data["status"] in ("pending", "done"), (
f"inform mail status should be pending or done, got {data['status']}"
)
def test_s82_send_task_assign_pending(self, client):
"""task-assign 类型保持 pending"""
mid = self._send_mail(client,
title="任务分配",
text="请完成此任务",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["status"] == "pending"
def test_s83_list_with_filters(self, client):
"""列表 + 筛选"""
self._send_mail(client,
title="筛选测试1",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
self._send_mail(client,
title="筛选测试2",
text="body",
from_="zhangfei-dev",
to="simayi-challenger",
type="text",
)
# 按 from 筛选
resp = client.get("/api/mail?from_agent=pangtong-fujunshi")
mails = resp.json()["mails"]
assert any(m["title"] == "筛选测试1" for m in mails)
# 按 to 筛选
resp = client.get("/api/mail?to_agent=simayi-challenger")
mails = resp.json()["mails"]
assert len(mails) >= 2
def test_s84_mail_detail_with_comments(self, client):
"""详情 + 评论"""
mid = self._send_mail(client,
title="详情测试",
text="正文内容",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 添加评论(通过 blackboard 直接写)
from src.api.mail_routes import _db_path
conn = get_connection(_db_path())
try:
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(mid, "simayi-challenger", "general", "收到,正在处理"),
)
conn.commit()
finally:
conn.close()
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["title"] == "详情测试"
# 验证评论已写入(通过直接查 DB,绕过 Comment.from_row 的 card_id 兼容问题)
from src.api.mail_routes import _db_path as mail_db
conn = get_connection(mail_db())
try:
row = conn.execute("SELECT COUNT(*) as cnt FROM comments WHERE task_id=?", (mid,)).fetchone()
assert row["cnt"] == 1
finally:
conn.close()
def test_s85_mark_read(self, client):
"""标记已读"""
mid = self._send_mail(client,
title="已读测试",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 确认初始未读
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is False
# 标记已读
resp = client.patch(f"/api/mail/{mid}", json={"is_read": True})
assert resp.status_code == 200
# 验证
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is True
def test_s86_mark_executed(self, client):
"""标记已执行(走完整状态链)"""
mid = self._send_mail(client,
title="执行测试",
text="body",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
# 先走状态链到 review,再标记 executed
from src.api.mail_routes import _db_path as mail_db
bb = Blackboard(mail_db())
for s in ["claimed", "working", "review"]:
bb.update_task_status(mid, s, agent="zhangfei-dev")
resp = client.patch(f"/api/mail/{mid}", json={"mark_executed": True})
assert resp.status_code == 200
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["is_read"] is True
assert data["status"] == "done"
def test_s87_summary_and_agents(self, client):
"""统计 + Agent 列表"""
self._send_mail(client,
title="统计测试",
text="body",
from_="zhaoyun-data",
to="guanyu-dev",
type="inform",
)
resp = client.get("/api/mail/summary")
data = resp.json()
assert "total" in data
assert "unread" in data
assert data["total"] > 0
resp = client.get("/api/mail/agents/list")
data = resp.json()
assert "agents" in data
assert isinstance(data["agents"], list)