Compare commits

..

1 Commits

Author SHA1 Message Date
时雨
c157f1573b feat: add recent files backend APIs 2026-05-06 21:20:13 +08:00
5 changed files with 164 additions and 255 deletions

View File

@@ -1,12 +1,10 @@
from typing import List, Dict, Tuple, AsyncIterator, Optional
import asyncio
import base64
import io
import os
import struct
import time
from models import StorageAdapter
from telethon import TelegramClient, errors, utils
from telethon import TelegramClient, utils
from telethon.crypto import AuthKey
from telethon.sessions import StringSession
from telethon.tl import types
@@ -53,9 +51,6 @@ CONFIG_SCHEMA = [
class TelegramAdapter:
"""Telegram 存储适配器 (使用用户 Session)"""
native_video_thumbnail_only = True
_message_cache_ttl = 300
_message_cache_limit = 200
_download_chunk_size = 512 * 1024
def __init__(self, record: StorageAdapter):
self.record = record
@@ -88,12 +83,6 @@ class TelegramAdapter:
if not all([self.api_id, self.api_hash, self.session_string, self.chat_id]):
raise ValueError("Telegram 适配器需要 api_id, api_hash, session_string 和 chat_id")
self._client: TelegramClient | None = None
self._client_lock = asyncio.Lock()
self._download_lock = asyncio.Lock()
self._active_stream_message_id: int | None = None
self._message_cache: Dict[int, Tuple[float, object]] = {}
@staticmethod
def _parse_legacy_session_string(value: str) -> StringSession:
"""
@@ -195,80 +184,6 @@ class TelegramAdapter:
"""创建一个新的 TelegramClient 实例"""
return TelegramClient(self._build_session(), self.api_id, self.api_hash, proxy=self.proxy)
async def _get_connected_client(self) -> TelegramClient:
async with self._client_lock:
if self._client is None:
self._client = self._get_client()
if not self._client.is_connected():
await self._client.connect()
return self._client
async def _disconnect_shared_client(self):
if self._client and self._client.is_connected():
await self._client.disconnect()
def _clear_message_cache(self):
self._message_cache.clear()
async def _get_cached_message(self, message_id: int):
now = time.monotonic()
cached = self._message_cache.get(message_id)
if cached and cached[0] > now:
return cached[1]
client = await self._get_connected_client()
message = await client.get_messages(self.chat_id, ids=message_id)
if message:
if len(self._message_cache) >= self._message_cache_limit:
oldest_key = min(self._message_cache, key=lambda k: self._message_cache[k][0])
self._message_cache.pop(oldest_key, None)
self._message_cache[message_id] = (now + self._message_cache_ttl, message)
else:
self._message_cache.pop(message_id, None)
return message
@staticmethod
def _get_message_media(message):
return message.document or message.video or message.photo
@staticmethod
def _flood_wait_http_exception(exc: errors.FloodWaitError):
from fastapi import HTTPException
seconds = int(getattr(exc, "seconds", 0) or 0)
if seconds > 0:
return HTTPException(
status_code=429,
detail=f"Telegram 请求过于频繁,请等待 {seconds} 秒后重试",
headers={"Retry-After": str(seconds)},
)
return HTTPException(status_code=429, detail="Telegram 请求过于频繁,请稍后重试")
@staticmethod
def _get_message_file_size(message, media) -> int:
file_meta = message.file
size = file_meta.size if file_meta and file_meta.size is not None else None
if size is None:
if hasattr(media, "size") and media.size is not None:
size = media.size
elif message.photo and getattr(message.photo, "sizes", None):
photo_size = message.photo.sizes[-1]
size = getattr(photo_size, "size", 0) or 0
else:
size = 0
return int(size or 0)
@staticmethod
def _get_message_mime_type(message, media) -> str:
file_meta = message.file
if file_meta and getattr(file_meta, "mime_type", None):
return file_meta.mime_type
if hasattr(media, "mime_type") and media.mime_type:
return media.mime_type
if message.photo:
return "image/jpeg"
return "application/octet-stream"
@staticmethod
def _parse_message_id(rel: str) -> int:
try:
@@ -325,7 +240,7 @@ class TelegramAdapter:
"size": size,
"mtime": int(message.date.timestamp()),
"type": "file",
"has_thumbnail": False,
"has_thumbnail": self._message_has_thumbnail(message),
})
finally:
if client.is_connected():
@@ -359,67 +274,62 @@ class TelegramAdapter:
async def read_file(self, root: str, rel: str) -> bytes:
message_id = self._parse_message_id(rel)
client = await self._get_connected_client()
message = await self._get_cached_message(message_id)
if not message or not self._get_message_media(message):
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
client = self._get_client()
try:
async with self._download_lock:
file_bytes = await client.download_media(message, file=bytes)
return file_bytes
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
raise self._flood_wait_http_exception(exc)
await client.connect()
message = await client.get_messages(self.chat_id, ids=message_id)
if not message or not (message.document or message.video or message.photo):
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
file_bytes = await client.download_media(message, file=bytes)
return file_bytes
finally:
if client.is_connected():
await client.disconnect()
async def read_file_range(self, root: str, rel: str, start: int, end: Optional[int] = None) -> bytes:
from fastapi import HTTPException
message_id = self._parse_message_id(rel)
client = await self._get_connected_client()
message = await self._get_cached_message(message_id)
if not message:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
media = self._get_message_media(message)
if not media:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
file_size = self._get_message_file_size(message, media)
if file_size > 0:
if start >= file_size:
raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable")
if end is None or end >= file_size:
end = file_size - 1
elif end is None:
end = start
if end < start:
raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable")
limit = end - start + 1
data = bytearray()
client = self._get_client()
try:
async with self._download_lock:
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size or None,
):
if not chunk:
continue
need = limit - len(data)
if need <= 0:
break
data.extend(chunk[:need])
if len(data) >= limit:
break
await client.connect()
message = await client.get_messages(self.chat_id, ids=message_id)
if not message:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
media = message.document or message.video or message.photo
if not media:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
file_meta = message.file
file_size = file_meta.size if file_meta and file_meta.size is not None else getattr(media, "size", 0) or 0
if file_size > 0:
if start >= file_size:
raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable")
if end is None or end >= file_size:
end = file_size - 1
elif end is None:
end = start
if end < start:
raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable")
limit = end - start + 1
data = bytearray()
async for chunk in client.iter_download(media, offset=start):
if not chunk:
continue
need = limit - len(data)
if need <= 0:
break
data.extend(chunk[:need])
if len(data) >= limit:
break
return bytes(data)
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
raise self._flood_wait_http_exception(exc)
finally:
if client.is_connected():
await client.disconnect()
async def write_file(self, root: str, rel: str, data: bytes):
"""将字节数据作为文件上传"""
@@ -439,7 +349,6 @@ class TelegramAdapter:
stored_name = file_meta.name
if getattr(message, "id", None) is not None:
actual_rel = f"{message.id}_{stored_name}"
self._clear_message_cache()
return {"rel": actual_rel, "size": len(data)}
finally:
if client.is_connected():
@@ -469,7 +378,6 @@ class TelegramAdapter:
stored_name = file_meta.name
if getattr(message, "id", None) is not None:
actual_rel = f"{message.id}_{stored_name}"
self._clear_message_cache()
if file_meta and getattr(file_meta, "size", None):
size = int(file_meta.size)
return {"rel": actual_rel, "size": size}
@@ -505,7 +413,6 @@ class TelegramAdapter:
stored_name = file_meta.name
if getattr(message, "id", None) is not None:
actual_rel = f"{message.id}_{stored_name}"
self._clear_message_cache()
finally:
if os.path.exists(temp_path):
@@ -518,7 +425,38 @@ class TelegramAdapter:
raise NotImplementedError("Telegram 适配器不支持创建目录。")
async def get_thumbnail(self, root: str, rel: str, size: str = "medium"):
return None
try:
message_id_str, _ = rel.split('_', 1)
message_id = int(message_id_str)
except (ValueError, IndexError):
return None
client = self._get_client()
try:
await client.connect()
message = await client.get_messages(self.chat_id, ids=message_id)
if not message:
return None
thumb = self._pick_photo_thumb(self._get_message_thumbs(message))
if not thumb:
return None
embedded = getattr(thumb, "bytes", None)
if embedded and isinstance(thumb, types.PhotoCachedSize):
return bytes(embedded)
if embedded and isinstance(thumb, types.PhotoStrippedSize):
return utils.stripped_photo_to_jpg(bytes(embedded))
result = await client.download_media(message, bytes, thumb=thumb)
if isinstance(result, (bytes, bytearray)):
return bytes(result)
return None
except Exception:
return None
finally:
if client.is_connected():
await client.disconnect()
async def delete(self, root: str, rel: str):
"""删除一个文件 (即一条消息)"""
@@ -534,12 +472,9 @@ class TelegramAdapter:
result = await client.delete_messages(self.chat_id, [message_id])
if not result or not result[0].pts:
raise FileNotFoundError(f"{self.chat_id} 中删除消息 {message_id} 失败,可能消息不存在或无权限")
self._message_cache.pop(message_id, None)
finally:
if client.is_connected():
await client.disconnect()
if self._client is client:
self._client = None
async def move(self, root: str, src_rel: str, dst_rel: str):
raise NotImplementedError("Telegram 适配器不支持移动。")
@@ -559,17 +494,38 @@ class TelegramAdapter:
except FileNotFoundError:
raise HTTPException(status_code=400, detail=f"无效的文件路径格式: {rel}")
client = self._get_client()
try:
client = await self._get_connected_client()
message = await self._get_cached_message(message_id)
await client.connect()
message = await client.get_messages(self.chat_id, ids=message_id)
if not message:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
media = self._get_message_media(message)
media = message.document or message.video or message.photo
if not media:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
file_size = self._get_message_file_size(message, media)
mime_type = self._get_message_mime_type(message, media)
file_meta = message.file
file_size = file_meta.size if file_meta and file_meta.size is not None else None
if file_size is None:
if hasattr(media, "size") and media.size is not None:
file_size = media.size
elif message.photo and getattr(message.photo, "sizes", None):
photo_size = message.photo.sizes[-1]
file_size = getattr(photo_size, "size", 0) or 0
else:
file_size = 0
mime_type = None
if file_meta and getattr(file_meta, "mime_type", None):
mime_type = file_meta.mime_type
if not mime_type:
if hasattr(media, "mime_type") and media.mime_type:
mime_type = media.mime_type
elif message.photo:
mime_type = "image/jpeg"
else:
mime_type = "application/octet-stream"
start = 0
end = file_size - 1
@@ -582,6 +538,8 @@ class TelegramAdapter:
if file_size <= 0:
headers["Content-Length"] = "0"
if client.is_connected():
await client.disconnect()
return StreamingResponse(iter(()), status_code=status, headers=headers)
if range_header:
@@ -598,70 +556,37 @@ class TelegramAdapter:
raise HTTPException(status_code=400, detail="Invalid Range header")
headers["Content-Length"] = str(end - start + 1)
self._active_stream_message_id = message_id
async def iterator():
downloaded = 0
try:
limit = end - start + 1
if self._active_stream_message_id != message_id:
return
async with self._download_lock:
async for chunk in client.iter_download(
media,
offset=start,
request_size=self._download_chunk_size,
chunk_size=self._download_chunk_size,
file_size=file_size,
):
if self._active_stream_message_id != message_id:
return
if not chunk:
continue
remaining = limit - downloaded
if remaining <= 0:
break
data = chunk[:remaining]
downloaded += len(data)
yield data
if downloaded >= limit:
break
except errors.FloodWaitError as exc:
await self._disconnect_shared_client()
if downloaded == 0:
raise self._flood_wait_http_exception(exc)
seconds = int(getattr(exc, "seconds", 0) or 0)
print(f"Telegram streaming stopped by FloodWait after partial response, wait={seconds}s")
return
except Exception:
await self._disconnect_shared_client()
raise
agen = iterator()
try:
first_chunk = await agen.__anext__()
except StopAsyncIteration:
first_chunk = b""
except HTTPException:
raise
async def response_iterator():
try:
if first_chunk:
yield first_chunk
async for chunk in agen:
downloaded = 0
async for chunk in client.iter_download(media, offset=start):
if downloaded + len(chunk) > limit:
yield chunk[:limit - downloaded]
break
yield chunk
downloaded += len(chunk)
if downloaded >= limit:
break
finally:
await agen.aclose()
if client.is_connected():
await client.disconnect()
return StreamingResponse(response_iterator(), status_code=status, headers=headers)
return StreamingResponse(iterator(), status_code=status, headers=headers)
except HTTPException:
if client.is_connected():
await client.disconnect()
raise
except FileNotFoundError as e:
if client.is_connected():
await client.disconnect()
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
await self._disconnect_shared_client()
if client.is_connected():
await client.disconnect()
raise HTTPException(status_code=500, detail=f"Streaming failed: {str(e)}")
async def stat_file(self, root: str, rel: str):
@@ -671,21 +596,36 @@ class TelegramAdapter:
except (ValueError, IndexError):
raise FileNotFoundError(f"无效的文件路径格式: {rel}")
message = await self._get_cached_message(message_id)
media = self._get_message_media(message) if message else None
if not message or not media:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
client = self._get_client()
try:
await client.connect()
message = await client.get_messages(self.chat_id, ids=message_id)
media = message.document or message.video or message.photo
if not message or not media:
raise FileNotFoundError(f"在频道 {self.chat_id} 中未找到消息ID为 {message_id} 的文件")
size = self._get_message_file_size(message, media)
file_meta = message.file
size = file_meta.size if file_meta and file_meta.size is not None else None
if size is None:
if hasattr(media, "size") and media.size is not None:
size = media.size
elif message.photo and getattr(message.photo, "sizes", None):
photo_size = message.photo.sizes[-1]
size = getattr(photo_size, "size", 0) or 0
else:
size = 0
return {
"name": rel,
"is_dir": False,
"size": size,
"mtime": int(message.date.timestamp()),
"type": "file",
"has_thumbnail": False,
}
return {
"name": rel,
"is_dir": False,
"size": size,
"mtime": int(message.date.timestamp()),
"type": "file",
"has_thumbnail": self._message_has_thumbnail(message),
}
finally:
if client.is_connected():
await client.disconnect()
def ADAPTER_FACTORY(rec: StorageAdapter) -> TelegramAdapter:
return TelegramAdapter(rec)

View File

@@ -17,7 +17,7 @@ dependencies = [
"pymilvus[milvus-lite]>=2.6.5",
"pysocks>=1.7.1",
"python-dotenv>=1.2.2",
"python-multipart>=0.0.27",
"python-multipart>=0.0.26",
"qdrant-client>=1.16.2",
"setuptools<82",
"telethon>=1.42.0",

8
uv.lock generated
View File

@@ -475,7 +475,7 @@ requires-dist = [
{ name = "pymilvus", extras = ["milvus-lite"], specifier = ">=2.6.5" },
{ name = "pysocks", specifier = ">=1.7.1" },
{ name = "python-dotenv", specifier = ">=1.2.2" },
{ name = "python-multipart", specifier = ">=0.0.27" },
{ name = "python-multipart", specifier = ">=0.0.26" },
{ name = "qdrant-client", specifier = ">=1.16.2" },
{ name = "setuptools", specifier = "<82" },
{ name = "telethon", specifier = ">=1.42.0" },
@@ -1179,11 +1179,11 @@ wheels = [
[[package]]
name = "python-multipart"
version = "0.0.27"
version = "0.0.26"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/69/9b/f23807317a113dc36e74e75eb265a02dd1a4d9082abc3c1064acd22997c4/python_multipart-0.0.27.tar.gz", hash = "sha256:9870a6a8c5a20a5bf4f07c017bd1489006ff8836cff097b6933355ee2b49b602", size = 44043, upload-time = "2026-04-27T10:51:26.649Z" }
sdist = { url = "https://files.pythonhosted.org/packages/88/71/b145a380824a960ebd60e1014256dbb7d2253f2316ff2d73dfd8928ec2c3/python_multipart-0.0.26.tar.gz", hash = "sha256:08fadc45918cd615e26846437f50c5d6d23304da32c341f289a617127b081f17", size = 43501, upload-time = "2026-04-10T14:09:59.473Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/99/78/4126abcbdbd3c559d43e0db7f7b9173fc6befe45d39a2856cc0b8ec2a5a6/python_multipart-0.0.27-py3-none-any.whl", hash = "sha256:6fccfad17a27334bd0193681b369f476eda3409f17381a2d65aa7df3f7275645", size = 29254, upload-time = "2026-04-27T10:51:24.997Z" },
{ url = "https://files.pythonhosted.org/packages/9a/22/f1925cdda983ab66fc8ec6ec8014b959262747e58bdca26a4e3d1da29d56/python_multipart-0.0.26-py3-none-any.whl", hash = "sha256:c0b169f8c4484c13b0dcf2ef0ec3a4adb255c4b7d18d8e420477d2b1dd03f185", size = 28847, upload-time = "2026-04-10T14:09:58.131Z" },
]
[[package]]

View File

@@ -24,16 +24,6 @@ function getPluginStylePaths(plugin: PluginItem): string[] {
return styles.filter((s) => typeof s === 'string' && s.trim().length > 0);
}
function unloadPluginFrame(iframe: HTMLIFrameElement | null) {
if (!iframe) return;
try {
iframe.contentWindow?.postMessage({ type: 'foxel-plugin:unload' }, window.location.origin);
} catch {
void 0;
}
iframe.src = 'about:blank';
}
/**
* 插件宿主组件 - 文件打开模式
* 使用 iframe 隔离渲染与样式,避免插件污染宿主 DOM/CSS。
@@ -76,10 +66,7 @@ export const PluginAppHost: React.FC<PluginAppHostProps> = ({
};
window.addEventListener('message', onMessage);
return () => {
window.removeEventListener('message', onMessage);
unloadPluginFrame(iframeRef.current);
};
return () => window.removeEventListener('message', onMessage);
}, [plugin.key]);
return (
@@ -131,10 +118,7 @@ export const PluginAppOpenHost: React.FC<PluginAppOpenHostProps> = ({ plugin, on
};
window.addEventListener('message', onMessage);
return () => {
window.removeEventListener('message', onMessage);
unloadPluginFrame(iframeRef.current);
};
return () => window.removeEventListener('message', onMessage);
}, [plugin.key]);
return (

View File

@@ -364,27 +364,12 @@ async function main() {
await mountError();
const runCleanup = () => {
window.addEventListener('beforeunload', () => {
try {
cleanup?.();
} catch {
void 0;
}
cleanup = null;
};
window.addEventListener('message', (ev) => {
if (ev.origin !== window.location.origin) return;
if (ev.source !== window.parent) return;
const data = ev.data as any;
if (!data || typeof data !== 'object') return;
if (data.type !== 'foxel-plugin:unload') return;
runCleanup();
root.innerHTML = '';
});
window.addEventListener('beforeunload', () => {
runCleanup();
});
}