auto-sync: 2026-05-17 06:29:53

This commit is contained in:
cfdaily
2026-05-17 06:29:53 +08:00
parent c3a40e9f07
commit bff52614bc
+5 -25
View File
@@ -193,28 +193,8 @@ class TestBlackboardAPI:
# ===================================================================
class TestSSE:
def test_sse_endpoint_returns_event_stream(self, client):
"""SSE 端点返回 text/event-stream"""
# TestClient 的 .get() 会等 streaming 完成才返回
# 在 async generator 里 subscribe() 需要运行中的 event loop
# 这里只测端点可达性,用后台线程读取
import threading
result = {}
def fetch():
try:
resp = client.get("/api/events")
result['status'] = resp.status_code
result['content_type'] = resp.headers.get('content-type', '')
result['body'] = resp.text[:200]
except Exception as e:
result['error'] = str(e)
t = threading.Thread(target=fetch, daemon=True)
t.start()
t.join(timeout=5.0)
if 'error' in result:
pytest.skip(f"SSE test needs async server: {result['error']}")
elif 'status' in result:
assert result['status'] == 200
def test_sse_endpoint_registered(self):
"""SSE 路由已注册(详细测试见 test_sse.py"""
from src.main import app
routes = [r.path for r in app.routes]
assert "/api/events" in routes