auto-sync: 2026-05-17 06:29:07
This commit is contained in:
+8
-6
@@ -193,9 +193,11 @@ class TestBlackboardAPI:
|
||||
# ===================================================================
|
||||
|
||||
class TestSSE:
|
||||
def test_sse_connection(self, client):
|
||||
with client.stream("GET", "/api/events") as resp:
|
||||
assert resp.status_code == 200
|
||||
# Read first chunk (connected event or heartbeat)
|
||||
chunk = next(resp.iter_lines())
|
||||
assert chunk # Got some data
|
||||
def test_sse_endpoint_exists(self, client):
|
||||
"""SSE 端点存在且返回正确 media type"""
|
||||
# 使用 stream context 读取第一行然后关闭
|
||||
# 注意:SSE 是长连接,不能像普通 API 一样 .get()
|
||||
resp = client.get("/api/events", headers={"Accept": "text/event-stream"})
|
||||
# TestClient 会将 StreamingResponse 读取完毕
|
||||
assert resp.status_code == 200
|
||||
assert "text/event-stream" in resp.headers.get("content-type", "")
|
||||
|
||||
Reference in New Issue
Block a user