Compare commits

...

45 Commits

Author SHA1 Message Date
jxxghp
73eba90f2f 更新 version.py 2025-06-24 10:34:42 +08:00
jxxghp
62e74f6fd1 fix 2025-06-24 08:19:10 +08:00
jxxghp
4375e48840 Merge pull request #4476 from Miralia/v2 2025-06-23 20:52:15 +08:00
Miralia
a1d6e94e90 feat(meta): 新增 WEB 平台来源识别并支持更多音视频格式。 2025-06-23 20:36:58 +08:00
jxxghp
1f44e13ff0 add reload logging 2025-06-23 10:14:22 +08:00
jxxghp
d2992f9ced fix plugin load 2025-06-23 09:31:56 +08:00
jxxghp
950337bccc fix plugin load 2025-06-23 08:19:22 +08:00
jxxghp
757c3be359 更新 version.py 2025-06-22 10:08:17 +08:00
jxxghp
269ab9adfc fix:删除消息能力 2025-06-22 10:04:21 +08:00
jxxghp
bd241a5164 feat:删除消息能力 2025-06-22 09:37:01 +08:00
jxxghp
3d92b57f24 fix 2025-06-22 09:04:03 +08:00
jxxghp
70d8cb3697 fix #4461 2025-06-22 08:51:29 +08:00
jxxghp
9e4ec5841c fix #4470 2025-06-22 08:47:43 +08:00
jxxghp
682f4fe608 fix message cache 2025-06-20 17:33:08 +08:00
jxxghp
ce8a077e07 优化按钮回调数据,简化为仅使用索引值 2025-06-19 15:54:07 +08:00
jxxghp
d5f63bcdb3 remove Commands DEV flag 2025-06-18 13:33:37 +08:00
jxxghp
5c3756fd1b v2.5.7-1 2025-06-17 20:02:45 +08:00
jxxghp
99939e1a3d fix 2025-06-17 19:42:16 +08:00
jxxghp
56742ace11 fix:带UA下载图片 2025-06-17 19:27:53 +08:00
jxxghp
742cb7a8da 更新 version.py 2025-06-17 18:56:47 +08:00
jxxghp
98327d1750 fix download message 2025-06-17 15:35:38 +08:00
jxxghp
b944306302 v2.5.7 2025-06-16 22:15:54 +08:00
jxxghp
02ab1d4111 fix settings 2025-06-16 21:29:57 +08:00
jxxghp
28552fb0ce 更新 transmission.py 2025-06-16 19:38:19 +08:00
jxxghp
bf52fcb2ec fix message 2025-06-16 11:45:26 +08:00
jxxghp
bab1f73480 修复:slack消息交互 2025-06-16 09:49:01 +08:00
jxxghp
c06001d921 feat:内建重启前主动备份插件 2025-06-16 08:57:21 +08:00
jxxghp
0fa49bb9c6 fix 消息定向发送时不检查消息类型匹配 2025-06-16 08:06:47 +08:00
jxxghp
bf23fe6ce2 更新 subscribe.py 2025-06-15 23:31:13 +08:00
jxxghp
7c6137b742 更新 download.py 2025-06-15 23:30:01 +08:00
jxxghp
3823a7c9b6 fix:消息发送范围 2025-06-15 23:18:07 +08:00
jxxghp
a944975be2 fix:交互消息立即发送 2025-06-15 23:06:25 +08:00
jxxghp
6da65d3b03 add MessageAction 2025-06-15 21:25:14 +08:00
jxxghp
0d938f2dca refactor:减少Alipan及115的Api调用 2025-06-15 20:41:32 +08:00
jxxghp
4fa9bb3c1f feat: 插件消息的事件回调 [PLUGIN]插件ID|内容 2025-06-15 19:47:04 +08:00
jxxghp
2f5b22a81f fix 2025-06-15 19:41:24 +08:00
jxxghp
fcd5ca3fda feat:Slack支持编辑消息 2025-06-15 19:28:05 +08:00
jxxghp
c18247f3b1 增强消息处理功能,支持编辑消息 2025-06-15 19:18:18 +08:00
jxxghp
f8fbfdbba7 优化消息处理逻辑 2025-06-15 18:40:36 +08:00
jxxghp
21addfb947 更新 message.py 2025-06-15 16:56:48 +08:00
jxxghp
8672bd12c4 fix bug 2025-06-15 16:31:09 +08:00
jxxghp
be8054e81e fix bug 2025-06-15 15:57:58 +08:00
jxxghp
82f46c6010 feat:回调消息路由给插件 2025-06-15 15:56:38 +08:00
jxxghp
95a827e8a2 feat:Telegram、Slack 支持按钮 2025-06-15 15:34:06 +08:00
jxxghp
c534e3dcb8 feat:未安装的插件,不加载模块 2025-06-15 09:55:20 +08:00
44 changed files with 1768 additions and 747 deletions

View File

@@ -22,7 +22,7 @@ from app.helper.service import ServiceConfigHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
WebhookEventInfo, TmdbEpisode, MediaPerson, FileItem, TransferDirectoryConf
from app.schemas.types import TorrentStatus, MediaType, MediaImageType, EventType
from app.schemas.types import TorrentStatus, MediaType, MediaImageType, EventType, MessageChannel
from app.utils.object import ObjectUtils
@@ -612,7 +612,8 @@ class ChainBase(metaclass=ABCMeta):
# 发送消息事件
self.eventmanager.send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype})
# 按原消息发送
self.messagequeue.send_message("post_message", message=message)
self.messagequeue.send_message("post_message", message=message,
immediately=True if message.userid else False)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -624,7 +625,8 @@ class ChainBase(metaclass=ABCMeta):
note_list = [media.to_dict() for media in medias]
self.messagehelper.put(message, role="user", note=note_list, title=message.title)
self.messageoper.add(**message.dict(), note=note_list)
return self.messagequeue.send_message("post_medias_message", message=message, medias=medias)
return self.messagequeue.send_message("post_medias_message", message=message, medias=medias,
immediately=True if message.userid else False)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -636,7 +638,21 @@ class ChainBase(metaclass=ABCMeta):
note_list = [torrent.torrent_info.to_dict() for torrent in torrents]
self.messagehelper.put(message, role="user", note=note_list, title=message.title)
self.messageoper.add(**message.dict(), note=note_list)
return self.messagequeue.send_message("post_torrents_message", message=message, torrents=torrents)
return self.messagequeue.send_message("post_torrents_message", message=message, torrents=torrents,
immediately=True if message.userid else False)
def delete_message(self, channel: MessageChannel, source: str,
message_id: Union[str, int], chat_id: Optional[Union[str, int]] = None) -> bool:
"""
删除消息
:param channel: 消息渠道
:param source: 消息源(指定特定的消息模块)
:param message_id: 消息ID
:param chat_id: 聊天ID如群组ID
:return: 删除是否成功
"""
return self.run_module("delete_message", channel=channel, source=source,
message_id=message_id, chat_id=chat_id)
def metadata_img(self, mediainfo: MediaInfo,
season: Optional[int] = None, episode: Optional[int] = None) -> Optional[dict]:

View File

