diff --git a/app/api/endpoints/search.py b/app/api/endpoints/search.py index f6ea2491..b88c3fbf 100644 --- a/app/api/endpoints/search.py +++ b/app/api/endpoints/search.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import List, Any, Optional, AsyncIterator @@ -17,6 +18,9 @@ from app.schemas.types import MediaType, ChainEventType router = APIRouter() +_SSE_APPEND_FLUSH_INTERVAL = 1 +_SSE_APPEND_MAX_ITEMS = 48 + def _parse_site_list(sites: Optional[str]) -> Optional[List[int]]: """ @@ -32,14 +36,97 @@ def _sse_event(data: dict) -> str: return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" +def _merge_append_event(pending_event: Optional[dict], event: dict) -> dict: + """ + 合并短时间内连续到达的 append 事件,降低前端刷新频率。 + """ + items = list(event.get("items") or []) + if not pending_event: + merged_event = dict(event) + merged_event["items"] = items + return merged_event + + merged_event = dict(pending_event) + merged_event.update({ + key: value + for key, value in event.items() + if key != "items" + }) + merged_event["type"] = "append" + merged_event["items"] = [*(pending_event.get("items") or []), *items] + return merged_event + + +async def _iter_batched_search_events(event_source: AsyncIterator[dict]) -> AsyncIterator[dict]: + """ + 对搜索流事件做轻量批处理,避免站点结果集中返回时产生过密 SSE。 + """ + iterator = event_source.__aiter__() + pending_append_event: Optional[dict] = None + next_event_task: Optional[asyncio.Task] = None + + try: + while True: + if next_event_task is None: + next_event_task = asyncio.create_task(anext(iterator)) + + timeout = _SSE_APPEND_FLUSH_INTERVAL if pending_append_event else None + done, _ = await asyncio.wait({next_event_task}, timeout=timeout) + + if not done: + if pending_append_event: + yield pending_append_event + pending_append_event = None + continue + + try: + event = next_event_task.result() + except StopAsyncIteration: + next_event_task = None + break + finally: + if next_event_task and next_event_task.done(): + next_event_task = None + + if event.get("type") == "append": + pending_append_event = _merge_append_event(pending_append_event, event) + if len(pending_append_event.get("items") or []) >= _SSE_APPEND_MAX_ITEMS: + yield pending_append_event + pending_append_event = None + continue + + if pending_append_event: + yield pending_append_event + pending_append_event = None + + yield event + finally: + if next_event_task and not next_event_task.done(): + next_event_task.cancel() + await asyncio.gather(next_event_task, return_exceptions=True) + + if pending_append_event: + yield pending_append_event + + async def _stream_search_events(request: Request, event_source: AsyncIterator[dict]): """ 输出搜索SSE事件 """ try: - async for event in event_source: + has_sent_final_replace = False + async for event in _iter_batched_search_events(event_source): if await request.is_disconnected(): break + # 精确搜索会先发送 replace,再发送 done。done 再带整包 items 只会重复占用带宽和前端内存。 + if event.get("type") == "replace" and event.get("items"): + has_sent_final_replace = True + elif event.get("type") == "done" and has_sent_final_replace and event.get("stage") == "done" and event.get("items"): + event = { + key: value + for key, value in event.items() + if key != "items" + } yield _sse_event(event) except Exception as err: logger.error(f"渐进式搜索出错:{err}", exc_info=True)