auto-sync: 2026-06-05 23:22:03

This commit is contained in:
cfdaily
2026-06-05 23:22:03 +08:00
parent de4f708f82
commit 9c455f8660
2 changed files with 726 additions and 1 deletions
+5 -1
View File
@@ -1,6 +1,10 @@
import os
import pytest
pytestmark = pytest.mark.e2e
pytestmark = [pytest.mark.e2e, pytest.mark.skipif(
not os.environ.get("RUN_INTEGRATION"),
reason="Set RUN_INTEGRATION=1 to run E2E tests",
)]
"""#01 四相循环 单元测试
+721
View File
@@ -0,0 +1,721 @@
"""S1-S8 API 集成测试
从 test_e2e_v27.py 的 TestE1-E8 迁移,改用 tmp_path 隔离。
使用 TestClient 直接测试 API 端点,无需 daemon。
覆盖:项目管理 → Task CRUD → SubTask → Stage 进度 → 父状态聚合 → 依赖链 → 超时 → Mail
"""
import json
import os
import sys
import uuid
from pathlib import Path
from typing import Any, Dict
import pytest
from unittest.mock import MagicMock
from fastapi.testclient import TestClient
# 指向部署目录
DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2"
sys.path.insert(0, str(DEPLOY_DIR))
from src.main import app
from src.blackboard.db import init_db, get_connection, VALID_TRANSITIONS, VALID_STATUSES
from src.blackboard.models import Task
from src.blackboard.operations import Blackboard
from src.blackboard.queries import Queries
from src.blackboard.registry import ProjectRegistry
from src.daemon.ticker import Ticker
from src.utils import get_data_root
pytestmark = pytest.mark.integration
# ── Fixtures ──
@pytest.fixture(scope="module")
def isolated_client():
"""模块级隔离 TestClientmonkeypatch get_data_root → tmp_path"""
import tempfile
tmp = Path(tempfile.mkdtemp(prefix="s1s8-"))
tmp.mkdir(parents=True, exist_ok=True)
import src.utils as utils
original = utils.get_data_root
utils.get_data_root = lambda: tmp
client = TestClient(app)
yield client, tmp
utils.get_data_root = original
import shutil
shutil.rmtree(tmp, ignore_errors=True)
@pytest.fixture(scope="module")
def client(isolated_client):
return isolated_client[0]
@pytest.fixture(scope="module")
def data_root(isolated_client):
return isolated_client[1]
def _pid() -> str:
"""生成唯一测试项目ID"""
return f"test-s1s8-{uuid.uuid4().hex[:8]}"
def _tid() -> str:
"""生成唯一任务ID"""
return f"test-s1s8-task-{uuid.uuid4().hex[:8]}"
# ===================================================================
# S1: 项目管理
# ===================================================================
class TestS1ProjectManagement:
"""S1: 项目创建、列表、归档"""
def test_s11_create_and_get_project(self, client):
pid = _pid()
resp = client.post("/api/projects", json={
"id": pid,
"name": f"集成测试-{pid}",
"description": "自动测试项目",
})
assert resp.status_code == 200
data = resp.json()
assert data.get("project_id") == pid or data.get("id") == pid
# 获取
resp = client.get(f"/api/projects/{pid}")
assert resp.status_code == 200
assert resp.json().get("id") == pid
def test_s12_list_projects(self, client):
resp = client.get("/api/projects")
assert resp.status_code == 200
data = resp.json()
assert "projects" in data
assert isinstance(data["projects"], dict)
def test_s13_archive_project(self, client):
pid = _pid()
client.post("/api/projects", json={"id": pid, "name": f"Archive-{pid}"})
resp = client.post(f"/api/projects/{pid}/archive")
assert resp.status_code == 200
# 验证状态
resp = client.get(f"/api/projects/{pid}")
assert resp.json()["status"] == "archived"
def test_s14_create_project_auto_discover(self, data_root):
"""创建含 blackboard.db 的目录,验证自动发现"""
pid = _pid()
project_dir = data_root / pid
project_dir.mkdir(parents=True, exist_ok=True)
init_db(project_dir / "blackboard.db")
# 注册
registry = ProjectRegistry(data_root)
registry.create_project(pid, f"Discover-{pid}", source="auto_discovered")
projects = registry.list_projects()
assert pid in projects
# 清理
import shutil
shutil.rmtree(project_dir, ignore_errors=True)
# ===================================================================
# S2: Task CRUD + 状态机
# ===================================================================
class TestS2TaskCRUD:
"""S2: Task 创建、查询、状态转换"""
@pytest.fixture(autouse=True)
def setup_project(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S2-{self.pid}",
})
def test_s21_create_task(self, client):
tid = _tid()
resp = client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid,
"title": "测试任务",
"description": "集成测试",
"status": "pending",
})
assert resp.status_code == 200
assert resp.json().get("task_id") == tid or resp.json().get("id") == tid
self._tid = tid
def test_s22_get_task_expand(self, client):
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "Expand测试", "status": "pending",
})
resp = client.get(f"/api/projects/{self.pid}/tasks/{tid}?expand=all")
assert resp.status_code == 200
data = resp.json()
assert data.get("id") == tid
assert "comments" in data or "outputs" in data
def test_s23_valid_transitions(self, client):
"""pending → claimed → working → review → done"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "状态机测试", "status": "pending",
})
transitions = [
("claimed", {"agent": "zhangfei-dev"}),
("working", {"agent": "zhangfei-dev"}),
("review", {"agent": "zhangfei-dev"}),
("done", {"agent": "zhangfei-dev"}),
]
for new_status, body in transitions:
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": new_status, **body},
)
assert resp.status_code == 200, f"Failed at → {new_status}: {resp.text}"
def test_s24_invalid_transition_rejected(self, client):
"""pending → done 应被拒绝"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "非法转换", "status": "pending",
})
resp = client.post(
f"/api/projects/{self.pid}/tasks/{tid}/status",
json={"status": "done", "agent": "test"},
)
assert resp.status_code == 409
assert "invalid_transition" in resp.text or "Cannot transition" in resp.text
def test_s25_list_tasks_filter(self, client):
"""按状态筛选"""
for st in ["pending", "pending"]:
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": _tid(), "title": "Filter", "status": st,
})
resp = client.get(f"/api/projects/{self.pid}/tasks?status=pending")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) >= 2
# ===================================================================
# S3: SubTask 父子关系
# ===================================================================
class TestS3SubTask:
"""S3: 父子 Task 关系"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S3-{self.pid}",
})
self.parent_id = "s3-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "setup", "label": "Setup"}, {"id": "run", "label": "Run"}, {"id": "verify", "label": "Verify"}]),
})
# 创建 3 个子 Task
self.child_ids = []
for i, stage in enumerate(["setup", "run", "verify"]):
cid = f"s3-child-{i}"
self.child_ids.append(cid)
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": cid,
"title": f"子任务-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_s31_list_subtasks(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks?parent_task={self.parent_id}")
assert resp.status_code == 200
tasks = resp.json().get("tasks", resp.json())
assert len(tasks) == 3
ids = {t["id"] for t in tasks}
assert set(self.child_ids) == ids
def test_s32_top_level_excludes_children(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
top = q.top_level_tasks()
top_ids = {t.id for t in top}
assert self.parent_id in top_ids
for cid in self.child_ids:
assert cid not in top_ids
def test_s33_child_stage_field(self, data_root):
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
for i, stage in enumerate(["setup", "run", "verify"]):
t = bb.get_task(f"s3-child-{i}")
assert t is not None
assert t.stage == stage
# ===================================================================
# S4: Stage 进度
# ===================================================================
class TestS4StageProgress:
"""S4: stages_json + stage 分组统计"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.pid = _pid()
client.post("/api/projects", json={
"id": self.pid, "name": f"S4-{self.pid}",
})
self.parent_id = "s4-parent-001"
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": self.parent_id,
"title": "Stage父任务",
"status": "pending",
"stages_json": json.dumps([{"id": "data", "label": "Data"}, {"id": "code", "label": "Code"}, {"id": "test", "label": "Test"}]),
})
# 每个stage一个子任务
for i, stage in enumerate(["data", "code", "test"]):
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": f"s4-child-{i}",
"title": f"Stage-{stage}",
"status": "pending",
"parent_task": self.parent_id,
"stage": stage,
})
def test_s41_progress_endpoint(self, client):
resp = client.get(f"/api/projects/{self.pid}/tasks/{self.parent_id}/progress")
assert resp.status_code == 200
data = resp.json()
assert "stages" in data
assert len(data["stages"]) == 3
def test_s42_progress_counts(self, data_root):
"""把 data stage 标记 done,验证进度"""
db_path = data_root / self.pid / "blackboard.db"
bb = Blackboard(db_path)
# data child → done
bb.update_task_status("s4-child-0", "claimed", agent="test")
bb.update_task_status("s4-child-0", "working", agent="test")
bb.update_task_status("s4-child-0", "review", agent="test")
bb.update_task_status("s4-child-0", "done", agent="test")
q = Queries(db_path)
progress = q.parent_task_progress(self.parent_id)
stages = {s["id"]: s for s in progress["stages"]}
assert stages["data"]["total"] == 1
assert stages["data"]["done"] == 1
assert stages["code"]["total"] == 1
assert stages["code"]["done"] == 0
def test_s43_empty_stages_progress(self, client, data_root):
"""无 stages_json 的 Task 进度"""
tid = _tid()
client.post(f"/api/projects/{self.pid}/tasks", json={
"id": tid, "title": "无Stage", "status": "pending",
})
db_path = data_root / self.pid / "blackboard.db"
q = Queries(db_path)
progress = q.parent_task_progress(tid)
# 无子任务,应返回空或基本结构
assert progress is not None
# ===================================================================
# S5: 父 Task 状态聚合
# ===================================================================
class TestS5ParentAggregation:
"""S5: compute_parent_status 聚合逻辑"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
self.q = Queries(db_path)
# 创建父任务
self.parent_id = "s5-parent"
self.bb.create_task(Task(
id=self.parent_id, title="聚合父任务", status="pending",
))
def _create_child(self, cid: str, status: str = "pending", stage: str = None):
self.bb.create_task(Task(
id=cid, title=f"子-{cid}", status="pending",
parent_task=self.parent_id, stage=stage,
))
if status == "pending":
return
# Walk state machine to target status
path_map = {
"claimed": ["claimed"],
"working": ["claimed", "working"],
"review": ["claimed", "working", "review"],
"done": ["claimed", "working", "review", "done"],
"failed": ["claimed", "working", "failed"],
"blocked": ["claimed", "working", "blocked"],
"cancelled": ["cancelled"],
}
for s in path_map.get(status, []):
self.bb.update_task_status(cid, s, agent="test")
def test_s51_all_done_parent_done(self):
self._create_child("c1", "done")
self._create_child("c2", "done")
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_s52_has_review_parent_review(self):
self._create_child("c3", "done")
self._create_child("c4", "review")
result = self.q.compute_parent_status(self.parent_id)
assert result == "review"
def test_s53_has_working_parent_working(self):
self._create_child("c5", "done")
self._create_child("c6", "working")
result = self.q.compute_parent_status(self.parent_id)
assert result == "working"
def test_s54_all_pending_parent_pending(self):
self._create_child("c7", "pending")
self._create_child("c8", "pending")
result = self.q.compute_parent_status(self.parent_id)
assert result == "pending"
def test_s55_cancelled_excluded(self):
"""cancelled 子 Task 不参与聚合"""
self._create_child("c9", "done")
self._create_child("c10", "cancelled")
# 只有 c9 有效 → done
result = self.q.compute_parent_status(self.parent_id)
assert result == "done"
def test_s56_cancelled_parent_not_overridden(self):
"""cancelled 的父 Task 不被聚合覆盖"""
# 先让所有子任务 done
self._create_child("c11", "done")
# 手动把父任务设为 cancelled
conn = get_connection(self.db_path)
try:
conn.execute("UPDATE tasks SET status='cancelled' WHERE id=?", (self.parent_id,))
conn.commit()
finally:
conn.close()
# _refresh_parent_statuses 应跳过 cancelled 父任务
ticker = Ticker(registry=MagicMock())
ticker._refresh_parent_statuses(self.db_path)
# 验证父任务仍是 cancelled
t = self.bb.get_task(self.parent_id)
assert t.status == "cancelled"
# ===================================================================
# S6: 依赖链
# ===================================================================
class TestS6DependencyChain:
"""S6: depends_on 依赖推进"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_s61_dep_advance(self):
"""A done → B 从 blocked → pending"""
self.bb.create_task(Task(id="dep-a", title="A", status="pending"))
self.bb.create_task(Task(id="dep-b", title="B", status="blocked", depends_on=json.dumps(["dep-a"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("dep-a", s, agent="test")
# 手动 tick 依赖推进
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-b" in advanced
t = self.bb.get_task("dep-b")
assert t.status == "pending"
def test_s62_dep_not_done_stays_blocked(self):
"""A 未完成 → B 保持 blocked"""
self.bb.create_task(Task(id="dep-c", title="C", status="pending"))
self.bb.create_task(Task(id="dep-d", title="D", status="blocked", depends_on=json.dumps(["dep-c"])))
ticker = Ticker(registry=MagicMock())
advanced = ticker._advance_dependencies(self.db_path)
assert "dep-d" not in advanced
assert self.bb.get_task("dep-d").status == "blocked"
def test_s63_chain_a_b_c(self):
"""A → B → C 多层依赖"""
self.bb.create_task(Task(id="chain-a", title="A", status="pending"))
self.bb.create_task(Task(id="chain-b", title="B", status="blocked", depends_on=json.dumps(["chain-a"])))
self.bb.create_task(Task(id="chain-c", title="C", status="blocked", depends_on=json.dumps(["chain-b"])))
# 完成 A
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-a", s, agent="test")
ticker = Ticker(registry=MagicMock())
advanced1 = ticker._advance_dependencies(self.db_path)
assert "chain-b" in advanced1
# B 现在 pending,完成 B
for s in ["claimed", "working", "review", "done"]:
self.bb.update_task_status("chain-b", s, agent="test")
advanced2 = ticker._advance_dependencies(self.db_path)
assert "chain-c" in advanced2
# ===================================================================
# S7: 超时回收
# ===================================================================
class TestS7Timeout:
"""S7: claimed/working 超时回收"""
@pytest.fixture(autouse=True)
def setup(self, data_root):
self.pid = _pid()
db_path = data_root / self.pid / "blackboard.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
init_db(db_path)
self.db_path = db_path
self.bb = Blackboard(db_path)
def test_s71_claimed_timeout_to_pending(self):
"""claimed 超过 claim_timeout → pending"""
self.bb.create_task(Task(id="to-001", title="超时测试", status="pending"))
self.bb.update_task_status("to-001", "claimed", agent="test-agent")
# 手动把 claimed_at 设为 10 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET claimed_at=datetime('now','-10 minutes') WHERE id=?",
("to-001",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-001" in reclaimed
assert self.bb.get_task("to-001").status == "pending"
def test_s72_working_timeout_to_failed(self):
"""working 超过 task_timeout → failed"""
self.bb.create_task(Task(id="to-002", title="工作超时", status="pending"))
self.bb.update_task_status("to-002", "claimed", agent="test-agent")
self.bb.update_task_status("to-002", "working", agent="test-agent")
# 手动把 started_at 设为 60 分钟前
conn = get_connection(self.db_path)
try:
conn.execute(
"UPDATE tasks SET started_at=datetime('now','-60 minutes') WHERE id=?",
("to-002",)
)
conn.commit()
finally:
conn.close()
ticker = Ticker(registry=MagicMock())
ticker.claim_timeout_minutes = 5.0
ticker.default_task_timeout_minutes = 30.0
reclaimed = ticker._check_timeouts(self.db_path)
assert "to-002" in reclaimed
assert self.bb.get_task("to-002").status == "failed"
# ===================================================================
# S8: Mail Tab 6 端点
# ===================================================================
class TestS8MailTab:
"""S8: Mail 端到端"""
@pytest.fixture(autouse=True)
def setup(self, client):
self.mail_ids = []
def _send_mail(self, client, **kwargs):
# Fix: 'from' is a Python keyword, callers use 'from_'
if 'from_' in kwargs:
kwargs['from'] = kwargs.pop('from_')
resp = client.post("/api/mail", json=kwargs)
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True
mid = data["mail_id"]
self.mail_ids.append(mid)
return mid
def test_s81_send_inform_auto_done(self, client):
"""inform 类型 mail — API 层创建为 pending,由 ticker 处理自动完成"""
mid = self._send_mail(client,
title="通知测试",
text="这是一条通知",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="inform",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
# API 层创建为 pendingA1-A10 防御改造后 inform 不再自动标 done
# 实际 done 由 ticker 的 mail 幻觉门控兜底处理
assert data["status"] in ("pending", "done"), (
f"inform mail status should be pending or done, got {data['status']}"
)
def test_s82_send_task_assign_pending(self, client):
"""task-assign 类型保持 pending"""
mid = self._send_mail(client,
title="任务分配",
text="请完成此任务",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["status"] == "pending"
def test_s83_list_with_filters(self, client):
"""列表 + 筛选"""
self._send_mail(client,
title="筛选测试1",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
self._send_mail(client,
title="筛选测试2",
text="body",
from_="zhangfei-dev",
to="simayi-challenger",
type="text",
)
# 按 from 筛选
resp = client.get("/api/mail?from_agent=pangtong-fujunshi")
mails = resp.json()["mails"]
assert any(m["title"] == "筛选测试1" for m in mails)
# 按 to 筛选
resp = client.get("/api/mail?to_agent=simayi-challenger")
mails = resp.json()["mails"]
assert len(mails) >= 2
def test_s84_mail_detail_with_comments(self, client):
"""详情 + 评论"""
mid = self._send_mail(client,
title="详情测试",
text="正文内容",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 添加评论(通过 blackboard 直接写)
from src.api.mail_routes import _db_path
conn = get_connection(_db_path())
try:
conn.execute(
"INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)",
(mid, "simayi-challenger", "general", "收到,正在处理"),
)
conn.commit()
finally:
conn.close()
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["title"] == "详情测试"
# 验证评论已写入(通过直接查 DB,绕过 Comment.from_row 的 card_id 兼容问题)
from src.api.mail_routes import _db_path as mail_db
conn = get_connection(mail_db())
try:
row = conn.execute("SELECT COUNT(*) as cnt FROM comments WHERE task_id=?", (mid,)).fetchone()
assert row["cnt"] == 1
finally:
conn.close()
def test_s85_mark_read(self, client):
"""标记已读"""
mid = self._send_mail(client,
title="已读测试",
text="body",
from_="pangtong-fujunshi",
to="simayi-challenger",
type="text",
)
# 确认初始未读
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is False
# 标记已读
resp = client.patch(f"/api/mail/{mid}", json={"is_read": True})
assert resp.status_code == 200
# 验证
resp = client.get(f"/api/mail/{mid}")
assert resp.json()["is_read"] is True
def test_s86_mark_executed(self, client):
"""标记已执行(走完整状态链)"""
mid = self._send_mail(client,
title="执行测试",
text="body",
from_="pangtong-fujunshi",
to="zhangfei-dev",
type="task-assign",
)
# 先走状态链到 review,再标记 executed
from src.api.mail_routes import _db_path as mail_db
bb = Blackboard(mail_db())
for s in ["claimed", "working", "review"]:
bb.update_task_status(mid, s, agent="zhangfei-dev")
resp = client.patch(f"/api/mail/{mid}", json={"mark_executed": True})
assert resp.status_code == 200
resp = client.get(f"/api/mail/{mid}")
data = resp.json()
assert data["is_read"] is True
assert data["status"] == "done"
def test_s87_summary_and_agents(self, client):
"""统计 + Agent 列表"""
self._send_mail(client,
title="统计测试",
text="body",
from_="zhaoyun-data",
to="guanyu-dev",
type="inform",
)
resp = client.get("/api/mail/summary")
data = resp.json()
assert "total" in data
assert "unread" in data
assert data["total"] > 0
resp = client.get("/api/mail/agents/list")
data = resp.json()
assert "agents" in data
assert isinstance(data["agents"], list)