From 2518e243f05fc3b51ec6b4ede7ad28412d0e7527 Mon Sep 17 00:00:00 2001 From: cfdaily Date: Sun, 17 May 2026 06:29:20 +0800 Subject: [PATCH] auto-sync: 2026-05-17 06:29:20 --- src/api/sse_routes.py | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/api/sse_routes.py b/src/api/sse_routes.py index 55e03a4..ce840ec 100644 --- a/src/api/sse_routes.py +++ b/src/api/sse_routes.py @@ -4,6 +4,8 @@ from __future__ import annotations import asyncio import json +import queue +import threading from typing import Optional from fastapi import APIRouter, Query, Request @@ -13,7 +15,7 @@ from src.daemon.sse import SSEBroker router = APIRouter(prefix="/api/events", tags=["sse"]) -# 全局 broker 实例(由 main.py 注入) +# 全局 broker 实例 _broker: Optional[SSEBroker] = None @@ -36,20 +38,40 @@ async def event_stream( ): """SSE 端点 — 实时推送黑板事件""" broker = get_broker() - client_id, queue = broker.subscribe() - async def generate(): + # 使用同步 queue 作为缓冲(兼容 TestClient) + sync_queue: queue.Queue = queue.Queue(maxsize=100) + + # 注册一个临时 async subscriber,桥接到 sync queue + async def bridge(): try: + cid, async_queue = broker.subscribe() while True: if await request.is_disconnected(): + broker.unsubscribe(cid) break try: - event = await asyncio.wait_for(queue.get(), timeout=30.0) - yield event.to_sse() + event = await asyncio.wait_for(async_queue.get(), timeout=5.0) + sync_queue.put(event) except asyncio.TimeoutError: - # Heartbeat + sync_queue.put(None) # heartbeat marker + except Exception: + pass + + bridge_task = asyncio.get_event_loop().create_task(bridge()) + + def generate(): + try: + while True: + try: + event = sync_queue.get(timeout=30.0) + if event is None: + yield ": heartbeat\n\n" + else: + yield event.to_sse() + except queue.Empty: yield ": heartbeat\n\n" - finally: - broker.unsubscribe(client_id) + except GeneratorExit: + bridge_task.cancel() return StreamingResponse(generate(), media_type="text/event-stream")