@@ -324,10 +324,12 @@ class DownloadChain(ChainBase):
self.post_message(
Notification(
channel=channel,
source=source if channel else None,
mtype=NotificationType.Download,
ctype=ContentType.DownloadAdded,
image=_media.get_message_image(),
link=settings.MP_DOMAIN('/#/downloading'),
userid=userid,
username=username
),
meta=_meta,

View File

@@ -427,7 +427,7 @@ class MediaChain(ChainBase):
"""
try:
logger.info(f"正在下载图片:{_url} ...")
r = RequestUtils(proxies=settings.PROXY).get_res(url=_url)
r = RequestUtils(proxies=settings.PROXY, ua=settings.USER_AGENT).get_res(url=_url)
if r:
return r.content
else:
@@ -506,7 +506,9 @@ class MediaChain(ChainBase):
# 根据图片类型检查开关
if 'poster' in image_name.lower():
should_scrape = scraping_switchs.get('movie_poster', True)
elif 'backdrop' in image_name.lower() or 'fanart' in image_name.lower():
elif ('backdrop' in image_name.lower()
or 'fanart' in image_name.lower()
or 'background' in image_name.lower()):
should_scrape = scraping_switchs.get('movie_backdrop', True)
elif 'logo' in image_name.lower():
should_scrape = scraping_switchs.get('movie_logo', True)
@@ -700,7 +702,9 @@ class MediaChain(ChainBase):
# 根据电视剧图片类型检查开关
if 'poster' in image_name.lower():
should_scrape = scraping_switchs.get('tv_poster', True)
elif 'backdrop' in image_name.lower() or 'fanart' in image_name.lower():
elif ('backdrop' in image_name.lower()
or 'fanart' in image_name.lower()
or 'background' in image_name.lower()):
should_scrape = scraping_switchs.get('tv_backdrop', True)
elif 'banner' in image_name.lower():
should_scrape = scraping_switchs.get('tv_banner', True)

View File

@@ -1,6 +1,5 @@
import gc
import re
from typing import Any, Optional, Dict, Union
from typing import Any, Optional, Dict, Union, List
from app.chain import ChainBase
from app.chain.download import DownloadChain
@@ -14,6 +13,7 @@ from app.db.user_oper import UserOper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import Notification, NotExistMediaInfo, CommingMessage
from app.schemas.message import ChannelCapabilityManager
from app.schemas.types import EventType, MessageChannel, MediaType
from app.utils.string import StringUtils
@@ -114,38 +114,55 @@ class MessageChain(ChainBase):
if not text:
logger.debug(f'未识别到消息内容::{body}{form}{args}')
return
# 获取原消息ID信息
original_message_id = info.message_id
original_chat_id = info.chat_id
# 处理消息
self.handle_message(channel=channel, source=source, userid=userid, username=username, text=text)
self.handle_message(channel=channel, source=source, userid=userid, username=username, text=text,
original_message_id=original_message_id, original_chat_id=original_chat_id)
def handle_message(self, channel: MessageChannel, source: str,
userid: Union[str, int], username: str, text: str) -> None:
userid: Union[str, int], username: str, text: str,
original_message_id: Optional[Union[str, int]] = None,
original_chat_id: Optional[str] = None) -> None:
"""
识别消息内容,执行操作
"""
# 申明全局变量
global _current_page, _current_meta, _current_media
# 加载缓存
user_cache: Dict[str, dict] = self.load_cache(self._cache_file) or {}
# 处理消息
logger.info(f'收到用户消息内容,用户:{userid},内容:{text}')
# 加载缓存
user_cache: Dict[str, dict] = self.load_cache(self._cache_file) or {}
# 保存消息
self.messagehelper.put(
CommingMessage(
userid=userid,
username=username,
if not text.startswith('CALLBACK:'):
self.messagehelper.put(
CommingMessage(
userid=userid,
username=username,
channel=channel,
source=source,
text=text
), role="user")
self.messageoper.add(
channel=channel,
source=source,
text=text
), role="user")
self.messageoper.add(
channel=channel,
source=source,
userid=username or userid,
text=text,
action=0
)
userid=username or userid,
text=text,
action=0
)
# 处理消息
if text.startswith('/'):
if text.startswith('CALLBACK:'):
# 处理按钮回调(适配支持回调的渠道)
if ChannelCapabilityManager.supports_callbacks(channel):
self._handle_callback(text=text, channel=channel, source=source,
userid=userid, username=username,
original_message_id=original_message_id, original_chat_id=original_chat_id)
else:
logger.warning(f"渠道 {channel.value} 不支持回调,但收到了回调消息:{text}")
elif text.startswith('/'):
# 执行命令
self.eventmanager.send_event(
EventType.CommandExcute,
@@ -254,6 +271,18 @@ class MessageChain(ChainBase):
"type": "Torrent",
"items": contexts
}
_current_page = 0
# 保存缓存
self.save_cache(user_cache, self._cache_file)
# 删除原消息
if (original_message_id and original_chat_id and
ChannelCapabilityManager.supports_deletion(channel)):
self.delete_message(
channel=channel,
source=source,
message_id=original_message_id,
chat_id=original_chat_id
)
# 发送种子数据
logger.info(f"搜索到 {len(contexts)} 条数据,开始发送选择消息 ...")
self.__post_torrents_message(channel=channel,
@@ -342,7 +371,9 @@ class MessageChain(ChainBase):
title=_current_media.title,
items=cache_list[start:end],
userid=userid,
total=len(cache_list))
total=len(cache_list),
original_message_id=original_message_id,
original_chat_id=original_chat_id)
else:
# 发送媒体数据
self.__post_medias_message(channel=channel,
@@ -350,7 +381,9 @@ class MessageChain(ChainBase):
title=_current_meta.name,
items=cache_list[start:end],
userid=userid,
total=len(cache_list))
total=len(cache_list),
original_message_id=original_message_id,
original_chat_id=original_chat_id)
elif text.lower() == "n":
# 下一页
@@ -380,13 +413,21 @@ class MessageChain(ChainBase):
self.__post_torrents_message(channel=channel,
source=source,
title=_current_media.title,
items=cache_list, userid=userid, total=total)
items=cache_list,
userid=userid,
total=total,
original_message_id=original_message_id,
original_chat_id=original_chat_id)
else:
# 发送媒体数据
self.__post_medias_message(channel=channel,
source=source,
title=_current_meta.name,
items=cache_list, userid=userid, total=total)
items=cache_list,
userid=userid,
total=total,
original_message_id=original_message_id,
original_chat_id=original_chat_id)
else:
# 搜索或订阅
@@ -435,10 +476,12 @@ class MessageChain(ChainBase):
logger.info(f"搜索到 {len(medias)} 条相关媒体信息")
# 记录当前状态
_current_meta = meta
# 保存缓存
user_cache[userid] = {
'type': action,
'items': medias
}
self.save_cache(user_cache, self._cache_file)
_current_page = 0
_current_media = None
# 发送媒体列表
@@ -459,14 +502,54 @@ class MessageChain(ChainBase):
}
)
# 保存缓存
self.save_cache(user_cache, self._cache_file)
def _handle_callback(self, text: str, channel: MessageChannel, source: str,
userid: Union[str, int], username: str,
original_message_id: Optional[Union[str, int]] = None,
original_chat_id: Optional[str] = None) -> None:
"""
处理按钮回调
"""
# 清理内存
user_cache.clear()
del user_cache
global _current_media
gc.collect()
# 提取回调数据
callback_data = text[9:] # 去掉 "CALLBACK:" 前缀
logger.info(f"处理按钮回调:{callback_data}")
# 插件消息的事件回调 [PLUGIN]插件ID|内容
if callback_data.startswith('[PLUGIN]'):
# 提取插件ID和内容
plugin_id, content = callback_data.split("|", 1)
# 广播给插件处理
self.eventmanager.send_event(
EventType.MessageAction,
{
"plugin_id": plugin_id.replace("[PLUGIN]", ""),
"text": content,
"userid": userid,
"channel": channel,
"source": source,
"original_message_id": original_message_id,
"original_chat_id": original_chat_id
}
)
return
# 解析系统回调数据
try:
page_text = callback_data.split("_", 1)[1]
self.handle_message(channel=channel, source=source, userid=userid, username=username,
text=page_text,
original_message_id=original_message_id, original_chat_id=original_chat_id)
except IndexError:
logger.error(f"回调数据格式错误:{callback_data}")
self.post_message(Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="回调数据格式错误,请检查!"
))
def __auto_download(self, channel: MessageChannel, source: str, cache_list: list[Context],
userid: Union[str, int], username: str,
@@ -521,35 +604,185 @@ class MessageChain(ChainBase):
note=note)
def __post_medias_message(self, channel: MessageChannel, source: str,
title: str, items: list, userid: str, total: int):
title: str, items: list, userid: str, total: int,
original_message_id: Optional[Union[str, int]] = None,
original_chat_id: Optional[str] = None):
"""
发送媒体列表消息
"""
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息请回复对应数字选择p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关信息,请回复对应数字选择"
self.post_medias_message(Notification(
channel=channel,
source=source,
title=title,
userid=userid
), medias=items)
# 检查渠道是否支持按钮
supports_buttons = ChannelCapabilityManager.supports_buttons(channel)
def __post_torrents_message(self, channel: MessageChannel, source: str,
title: str, items: list,
userid: str, total: int):
"""
发送种子列表消息
"""
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择 p: 上一页 n: 下一页)"
if supports_buttons:
# 支持按钮的渠道
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息,请选择操作"
else:
title = f"{title}】共找到{total}条相关信息,请选择操作"
buttons = self._create_media_buttons(channel=channel, items=items, total=total)
else:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择)"
self.post_torrents_message(Notification(
# 不支持按钮的渠道,使用文本提示
if total > self._page_size:
title = f"{title}】共找到{total}条相关信息请回复对应数字选择p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关信息,请回复对应数字选择"
buttons = None
notification = Notification(
channel=channel,
source=source,
title=title,
userid=userid,
link=settings.MP_DOMAIN('#/resource')
), torrents=items)
buttons=buttons,
original_message_id=original_message_id,
original_chat_id=original_chat_id
)
self.post_medias_message(notification, medias=items)
def _create_media_buttons(self, channel: MessageChannel, items: list, total: int) -> List[List[Dict]]:
"""
创建媒体选择按钮
"""
global _current_page
buttons = []
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
# 为每个媒体项创建选择按钮
current_row = []
for i in range(len(items)):
media = items[i]
if max_per_row == 1:
# 每行一个按钮,使用完整文本
button_text = f"{i + 1}. {media.title_year}"
if len(button_text) > max_text_length:
button_text = button_text[:max_text_length - 3] + "..."
buttons.append([{
"text": button_text,
"callback_data": f"select_{i + 1}"
}])
else:
# 多按钮一行的情况,使用简化文本
button_text = f"{i + 1}"
current_row.append({
"text": button_text,
"callback_data": f"select_{i + 1}"
})
# 如果当前行已满或者是最后一个按钮,添加到按钮列表
if len(current_row) == max_per_row or i == len(items) - 1:
buttons.append(current_row)
current_row = []
# 添加翻页按钮
if total > self._page_size:
page_buttons = []
if _current_page > 0:
page_buttons.append({"text": "⬅️ 上一页", "callback_data": "page_p"})
if (_current_page + 1) * self._page_size < total:
page_buttons.append({"text": "下一页 ➡️", "callback_data": "page_n"})
if page_buttons:
buttons.append(page_buttons)
return buttons
def __post_torrents_message(self, channel: MessageChannel, source: str,
title: str, items: list, userid: str, total: int,
original_message_id: Optional[Union[str, int]] = None,
original_chat_id: Optional[str] = None):
"""
发送种子列表消息
"""
# 检查渠道是否支持按钮
supports_buttons = ChannelCapabilityManager.supports_buttons(channel)
if supports_buttons:
# 支持按钮的渠道
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源,请选择下载"
else:
title = f"{title}】共找到{total}条相关资源,请选择下载"
buttons = self._create_torrent_buttons(channel=channel, items=items, total=total)
else:
# 不支持按钮的渠道,使用文本提示
if total > self._page_size:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择 p: 上一页 n: 下一页)"
else:
title = f"{title}】共找到{total}条相关资源请回复对应数字下载0: 自动选择)"
buttons = None
notification = Notification(
channel=channel,
source=source,
title=title,
userid=userid,
link=settings.MP_DOMAIN('#/resource'),
buttons=buttons,
original_message_id=original_message_id,
original_chat_id=original_chat_id
)
self.post_torrents_message(notification, torrents=items)
def _create_torrent_buttons(self, channel: MessageChannel, items: list, total: int) -> List[List[Dict]]:
"""
创建种子下载按钮
"""
global _current_page
buttons = []
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
# 自动选择按钮
buttons.append([{"text": "🤖 自动选择下载", "callback_data": "download_0"}])
# 为每个种子项创建下载按钮
current_row = []
for i in range(len(items)):
context = items[i]
torrent = context.torrent_info
if max_per_row == 1:
# 每行一个按钮,使用完整文本
button_text = f"{i + 1}. {torrent.site_name} - {torrent.seeders}"
if len(button_text) > max_text_length:
button_text = button_text[:max_text_length - 3] + "..."
buttons.append([{
"text": button_text,
"callback_data": f"download_{i + 1}"
}])
else:
# 多按钮一行的情况,使用简化文本
button_text = f"{i + 1}"
current_row.append({
"text": button_text,
"callback_data": f"download_{i + 1}"
})
# 如果当前行已满或者是最后一个按钮,添加到按钮列表
if len(current_row) == max_per_row or i == len(items) - 1:
buttons.append(current_row)
current_row = []
# 添加翻页按钮
if total > self._page_size:
page_buttons = []
if _current_page > 0:
page_buttons.append({"text": "⬅️ 上一页", "callback_data": "page_p"})
if (_current_page + 1) * self._page_size < total:
page_buttons.append({"text": "下一页 ➡️", "callback_data": "page_n"})
if page_buttons:
buttons.append(page_buttons)
return buttons

View File

@@ -221,10 +221,13 @@ class SubscribeChain(ChainBase):
# 订阅成功按规则发送消息
self.post_message(
schemas.Notification(
channel=channel,
source=source,
mtype=NotificationType.Subscribe,
ctype=ContentType.SubscribeAdded,
image=mediainfo.get_message_image(),
link=link,
userid=userid,
username=username
),
meta=metainfo,
@@ -396,7 +399,6 @@ class SubscribeChain(ChainBase):
downloads, lefts = DownloadChain().batch_download(
contexts=matched_contexts,
no_exists=no_exists,
userid=subscribe.username,
username=subscribe.username,
save_path=subscribe.save_path,
downloader=subscribe.downloader,
@@ -786,7 +788,6 @@ class SubscribeChain(ChainBase):
logger.info(f'{mediainfo.title_year} 匹配完成,共匹配到{len(_match_context)}个资源')
downloads, lefts = DownloadChain().batch_download(contexts=_match_context,
no_exists=no_exists,
userid=subscribe.username,
username=subscribe.username,
save_path=subscribe.save_path,
downloader=subscribe.downloader,

View File

@@ -1,5 +1,6 @@
import json
import re
import shutil
from pathlib import Path
from typing import Union, Optional
@@ -42,11 +43,117 @@ class SystemChain(ChainBase):
"channel": channel.value,
"userid": userid
}, self._restart_file)
# 主动备份一次插件
self.backup_plugins()
# 设置停止标志,通知所有模块准备停止
global_vars.stop_system()
# 重启
SystemHelper.restart()
@staticmethod
def backup_plugins():
"""
备份插件到用户配置目录仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
try:
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not plugins_dir.exists():
logger.info("插件目录不存在,跳过备份")
return
# 确保备份目录存在
backup_dir.mkdir(parents=True, exist_ok=True)
# 需要排除的文件和目录
exclude_items = {"__init__.py", "__pycache__", ".DS_Store"}
# 遍历插件目录,备份除排除项外的所有内容
for item in plugins_dir.iterdir():
if item.name in exclude_items:
continue
target_path = backup_dir / item.name
# 如果是目录
if item.is_dir():
if target_path.exists():
continue
shutil.copytree(item, target_path)
logger.info(f"已备份插件目录: {item.name}")
# 如果是文件
elif item.is_file():
if target_path.exists():
continue
shutil.copy2(item, target_path)
logger.info(f"已备份插件文件: {item.name}")
logger.info(f"插件备份完成,备份位置: {backup_dir}")
except Exception as e:
logger.error(f"插件备份失败: {str(e)}")
@staticmethod
def restore_plugins():
"""
从备份恢复插件到app/plugins目录恢复完成后删除备份仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not backup_dir.exists():
logger.info("插件备份目录不存在,跳过恢复")
return
# 系统被重置才恢复插件
if SystemHelper().is_system_reset():
# 确保插件目录存在
plugins_dir.mkdir(parents=True, exist_ok=True)
# 遍历备份目录,恢复所有内容
restored_count = 0
for item in backup_dir.iterdir():
target_path = plugins_dir / item.name
try:
# 如果是目录,且目录内有内容
if item.is_dir() and any(item.iterdir()):
if target_path.exists():
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已恢复插件目录: {item.name}")
restored_count += 1
# 如果是文件
elif item.is_file():
shutil.copy2(item, target_path)
logger.info(f"已恢复插件文件: {item.name}")
restored_count += 1
except Exception as e:
logger.error(f"恢复插件 {item.name} 时发生错误: {str(e)}")
continue
logger.info(f"插件恢复完成,共恢复 {restored_count} 个项目")
# 删除备份目录
try:
shutil.rmtree(backup_dir)
logger.info(f"已删除插件备份目录: {backup_dir}")
except Exception as e:
logger.warning(f"删除备份目录失败: {str(e)}")
def __get_version_message(self) -> str:
"""
获取版本信息文本

View File

@@ -9,7 +9,6 @@ from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.system import SystemChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.event import Event as ManagerEvent, eventmanager, Event
from app.core.plugin import PluginManager
from app.helper.message import MessageHelper
@@ -162,10 +161,6 @@ class Command(metaclass=Singleton):
"""
初始化菜单命令
"""
if settings.DEV:
logger.debug("Development mode active. Skipping command initialization.")
return
# 使用线程池提交后台任务,避免引起阻塞
ThreadHelper().submit(self.__init_commands_background, pid)

View File

@@ -124,6 +124,8 @@ class ConfigModel(BaseModel):
ALIPAN_APP_ID: str = "ac1bf04dc9fd4d9aaabb65b4a668d403"
# 元数据识别缓存过期时间(小时)
META_CACHE_EXPIRE: int = 0
# 电视剧动漫的分类genre_ids
ANIME_GENREIDS: List[int] = Field(default=[16])
# 用户认证站点
AUTH_SITE: str = ""
# 重启自动升级

View File

@@ -10,6 +10,7 @@ from app.core.meta.releasegroup import ReleaseGroupsMatcher
from app.schemas.types import MediaType
from app.utils.string import StringUtils
from app.utils.tokens import Tokens
from app.core.meta.streamingplatform import StreamingPlatforms
class MetaVideo(MetaBase):
@@ -31,7 +32,7 @@ class MetaVideo(MetaBase):
_part_re = r"(^PART[0-9ABI]{0,2}$|^CD[0-9]{0,2}$|^DVD[0-9]{0,2}$|^DISK[0-9]{0,2}$|^DISC[0-9]{0,2}$)"
_roman_numerals = r"^(?=[MDCLXVI])M*(C[MD]|D?C{0,3})(X[CL]|L?X{0,3})(I[XV]|V?I{0,3})$"
_source_re = r"^BLURAY$|^HDTV$|^UHDTV$|^HDDVD$|^WEBRIP$|^DVDRIP$|^BDRIP$|^BLU$|^WEB$|^BD$|^HDRip$|^REMUX$|^UHD$"
_effect_re = r"^SDR$|^HDR\d*$|^DOLBY$|^DOVI$|^DV$|^3D$|^REPACK$|^HLG$|^HDR10(\+|Plus)$"
_effect_re = r"^SDR$|^HDR\d*$|^DOLBY$|^DOVI$|^DV$|^3D$|^REPACK$|^HLG$|^HDR10(\+|Plus)$|^EDR$|^HQ$"
_resources_type_re = r"%s|%s" % (_source_re, _effect_re)
_name_no_begin_re = r"^[\[【].+?[\]】]"
_name_no_chinese_re = r".*版|.*字幕"
@@ -51,7 +52,7 @@ class MetaVideo(MetaBase):
_resources_pix_re = r"^[SBUHD]*(\d{3,4}[PI]+)|\d{3,4}X(\d{3,4})"
_resources_pix_re2 = r"(^[248]+K)"
_video_encode_re = r"^(H26[45])$|^(x26[45])$|^AVC$|^HEVC$|^VC\d?$|^MPEG\d?$|^Xvid$|^DivX$|^AV1$|^HDR\d*$|^AVS(\+|[23])$"
_audio_encode_re = r"^DTS\d?$|^DTSHD$|^DTSHDMA$|^Atmos$|^TrueHD\d?$|^AC3$|^\dAudios?$|^DDP\d?$|^DD\+\d?$|^DD\d?$|^LPCM\d?$|^AAC\d?$|^FLAC\d?$|^HD\d?$|^MA\d?$|^HR\d?$|^Opus\d?$|^Vorbis\d?$"
_audio_encode_re = r"^DTS\d?$|^DTSHD$|^DTSHDMA$|^Atmos$|^TrueHD\d?$|^AC3$|^\dAudios?$|^DDP\d?$|^DD\+\d?$|^DD\d?$|^LPCM\d?$|^AAC\d?$|^FLAC\d?$|^HD\d?$|^MA\d?$|^HR\d?$|^Opus\d?$|^Vorbis\d?$|^AV[3S]A$"
def __init__(self, title: str, subtitle: str = None, isfile: bool = False):
"""
@@ -66,6 +67,8 @@ class MetaVideo(MetaBase):
original_title = title
self._source = ""
self._effect = []
self.web_source = None
self._index = 0
# 判断是否纯数字命名
if isfile \
and title.isdigit() \
@@ -93,9 +96,12 @@ class MetaVideo(MetaBase):
# 拆分tokens
tokens = Tokens(title)
self.tokens = tokens
# 实例化StreamingPlatforms对象
streaming_platforms = StreamingPlatforms()
# 解析名称、年份、季、集、资源类型、分辨率等
token = tokens.get_next()
while token:
self._index += 1 # 更新当前处理的token索引
# Part
self.__init_part(token)
# 标题
@@ -116,6 +122,9 @@ class MetaVideo(MetaBase):
# 资源类型
if self._continue_flag:
self.__init_resource_type(token)
# 流媒体平台
if self._continue_flag:
self.__init_web_source(token, streaming_platforms)
# 视频编码
if self._continue_flag:
self.__init_video_encode(token)
@@ -131,6 +140,9 @@ class MetaVideo(MetaBase):
self.resource_effect = " ".join(self._effect)
if self._source:
self.resource_type = self._source.strip()
# 添加流媒体平台
if self.web_source:
self.resource_type = f"{self.web_source} {self.resource_type}"
# 提取原盘DIY
if self.resource_type and "BluRay" in self.resource_type:
if (self.subtitle and re.findall(r'D[Ii]Y', self.subtitle)) \
@@ -574,6 +586,57 @@ class MetaVideo(MetaBase):
self._effect.append(effect)
self._last_token = effect.upper()
def __init_web_source(self, token: str, streaming_platforms: StreamingPlatforms):
"""
识别流媒体平台
"""
if not self.name:
return
platform_name = None
query_range = 1
prev_token = None
prev_idx = self._index - 2
if 0 <= prev_idx < len(self.tokens.tokens):
prev_token = self.tokens.tokens[prev_idx]
next_token = self.tokens.peek()
if streaming_platforms.is_streaming_platform(token):
platform_name = streaming_platforms.get_streaming_platform_name(token)
else:
for adjacent_token, is_next in [(prev_token, False), (next_token, True)]:
if not adjacent_token or platform_name:
continue
for separator in [" ", "-"]:
if is_next:
combined_token = f"{token}{separator}{adjacent_token}"
else:
combined_token = f"{adjacent_token}{separator}{token}"
if streaming_platforms.is_streaming_platform(combined_token):
platform_name = streaming_platforms.get_streaming_platform_name(combined_token)
query_range = 2
if is_next:
self.tokens.get_next()
break
if not platform_name:
return
web_tokens = ["WEB", "DL", "WEBDL", "WEBRIP"]
match_start_idx = self._index - query_range
match_end_idx = self._index - 1
start_index = max(0, match_start_idx - query_range)
end_index = min(len(self.tokens.tokens), match_end_idx + 1 + query_range)
tokens_to_check = self.tokens.tokens[start_index:end_index]
if any(tok and tok.upper() in web_tokens for tok in tokens_to_check):
self.web_source = platform_name
self._continue_flag = False
def __init_video_encode(self, token: str):
"""
识别视频编码

View File

@@ -0,0 +1,104 @@
from typing import Optional, List, Tuple
from app.utils.singleton import Singleton
class StreamingPlatforms(metaclass=Singleton):
"""
流媒体平台简称与全称。
"""
STREAMING_PLATFORMS: List[Tuple[str, str]] = [
("AMZN", "Amazon"),
("NF", "Netflix"),
("ATVP", "Apple TV+"),
("iT", "iTunes"),
("DSNP", "Disney+"),
("HS", "Hotstar"),
("APPS", "Disney+ MENA"),
("PMTP", "Paramount+"),
("HMAX", "Max"),
("", "Max"),
("HULU", "Hulu"),
("MA", "Movies Anywhere"),
("BCORE", "Bravia Core"),
("MS", "Microsoft Store"),
("SHO", "Showtime"),
("STAN", "Stan"),
("PCOK", "Peacock"),
("SKST", "SkyShowtime"),
("NOW", "Now TV"),
("FXTL", "Foxtel Now"),
("BNGE", "Binge"),
("CRKL", "Crackle"),
("RKTN", "Rakuten TV"),
("ALL4", "All 4"),
("AS", "Adult Swim"),
("BRTB", "Brtb TV"),
("CNLP", "Canal+"),
("CRIT", "Criterion Channel"),
("DSCP", "Discovery+"),
("", "ESPN"),
("FOOD", "Food Network"),
("MUBI", "Mubi"),
("PLAY", "Google Play"),
("YT", "YouTube"),
("", "friDay"),
("", "KKTV"),
("", "ofiii"),
("", "LiTV"),
("", "MyVideo"),
("Hami", "Hami Video"),
("", "meWATCH"),
("CATCHPLAY", "CATCHPLAY+"),
("", "LINE TV"),
("VIU", "Viu"),
("IQ", ""),
("", "WeTV"),
("ABMA", "Abema"),
("ADN", ""),
("AT-X", ""),
("Baha", ""),
("BG", "B-Global"),
("CR", "Crunchyroll"),
("", "DMM"),
("FOD", ""),
("FUNi", "Funimation"),
("HIDI", "HIDIVE"),
("UNXT", "U-NEXT"),
]
def __init__(self):
"""初始化流媒体平台匹配器"""
self._lookup_cache = {}
self._build_cache()
def _build_cache(self) -> None:
"""
构建查询缓存。
"""
self._lookup_cache.clear()
for short_name, full_name in self.STREAMING_PLATFORMS:
canonical_name = full_name or short_name
if not canonical_name:
continue
aliases = {short_name, full_name}
for alias in aliases:
if alias:
self._lookup_cache[alias.upper()] = canonical_name
def get_streaming_platform_name(self, platform_code: str) -> Optional[str]:
"""
根据流媒体平台简称或全称获取标准名称。
"""
if platform_code is None:
return None
return self._lookup_cache.get(platform_code.upper())
def is_streaming_platform(self, name: str) -> bool:
"""
判断给定的字符串是否为已知的流媒体平台代码或名称。
"""
if name is None:
return False
return name.upper() in self._lookup_cache

View File

@@ -19,7 +19,6 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.plugindata_oper import PluginDataOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.module import ModuleHelper
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
@@ -122,21 +121,10 @@ class PluginManager(metaclass=Singleton):
return False
return True
# 扫描插件目录
if pid:
# 加载指定插件
plugins = ModuleHelper.load_with_pre_filter(
"app.plugins",
filter_func=lambda name, obj: check_module(obj) and name == pid
)
else:
# 加载所有插件
plugins = ModuleHelper.load(
"app.plugins",
filter_func=lambda _, obj: check_module(obj)
)
# 已安装插件
installed_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 扫描插件目录,只加载符合条件的插件
plugins = self._load_selective_plugins(pid, installed_plugins, check_module)
# 排序
plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0)
for plugin in plugins:
@@ -152,11 +140,6 @@ class PluginManager(metaclass=Singleton):
continue
# 存储Class
self._plugins[plugin_id] = plugin
# 未安装的不加载
if plugin_id not in installed_plugins:
# 设置事件状态为不可用
eventmanager.disable_event_handler(plugin)
continue
# 生成实例
plugin_obj = plugin()
# 生效插件配置
@@ -201,7 +184,7 @@ class PluginManager(metaclass=Singleton):
logger.info(f"正在停止插件 {pid}...")
plugin_obj = self._running_plugins.get(pid)
if not plugin_obj:
logger.warning(f"插件 {pid} 不存在或未加载")
logger.debug(f"插件 {pid} 不存在或未加载")
return
plugins = {pid: plugin_obj}
else:
@@ -213,6 +196,7 @@ class PluginManager(metaclass=Singleton):
# 清空对像
if pid:
# 清空指定插件
self._plugins.pop(pid, None)
self._running_plugins.pop(pid, None)
else:
# 清空
@@ -220,6 +204,80 @@ class PluginManager(metaclass=Singleton):
self._running_plugins = {}
logger.info("插件停止完成")
@staticmethod
def _load_selective_plugins(pid: Optional[str], installed_plugins: List[str],
check_module_func: Callable) -> List[Any]:
"""
选择性加载插件只import符合条件的插件
:param pid: 指定插件ID为空则加载所有已安装插件
:param installed_plugins: 已安装插件列表
:param check_module_func: 模块检查函数
:return: 插件类列表
"""
import importlib
plugins = []
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
if not plugins_dir.exists():
logger.warning(f"插件目录不存在:{plugins_dir}")
return plugins
# 确定需要加载的插件目录名称列表
if pid:
# 加载指定插件
target_plugins = [pid.lower()]
else:
# 加载已安装插件
target_plugins = [plugin_id.lower() for plugin_id in installed_plugins]
if not target_plugins:
logger.debug("没有需要加载的插件")
return plugins
# 扫描plugins目录
_loaded_modules = set()
for plugin_dir in plugins_dir.iterdir():
if not plugin_dir.is_dir() or plugin_dir.name.startswith('_'):
continue
# 检查是否是需要加载的插件
if plugin_dir.name not in target_plugins:
logger.debug(f"跳过插件目录:{plugin_dir.name}(不在加载列表中)")
continue
# 检查__init__.py是否存在
init_file = plugin_dir / "__init__.py"
if not init_file.exists():
logger.debug(f"跳过插件目录:{plugin_dir.name}缺少__init__.py")
continue
try:
# 构建模块名
module_name = f"app.plugins.{plugin_dir.name}"
logger.debug(f"正在导入插件模块:{module_name}")
# 导入模块
module = importlib.import_module(module_name)
importlib.reload(module)
# 检查模块中的类
for name, obj in module.__dict__.items():
if name.startswith('_') or not isinstance(obj, type):
continue
if name in _loaded_modules:
continue
if check_module_func(obj):
_loaded_modules.add(name)
plugins.append(obj)
logger.debug(f"找到符合条件的插件类:{name}")
break
except Exception as err:
logger.error(f"加载插件 {plugin_dir.name} 失败:{str(err)} - {traceback.format_exc()}")
return plugins
@property
def running_plugins(self) -> Dict[str, Any]:
"""
@@ -247,6 +305,7 @@ class PluginManager(metaclass=Singleton):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in ['DEV', 'PLUGIN_AUTO_RELOAD']:
return
logger.info("配置变更,重新加载插件文件修改监测...")
self.reload_monitor()
def reload_monitor(self):
@@ -354,8 +413,7 @@ class PluginManager(metaclass=Singleton):
# 确定需要安装的插件
plugins_to_install = [
plugin for plugin in online_plugins
if plugin.id in install_plugins
and not self.is_plugin_exists(plugin.id, plugin.plugin_version)
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id, plugin.plugin_version)
]
if not plugins_to_install:

View File

@@ -68,6 +68,7 @@ def enable_doh(enable: bool):
else:
socket.getaddrinfo = _orig_getaddrinfo
class DohHelper(metaclass=Singleton):
def __init__(self):
enable_doh(settings.DOH_ENABLE)

View File

@@ -241,7 +241,7 @@ class TemplateContextBuilder:
"total_size": StringUtils.str_filesize(transferinfo.total_size),
"err_msg": transferinfo.message,
}
self._context.update(ctx)
return self._context.update(ctx)
def _add_file_info(self, file_extension: Optional[str]):
"""
@@ -363,7 +363,7 @@ class TemplateHelper(metaclass=SingletonClass):
self.set_cache_context(rendered, context)
# 返回渲染结果
return rendered
return None
except Exception as e:
logger.error(f"模板处理失败: {str(e)}")
raise ValueError(f"模板处理失败: {str(e)}") from e
@@ -645,7 +645,8 @@ class MessageQueueManager(metaclass=SingletonClass):
"""
发送消息(立即发送或加入队列)
"""
if self._is_in_scheduled_time(datetime.now()):
immediately = kwargs.pop("immediately", False)
if immediately or self._is_in_scheduled_time(datetime.now()):
self._send(*args, **kwargs)
else:
self.queue.put({

View File

@@ -32,6 +32,7 @@ class SystemHelper:
if event_data.key not in ['DEBUG', 'LOG_LEVEL', 'LOG_MAX_FILE_SIZE', 'LOG_BACKUP_COUNT',
'LOG_FILE_FORMAT', 'LOG_CONSOLE_FORMAT']:
return
logger.info("配置变更,更新日志设置...")
logger.update_loggers()
@staticmethod
@@ -109,7 +110,6 @@ class SystemHelper:
try:
# 检查容器是否配置了自动重启策略
has_restart_policy = SystemHelper._check_restart_policy()
if has_restart_policy:
# 有重启策略,使用优雅退出方式
logger.info("检测到容器配置了自动重启策略,使用优雅重启方式...")
@@ -120,7 +120,6 @@ class SystemHelper:
# 没有重启策略使用Docker API强制重启
logger.info("容器未配置自动重启策略使用Docker API重启...")
return SystemHelper._docker_api_restart()
except Exception as err:
logger.error(f"重启失败: {str(err)}")
# 降级为Docker API重启
@@ -141,7 +140,6 @@ class SystemHelper:
# 重启容器
client.containers.get(container_id).restart()
return True, ""
except Exception as docker_err:
return False, f"重启时发生错误:{str(docker_err)}"

View File

@@ -112,7 +112,7 @@ class ServiceBase(Generic[TService, TConf], metaclass=ABCMeta):
# 通过服务类型或工厂函数来创建实例
if isinstance(service_type, type):
# 如果传入的是类类型,调用构造函数实例化
self._instances[conf.name] = service_type(**conf.config)
self._instances[conf.name] = service_type(name=conf.name, **conf.config)
else:
# 如果传入的是工厂函数,直接调用工厂函数
self._instances[conf.name] = service_type(conf)
@@ -210,8 +210,8 @@ class _MessageBase(ServiceBase[TService, NotificationConf]):
# 检查消息来源
if message.source and message.source != source:
return False
# 检查消息类型开关
if message.mtype:
# 不是定向发送时,检查消息类型开关
if not message.userid and message.mtype:
conf = self.get_config(source)
if conf:
switchs = conf.switchs or []

View File

@@ -29,6 +29,7 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Emby模块...")
self.init_module()
@staticmethod

View File

@@ -399,28 +399,30 @@ class FanartModule(_ModuleBase):
if not mediainfo.get_image(season_image):
mediainfo.set_image(season_image, image_obj.get('url'))
else:
# 其他图片优先环境变量指定语言再like最多
def pick_best_image(images):
def __pick_best_image(_images):
lang_env = settings.FANART_LANG
if lang_env:
langs = [lang.strip() for lang in lang_env.split(",") if lang.strip()]
for lang in langs:
lang_images = [img for img in images if img.get('lang') == lang]
lang_images = [img for img in _images if img.get('lang') == lang]
if lang_images:
lang_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return lang_images[0]
# 没设置或没找到,按原逻辑 zh、en、like最多
zh_images = [img for img in images if img.get('lang') == 'zh']
zh_images = [img for img in _images if img.get('lang') == 'zh']
if zh_images:
zh_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return zh_images[0]
en_images = [img for img in images if img.get('lang') == 'en']
en_images = [img for img in _images if img.get('lang') == 'en']
if en_images:
en_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return en_images[0]
images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return images[0]
image_obj = pick_best_image(images)
_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True)
return _images[0]
image_obj = __pick_best_image(images)
# 设置图片,没有图片才设置
if not mediainfo.get_image(image_name):
mediainfo.set_image(image_name, image_obj.get('url'))

View File

@@ -5,7 +5,7 @@ import secrets
import threading
import time
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union
import requests
from tqdm import tqdm
@@ -52,9 +52,6 @@ class AliPan(StorageBase, metaclass=Singleton):
# 基础url
base_url = "https://openapi.alipan.com"
# CID和路径缓存
_id_cache: Dict[str, Tuple[str, str]] = {}
def __init__(self):
super().__init__()
self.session = requests.Session()
@@ -279,61 +276,6 @@ class AliPan(StorageBase, metaclass=Singleton):
return ret_data.get(result_key)
return ret_data
def _path_to_id(self, drive_id: str, path: str) -> Tuple[str, str]:
"""
路径转drive_id, file_id带缓存机制
"""
# 根目录
if path == "/":
return drive_id, "root"
if len(path) > 1 and path.endswith("/"):
path = path[:-1]
# 检查缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
file_id = "root"
file_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
file_path = str(p)
file_id = self._id_cache[file_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(file_path)
for part in Path(rel_path).parts:
find_part = False
next_marker = None
while True:
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/list",
json={
"drive_id": drive_id,
"limit": 100,
"marker": next_marker,
"parent_file_id": file_id,
}
)
if not resp:
break
for item in resp.get("items", []):
if item["name"] == part:
file_id = item["file_id"]
find_part = True
break
if find_part:
break
if len(resp.get("items")) < 100:
break
if not find_part:
raise FileNotFoundError(f"【阿里云盘】{path} 不存在")
if file_id == "root":
raise FileNotFoundError(f"【阿里云盘】{path} 不存在")
# 缓存路径
self._id_cache[path] = (drive_id, file_id)
return drive_id, file_id
def __get_fileitem(self, fileinfo: dict, parent: str = "/") -> schemas.FileItem:
"""
获取文件信息
@@ -427,9 +369,6 @@ class AliPan(StorageBase, metaclass=Singleton):
break
next_marker = resp.get("next_marker")
for item in resp.get("items", []):
# 更新缓存
path = f"{fileitem.path}{item.get('name')}"
self._id_cache[path] = (drive_id, item.get("file_id"))
items.append(self.__get_fileitem(item, parent=fileitem.path))
if len(resp.get("items")) < 100:
break
@@ -467,7 +406,6 @@ class AliPan(StorageBase, metaclass=Singleton):
return None
# 缓存新目录
new_path = Path(parent_item.path) / name
self._id_cache[str(new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return self._delay_get_item(new_path)
@staticmethod
@@ -837,15 +775,9 @@ class AliPan(StorageBase, metaclass=Singleton):
if resp.get("code"):
logger.warn(f"【阿里云盘】重命名失败: {resp.get('message')}")
return False
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
for key in list(self._id_cache.keys()):
if key.startswith(fileitem.path):
del self._id_cache[key]
self._id_cache[str(Path(fileitem.path).parent / name)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
def get_item(self, path: Path, drive_id: str = None) -> Optional[schemas.FileItem]:
"""
获取指定路径的文件/目录项
"""
@@ -854,7 +786,7 @@ class AliPan(StorageBase, metaclass=Singleton):
"POST",
"/adrive/v1.0/openFile/get_by_path",
json={
"drive_id": self._default_drive_id,
"drive_id": drive_id or self._default_drive_id,
"file_path": str(path)
}
)
@@ -910,9 +842,15 @@ class AliPan(StorageBase, metaclass=Singleton):
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
企业级复制实现(支持目录递归复制)
复制文件到指定路径
:param fileitem: 要复制的文件项
:param path: 目标目录路径
:param new_name: 新文件名
"""
dest_cid = self._path_to_id(fileitem.drive_id, str(path))
dest_fileitem = self.get_item(path, drive_id=fileitem.drive_id)
if not dest_fileitem or dest_fileitem.type != "dir":
logger.warn(f"【阿里云盘】目标路径 {path} 不存在或不是目录!")
return False
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/copy",
@@ -920,7 +858,7 @@ class AliPan(StorageBase, metaclass=Singleton):
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid,
"to_drive_id": fileitem.drive_id,
"to_parent_file_id": dest_cid
"to_parent_file_id": dest_fileitem.fileid,
}
)
if not resp:
@@ -932,18 +870,20 @@ class AliPan(StorageBase, metaclass=Singleton):
new_path = Path(path) / fileitem.name
new_file = self._delay_get_item(new_path)
self.rename(new_file, new_name)
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
原子性移动操作实现
移动文件到指定路径
:param fileitem: 要移动的文件项
:param path: 目标目录路径
:param new_name: 新文件名
"""
src_fid = fileitem.fileid
target_id = self._path_to_id(fileitem.drive_id, str(path))
target_fileitem = self.get_item(path, drive_id=fileitem.drive_id)
if not target_fileitem or target_fileitem.type != "dir":
logger.warn(f"【阿里云盘】目标路径 {path} 不存在或不是目录!")
return False
resp = self._request_api(
"POST",
@@ -951,7 +891,7 @@ class AliPan(StorageBase, metaclass=Singleton):
json={
"drive_id": fileitem.drive_id,
"file_id": src_fid,
"to_parent_file_id": target_id,
"to_parent_file_id": target_fileitem.fileid,
"new_name": new_name
}
)
@@ -960,10 +900,6 @@ class AliPan(StorageBase, metaclass=Singleton):
if resp.get("code"):
logger.warn(f"【阿里云盘】移动文件失败: {resp.get('message')}")
return False
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:

View File

@@ -5,7 +5,7 @@ import secrets
import threading
import time
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union
import oss2
import requests
@@ -51,9 +51,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
# 基础url
base_url = "https://proapi.115.com"
# CID和路径缓存
_id_cache: Dict[str, str] = {}
def __init__(self):
super().__init__()
self.session = requests.Session()
@@ -238,58 +235,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
return ret_data.get(result_key)
return ret_data
def _path_to_id(self, path: str) -> str:
"""
路径转FID带缓存机制
"""
# 根目录
if path == "/":
return '0'
if len(path) > 1 and path.endswith("/"):
path = path[:-1]
# 检查缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
current_id = 0
parent_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
parent_path = str(p)
current_id = self._id_cache[parent_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(parent_path)
for part in Path(rel_path).parts:
offset = 0
find_part = False
while True:
resp = self._request_api(
"GET",
"/open/ufile/files",
"data",
params={"cid": current_id, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1}
)
if not resp:
break
for item in resp:
if item["fn"] == part:
current_id = item["fid"]
find_part = True
break
if find_part:
break
if len(resp) < 1000:
break
offset += len(resp)
if not find_part:
raise FileNotFoundError(f"【115】{path} 不存在")
if not current_id:
raise FileNotFoundError(f"【115】{path} 不存在")
# 缓存路径
self._id_cache[path] = str(current_id)
return str(current_id)
@staticmethod
def _calc_sha1(filepath: Path, size: Optional[int] = None) -> str:
"""
@@ -335,7 +280,11 @@ class U115Pan(StorageBase, metaclass=Singleton):
else:
cid = fileitem.fileid
if not cid:
cid = self._path_to_id(fileitem.path)
_fileitem = self.get_item(Path(fileitem.path))
if not _fileitem:
logger.warn(f"【115】获取目录 {fileitem.path} 失败!")
return []
cid = _fileitem.fileid
items = []
offset = 0
@@ -354,8 +303,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
for item in resp:
# 更新缓存
path = f"{fileitem.path}{item['fn']}"
self._id_cache[path] = str(item["fid"])
file_path = path + ("/" if item["fc"] == "0" else "")
items.append(schemas.FileItem(
storage=self.schema.value,
@@ -398,8 +345,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
return self.get_item(new_path)
logger.warn(f"【115】创建目录失败: {resp.get('error')}")
return None
# 缓存新目录
self._id_cache[str(new_path)] = str(resp["data"]["file_id"])
return schemas.FileItem(
storage=self.schema.value,
fileid=str(resp["data"]["file_id"]),
@@ -716,13 +661,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not resp:
return False
if resp["state"]:
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
for key in list(self._id_cache.keys()):
if key.startswith(fileitem.path):
del self._id_cache[key]
new_path = Path(fileitem.path).parent / name
self._id_cache[str(new_path)] = fileitem.fileid
return True
return False
@@ -731,15 +669,12 @@ class U115Pan(StorageBase, metaclass=Singleton):
获取指定路径的文件/目录项
"""
try:
file_id = self._path_to_id(str(path))
if not file_id:
return None
resp = self._request_api(
"GET",
"POST",
"/open/folder/get_info",
"data",
params={
"file_id": int(file_id)
data={
"path": str(path)
}
)
if not resp:
@@ -753,7 +688,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
basename=Path(resp["file_name"]).stem,
extension=Path(resp["file_name"]).suffix[1:] if resp["file_category"] == "1" else None,
pickcode=resp["pick_code"],
size=StringUtils.num_filesize(resp['size']) if resp["file_category"] == "1" else None,
size=resp['size_byte'] if resp["file_category"] == "1" else None,
modify_time=resp["utime"]
)
except Exception as e:
@@ -805,14 +740,17 @@ class U115Pan(StorageBase, metaclass=Singleton):
企业级复制实现(支持目录递归复制)
"""
src_fid = fileitem.fileid
dest_cid = self._path_to_id(str(path))
dest_fileitem = self.get_item(path)
if not dest_fileitem or dest_fileitem.type != "dir":
logger.warn(f"【115】目标路径 {path} 不是一个有效的目录!")
return False
resp = self._request_api(
"POST",
"/open/ufile/copy",
data={
"file_id": int(src_fid),
"pid": int(dest_cid)
"pid": int(dest_fileitem.fileid),
}
)
if not resp:
@@ -821,10 +759,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
new_path = Path(path) / fileitem.name
new_item = self._delay_get_item(new_path)
self.rename(new_item, new_name)
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = new_item.fileid
return True
return False
@@ -833,14 +767,16 @@ class U115Pan(StorageBase, metaclass=Singleton):
原子性移动操作实现
"""
src_fid = fileitem.fileid
dest_cid = self._path_to_id(str(path))
dest_fileitem = self.get_item(path)
if not dest_fileitem or dest_fileitem.type != "dir":
logger.warn(f"【115】目标路径 {path} 不是一个有效的目录!")
return False
resp = self._request_api(
"POST",
"/open/ufile/move",
data={
"file_ids": int(src_fid),
"to_cid": int(dest_cid)
"to_cid": int(dest_fileitem.fileid),
}
)
if not resp:
@@ -849,10 +785,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
new_path = Path(path) / fileitem.name
new_file = self._delay_get_item(new_path)
self.rename(new_file, new_name)
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = src_fid
return True
return False

View File

@@ -30,6 +30,7 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Jellyfin模块...")
self.init_module()
@staticmethod

View File

@@ -30,6 +30,7 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更重新初始化Plex模块...")
self.init_module()
@staticmethod

View File

@@ -36,6 +36,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Downloaders.value]:
return
logger.info("配置变更重新加载Qbittorrent模块...")
self.init_module()
@staticmethod

View File

@@ -32,6 +32,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Slack模块...")
self.init_module()
@staticmethod
@@ -81,8 +82,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def message_parser(self, source: str, body: Any, form: Any,
args: Any) -> Optional[CommingMessage]:
def message_parser(self, source: str, body: Any, form: Any, args: Any) -> Optional[CommingMessage]:
"""
解析消息内容,返回字典,注意以下约定值:
userid: 用户ID
@@ -219,8 +219,32 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
username = msg_json.get("user")
elif msg_json.get("type") == "block_actions":
userid = msg_json.get("user", {}).get("id")
text = msg_json.get("actions")[0].get("value")
callback_data = msg_json.get("actions")[0].get("value")
# 使用CALLBACK前缀标识按钮回调
text = f"CALLBACK:{callback_data}"
username = msg_json.get("user", {}).get("name")
# 获取原消息信息用于编辑
message_info = msg_json.get("message", {})
# Slack消息的时间戳作为消息ID
message_ts = message_info.get("ts")
channel_id = msg_json.get("channel", {}).get("id") or msg_json.get("container", {}).get("channel_id")
logger.info(f"收到来自 {client_config.name} 的Slack按钮回调"
f"userid={userid}, username={username}, callback_data={callback_data}")
# 创建包含回调信息的CommingMessage
return CommingMessage(
channel=MessageChannel.Slack,
source=client_config.name,
userid=userid,
username=username,
text=text,
is_callback=True,
callback_data=callback_data,
message_id=message_ts,
chat_id=channel_id
)
elif msg_json.get("type") == "event_callback":
userid = msg_json.get('event', {}).get('user')
text = re.sub(r"<@[0-9A-Z]+>", "", msg_json.get("event", {}).get("text"), flags=re.IGNORECASE).strip()
@@ -259,7 +283,10 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
client: Slack = self.get_instance(conf.name)
if client:
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link)
image=message.image, userid=userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -273,7 +300,10 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
continue
client: Slack = self.get_instance(conf.name)
if client:
client.send_medias_msg(title=message.title, medias=medias, userid=message.userid)
client.send_medias_msg(title=message.title, medias=medias, userid=message.userid,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -288,4 +318,29 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
client: Slack = self.get_instance(conf.name)
if client:
client.send_torrents_msg(title=message.title, torrents=torrents,
userid=message.userid)
userid=message.userid, buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def delete_message(self, channel: MessageChannel, source: str,
message_id: str, chat_id: Optional[str] = None) -> bool:
"""
删除消息
:param channel: 消息渠道
:param source: 指定的消息源
:param message_id: 消息IDSlack中为时间戳
:param chat_id: 聊天ID频道ID
:return: 删除是否成功
"""
success = False
for conf in self.get_configs().values():
if channel != self._channel:
break
if source != conf.name:
continue
client: Slack = self.get_instance(conf.name)
if client:
result = client.delete_msg(message_id=message_id, chat_id=chat_id)
if result:
success = True
return success

View File

@@ -13,18 +13,16 @@ from app.core.metainfo import MetaInfo
from app.log import logger
from app.utils.string import StringUtils
lock = Lock()
class Slack:
_client: WebClient = None
_service: SocketModeHandler = None
_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message?token={settings.API_TOKEN}"
_channel = ""
def __init__(self, SLACK_OAUTH_TOKEN: Optional[str] = None, SLACK_APP_TOKEN: Optional[str] = None,
def __init__(self, SLACK_OAUTH_TOKEN: Optional[str] = None, SLACK_APP_TOKEN: Optional[str] = None,
SLACK_CHANNEL: Optional[str] = None, **kwargs):
if not SLACK_OAUTH_TOKEN or not SLACK_APP_TOKEN:
@@ -52,7 +50,7 @@ class Slack:
with requests.post(self._ds_url, json=message, timeout=10) as local_res:
logger.debug("message: %s processed, response is: %s" % (message, local_res.text))
@slack_app.action(re.compile(r"actionId-\d+"))
@slack_app.action(re.compile(r"actionId-.*"))
def slack_action(ack, body):
ack()
with requests.post(self._ds_url, json=body, timeout=60) as local_res:
@@ -101,15 +99,21 @@ class Slack:
"""
return True if self._client else False
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None, link: Optional[str] = None, userid: Optional[str] = None):
def send_msg(self, title: str, text: Optional[str] = None,
image: Optional[str] = None, link: Optional[str] = None,
userid: Optional[str] = None, buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[str] = None,
original_chat_id: Optional[str] = None):
"""
发送Telegram消息
发送Slack消息
:param title: 消息标题
:param text: 消息内容
:param image: 消息图片地址
:param link: 点击消息转转的URL
:param userid: 用户ID如有则只发消息给该用户
:user_id: 发送消息的目标用户ID为空则发给管理员
:param buttons: 消息按钮列表,格式为 [[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
:param original_message_id: 原消息的时间戳,如果提供则编辑原消息
:param original_chat_id: 原消息的频道ID编辑消息时需要
"""
if not self._client:
return False, "消息客户端未就绪"
@@ -139,8 +143,42 @@ class Slack:
"image_url": f"{image}",
"alt_text": f"{title}"
}})
# 链接
if link:
# 自定义按钮
if buttons:
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
# URL按钮
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"url": button["url"],
"action_id": f"actionId-url-{button.get('text', 'url')}-{len(elements)}"
})
else:
# 回调按钮
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
elif link:
# 默认链接按钮
blocks.append({
"type": "actions",
"elements": [
@@ -157,21 +195,41 @@ class Slack:
}
]
})
# 发送
result = self._client.chat_postMessage(
channel=channel,
text=message_text[:1000],
blocks=blocks,
mrkdwn=True
)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
result = self._client.chat_update(
channel=original_chat_id,
ts=original_message_id,
text=message_text[:1000],
blocks=blocks or []
)
else:
# 发送新消息
result = self._client.chat_postMessage(
channel=channel,
text=message_text[:1000],
blocks=blocks,
mrkdwn=True
)
return True, result
except Exception as msg_e:
logger.error(f"Slack消息发送失败: {msg_e}")
return False, str(msg_e)
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None) -> Optional[bool]:
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[str] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送列表消息
发送媒体列表消息
:param medias: 媒体信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息的时间戳,如果提供则编辑原消息
:param original_chat_id: 原消息的频道ID编辑消息时需要
"""
if not self._client:
return False
@@ -198,64 +256,148 @@ class Slack:
"type": "divider"
})
index = 1
for media in medias:
if media.get_poster_image():
if media.vote_star:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.vote_star}" \
f"\n{media.get_overview_string(50)}"
else:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.get_overview_string(50)}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
},
"accessory": {
"type": "image",
"image_url": f"{media.get_poster_image()}",
"alt_text": f"{media.title_year}"
}
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
# 如果有自定义按钮,先添加所有媒体项,然后添加统一的按钮
if buttons:
# 添加媒体列表(不带单独的选择按钮)
for media in medias:
if media.get_poster_image():
if media.vote_star:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.vote_star}" \
f"\n{media.get_overview_string(50)}"
else:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.get_overview_string(50)}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
},
"accessory": {
"type": "image",
"image_url": f"{media.get_poster_image()}",
"alt_text": f"{media.title_year}"
}
]
}
)
index += 1
# 发送
result = self._client.chat_postMessage(
channel=channel,
text=title,
blocks=blocks
)
}
)
index += 1
# 添加统一的自定义按钮(在所有媒体项之后)
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"url": button["url"],
"action_id": f"actionId-url-{button.get('text', 'url')}-{len(elements)}"
})
else:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
else:
# 使用默认的每个媒体项单独按钮
for media in medias:
if media.get_poster_image():
if media.vote_star:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.vote_star}" \
f"\n{media.get_overview_string(50)}"
else:
text = f"{index}. *<{media.detail_link}|{media.title_year}>*" \
f"\n类型:{media.type.value}" \
f"\n{media.get_overview_string(50)}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
},
"accessory": {
"type": "image",
"image_url": f"{media.get_poster_image()}",
"alt_text": f"{media.title_year}"
}
}
)
# 使用默认选择按钮
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
}
]
}
)
index += 1
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
result = self._client.chat_update(
channel=original_chat_id,
ts=original_message_id,
text=title,
blocks=blocks or []
)
else:
# 发送新消息
result = self._client.chat_postMessage(
channel=channel,
text=title,
blocks=blocks
)
return True if result else False
except Exception as msg_e:
logger.error(f"Slack消息发送失败: {msg_e}")
return False
def send_torrents_msg(self, torrents: List[Context],
userid: Optional[str] = None, title: Optional[str] = None) -> Optional[bool]:
def send_torrents_msg(self, torrents: List[Context], userid: Optional[str] = None, title: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[str] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送列表消息
发送种子列表消息
:param torrents: 种子信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息的时间戳,如果提供则编辑原消息
:param original_chat_id: 原消息的频道ID编辑消息时需要
"""
if not self._client:
return None
@@ -279,60 +421,172 @@ class Slack:
}]
# 列表
index = 1
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title = re.sub(r"\s+", " ", title).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
# 如果有自定义按钮,先添加种子列表,然后添加统一的按钮
if buttons:
# 添加种子列表(不带单独的选择按钮)
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title_text = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title_text = re.sub(r"\s+", " ", title_text).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title_text}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
}
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
)
index += 1
# 添加统一的自定义按钮
for button_row in buttons:
elements = []
for button in button_row:
if "url" in button:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"text": button["text"],
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
"url": button["url"],
"action_id": f"actionId-url-{button.get('text', 'url')}-{len(elements)}"
})
else:
elements.append({
"type": "button",
"text": {
"type": "plain_text",
"text": button["text"],
"emoji": True
},
"value": button["callback_data"],
"action_id": f"actionId-{button['callback_data']}"
})
if elements:
blocks.append({
"type": "actions",
"elements": elements
})
else:
# 使用默认的每个种子单独按钮
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
meta = MetaInfo(torrent.title, torrent.description)
link = torrent.page_url
title_text = f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group}"
title_text = re.sub(r"\s+", " ", title_text).strip()
free = torrent.volume_factor
seeder = f"{torrent.seeders}"
description = torrent.description
text = f"{index}. 【{site_name}】<{link}|{title_text}> " \
f"{StringUtils.str_filesize(torrent.size)} {free} {seeder}\n" \
f"{description}"
blocks.append(
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": text
}
]
}
}
)
blocks.append(
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "选择",
"emoji": True
},
"value": f"{index}",
"action_id": f"actionId-{index}"
}
]
}
)
index += 1
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
result = self._client.chat_update(
channel=original_chat_id,
ts=original_message_id,
text=title,
blocks=blocks or []
)
else:
# 发送新消息
result = self._client.chat_postMessage(
channel=channel,
text=title,
blocks=blocks
)
index += 1
# 发送
result = self._client.chat_postMessage(
channel=channel,
text=title,
blocks=blocks
)
return True if result else False
except Exception as msg_e:
logger.error(f"Slack消息发送失败: {msg_e}")
return False
def delete_msg(self, message_id: str, chat_id: Optional[str] = None) -> Optional[bool]:
"""
删除Slack消息
:param message_id: 消息时间戳Slack消息ID
:param chat_id: 频道ID
:return: 删除是否成功
"""
if not self._client:
return None
try:
# 确定要删除消息的频道ID
if chat_id:
target_channel = chat_id
else:
target_channel = self.__find_public_channel()
if not target_channel:
logger.error("无法确定要删除消息的Slack频道")
return False
# 删除消息
result = self._client.chat_delete(
channel=target_channel,
ts=message_id
)
if result.get("ok"):
logger.info(f"成功删除Slack消息: channel={target_channel}, ts={message_id}")
return True
else:
logger.error(f"删除Slack消息失败: {result.get('error', 'unknown error')}")
return False
except Exception as e:
logger.error(f"删除Slack消息异常: {str(e)}")
return False
def __find_public_channel(self):
"""
查找公共频道

View File

@@ -30,6 +30,7 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载SynologyChat模块...")
self.init_module()
@staticmethod

View File

@@ -9,7 +9,8 @@ from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MessageBase
from app.modules.telegram.telegram import Telegram
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData
from app.schemas import MessageChannel, CommingMessage, Notification, CommandRegisterEventData, ConfigChangeEventData, \
NotificationConf
from app.schemas.types import ModuleType, ChainEventType, SystemConfigKey, EventType
from app.utils.structures import DictUtils
@@ -35,6 +36,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Telegram模块...")
self.init_module()
@staticmethod
@@ -98,6 +100,7 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
:return: 渠道、消息体
"""
"""
普通消息格式:
{
'update_id': ,
'message': {
@@ -119,6 +122,16 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
'text': ''
}
}
按钮回调格式:
{
'callback_query': {
'id': '',
'from': {...},
'message': {...},
'data': 'callback_data'
}
}
"""
# 获取服务配置
client_config = self.get_config(source)
@@ -130,32 +143,88 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
except Exception as err:
logger.debug(f"解析Telegram消息失败{str(err)}")
return None
if message:
text = message.get("text")
user_id = message.get("from", {}).get("id")
# 获取用户名
user_name = message.get("from", {}).get("username")
if text:
logger.info(f"收到来自 {client_config.name} 的Telegram消息"
f"userid={user_id}, username={user_name}, text={text}")
# 检查权限
admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if text.startswith("/"):
if admin_users \
and str(user_id) not in admin_users.split(',') \
and str(user_id) != chat_id:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
return None
else:
if user_list \
and not str(user_id) in user_list.split(','):
logger.info(f"用户{user_id}不在用户白名单中,无法使用此机器人")
client.send_msg(title="你不在用户白名单中,无法使用此机器人", userid=user_id)
return None
return CommingMessage(channel=MessageChannel.Telegram, source=client_config.name,
userid=user_id, username=user_name, text=text)
# 处理按钮回调
if "callback_query" in message:
return self._handle_callback_query(message, client_config)
# 处理普通消息
return self._handle_text_message(message, client_config, client)
return None
@staticmethod
def _handle_callback_query(message: dict, client_config: NotificationConf) -> Optional[CommingMessage]:
"""
处理按钮回调查询
"""
callback_query = message.get("callback_query", {})
user_info = callback_query.get("from", {})
callback_data = callback_query.get("data", "")
user_id = user_info.get("id")
user_name = user_info.get("username")
if callback_data and user_id:
logger.info(f"收到来自 {client_config.name} 的Telegram按钮回调"
f"userid={user_id}, username={user_name}, callback_data={callback_data}")
# 将callback_data作为特殊格式的text返回以便主程序识别这是按钮回调
callback_text = f"CALLBACK:{callback_data}"
# 创建包含完整回调信息的CommingMessage
return CommingMessage(
channel=MessageChannel.Telegram,
source=client_config.name,
userid=user_id,
username=user_name,
text=callback_text,
is_callback=True,
callback_data=callback_data,
message_id=callback_query.get("message", {}).get("message_id"),
chat_id=str(callback_query.get("message", {}).get("chat", {}).get("id", "")),
callback_query=callback_query
)
return None
@staticmethod
def _handle_text_message(msg: dict, client_config: NotificationConf, client: Telegram) -> Optional[CommingMessage]:
"""
处理普通文本消息
"""
text = msg.get("text")
user_id = msg.get("from", {}).get("id")
user_name = msg.get("from", {}).get("username")
if text and user_id:
logger.info(f"收到来自 {client_config.name} 的Telegram消息"
f"userid={user_id}, username={user_name}, text={text}")
# 检查权限
admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if text.startswith("/"):
if admin_users \
and str(user_id) not in admin_users.split(',') \
and str(user_id) != chat_id:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
return None
else:
if user_list \
and str(user_id) not in user_list.split(','):
logger.info(f"用户{user_id}不在用户白名单中,无法使用此机器人")
client.send_msg(title="你不在用户白名单中,无法使用此机器人", userid=user_id)
return None
return CommingMessage(
channel=MessageChannel.Telegram,
source=client_config.name,
userid=user_id,
username=user_name,
text=text
)
return None
def post_message(self, message: Notification) -> None:
@@ -177,7 +246,10 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link)
image=message.image, userid=userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
@@ -192,7 +264,10 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_medias_msg(title=message.title, medias=medias,
userid=message.userid, link=message.link)
userid=message.userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def post_torrents_message(self, message: Notification, torrents: List[Context]) -> None:
"""
@@ -207,7 +282,33 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
client: Telegram = self.get_instance(conf.name)
if client:
client.send_torrents_msg(title=message.title, torrents=torrents,
userid=message.userid, link=message.link)
userid=message.userid, link=message.link,
buttons=message.buttons,
original_message_id=message.original_message_id,
original_chat_id=message.original_chat_id)
def delete_message(self, channel: MessageChannel, source: str,
message_id: int, chat_id: Optional[int] = None) -> bool:
"""
删除消息
:param channel: 消息渠道
:param source: 指定的消息源
:param message_id: 消息ID
:param chat_id: 聊天ID
:return: 删除是否成功
"""
success = False
for conf in self.get_configs().values():
if channel != self._channel:
break
if source != conf.name:
continue
client: Telegram = self.get_instance(conf.name)
if client:
result = client.delete_msg(message_id=message_id, chat_id=chat_id)
if result:
success = True
return success
def register_commands(self, commands: Dict[str, dict]):
"""

View File

@@ -3,12 +3,13 @@ import threading
import uuid
from pathlib import Path
from threading import Event
from typing import Optional, List, Dict
from typing import Optional, List, Dict, Callable
from urllib.parse import urljoin
import telebot
from telebot import apihelper
from telebot.types import InputFile
from telebot.types import InputFile, InlineKeyboardMarkup, InlineKeyboardButton
from telebot.types import InputMediaPhoto
from app.core.config import settings
from app.core.context import MediaInfo, Context
@@ -23,6 +24,7 @@ class Telegram:
_ds_url = f"http://127.0.0.1:{settings.PORT}/api/v1/message?token={settings.API_TOKEN}"
_event = Event()
_bot: telebot.TeleBot = None
_callback_handlers: Dict[str, Callable] = {} # 存储回调处理器
def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs):
"""
@@ -57,7 +59,44 @@ class Telegram:
@_bot.message_handler(func=lambda message: True)
def echo_all(message):
RequestUtils(timeout=5).post_res(self._ds_url, json=message.json)
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
@_bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
"""
处理按钮点击回调
"""
try:
# 解析回调数据
callback_data = call.data
user_id = str(call.from_user.id)
logger.info(f"收到按钮回调:{callback_data},用户:{user_id}")
# 发送回调数据给主程序处理
callback_json = {
"callback_query": {
"id": call.id,
"from": call.from_user.to_dict(),
"message": {
"message_id": call.message.message_id,
"chat": {
"id": call.message.chat.id,
}
},
"data": callback_data
}
}
# 先确认回调避免用户看到loading状态
_bot.answer_callback_query(call.id)
# 发送给主程序处理
RequestUtils(timeout=15).post_res(self._ds_url, json=callback_json)
except Exception as e:
logger.error(f"处理按钮回调失败:{str(e)}")
_bot.answer_callback_query(call.id, "处理失败,请重试")
def run_polling():
"""
@@ -80,7 +119,10 @@ class Telegram:
return self._bot is not None
def send_msg(self, title: str, text: Optional[str] = None, image: Optional[str] = None,
userid: Optional[str] = None, link: Optional[str] = None) -> Optional[bool]:
userid: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送Telegram消息
:param title: 消息标题
@@ -88,6 +130,9 @@ class Telegram:
:param image: 消息图片地址
:param userid: 用户ID如有则只发消息给该用户
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
:userid: 发送消息的目标用户ID为空则发给管理员
"""
if not self._telegram_token or not self._telegram_chat_id:
@@ -113,16 +158,37 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
return self.__send_request(userid=chat_id, image=image, caption=caption)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None,
title: Optional[str] = None, link: Optional[str] = None) -> Optional[bool]:
title: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[Dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送媒体列表消息
:param medias: 媒体信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
@@ -155,7 +221,18 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
return self.__send_request(userid=chat_id, image=image, caption=caption)
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
@@ -163,19 +240,25 @@ class Telegram:
def send_torrents_msg(self, torrents: List[Context],
userid: Optional[str] = None, title: Optional[str] = None,
link: Optional[str] = None) -> Optional[bool]:
link: Optional[str] = None, buttons: Optional[List[List[Dict]]] = None,
original_message_id: Optional[int] = None,
original_chat_id: Optional[str] = None) -> Optional[bool]:
"""
发送列表消息
发送种子列表消息
:param torrents: 种子信息列表
:param userid: 用户ID如有则只发消息给该用户
:param title: 消息标题
:param link: 跳转链接
:param buttons: 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据"}]]
:param original_message_id: 原消息ID如果提供则编辑原消息
:param original_chat_id: 原消息的聊天ID编辑消息时需要
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
if not torrents:
return False
try:
index, caption = 1, "*%s*" % title
mediainfo = torrents[0].media_info
image = torrents[0].media_info.get_message_image()
for context in torrents:
torrent = context.torrent_info
site_name = torrent.site_name
@@ -200,20 +283,142 @@ class Telegram:
else:
chat_id = self._telegram_chat_id
return self.__send_request(userid=chat_id, caption=caption,
image=mediainfo.get_message_image())
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
# 判断是编辑消息还是发送新消息
if original_message_id and original_chat_id:
# 编辑消息(种子消息通常没有图片)
return self.__edit_message(original_chat_id, original_message_id, caption, buttons, image)
else:
# 发送新消息
return self.__send_request(userid=chat_id, image=image, caption=caption, reply_markup=reply_markup)
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
@staticmethod
def _create_inline_keyboard(buttons: List[List[Dict]]) -> InlineKeyboardMarkup:
"""
创建内联键盘
:param buttons: 按钮配置,格式:[[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
:return: InlineKeyboardMarkup对象
"""
keyboard = []
for row in buttons:
button_row = []
for button in row:
if "url" in button:
# URL按钮
btn = InlineKeyboardButton(text=button["text"], url=button["url"])
else:
# 回调按钮
btn = InlineKeyboardButton(text=button["text"], callback_data=button["callback_data"])
button_row.append(btn)
keyboard.append(button_row)
return InlineKeyboardMarkup(keyboard)
def answer_callback_query(self, callback_query_id: int, text: Optional[str] = None,
show_alert: bool = False) -> Optional[bool]:
"""
回应回调查询
"""
if not self._bot:
return None
try:
self._bot.answer_callback_query(callback_query_id, text=text, show_alert=show_alert)
return True
except Exception as e:
logger.error(f"回应回调查询失败:{str(e)}")
return False
def delete_msg(self, message_id: int, chat_id: Optional[int] = None) -> Optional[bool]:
"""
删除Telegram消息
:param message_id: 消息ID
:param chat_id: 聊天ID
:return: 删除是否成功
"""
if not self._telegram_token or not self._telegram_chat_id:
return None
try:
# 确定要删除消息的聊天ID
if chat_id:
target_chat_id = chat_id
else:
target_chat_id = self._telegram_chat_id
# 删除消息
result = self._bot.delete_message(chat_id=target_chat_id, message_id=int(message_id))
if result:
logger.info(f"成功删除Telegram消息: chat_id={target_chat_id}, message_id={message_id}")
return True
else:
logger.error(f"删除Telegram消息失败: chat_id={target_chat_id}, message_id={message_id}")
return False
except Exception as e:
logger.error(f"删除Telegram消息异常: {str(e)}")
return False
def __edit_message(self, chat_id: str, message_id: int, text: str,
buttons: Optional[List[List[dict]]] = None,
image: Optional[str] = None) -> Optional[bool]:
"""
编辑已发送的消息
:param chat_id: 聊天ID
:param message_id: 消息ID
:param text: 新的消息内容
:param buttons: 按钮列表
:param image: 图片URL或路径
:return: 编辑是否成功
"""
if not self._bot:
return None
try:
# 创建按钮键盘
reply_markup = None
if buttons:
reply_markup = self._create_inline_keyboard(buttons)
if image:
# 如果有图片使用edit_message_media
media = InputMediaPhoto(media=image, caption=text, parse_mode="Markdown")
self._bot.edit_message_media(
chat_id=chat_id,
message_id=message_id,
media=media,
reply_markup=reply_markup
)
else:
# 如果没有图片使用edit_message_text
self._bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
parse_mode="Markdown",
reply_markup=reply_markup
)
return True
except Exception as e:
logger.error(f"编辑消息失败:{str(e)}")
return False
@retry(Exception, logger=logger)
def __send_request(self, userid: Optional[str] = None, image="", caption="") -> bool:
def __send_request(self, userid: Optional[str] = None, image="", caption="",
reply_markup: Optional[InlineKeyboardMarkup] = None) -> bool:
"""
向Telegram发送报文
:param reply_markup: 内联键盘
"""
if image:
res = RequestUtils(proxies=settings.PROXY).get_res(image)
res = RequestUtils(proxies=settings.PROXY, ua=settings.USER_AGENT).get_res(image)
if res is None:
raise Exception("获取图片失败")
if res.content:
@@ -227,7 +432,8 @@ class Telegram:
ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id,
photo=photo,
caption=caption,
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise Exception("发送图片消息失败")
return True
@@ -237,11 +443,13 @@ class Telegram:
for i in range(0, len(caption), 4095):
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption[i:i + 4095],
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup if i == 0 else None)
else:
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption,
parse_mode="Markdown")
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise Exception("发送文本消息失败")
return True if ret else False

View File

@@ -36,6 +36,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Downloaders.value]:
return
logger.info("配置变更重新加载Transmission模块...")
self.init_module()
@staticmethod

View File

@@ -134,7 +134,7 @@ class Transmission:
return None
try:
torrents, error = self.get_torrents(ids=ids,
status=["downloading", "download_pending", "stopped"],
status=["downloading", "download_pending"],
tags=tags)
return None if error else torrents or []
except Exception as err:

View File

@@ -34,6 +34,7 @@ class TrimeMediaModule(_ModuleBase, _MediaServerBase[TrimeMedia]):
event_data: schemas.ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.MediaServers.value]:
return
logger.info("配置变更,重新加载飞牛影视模块...")
self.init_module()
@staticmethod

View File

@@ -31,6 +31,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载VoceChat模块...")
self.init_module()
@staticmethod

View File

@@ -31,6 +31,7 @@ class WebPushModule(_ModuleBase, _MessageBase):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载WebPush模块...")
self.init_module()
@staticmethod

View File

@@ -35,6 +35,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Notifications.value]:
return
logger.info("配置变更重新加载Wechat模块...")
self.init_module()
@staticmethod

View File

@@ -91,6 +91,7 @@ class Monitor(metaclass=Singleton):
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in [SystemConfigKey.Directories.value]:
return
logger.info("配置变更事件触发,重新初始化目录监控...")
self.init()
def init(self):

View File

@@ -68,6 +68,7 @@ class Scheduler(metaclass=Singleton):
if event_data.key not in ['DEV', 'COOKIECLOUD_INTERVAL', 'MEDIASERVER_SYNC_INTERVAL', 'SUBSCRIBE_SEARCH',
'SUBSCRIBE_MODE', 'SUBSCRIBE_RSS_INTERVAL', 'SITEDATA_REFRESH_INTERVAL']:
return
logger.info(f"配置项 {event_data.key} 变更,重新初始化定时服务...")
self.init()
def init(self):

View File

@@ -1,4 +1,6 @@
from typing import Optional, Union
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Union, List, Dict, Set
from pydantic import BaseModel, Field
@@ -23,6 +25,16 @@ class CommingMessage(BaseModel):
date: Optional[str] = None
# 消息方向
action: Optional[int] = 0
# 是否为回调消息
is_callback: Optional[bool] = False
# 回调数据
callback_data: Optional[str] = None
# 消息ID用于回调时定位原消息
message_id: Optional[Union[str, int]] = None
# 聊天ID用于回调时定位聊天
chat_id: Optional[str] = None
# 完整的回调查询信息(原始数据)
callback_query: Optional[Dict] = None
def to_dict(self):
"""
@@ -65,6 +77,12 @@ class Notification(BaseModel):
action: Optional[int] = 1
# 消息目标用户ID字典未指定用户ID时使用
targets: Optional[dict] = None
# 按钮列表,格式:[[{"text": "按钮文本", "callback_data": "回调数据", "url": "链接"}]]
buttons: Optional[List[List[dict]]] = None
# 原消息ID用于编辑消息
original_message_id: Optional[Union[str, int]] = None
# 原消息的聊天ID用于编辑消息
original_chat_id: Optional[str] = None
def to_dict(self):
"""
@@ -115,3 +133,203 @@ class SubscriptionMessage(BaseModel):
icon: Optional[str] = None
url: Optional[str] = None
data: Optional[dict] = Field(default_factory=dict)
class ChannelCapability(Enum):
"""
渠道能力枚举
"""
# 支持内联按钮
INLINE_BUTTONS = "inline_buttons"
# 支持菜单命令
MENU_COMMANDS = "menu_commands"
# 支持消息编辑
MESSAGE_EDITING = "message_editing"
# 支持消息删除
MESSAGE_DELETION = "message_deletion"
# 支持回调查询
CALLBACK_QUERIES = "callback_queries"
# 支持富文本
RICH_TEXT = "rich_text"
# 支持图片
IMAGES = "images"
# 支持链接
LINKS = "links"
# 支持文件发送
FILE_SENDING = "file_sending"
@dataclass
class ChannelCapabilities:
"""
渠道能力配置
"""
channel: MessageChannel
capabilities: Set[ChannelCapability]
max_buttons_per_row: int = 5
max_button_rows: int = 10
max_button_text_length: int = 30
fallback_enabled: bool = True
class ChannelCapabilityManager:
"""
渠道能力管理器
"""
_capabilities: Dict[MessageChannel, ChannelCapabilities] = {
MessageChannel.Telegram: ChannelCapabilities(
channel=MessageChannel.Telegram,
capabilities={
ChannelCapability.INLINE_BUTTONS,
ChannelCapability.MENU_COMMANDS,
ChannelCapability.MESSAGE_EDITING,
ChannelCapability.MESSAGE_DELETION,
ChannelCapability.CALLBACK_QUERIES,
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.FILE_SENDING
},
max_buttons_per_row=4,
max_button_rows=10,
max_button_text_length=30
),
MessageChannel.Wechat: ChannelCapabilities(
channel=MessageChannel.Wechat,
capabilities={
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.MENU_COMMANDS
},
fallback_enabled=True
),
MessageChannel.Slack: ChannelCapabilities(
channel=MessageChannel.Slack,
capabilities={
ChannelCapability.INLINE_BUTTONS,
ChannelCapability.MESSAGE_EDITING,
ChannelCapability.MESSAGE_DELETION,
ChannelCapability.CALLBACK_QUERIES,
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.MENU_COMMANDS
},
max_buttons_per_row=3,
max_button_rows=8,
max_button_text_length=25,
fallback_enabled=True
),
MessageChannel.SynologyChat: ChannelCapabilities(
channel=MessageChannel.SynologyChat,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.VoceChat: ChannelCapabilities(
channel=MessageChannel.VoceChat,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.WebPush: ChannelCapabilities(
channel=MessageChannel.WebPush,
capabilities={
ChannelCapability.LINKS
},
fallback_enabled=True
),
MessageChannel.Web: ChannelCapabilities(
channel=MessageChannel.Web,
capabilities={
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS
},
fallback_enabled=True
)
}
@classmethod
def get_capabilities(cls, channel: MessageChannel) -> Optional[ChannelCapabilities]:
"""
获取渠道能力
"""
return cls._capabilities.get(channel)
@classmethod
def supports_capability(cls, channel: MessageChannel, capability: ChannelCapability) -> bool:
"""
检查渠道是否支持某项能力
"""
channel_caps = cls.get_capabilities(channel)
if not channel_caps:
return False
return capability in channel_caps.capabilities
@classmethod
def supports_buttons(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持按钮
"""
return cls.supports_capability(channel, ChannelCapability.INLINE_BUTTONS)
@classmethod
def supports_callbacks(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持回调
"""
return cls.supports_capability(channel, ChannelCapability.CALLBACK_QUERIES)
@classmethod
def supports_editing(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持消息编辑
"""
return cls.supports_capability(channel, ChannelCapability.MESSAGE_EDITING)
@classmethod
def supports_deletion(cls, channel: MessageChannel) -> bool:
"""
检查渠道是否支持消息删除
"""
return cls.supports_capability(channel, ChannelCapability.MESSAGE_DELETION)
@classmethod
def get_max_buttons_per_row(cls, channel: MessageChannel) -> int:
"""
获取每行最大按钮数
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_buttons_per_row if channel_caps else 2
@classmethod
def get_max_button_rows(cls, channel: MessageChannel) -> int:
"""
获取最大按钮行数
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_button_rows if channel_caps else 5
@classmethod
def get_max_button_text_length(cls, channel: MessageChannel) -> int:
"""
获取按钮文本最大长度
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.max_button_text_length if channel_caps else 20
@classmethod
def should_use_fallback(cls, channel: MessageChannel) -> bool:
"""
是否应该使用降级策略
"""
channel_caps = cls.get_capabilities(channel)
return channel_caps.fallback_enabled if channel_caps else True

View File

@@ -63,6 +63,8 @@ class EventType(Enum):
ModuleReload = "module.reload"
# 配置项更新
ConfigChanged = "config.updated"
# 消息交互动作
MessageAction = "message.action"
# 同步链式事件

View File

@@ -8,7 +8,7 @@ from app.startup.command_initializer import init_command, stop_command, restart_
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
from app.startup.modules_initializer import init_modules, stop_modules
from app.startup.monitor_initializer import stop_monitor, init_monitor
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins, backup_plugins, restore_plugins
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
from app.startup.routers_initializer import init_routers
from app.startup.scheduler_initializer import stop_scheduler, init_scheduler, init_plugin_scheduler
from app.startup.workflow_initializer import init_workflow, stop_workflow
@@ -41,7 +41,7 @@ async def lifespan(app: FastAPI):
# 初始化路由
init_routers(app)
# 恢复插件备份
restore_plugins()
SystemChain().restore_plugins()
# 初始化插件
init_plugins()
# 初始化定时器
@@ -70,7 +70,7 @@ async def lifespan(app: FastAPI):
except Exception as e:
print(str(e))
# 备份插件
backup_plugins()
SystemChain().backup_plugins()
# 停止内存管理器
stop_memory_manager()
# 停止工作流

View File

@@ -1,11 +1,7 @@
import asyncio
import shutil
from app.core.config import settings
from app.core.plugin import PluginManager
from app.log import logger
from app.utils.system import SystemUtils
from app.helper.system import SystemHelper
async def sync_plugins() -> bool:
@@ -79,105 +75,3 @@ def stop_plugins():
plugin_manager.stop_monitor()
except Exception as e:
logger.error(f"停止插件时发生错误:{e}", exc_info=True)
def backup_plugins():
"""
备份插件到用户配置目录仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
try:
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not plugins_dir.exists():
logger.info("插件目录不存在,跳过备份")
return
# 确保备份目录存在
backup_dir.mkdir(parents=True, exist_ok=True)
# 需要排除的文件和目录
exclude_items = {"__init__.py", "__pycache__", ".DS_Store"}
# 遍历插件目录,备份除排除项外的所有内容
for item in plugins_dir.iterdir():
if item.name in exclude_items:
continue
target_path = backup_dir / item.name
# 如果是目录
if item.is_dir():
if target_path.exists():
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已备份插件目录: {item.name}")
# 如果是文件
elif item.is_file():
shutil.copy2(item, target_path)
logger.info(f"已备份插件文件: {item.name}")
logger.info(f"插件备份完成,备份位置: {backup_dir}")
except Exception as e:
logger.error(f"插件备份失败: {str(e)}")
def restore_plugins():
"""
从备份恢复插件到app/plugins目录恢复完成后删除备份仅docker环境
"""
# 非docker环境不处理
if not SystemUtils.is_docker():
return
# 使用绝对路径确保准确性
plugins_dir = settings.ROOT_PATH / "app" / "plugins"
backup_dir = settings.CONFIG_PATH / "plugins_backup"
if not backup_dir.exists():
logger.info("插件备份目录不存在,跳过恢复")
return
# 系统被重置才恢复插件
if SystemHelper().is_system_reset():
# 确保插件目录存在
plugins_dir.mkdir(parents=True, exist_ok=True)
# 遍历备份目录,恢复所有内容
restored_count = 0
for item in backup_dir.iterdir():
target_path = plugins_dir / item.name
try:
# 如果是目录,且目录内有内容
if item.is_dir() and any(item.iterdir()):
if target_path.exists():
shutil.rmtree(target_path)
shutil.copytree(item, target_path)
logger.info(f"已恢复插件目录: {item.name}")
restored_count += 1
# 如果是文件
elif item.is_file():
shutil.copy2(item, target_path)
logger.info(f"已恢复插件文件: {item.name}")
restored_count += 1
except Exception as e:
logger.error(f"恢复插件 {item.name} 时发生错误: {str(e)}")
continue
logger.info(f"插件恢复完成,共恢复 {restored_count} 个项目")
# 删除备份目录
try:
shutil.rmtree(backup_dir)
logger.info(f"已删除插件备份目录: {backup_dir}")
except Exception as e:
logger.warning(f"删除备份目录失败: {str(e)}")

View File

@@ -36,3 +36,7 @@ class Tokens:
return None
else:
return self._tokens[index]
@property
def tokens(self):
return self._tokens

View File

@@ -153,7 +153,7 @@ meta_cases = [{
"part": "",
"season": "S01",
"episode": "E02",
"restype": "WEB-DL",
"restype": "B-Global WEB-DL",
"pix": "1080p",
"video_codec": "x264",
"audio_codec": "AAC"
@@ -569,7 +569,7 @@ meta_cases = [{
"part": "",
"season": "S02",
"episode": "E05",
"restype": "WEB-DL",
"restype": "Crunchyroll WEB-DL",
"pix": "1080p",
"video_codec": "x264",
"audio_codec": "AAC"
@@ -649,7 +649,7 @@ meta_cases = [{
"part": "",
"season": "",
"episode": "",
"restype": "WEBRip",
"restype": "Netflix WEBRip",
"pix": "1080p",
"video_codec": "H264",
"audio_codec": "DDP 5.1"
@@ -681,7 +681,7 @@ meta_cases = [{
"part": "",
"season": "S01",
"episode": "E16",
"restype": "WEB-DL",
"restype": "KKTV WEB-DL",
"pix": "1080p",
"video_codec": "x264",
"audio_codec": "AAC"
@@ -921,7 +921,7 @@ meta_cases = [{
"part": "",
"season": "S06",
"episode": "E06",
"restype": "WEBRip",
"restype": "Max WEBRip",
"pix": "1080p",
"video_codec": "x264",
"audio_codec": "DD 5.1"
@@ -937,7 +937,7 @@ meta_cases = [{
"part": "",
"season": "S06",
"episode": "E05",
"restype": "WEBRip",
"restype": "Max WEBRip",
"pix": "1080p",
"video_codec": "x264",
"audio_codec": "DD 5.1"
@@ -969,7 +969,7 @@ meta_cases = [{
"part": "",
"season": "S02",
"episode": "",
"restype": "WEB-DL",
"restype": "Netflix WEB-DL",
"pix": "2160p",
"video_codec": "H265",
"audio_codec": "DDP 5.1 Atmos"

View File

@@ -1,6 +1,5 @@
import unittest
from tests.test_bluray import BluRayTest
from tests.test_metainfo import MetaInfoTest
if __name__ == '__main__':
@@ -10,9 +9,6 @@ if __name__ == '__main__':
suite.addTest(MetaInfoTest('test_metainfo'))
suite.addTest(MetaInfoTest('test_emby_format_ids'))
# 测试蓝光目录识别
suite.addTest(BluRayTest())
# 运行测试
runner = unittest.TextTestRunner()
runner.run(suite)

View File

@@ -1,178 +0,0 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pathlib import Path
from typing import List, Optional
from unittest import TestCase
from app import schemas
from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.db.models.transferhistory import TransferHistory
from app.db.systemconfig_oper import SystemConfigOper
from app.db.transferhistory_oper import TransferHistoryOper
from tests.cases.files import bluray_files
class MockTransferHistoryOper(TransferHistoryOper):
def __init__(self):
# pylint: disable=super-init-not-called
self.history = []
def get_by_src(self, src, storage=None):
self.history.append(src)
return TransferHistory()
class MockStorage(StorageChain):
def __init__(self, files: list):
# pylint: disable=super-init-not-called
self.__root = schemas.FileItem(
path="/", name="", type="dir", extension="", size=0
)
self.__all = {self.__root.path: self.__root}
def __build_child(parent: schemas.FileItem, files: list[dict]):
parent.children = []
for item in files:
children = item.get("children")
sep = "" if parent.path.endswith("/") else "/"
name: str = item["name"]
file_item = schemas.FileItem(
path=f"{parent.path}{sep}{name}",
name=name,
extension=Path(name).suffix[1:],
basename=Path(name).stem,
type="file" if children is None else "dir",
size=item.get("size", 0),
)
parent.children.append(file_item)
self.__all[file_item.path] = file_item
if children is not None:
__build_child(file_item, children)
__build_child(self.__root, files)
def list_files(
self, fileitem: schemas.FileItem, recursion: bool = False
) -> Optional[List[schemas.FileItem]]:
if fileitem.type != "dir":
return None
if recursion:
result = []
file_path = f"{fileitem.path}/"
for path, item in self.__all.items():
if path.startswith(file_path):
result.append(item)
return result
else:
return fileitem.children
def get_file_item(self, storage: str, path: Path) -> Optional[schemas.FileItem]:
"""
根据路径获取文件项
"""
path_posix = path.as_posix()
return self.__all.get(path_posix)
class MockTransferChain(TransferChain):
def __init__(self, storage: MockStorage):
# pylint: disable=super-init-not-called
self.transferhis = MockTransferHistoryOper()
self.systemconfig = SystemConfigOper()
self.storagechain = storage
def test(self, path: str):
self.transferhis.history.clear()
self.do_transfer(
force=False,
background=False,
fileitem=self.storagechain.get_file_item(None, Path(path)),
)
return self.transferhis.history
class BluRayTest(TestCase):
def __init__(self, methodName="test"):
super().__init__(methodName)
def setUp(self) -> None:
pass
def tearDown(self) -> None:
pass
def test(self):
transfer = MockTransferChain(MockStorage(bluray_files))
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
"/FOLDER/Digimon/Digimon (2099)",
"/FOLDER/Digimon/Digimon (2199)/Digimon.2199.mp4",
],
transfer.test("/FOLDER/Digimon"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
],
transfer.test("/FOLDER/Digimon/Digimon (2055)"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
],
transfer.test("/FOLDER/Digimon/Digimon (2055)/BDMV"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
],
transfer.test("/FOLDER/Digimon/Digimon (2055)/BDMV/STREAM"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
],
transfer.test("/FOLDER/Digimon/Digimon (2055)/BDMV/STREAM/00001.m2ts"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2199)/Digimon.2199.mp4",
],
transfer.test("/FOLDER/Digimon/Digimon (2199)"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2199)/Digimon.2199.mp4",
],
transfer.test("/FOLDER/Digimon/Digimon (2199)/Digimon.2199.mp4"),
)
self.assertEqual(
[
"/FOLDER/Pokemon.2029.mp4",
],
transfer.test("/FOLDER/Pokemon.2029.mp4"),
)
self.assertEqual(
[
"/FOLDER/Digimon/Digimon (2055)",
"/FOLDER/Digimon/Digimon (2099)",
"/FOLDER/Digimon/Digimon (2199)/Digimon.2199.mp4",
"/FOLDER/Pokemon (2016)",
"/FOLDER/Pokemon (2021)",
"/FOLDER/Pokemon (2028)/Pokemon.2028.mkv",
"/FOLDER/Pokemon.2029.mp4",
],
transfer.test("/FOLDER"),
)

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.5.6'
FRONTEND_VERSION = 'v2.5.6'
APP_VERSION = 'v2.5.8'
FRONTEND_VERSION = 'v2.5.8'