"""S1-S8 API 集成测试 从 test_e2e_v27.py 的 TestE1-E8 迁移,改用 tmp_path 隔离。 使用 TestClient 直接测试 API 端点,无需 daemon。 覆盖:项目管理 → Task CRUD → SubTask → Stage 进度 → 父状态聚合 → 依赖链 → 超时 → Mail """ import json import os import sys import uuid from pathlib import Path from typing import Any, Dict import pytest from unittest.mock import MagicMock from fastapi.testclient import TestClient # 指向部署目录 DEPLOY_DIR = Path.home() / ".sanguo_projects" / "sanguo_moziplus_v2" sys.path.insert(0, str(DEPLOY_DIR)) from src.main import app from src.blackboard.db import init_db, get_connection, VALID_TRANSITIONS, VALID_STATUSES from src.blackboard.models import Task from src.blackboard.operations import Blackboard from src.blackboard.queries import Queries from src.blackboard.registry import ProjectRegistry from src.daemon.ticker import Ticker from src.utils import get_data_root pytestmark = pytest.mark.integration # ── Fixtures ── @pytest.fixture(scope="module") def isolated_client(): """模块级隔离 TestClient:monkeypatch get_data_root → tmp_path""" import tempfile tmp = Path(tempfile.mkdtemp(prefix="s1s8-")) tmp.mkdir(parents=True, exist_ok=True) import src.utils as utils original = utils.get_data_root utils.get_data_root = lambda: tmp client = TestClient(app) yield client, tmp utils.get_data_root = original import shutil shutil.rmtree(tmp, ignore_errors=True) @pytest.fixture(scope="module") def client(isolated_client): return isolated_client[0] @pytest.fixture(scope="module") def data_root(isolated_client): return isolated_client[1] def _pid() -> str: """生成唯一测试项目ID""" return f"test-s1s8-{uuid.uuid4().hex[:8]}" def _tid() -> str: """生成唯一任务ID""" return f"test-s1s8-task-{uuid.uuid4().hex[:8]}" # =================================================================== # S1: 项目管理 # =================================================================== class TestS1ProjectManagement: """S1: 项目创建、列表、归档""" def test_s11_create_and_get_project(self, client): pid = _pid() resp = client.post("/api/projects", json={ "id": pid, "name": f"集成测试-{pid}", "description": "自动测试项目", }) assert resp.status_code == 200 data = resp.json() assert data.get("project_id") == pid or data.get("id") == pid # 获取 resp = client.get(f"/api/projects/{pid}") assert resp.status_code == 200 assert resp.json().get("id") == pid def test_s12_list_projects(self, client): resp = client.get("/api/projects") assert resp.status_code == 200 data = resp.json() assert "projects" in data assert isinstance(data["projects"], dict) def test_s13_archive_project(self, client): pid = _pid() client.post("/api/projects", json={"id": pid, "name": f"Archive-{pid}"}) resp = client.post(f"/api/projects/{pid}/archive") assert resp.status_code == 200 # 验证状态 resp = client.get(f"/api/projects/{pid}") assert resp.json()["status"] == "archived" def test_s14_create_project_auto_discover(self, data_root): """创建含 blackboard.db 的目录,验证自动发现""" pid = _pid() project_dir = data_root / pid project_dir.mkdir(parents=True, exist_ok=True) init_db(project_dir / "blackboard.db") # 注册 registry = ProjectRegistry(data_root) registry.create_project(pid, f"Discover-{pid}", source="auto_discovered") projects = registry.list_projects() assert pid in projects # 清理 import shutil shutil.rmtree(project_dir, ignore_errors=True) # =================================================================== # S2: Task CRUD + 状态机 # =================================================================== class TestS2TaskCRUD: """S2: Task 创建、查询、状态转换""" @pytest.fixture(autouse=True) def setup_project(self, client): self.pid = _pid() client.post("/api/projects", json={ "id": self.pid, "name": f"S2-{self.pid}", }) def test_s21_create_task(self, client): tid = _tid() resp = client.post(f"/api/projects/{self.pid}/tasks", json={ "id": tid, "title": "测试任务", "description": "集成测试", "status": "pending", }) assert resp.status_code == 200 assert resp.json().get("task_id") == tid or resp.json().get("id") == tid self._tid = tid def test_s22_get_task_expand(self, client): tid = _tid() client.post(f"/api/projects/{self.pid}/tasks", json={ "id": tid, "title": "Expand测试", "status": "pending", }) resp = client.get(f"/api/projects/{self.pid}/tasks/{tid}?expand=all") assert resp.status_code == 200 data = resp.json() assert data.get("id") == tid assert "comments" in data or "outputs" in data def test_s23_valid_transitions(self, client): """pending → claimed → working → review → done""" tid = _tid() client.post(f"/api/projects/{self.pid}/tasks", json={ "id": tid, "title": "状态机测试", "status": "pending", }) transitions = [ ("claimed", {"agent": "zhangfei-dev"}), ("working", {"agent": "zhangfei-dev"}), ("review", {"agent": "zhangfei-dev"}), ("done", {"agent": "zhangfei-dev"}), ] for new_status, body in transitions: resp = client.post( f"/api/projects/{self.pid}/tasks/{tid}/status", json={"status": new_status, **body}, ) assert resp.status_code == 200, f"Failed at → {new_status}: {resp.text}" def test_s24_invalid_transition_rejected(self, client): """pending → done 应被拒绝""" tid = _tid() client.post(f"/api/projects/{self.pid}/tasks", json={ "id": tid, "title": "非法转换", "status": "pending", }) resp = client.post( f"/api/projects/{self.pid}/tasks/{tid}/status", json={"status": "done", "agent": "test"}, ) assert resp.status_code == 409 assert "invalid_transition" in resp.text or "Cannot transition" in resp.text def test_s25_list_tasks_filter(self, client): """按状态筛选""" for st in ["pending", "pending"]: client.post(f"/api/projects/{self.pid}/tasks", json={ "id": _tid(), "title": "Filter", "status": st, }) resp = client.get(f"/api/projects/{self.pid}/tasks?status=pending") assert resp.status_code == 200 tasks = resp.json().get("tasks", resp.json()) assert len(tasks) >= 2 # =================================================================== # S3: SubTask 父子关系 # =================================================================== class TestS3SubTask: """S3: 父子 Task 关系""" @pytest.fixture(autouse=True) def setup(self, client): self.pid = _pid() client.post("/api/projects", json={ "id": self.pid, "name": f"S3-{self.pid}", }) self.parent_id = "s3-parent-001" client.post(f"/api/projects/{self.pid}/tasks", json={ "id": self.parent_id, "title": "父任务", "status": "pending", "stages_json": json.dumps([{"id": "setup", "label": "Setup"}, {"id": "run", "label": "Run"}, {"id": "verify", "label": "Verify"}]), }) # 创建 3 个子 Task self.child_ids = [] for i, stage in enumerate(["setup", "run", "verify"]): cid = f"s3-child-{i}" self.child_ids.append(cid) client.post(f"/api/projects/{self.pid}/tasks", json={ "id": cid, "title": f"子任务-{stage}", "status": "pending", "parent_task": self.parent_id, "stage": stage, }) def test_s31_list_subtasks(self, client): resp = client.get(f"/api/projects/{self.pid}/tasks?parent_task={self.parent_id}") assert resp.status_code == 200 tasks = resp.json().get("tasks", resp.json()) assert len(tasks) == 3 ids = {t["id"] for t in tasks} assert set(self.child_ids) == ids def test_s32_top_level_excludes_children(self, data_root): db_path = data_root / self.pid / "blackboard.db" q = Queries(db_path) top = q.top_level_tasks() top_ids = {t.id for t in top} assert self.parent_id in top_ids for cid in self.child_ids: assert cid not in top_ids def test_s33_child_stage_field(self, data_root): db_path = data_root / self.pid / "blackboard.db" bb = Blackboard(db_path) for i, stage in enumerate(["setup", "run", "verify"]): t = bb.get_task(f"s3-child-{i}") assert t is not None assert t.stage == stage # =================================================================== # S4: Stage 进度 # =================================================================== class TestS4StageProgress: """S4: stages_json + stage 分组统计""" @pytest.fixture(autouse=True) def setup(self, client): self.pid = _pid() client.post("/api/projects", json={ "id": self.pid, "name": f"S4-{self.pid}", }) self.parent_id = "s4-parent-001" client.post(f"/api/projects/{self.pid}/tasks", json={ "id": self.parent_id, "title": "Stage父任务", "status": "pending", "stages_json": json.dumps([{"id": "data", "label": "Data"}, {"id": "code", "label": "Code"}, {"id": "test", "label": "Test"}]), }) # 每个stage一个子任务 for i, stage in enumerate(["data", "code", "test"]): client.post(f"/api/projects/{self.pid}/tasks", json={ "id": f"s4-child-{i}", "title": f"Stage-{stage}", "status": "pending", "parent_task": self.parent_id, "stage": stage, }) def test_s41_progress_endpoint(self, client): resp = client.get(f"/api/projects/{self.pid}/tasks/{self.parent_id}/progress") assert resp.status_code == 200 data = resp.json() assert "stages" in data assert len(data["stages"]) == 3 def test_s42_progress_counts(self, data_root): """把 data stage 标记 done,验证进度""" db_path = data_root / self.pid / "blackboard.db" bb = Blackboard(db_path) # data child → done bb.update_task_status("s4-child-0", "claimed", agent="test") bb.update_task_status("s4-child-0", "working", agent="test") bb.update_task_status("s4-child-0", "review", agent="test") bb.update_task_status("s4-child-0", "done", agent="test") q = Queries(db_path) progress = q.parent_task_progress(self.parent_id) stages = {s["id"]: s for s in progress["stages"]} assert stages["data"]["total"] == 1 assert stages["data"]["done"] == 1 assert stages["code"]["total"] == 1 assert stages["code"]["done"] == 0 def test_s43_empty_stages_progress(self, client, data_root): """无 stages_json 的 Task 进度""" tid = _tid() client.post(f"/api/projects/{self.pid}/tasks", json={ "id": tid, "title": "无Stage", "status": "pending", }) db_path = data_root / self.pid / "blackboard.db" q = Queries(db_path) progress = q.parent_task_progress(tid) # 无子任务,应返回空或基本结构 assert progress is not None # =================================================================== # S5: 父 Task 状态聚合 # =================================================================== class TestS5ParentAggregation: """S5: compute_parent_status 聚合逻辑""" @pytest.fixture(autouse=True) def setup(self, data_root): self.pid = _pid() db_path = data_root / self.pid / "blackboard.db" db_path.parent.mkdir(parents=True, exist_ok=True) init_db(db_path) self.db_path = db_path self.bb = Blackboard(db_path) self.q = Queries(db_path) # 创建父任务 self.parent_id = "s5-parent" self.bb.create_task(Task( id=self.parent_id, title="聚合父任务", status="pending", )) def _create_child(self, cid: str, status: str = "pending", stage: str = None): self.bb.create_task(Task( id=cid, title=f"子-{cid}", status="pending", parent_task=self.parent_id, stage=stage, )) if status == "pending": return # Walk state machine to target status path_map = { "claimed": ["claimed"], "working": ["claimed", "working"], "review": ["claimed", "working", "review"], "done": ["claimed", "working", "review", "done"], "failed": ["claimed", "working", "failed"], "blocked": ["claimed", "working", "blocked"], "cancelled": ["cancelled"], } for s in path_map.get(status, []): self.bb.update_task_status(cid, s, agent="test") def test_s51_all_done_parent_done(self): self._create_child("c1", "done") self._create_child("c2", "done") result = self.q.compute_parent_status(self.parent_id) assert result == "done" def test_s52_has_review_parent_review(self): self._create_child("c3", "done") self._create_child("c4", "review") result = self.q.compute_parent_status(self.parent_id) assert result == "review" def test_s53_has_working_parent_working(self): self._create_child("c5", "done") self._create_child("c6", "working") result = self.q.compute_parent_status(self.parent_id) assert result == "working" def test_s54_all_pending_parent_pending(self): self._create_child("c7", "pending") self._create_child("c8", "pending") result = self.q.compute_parent_status(self.parent_id) assert result == "pending" def test_s55_cancelled_excluded(self): """cancelled 子 Task 不参与聚合""" self._create_child("c9", "done") self._create_child("c10", "cancelled") # 只有 c9 有效 → done result = self.q.compute_parent_status(self.parent_id) assert result == "done" def test_s56_cancelled_parent_not_overridden(self): """cancelled 的父 Task 不被聚合覆盖""" # 先让所有子任务 done self._create_child("c11", "done") # 手动把父任务设为 cancelled conn = get_connection(self.db_path) try: conn.execute("UPDATE tasks SET status='cancelled' WHERE id=?", (self.parent_id,)) conn.commit() finally: conn.close() # _refresh_parent_statuses 应跳过 cancelled 父任务 ticker = Ticker(registry=MagicMock()) ticker._refresh_parent_statuses(self.db_path) # 验证父任务仍是 cancelled t = self.bb.get_task(self.parent_id) assert t.status == "cancelled" # =================================================================== # S6: 依赖链 # =================================================================== class TestS6DependencyChain: """S6: depends_on 依赖推进""" @pytest.fixture(autouse=True) def setup(self, data_root): self.pid = _pid() db_path = data_root / self.pid / "blackboard.db" db_path.parent.mkdir(parents=True, exist_ok=True) init_db(db_path) self.db_path = db_path self.bb = Blackboard(db_path) def test_s61_dep_advance(self): """A done → B 从 blocked → pending""" self.bb.create_task(Task(id="dep-a", title="A", status="pending")) self.bb.create_task(Task(id="dep-b", title="B", status="blocked", depends_on=json.dumps(["dep-a"]))) # 完成 A for s in ["claimed", "working", "review", "done"]: self.bb.update_task_status("dep-a", s, agent="test") # 手动 tick 依赖推进 ticker = Ticker(registry=MagicMock()) advanced = ticker._advance_dependencies(self.db_path) assert "dep-b" in advanced t = self.bb.get_task("dep-b") assert t.status == "pending" def test_s62_dep_not_done_stays_blocked(self): """A 未完成 → B 保持 blocked""" self.bb.create_task(Task(id="dep-c", title="C", status="pending")) self.bb.create_task(Task(id="dep-d", title="D", status="blocked", depends_on=json.dumps(["dep-c"]))) ticker = Ticker(registry=MagicMock()) advanced = ticker._advance_dependencies(self.db_path) assert "dep-d" not in advanced assert self.bb.get_task("dep-d").status == "blocked" def test_s63_chain_a_b_c(self): """A → B → C 多层依赖""" self.bb.create_task(Task(id="chain-a", title="A", status="pending")) self.bb.create_task(Task(id="chain-b", title="B", status="blocked", depends_on=json.dumps(["chain-a"]))) self.bb.create_task(Task(id="chain-c", title="C", status="blocked", depends_on=json.dumps(["chain-b"]))) # 完成 A for s in ["claimed", "working", "review", "done"]: self.bb.update_task_status("chain-a", s, agent="test") ticker = Ticker(registry=MagicMock()) advanced1 = ticker._advance_dependencies(self.db_path) assert "chain-b" in advanced1 # B 现在 pending,完成 B for s in ["claimed", "working", "review", "done"]: self.bb.update_task_status("chain-b", s, agent="test") advanced2 = ticker._advance_dependencies(self.db_path) assert "chain-c" in advanced2 # =================================================================== # S7: 超时回收 # =================================================================== class TestS7Timeout: """S7: claimed/working 超时回收""" @pytest.fixture(autouse=True) def setup(self, data_root): self.pid = _pid() db_path = data_root / self.pid / "blackboard.db" db_path.parent.mkdir(parents=True, exist_ok=True) init_db(db_path) self.db_path = db_path self.bb = Blackboard(db_path) def test_s71_claimed_timeout_to_pending(self): """claimed 超过 claim_timeout → pending""" self.bb.create_task(Task(id="to-001", title="超时测试", status="pending")) self.bb.update_task_status("to-001", "claimed", agent="test-agent") # 手动把 claimed_at 设为 10 分钟前 conn = get_connection(self.db_path) try: conn.execute( "UPDATE tasks SET claimed_at=datetime('now','-10 minutes') WHERE id=?", ("to-001",) ) conn.commit() finally: conn.close() ticker = Ticker(registry=MagicMock()) ticker.claim_timeout_minutes = 5.0 ticker.default_task_timeout_minutes = 30.0 reclaimed = ticker._check_timeouts(self.db_path) assert "to-001" in reclaimed assert self.bb.get_task("to-001").status == "pending" def test_s72_working_timeout_to_failed(self): """working 超过 task_timeout → failed""" self.bb.create_task(Task(id="to-002", title="工作超时", status="pending")) self.bb.update_task_status("to-002", "claimed", agent="test-agent") self.bb.update_task_status("to-002", "working", agent="test-agent") # 手动把 started_at 设为 60 分钟前 conn = get_connection(self.db_path) try: conn.execute( "UPDATE tasks SET started_at=datetime('now','-60 minutes') WHERE id=?", ("to-002",) ) conn.commit() finally: conn.close() ticker = Ticker(registry=MagicMock()) ticker.claim_timeout_minutes = 5.0 ticker.default_task_timeout_minutes = 30.0 reclaimed = ticker._check_timeouts(self.db_path) assert "to-002" in reclaimed assert self.bb.get_task("to-002").status == "failed" # =================================================================== # S8: Mail Tab 6 端点 # =================================================================== class TestS8MailTab: """S8: Mail 端到端""" @pytest.fixture(autouse=True) def setup(self, client): self.mail_ids = [] def _send_mail(self, client, **kwargs): # Fix: 'from' is a Python keyword, callers use 'from_' if 'from_' in kwargs: kwargs['from'] = kwargs.pop('from_') resp = client.post("/api/mail", json=kwargs) assert resp.status_code == 200 data = resp.json() assert data["ok"] is True mid = data["mail_id"] self.mail_ids.append(mid) return mid def test_s81_send_inform_auto_done(self, client): """inform 类型 mail — API 层创建为 pending,由 ticker 处理自动完成""" mid = self._send_mail(client, title="通知测试", text="这是一条通知", from_="pangtong-fujunshi", to="simayi-challenger", type="inform", ) resp = client.get(f"/api/mail/{mid}") data = resp.json() # API 层创建为 pending(A1-A10 防御改造后 inform 不再自动标 done) # 实际 done 由 ticker 的 mail 幻觉门控兜底处理 assert data["status"] in ("pending", "done"), ( f"inform mail status should be pending or done, got {data['status']}" ) def test_s82_send_task_assign_pending(self, client): """task-assign 类型保持 pending""" mid = self._send_mail(client, title="任务分配", text="请完成此任务", from_="pangtong-fujunshi", to="zhangfei-dev", type="task-assign", ) resp = client.get(f"/api/mail/{mid}") data = resp.json() assert data["status"] == "pending" def test_s83_list_with_filters(self, client): """列表 + 筛选""" self._send_mail(client, title="筛选测试1", text="body", from_="pangtong-fujunshi", to="simayi-challenger", type="text", ) self._send_mail(client, title="筛选测试2", text="body", from_="zhangfei-dev", to="simayi-challenger", type="text", ) # 按 from 筛选 resp = client.get("/api/mail?from_agent=pangtong-fujunshi") mails = resp.json()["mails"] assert any(m["title"] == "筛选测试1" for m in mails) # 按 to 筛选 resp = client.get("/api/mail?to_agent=simayi-challenger") mails = resp.json()["mails"] assert len(mails) >= 2 def test_s84_mail_detail_with_comments(self, client): """详情 + 评论""" mid = self._send_mail(client, title="详情测试", text="正文内容", from_="pangtong-fujunshi", to="simayi-challenger", type="text", ) # 添加评论(通过 blackboard 直接写) from src.api.mail_routes import _db_path conn = get_connection(_db_path()) try: conn.execute( "INSERT INTO comments (task_id, author, comment_type, body) VALUES (?, ?, ?, ?)", (mid, "simayi-challenger", "general", "收到,正在处理"), ) conn.commit() finally: conn.close() resp = client.get(f"/api/mail/{mid}") data = resp.json() assert data["title"] == "详情测试" # 验证评论已写入(通过直接查 DB,绕过 Comment.from_row 的 card_id 兼容问题) from src.api.mail_routes import _db_path as mail_db conn = get_connection(mail_db()) try: row = conn.execute("SELECT COUNT(*) as cnt FROM comments WHERE task_id=?", (mid,)).fetchone() assert row["cnt"] == 1 finally: conn.close() def test_s85_mark_read(self, client): """标记已读""" mid = self._send_mail(client, title="已读测试", text="body", from_="pangtong-fujunshi", to="simayi-challenger", type="text", ) # 确认初始未读 resp = client.get(f"/api/mail/{mid}") assert resp.json()["is_read"] is False # 标记已读 resp = client.patch(f"/api/mail/{mid}", json={"is_read": True}) assert resp.status_code == 200 # 验证 resp = client.get(f"/api/mail/{mid}") assert resp.json()["is_read"] is True def test_s86_mark_executed(self, client): """标记已执行(走完整状态链)""" mid = self._send_mail(client, title="执行测试", text="body", from_="pangtong-fujunshi", to="zhangfei-dev", type="task-assign", ) # 先走状态链到 review,再标记 executed from src.api.mail_routes import _db_path as mail_db bb = Blackboard(mail_db()) for s in ["claimed", "working", "review"]: bb.update_task_status(mid, s, agent="zhangfei-dev") resp = client.patch(f"/api/mail/{mid}", json={"mark_executed": True}) assert resp.status_code == 200 resp = client.get(f"/api/mail/{mid}") data = resp.json() assert data["is_read"] is True assert data["status"] == "done" def test_s87_summary_and_agents(self, client): """统计 + Agent 列表""" self._send_mail(client, title="统计测试", text="body", from_="zhaoyun-data", to="guanyu-dev", type="inform", ) resp = client.get("/api/mail/summary") data = resp.json() assert "total" in data assert "unread" in data assert data["total"] > 0 resp = client.get("/api/mail/agents/list") data = resp.json() assert "agents" in data assert isinstance(data["agents"], list)