feat: add download locking and flood wait handling in TelegramAdapter

This commit is contained in:
shiyu
2026-05-06 23:00:10 +08:00
parent 93d5e5e313
commit bd24d7eeeb

View File

@@ -6,7 +6,7 @@ import os
import struct
import time
from models import StorageAdapter
from telethon import TelegramClient, utils
from telethon import TelegramClient, errors, utils
from telethon.crypto import AuthKey
from telethon.sessions import StringSession
from telethon.tl import types
@@ -90,6 +90,7 @@ class TelegramAdapter:
self._client: TelegramClient | None = None
self._client_lock = asyncio.Lock()
self._download_lock = asyncio.Lock()
self._message_cache: Dict[int, Tuple[float, object]] = {}
@staticmethod
@@ -229,6 +230,19 @@ class TelegramAdapter:
def _get_message_media(message):
return message.document or message.video or message.photo
@staticmethod
def _flood_wait_http_exception(exc: errors.FloodWaitError):
from fastapi import HTTPException
seconds = int(getattr(exc, "seconds", 0) or 0)
if seconds > 0:
return HTTPException(
status_code=429,
detail=f"Telegram 请求过于频繁,请等待 {seconds} 秒后重试",
headers={"Retry-After": str(seconds)},
)
return HTTPException(status_code=429, detail="Telegram 请求过于频繁,请稍后重试")
@staticmethod
def _get_message_file_size(message, media) -> int:
file_meta = message.file
@@ -310,7 +324,7 @@ class TelegramAdapter:
"size": size,
"mtime": int(message.date.timestamp()),
"type": "file",
"has_thumbnail": self._message_has_thumbnail(message),
"has_thumbnail": False,
})
finally:
if client.is_connected():
@@ -349,8 +363,13 @@ class TelegramAdapter:
if not message or not self._get_message_media(message):
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
file_bytes = await client.download_media(message, file=bytes)
return file_bytes
try:
async with self._download_lock:
file_bytes = await client.download_media(message, file=bytes)
return file_bytes
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
raise self._flood_wait_http_exception(exc)
async def read_file_range(self, root: str, rel: str, start: int, end: Optional[int] = None) -> bytes:
from fastapi import HTTPException
@@ -379,22 +398,27 @@ class TelegramAdapter:
limit = end - start + 1
data = bytearray()
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size or None,
):
if not chunk:
continue
need = limit - len(data)
if need <= 0:
break
data.extend(chunk[:need])
if len(data) >= limit:
break
return bytes(data)
try:
async with self._download_lock:
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size or None,
):
if not chunk:
continue
need = limit - len(data)
if need <= 0:
break
data.extend(chunk[:need])
if len(data) >= limit:
break
return bytes(data)
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
raise self._flood_wait_http_exception(exc)
async def write_file(self, root: str, rel: str, data: bytes):
"""将字节数据作为文件上传"""
@@ -515,7 +539,8 @@ class TelegramAdapter:
if embedded and isinstance(thumb, types.PhotoStrippedSize):
return utils.stripped_photo_to_jpg(bytes(embedded))
result = await client.download_media(message, bytes, thumb=thumb)
async with self._download_lock:
result = await client.download_media(message, bytes, thumb=thumb)
if isinstance(result, (bytes, bytearray)):
return bytes(result)
return None
@@ -602,29 +627,56 @@ class TelegramAdapter:
headers["Content-Length"] = str(end - start + 1)
async def iterator():
downloaded = 0
try:
limit = end - start + 1
downloaded = 0
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size,
):
if downloaded + len(chunk) > limit:
yield chunk[:limit - downloaded]
break
yield chunk
downloaded += len(chunk)
if downloaded >= limit:
break
async with self._download_lock:
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size,
):
if not chunk:
continue
remaining = limit - downloaded
if remaining <= 0:
break
data = chunk[:remaining]
downloaded += len(data)
yield data
if downloaded >= limit:
break
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
if downloaded == 0:
raise self._flood_wait_http_exception(exc)
seconds = int(getattr(exc, "seconds", 0) or 0)
print(f"Telegram streaming stopped by FloodWait after partial response, wait={seconds}s")
return
except Exception:
await self._disconnect_shared_client()
raise
return StreamingResponse(iterator(), status_code=status, headers=headers)
agen = iterator()
try:
first_chunk = await agen.__anext__()
except StopAsyncIteration:
first_chunk = b""
except HTTPException:
raise
async def response_iterator():
try:
if first_chunk:
yield first_chunk
async for chunk in agen:
yield chunk
finally:
await agen.aclose()
return StreamingResponse(response_iterator(), status_code=status, headers=headers)
except HTTPException:
raise
@@ -654,7 +706,7 @@ class TelegramAdapter:
"size": size,
"mtime": int(message.date.timestamp()),
"type": "file",
"has_thumbnail": self._message_has_thumbnail(message),
"has_thumbnail": False,
}
def ADAPTER_FACTORY(rec: StorageAdapter) -> TelegramAdapter: