From a2638f077cb1c1a200c0cb537f51e1bf27a3cb93 Mon Sep 17 00:00:00 2001 From: shiyu Date: Sat, 30 Aug 2025 11:16:35 +0800 Subject: [PATCH] feat: Add Telegram storage adapter implementation --- services/adapters/telegram.py | 245 ++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 services/adapters/telegram.py diff --git a/services/adapters/telegram.py b/services/adapters/telegram.py new file mode 100644 index 0000000..3ee3c24 --- /dev/null +++ b/services/adapters/telegram.py @@ -0,0 +1,245 @@ +from __future__ import annotations +from typing import List, Dict, Tuple, AsyncIterator +from models import StorageAdapter +from telethon import TelegramClient +from telethon.sessions import StringSession +import socks + +# 适配器类型标识 +ADAPTER_TYPE = "Telegram" + +# 适配器配置项定义 +CONFIG_SCHEMA = [ + {"key": "api_id", "label": "API ID", "type": "string", "required": True, "help_text": "从 my.telegram.org 获取"}, + {"key": "api_hash", "label": "API Hash", "type": "password", "required": True, "help_text": "从 my.telegram.org 获取"}, + {"key": "session_string", "label": "Session String", "type": "password", "required": True, "help_text": "通过 generate_session.py 生成"}, + {"key": "chat_id", "label": "Chat ID", "type": "string", "required": True, "placeholder": "频道/群组的ID或用户名, 例如: -100123456789 或 'channel_username'"}, + {"key": "proxy_protocol", "label": "代理协议", "type": "string", "required": False, "placeholder": "例如: socks5, http"}, + {"key": "proxy_host", "label": "代理主机", "type": "string", "required": False, "placeholder": "例如: 127.0.0.1"}, + {"key": "proxy_port", "label": "代理端口", "type": "number", "required": False, "placeholder": "例如: 1080"}, +] + +class TelegramAdapter: + """Telegram 存储适配器 (只读, 使用用户 Session)""" + + def __init__(self, record: StorageAdapter): + self.record = record + cfg = record.config + self.api_id = int(cfg.get("api_id")) + self.api_hash = cfg.get("api_hash") + self.session_string = cfg.get("session_string") + self.chat_id_str = cfg.get("chat_id") + + # 代理设置 + self.proxy_protocol = cfg.get("proxy_protocol") + self.proxy_host = cfg.get("proxy_host") + self.proxy_port = cfg.get("proxy_port") + + self.proxy = None + if self.proxy_protocol and self.proxy_host and self.proxy_port: + proto_map = { + "socks5": socks.SOCKS5, + "http": socks.HTTP, + } + proxy_type = proto_map.get(self.proxy_protocol.lower()) + if proxy_type: + self.proxy = (proxy_type, self.proxy_host, int(self.proxy_port)) + + try: + self.chat_id = int(self.chat_id_str) + except (ValueError, TypeError): + self.chat_id = self.chat_id_str + + if not all([self.api_id, self.api_hash, self.session_string, self.chat_id]): + raise ValueError("Telegram 适配器需要 api_id, api_hash, session_string 和 chat_id") + + def _get_client(self) -> TelegramClient: + """创建一个新的 TelegramClient 实例""" + return TelegramClient(StringSession(self.session_string), self.api_id, self.api_hash, proxy=self.proxy) + + def get_effective_root(self, sub_path: str | None) -> str: + return "" + + async def list_dir(self, root: str, rel: str, page_num: int = 1, page_size: int = 50) -> Tuple[List[Dict], int]: + if rel: + return [], 0 + + client = self._get_client() + entries = [] + try: + await client.connect() + messages = await client.get_messages(self.chat_id, limit=50) + for message in messages: + print(message) + if message and (message.document or message.video): + media = message.document or message.video + filename = None + if hasattr(media, 'attributes'): + for attr in media.attributes: + if hasattr(attr, 'file_name') and attr.file_name: + filename = attr.file_name + break + + if not filename: + if message.text and '.' in message.text: + if len(message.text) < 256 and '\n' not in message.text: + filename = message.text + + if not filename: + filename = "Unknown" + + entries.append({ + "name": f"{message.id}_{filename}", + "is_dir": False, + "size": media.size, + "mtime": int(message.date.timestamp()), + "type": "file", + }) + finally: + if client.is_connected(): + await client.disconnect() + + return entries, len(entries) + + async def read_file(self, root: str, rel: str) -> bytes: + try: + message_id_str, _ = rel.split('_', 1) + message_id = int(message_id_str) + except (ValueError, IndexError): + raise FileNotFoundError(f"无效的文件路径格式: {rel}") + + client = self._get_client() + try: + await client.connect() + message = await client.get_messages(self.chat_id, ids=message_id) + if not message or not (message.document or message.video): + raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件") + + file_bytes = await client.download_media(message, file=bytes) + return file_bytes + finally: + if client.is_connected(): + await client.disconnect() + + async def write_file(self, root: str, rel: str, data: bytes): + raise NotImplementedError("Telegram 适配器是只读的,不支持写入文件。") + + async def write_file_stream(self, root: str, rel: str, data_iter: AsyncIterator[bytes]): + raise NotImplementedError("Telegram 适配器是只读的,不支持流式写入文件。") + + async def mkdir(self, root: str, rel: str): + raise NotImplementedError("Telegram 适配器是只读的,不支持创建目录。") + + async def delete(self, root: str, rel: str): + raise NotImplementedError("Telegram 适配器是只读的,不支持删除。") + + async def move(self, root: str, src_rel: str, dst_rel: str): + raise NotImplementedError("Telegram 适配器是只读的,不支持移动。") + + async def rename(self, root: str, src_rel: str, dst_rel: str): + raise NotImplementedError("Telegram 适配器是只读的,不支持重命名。") + + async def copy(self, root: str, src_rel: str, dst_rel: str, overwrite: bool = False): + raise NotImplementedError("Telegram 适配器是只读的,不支持复制。") + + async def stream_file(self, root: str, rel: str, range_header: str | None): + from fastapi.responses import StreamingResponse + from fastapi import HTTPException + + try: + message_id_str, _ = rel.split('_', 1) + message_id = int(message_id_str) + except (ValueError, IndexError): + raise HTTPException(status_code=400, detail=f"无效的文件路径格式: {rel}") + + client = self._get_client() + + try: + await client.connect() + message = await client.get_messages(self.chat_id, ids=message_id) + if not message or not (message.document or message.video): + raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件") + + media = message.document or message.video + file_size = media.size + + start = 0 + end = file_size - 1 + status = 200 + + headers = { + "Accept-Ranges": "bytes", + "Content-Type": media.mime_type or "application/octet-stream", + "Content-Length": str(file_size), + } + + if range_header: + try: + range_val = range_header.strip().partition("=")[2] + s, _, e = range_val.partition("-") + start = int(s) if s else 0 + end = int(e) if e else file_size - 1 + if start >= file_size or end >= file_size or start > end: + raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable") + status = 206 + headers["Content-Length"] = str(end - start + 1) + headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" + except ValueError: + raise HTTPException(status_code=400, detail="Invalid Range header") + + async def iterator(): + try: + limit = end - start + 1 + downloaded = 0 + + async for chunk in client.iter_download(media, offset=start): + if downloaded + len(chunk) > limit: + yield chunk[:limit - downloaded] + break + yield chunk + downloaded += len(chunk) + if downloaded >= limit: + break + finally: + if client.is_connected(): + await client.disconnect() + + return StreamingResponse(iterator(), status_code=status, headers=headers) + + except FileNotFoundError as e: + if client.is_connected(): + await client.disconnect() + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + if client.is_connected(): + await client.disconnect() + raise HTTPException(status_code=500, detail=f"Streaming failed: {str(e)}") + + async def stat_file(self, root: str, rel: str): + try: + message_id_str, filename = rel.split('_', 1) + message_id = int(message_id_str) + except (ValueError, IndexError): + raise FileNotFoundError(f"无效的文件路径格式: {rel}") + + client = self._get_client() + try: + await client.connect() + message = await client.get_messages(self.chat_id, ids=message_id) + if not message or not (message.document or message.video): + raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件") + + media = message.document or message.video + return { + "name": rel, + "is_dir": False, + "size": media.size, + "mtime": int(message.date.timestamp()), + "type": "file", + } + finally: + if client.is_connected(): + await client.disconnect() + +def ADAPTER_FACTORY(rec: StorageAdapter) -> TelegramAdapter: + return TelegramAdapter(rec) \ No newline at end of file