diff --git a/src/api/sse_routes.py b/src/api/sse_routes.py index 75486ef..55e03a4 100644 --- a/src/api/sse_routes.py +++ b/src/api/sse_routes.py @@ -1,17 +1,55 @@ -"""SSE 推送路由(占位,F17 完善)""" +"""SSE 推送路由""" from __future__ import annotations -from fastapi import APIRouter, Request +import asyncio +import json +from typing import Optional + +from fastapi import APIRouter, Query, Request from fastapi.responses import StreamingResponse +from src.daemon.sse import SSEBroker + router = APIRouter(prefix="/api/events", tags=["sse"]) +# 全局 broker 实例(由 main.py 注入) +_broker: Optional[SSEBroker] = None + + +def get_broker() -> SSEBroker: + global _broker + if _broker is None: + _broker = SSEBroker() + return _broker + + +def set_broker(broker: SSEBroker) -> None: + global _broker + _broker = broker + @router.get("") -async def event_stream(request: Request): - """SSE 端点(占位,F17 实现真实推送)""" +async def event_stream( + request: Request, + project: Optional[str] = Query(None, description="Filter by project ID"), +): + """SSE 端点 — 实时推送黑板事件""" + broker = get_broker() + client_id, queue = broker.subscribe() + async def generate(): - yield 'data: {"type":"connected"}\n\n' + try: + while True: + if await request.is_disconnected(): + break + try: + event = await asyncio.wait_for(queue.get(), timeout=30.0) + yield event.to_sse() + except asyncio.TimeoutError: + # Heartbeat + yield ": heartbeat\n\n" + finally: + broker.unsubscribe(client_id) return StreamingResponse(generate(), media_type="text/event-stream")