From bd24d7eeeba4858ca92385888fc383a5b18fd2b2 Mon Sep 17 00:00:00 2001 From: shiyu Date: Wed, 6 May 2026 23:00:10 +0800 Subject: [PATCH] feat: add download locking and flood wait handling in TelegramAdapter --- domain/adapters/providers/telegram.py | 130 ++++++++++++++++++-------- 1 file changed, 91 insertions(+), 39 deletions(-) diff --git a/domain/adapters/providers/telegram.py b/domain/adapters/providers/telegram.py index 06a2e31..8f4b445 100644 --- a/domain/adapters/providers/telegram.py +++ b/domain/adapters/providers/telegram.py @@ -6,7 +6,7 @@ import os import struct import time from models import StorageAdapter -from telethon import TelegramClient, utils +from telethon import TelegramClient, errors, utils from telethon.crypto import AuthKey from telethon.sessions import StringSession from telethon.tl import types @@ -90,6 +90,7 @@ class TelegramAdapter: self._client: TelegramClient | None = None self._client_lock = asyncio.Lock() + self._download_lock = asyncio.Lock() self._message_cache: Dict[int, Tuple[float, object]] = {} @staticmethod @@ -229,6 +230,19 @@ class TelegramAdapter: def _get_message_media(message): return message.document or message.video or message.photo + @staticmethod + def _flood_wait_http_exception(exc: errors.FloodWaitError): + from fastapi import HTTPException + + seconds = int(getattr(exc, "seconds", 0) or 0) + if seconds > 0: + return HTTPException( + status_code=429, + detail=f"Telegram 请求过于频繁,请等待 {seconds} 秒后重试", + headers={"Retry-After": str(seconds)}, + ) + return HTTPException(status_code=429, detail="Telegram 请求过于频繁,请稍后重试") + @staticmethod def _get_message_file_size(message, media) -> int: file_meta = message.file @@ -310,7 +324,7 @@ class TelegramAdapter: "size": size, "mtime": int(message.date.timestamp()), "type": "file", - "has_thumbnail": self._message_has_thumbnail(message), + "has_thumbnail": False, }) finally: if client.is_connected(): @@ -349,8 +363,13 @@ class TelegramAdapter: if not message or not self._get_message_media(message): raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件") - file_bytes = await client.download_media(message, file=bytes) - return file_bytes + try: + async with self._download_lock: + file_bytes = await client.download_media(message, file=bytes) + return file_bytes + except errors.FloodWaitError as exc: + await self._disconnect_shared_client() + raise self._flood_wait_http_exception(exc) async def read_file_range(self, root: str, rel: str, start: int, end: Optional[int] = None) -> bytes: from fastapi import HTTPException @@ -379,22 +398,27 @@ class TelegramAdapter: limit = end - start + 1 data = bytearray() - async for chunk in client.iter_download( - media, - offset=start, - request_size=self._download_chunk_size, - chunk_size=self._download_chunk_size, - file_size=file_size or None, - ): - if not chunk: - continue - need = limit - len(data) - if need <= 0: - break - data.extend(chunk[:need]) - if len(data) >= limit: - break - return bytes(data) + try: + async with self._download_lock: + async for chunk in client.iter_download( + media, + offset=start, + request_size=self._download_chunk_size, + chunk_size=self._download_chunk_size, + file_size=file_size or None, + ): + if not chunk: + continue + need = limit - len(data) + if need <= 0: + break + data.extend(chunk[:need]) + if len(data) >= limit: + break + return bytes(data) + except errors.FloodWaitError as exc: + await self._disconnect_shared_client() + raise self._flood_wait_http_exception(exc) async def write_file(self, root: str, rel: str, data: bytes): """将字节数据作为文件上传""" @@ -515,7 +539,8 @@ class TelegramAdapter: if embedded and isinstance(thumb, types.PhotoStrippedSize): return utils.stripped_photo_to_jpg(bytes(embedded)) - result = await client.download_media(message, bytes, thumb=thumb) + async with self._download_lock: + result = await client.download_media(message, bytes, thumb=thumb) if isinstance(result, (bytes, bytearray)): return bytes(result) return None @@ -602,29 +627,56 @@ class TelegramAdapter: headers["Content-Length"] = str(end - start + 1) async def iterator(): + downloaded = 0 try: limit = end - start + 1 - downloaded = 0 - - async for chunk in client.iter_download( - media, - offset=start, - request_size=self._download_chunk_size, - chunk_size=self._download_chunk_size, - file_size=file_size, - ): - if downloaded + len(chunk) > limit: - yield chunk[:limit - downloaded] - break - yield chunk - downloaded += len(chunk) - if downloaded >= limit: - break + async with self._download_lock: + async for chunk in client.iter_download( + media, + offset=start, + request_size=self._download_chunk_size, + chunk_size=self._download_chunk_size, + file_size=file_size, + ): + if not chunk: + continue + remaining = limit - downloaded + if remaining <= 0: + break + data = chunk[:remaining] + downloaded += len(data) + yield data + if downloaded >= limit: + break + except errors.FloodWaitError as exc: + await self._disconnect_shared_client() + if downloaded == 0: + raise self._flood_wait_http_exception(exc) + seconds = int(getattr(exc, "seconds", 0) or 0) + print(f"Telegram streaming stopped by FloodWait after partial response, wait={seconds}s") + return except Exception: await self._disconnect_shared_client() raise - return StreamingResponse(iterator(), status_code=status, headers=headers) + agen = iterator() + try: + first_chunk = await agen.__anext__() + except StopAsyncIteration: + first_chunk = b"" + except HTTPException: + raise + + async def response_iterator(): + try: + if first_chunk: + yield first_chunk + async for chunk in agen: + yield chunk + finally: + await agen.aclose() + + return StreamingResponse(response_iterator(), status_code=status, headers=headers) except HTTPException: raise @@ -654,7 +706,7 @@ class TelegramAdapter: "size": size, "mtime": int(message.date.timestamp()), "type": "file", - "has_thumbnail": self._message_has_thumbnail(message), + "has_thumbnail": False, } def ADAPTER_FACTORY(rec: StorageAdapter) -> TelegramAdapter: