"""F5 测试:API 层""" import json import os from pathlib import Path import pytest from fastapi.testclient import TestClient from src.blackboard.operations import Blackboard from src.blackboard.models import Task from src.blackboard.registry import ProjectRegistry from src.main import app @pytest.fixture def project_env(tmp_path): """创建临时项目环境""" project_root = tmp_path / "projects" project_root.mkdir() os.environ["BLACKBOARD_ROOT"] = str(project_root) # Create a test project with registry + DB reg = ProjectRegistry(project_root) reg.create_project("test-proj", "Test Project", agents=["agent1"]) bb = Blackboard(project_root / "test-proj" / "blackboard.db") bb.create_task(Task(id="t1", title="Existing Task", task_type="coding")) yield project_root del os.environ["BLACKBOARD_ROOT"] @pytest.fixture def client(): return TestClient(app) # =================================================================== # Daemon API # =================================================================== class TestDaemonAPI: def test_status(self, client): resp = client.get("/api/daemon/status") assert resp.status_code == 200 assert resp.json()["status"] == "running" def test_manual_tick(self, client): resp = client.post("/api/daemon/tick") assert resp.status_code == 200 # =================================================================== # Project API # =================================================================== class TestProjectAPI: def test_list_projects(self, client, project_env): resp = client.get("/api/projects") assert resp.status_code == 200 assert "test-proj" in resp.json()["projects"] def test_create_project(self, client, tmp_path): root = tmp_path / "new_root" root.mkdir() os.environ["BLACKBOARD_ROOT"] = str(root) try: resp = client.post("/api/projects", json={ "id": "new-proj", "name": "New", "agents": ["a1"], "description": "test", }) assert resp.status_code == 200 assert resp.json()["ok"] finally: del os.environ["BLACKBOARD_ROOT"] def test_create_duplicate(self, client, project_env): resp = client.post("/api/projects", json={ "id": "test-proj", "name": "Dup", }) assert resp.status_code == 409 def test_get_project(self, client, project_env): resp = client.get("/api/projects/test-proj") assert resp.status_code == 200 assert resp.json()["name"] == "Test Project" def test_get_nonexistent(self, client, project_env): resp = client.get("/api/projects/nope") assert resp.status_code == 404 # =================================================================== # Blackboard API # =================================================================== class TestBlackboardAPI: def test_list_tasks(self, client, project_env): resp = client.get("/api/projects/test-proj/tasks") assert resp.status_code == 200 assert len(resp.json()["tasks"]) == 1 def test_get_task(self, client, project_env): resp = client.get("/api/projects/test-proj/tasks/t1") assert resp.status_code == 200 assert resp.json()["title"] == "Existing Task" def test_get_task_404(self, client, project_env): resp = client.get("/api/projects/test-proj/tasks/nope") assert resp.status_code == 404 def test_create_task(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks", json={ "id": "t2", "title": "New Task", "task_type": "review", }) assert resp.status_code == 200 assert resp.json()["task_id"] == "t2" def test_claim_task(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/claim", json={"agent": "agent1"}) assert resp.status_code == 200 def test_update_status(self, client, project_env): # Claim first client.post("/api/projects/test-proj/tasks/t1/claim", json={"agent": "agent1"}) resp = client.post("/api/projects/test-proj/tasks/t1/status", json={"status": "working", "agent": "agent1"}) assert resp.status_code == 200 def test_invalid_status_transition(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/status", json={"status": "done"}) assert resp.status_code == 409 def test_add_comment(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/comments", json={ "author": "pangtong", "body": "Nice", "comment_type": "general", "mentions": ["agent1"], }) assert resp.status_code == 200 def test_get_comments(self, client, project_env): client.post("/api/projects/test-proj/tasks/t1/comments", json={ "author": "a", "body": "Hello", }) resp = client.get("/api/projects/test-proj/tasks/t1/comments") assert resp.status_code == 200 assert len(resp.json()["comments"]) == 1 def test_write_output(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/outputs", json={ "agent": "agent1", "type": "code", "title": "main.py", }) assert resp.status_code == 200 def test_add_decision(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/decisions", json={ "decider": "pangtong", "decision": "Use X", "rationale": "Better", }) assert resp.status_code == 200 def test_add_observation(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/observations", json={ "observer": "simayi", "body": "Warning!", "severity": "warning", }) assert resp.status_code == 200 def test_add_review(self, client, project_env): resp = client.post("/api/projects/test-proj/tasks/t1/reviews", json={ "id": "rev-1", "reviewer": "simayi", "review_type": "output_review", "verdict": "approved", "summary": "LGTM", "confidence": 0.9, }) assert resp.status_code == 200 def test_get_events(self, client, project_env): resp = client.get("/api/projects/test-proj/events") assert resp.status_code == 200 assert "events" in resp.json() def test_summary(self, client, project_env): resp = client.get("/api/projects/test-proj/summary") assert resp.status_code == 200 assert "pending" in resp.json()["summary"] # =================================================================== # SSE # =================================================================== class TestSSE: def test_sse_endpoint_returns_event_stream(self, client): """SSE 端点返回 text/event-stream""" # TestClient 的 .get() 会等 streaming 完成才返回 # 在 async generator 里 subscribe() 需要运行中的 event loop # 这里只测端点可达性,用后台线程读取 import threading result = {} def fetch(): try: resp = client.get("/api/events") result['status'] = resp.status_code result['content_type'] = resp.headers.get('content-type', '') result['body'] = resp.text[:200] except Exception as e: result['error'] = str(e) t = threading.Thread(target=fetch, daemon=True) t.start() t.join(timeout=5.0) if 'error' in result: pytest.skip(f"SSE test needs async server: {result['error']}") elif 'status' in result: assert result['status'] == 200