fix: optimize SSE event streaming with batched processing

This commit is contained in:
jxxghp
2026-05-09 13:23:05 +08:00
parent 893b8eba86
commit 45d47d32f8

View File

@@ -1,3 +1,4 @@
import asyncio
import json
from typing import List, Any, Optional, AsyncIterator
@@ -17,6 +18,9 @@ from app.schemas.types import MediaType, ChainEventType
router = APIRouter()
_SSE_APPEND_FLUSH_INTERVAL = 1
_SSE_APPEND_MAX_ITEMS = 48
def _parse_site_list(sites: Optional[str]) -> Optional[List[int]]:
"""
@@ -32,14 +36,97 @@ def _sse_event(data: dict) -> str:
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def _merge_append_event(pending_event: Optional[dict], event: dict) -> dict:
"""
合并短时间内连续到达的 append 事件,降低前端刷新频率。
"""
items = list(event.get("items") or [])
if not pending_event:
merged_event = dict(event)
merged_event["items"] = items
return merged_event
merged_event = dict(pending_event)
merged_event.update({
key: value
for key, value in event.items()
if key != "items"
})
merged_event["type"] = "append"
merged_event["items"] = [*(pending_event.get("items") or []), *items]
return merged_event
async def _iter_batched_search_events(event_source: AsyncIterator[dict]) -> AsyncIterator[dict]:
"""
对搜索流事件做轻量批处理,避免站点结果集中返回时产生过密 SSE。
"""
iterator = event_source.__aiter__()
pending_append_event: Optional[dict] = None
next_event_task: Optional[asyncio.Task] = None
try:
while True:
if next_event_task is None:
next_event_task = asyncio.create_task(anext(iterator))
timeout = _SSE_APPEND_FLUSH_INTERVAL if pending_append_event else None
done, _ = await asyncio.wait({next_event_task}, timeout=timeout)
if not done:
if pending_append_event:
yield pending_append_event
pending_append_event = None
continue
try:
event = next_event_task.result()
except StopAsyncIteration:
next_event_task = None
break
finally:
if next_event_task and next_event_task.done():
next_event_task = None
if event.get("type") == "append":
pending_append_event = _merge_append_event(pending_append_event, event)
if len(pending_append_event.get("items") or []) >= _SSE_APPEND_MAX_ITEMS:
yield pending_append_event
pending_append_event = None
continue
if pending_append_event:
yield pending_append_event
pending_append_event = None
yield event
finally:
if next_event_task and not next_event_task.done():
next_event_task.cancel()
await asyncio.gather(next_event_task, return_exceptions=True)
if pending_append_event:
yield pending_append_event
async def _stream_search_events(request: Request, event_source: AsyncIterator[dict]):
"""
输出搜索SSE事件
"""
try:
async for event in event_source:
has_sent_final_replace = False
async for event in _iter_batched_search_events(event_source):
if await request.is_disconnected():
break
# 精确搜索会先发送 replace再发送 done。done 再带整包 items 只会重复占用带宽和前端内存。
if event.get("type") == "replace" and event.get("items"):
has_sent_final_replace = True
elif event.get("type") == "done" and has_sent_final_replace and event.get("stage") == "done" and event.get("items"):
event = {
key: value
for key, value in event.items()
if key != "items"
}
yield _sse_event(event)
except Exception as err:
logger.error(f"渐进式搜索出错:{err}", exc_info=True)