mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-05-07 05:02:42 +08:00
feat: enhance Telegram adapter to support parsing legacy session_string and fetching thumbnails
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
from typing import List, Dict, Tuple, AsyncIterator
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
from models import StorageAdapter
|
||||
from telethon import TelegramClient
|
||||
from telethon.crypto import AuthKey
|
||||
from telethon.sessions import StringSession
|
||||
from telethon.tl import types
|
||||
import socks
|
||||
|
||||
# 适配器类型标识
|
||||
@@ -54,9 +58,93 @@ class TelegramAdapter:
|
||||
if not all([self.api_id, self.api_hash, self.session_string, self.chat_id]):
|
||||
raise ValueError("Telegram 适配器需要 api_id, api_hash, session_string 和 chat_id")
|
||||
|
||||
@staticmethod
|
||||
def _parse_legacy_session_string(value: str) -> StringSession:
|
||||
"""
|
||||
兼容旧版 session_string 格式:
|
||||
- version(1B char) + base64(data)
|
||||
- data: dc_id(1B) + ip_len(2B) + ip(ASCII, ip_len bytes) + port(2B) + auth_key(256B)
|
||||
"""
|
||||
s = (value or "").strip()
|
||||
if not s:
|
||||
raise ValueError("session_string 为空")
|
||||
|
||||
body = s[1:] if s.startswith("1") else s
|
||||
raw = base64.urlsafe_b64decode(body)
|
||||
if len(raw) < 1 + 2 + 2 + 256:
|
||||
raise ValueError("legacy session 数据长度不足")
|
||||
|
||||
dc_id = raw[0]
|
||||
ip_len = struct.unpack(">H", raw[1:3])[0]
|
||||
expected_len = 1 + 2 + ip_len + 2 + 256
|
||||
if len(raw) != expected_len:
|
||||
raise ValueError("legacy session 数据长度不匹配")
|
||||
|
||||
ip_start = 3
|
||||
ip_end = ip_start + ip_len
|
||||
ip = raw[ip_start:ip_end].decode("utf-8")
|
||||
port = struct.unpack(">H", raw[ip_end : ip_end + 2])[0]
|
||||
key = raw[ip_end + 2 : ip_end + 2 + 256]
|
||||
|
||||
sess = StringSession()
|
||||
sess.set_dc(dc_id, ip, port)
|
||||
sess.auth_key = AuthKey(key)
|
||||
return sess
|
||||
|
||||
@staticmethod
|
||||
def _pick_photo_thumb(thumbs: list | None):
|
||||
if not thumbs:
|
||||
return None
|
||||
|
||||
cached = []
|
||||
others = []
|
||||
for t in thumbs:
|
||||
if isinstance(t, (types.PhotoCachedSize, types.PhotoStrippedSize)):
|
||||
cached.append(t)
|
||||
elif isinstance(t, (types.PhotoSize, types.PhotoSizeProgressive)):
|
||||
if not isinstance(t, types.PhotoSizeEmpty):
|
||||
others.append(t)
|
||||
|
||||
if cached:
|
||||
cached.sort(key=lambda x: len(getattr(x, "bytes", b"") or b""))
|
||||
return cached[-1]
|
||||
|
||||
if others:
|
||||
def _sz(x):
|
||||
if isinstance(x, types.PhotoSizeProgressive):
|
||||
return max(x.sizes or [0])
|
||||
return int(getattr(x, "size", 0) or 0)
|
||||
|
||||
others.sort(key=_sz)
|
||||
return others[-1]
|
||||
|
||||
return None
|
||||
|
||||
def _build_session(self) -> StringSession:
|
||||
s = (self.session_string or "").strip()
|
||||
if not s:
|
||||
raise ValueError("Telegram 适配器 session_string 为空")
|
||||
|
||||
try:
|
||||
return StringSession(s)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 少数工具可能去掉了 version 前缀,这里做一次兼容
|
||||
if not s.startswith("1"):
|
||||
try:
|
||||
return StringSession("1" + s)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
return self._parse_legacy_session_string(s)
|
||||
except Exception as exc:
|
||||
raise ValueError("Telegram session_string 无效,请使用 Telethon StringSession 重新生成") from exc
|
||||
|
||||
def _get_client(self) -> TelegramClient:
|
||||
"""创建一个新的 TelegramClient 实例"""
|
||||
return TelegramClient(StringSession(self.session_string), self.api_id, self.api_hash, proxy=self.proxy)
|
||||
return TelegramClient(self._build_session(), self.api_id, self.api_hash, proxy=self.proxy)
|
||||
|
||||
def get_effective_root(self, sub_path: str | None) -> str:
|
||||
return ""
|
||||
@@ -198,6 +286,41 @@ class TelegramAdapter:
|
||||
async def mkdir(self, root: str, rel: str):
|
||||
raise NotImplementedError("Telegram 适配器不支持创建目录。")
|
||||
|
||||
async def get_thumbnail(self, root: str, rel: str, size: str = "medium"):
|
||||
try:
|
||||
message_id_str, _ = rel.split('_', 1)
|
||||
message_id = int(message_id_str)
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
client = self._get_client()
|
||||
try:
|
||||
await client.connect()
|
||||
message = await client.get_messages(self.chat_id, ids=message_id)
|
||||
if not message:
|
||||
return None
|
||||
|
||||
doc = message.document or message.video
|
||||
thumbs = None
|
||||
if doc and getattr(doc, "thumbs", None):
|
||||
thumbs = list(doc.thumbs or [])
|
||||
elif message.photo and getattr(message.photo, "sizes", None):
|
||||
thumbs = list(message.photo.sizes or [])
|
||||
|
||||
thumb = self._pick_photo_thumb(thumbs)
|
||||
if not thumb:
|
||||
return None
|
||||
|
||||
result = await client.download_media(message, bytes, thumb=thumb)
|
||||
if isinstance(result, (bytes, bytearray)):
|
||||
return bytes(result)
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
finally:
|
||||
if client.is_connected():
|
||||
await client.disconnect()
|
||||
|
||||
async def delete(self, root: str, rel: str):
|
||||
"""删除一个文件 (即一条消息)"""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user