Compare commits

...

27 Commits

Author SHA1 Message Date
jxxghp
abbd2253ad fix deadlock 2026-01-21 22:46:04 +08:00
jxxghp
46466624ae fix:优化下载器整理控制逻辑 2026-01-21 22:21:17 +08:00
jxxghp
0ba8d51b2a fix:优化下载器整理 2026-01-21 21:31:55 +08:00
jxxghp
a1408ee18f feat:TRANSFER_THREADS 变更监听 2026-01-21 20:46:34 +08:00
jxxghp
58030bbcff fix #5392 2026-01-21 20:12:05 +08:00
jxxghp
e1b3e6ef01 fix:只有媒体文件整完成才触发事件,以保持与历史一致 2026-01-21 20:07:18 +08:00
jxxghp
298a6ba8ab 更新 update_subscribe.py 2026-01-21 19:36:12 +08:00
jxxghp
e5bf47629f 更新 config.py 2026-01-21 19:13:36 +08:00
jxxghp
ea29ee9f66 Merge pull request #5390 from xiaoQQya/develop 2026-01-21 18:39:06 +08:00
jxxghp
868c2254de v2.9.5 2026-01-21 17:59:52 +08:00
jxxghp
567522c87a fix:统一调整文件类型支持 2026-01-21 17:59:18 +08:00
jxxghp
25fd47f57b Merge pull request #5389 from hyuan280/v2 2026-01-21 17:22:27 +08:00
hyuan280
f89d6342d1 fix: 修复Cookie解码二进制数据导致请求发送时UnicodeEncodeError 2026-01-21 16:36:28 +08:00
jxxghp
b02affdea3 Merge pull request #5388 from cddjr/fix_tmdb_img_url 2026-01-21 13:24:39 +08:00
景大侠
6e5ade943b 修复 订阅无法查看文件列表的问题
TMDB图片路径参数增加空值检查
2026-01-21 12:47:39 +08:00
jxxghp
a6ed0c0d00 fix:优化transhandler线程安全 2026-01-21 08:42:57 +08:00
jxxghp
68402aadd7 fix:去除文件操作全局锁 2026-01-21 08:31:51 +08:00
jxxghp
85cacd447b feat: 为文件整理服务引入多线程处理并优化进度管理。 2026-01-21 08:16:02 +08:00
xiaoQQya
11262b321a fix(rousi pro): 修复 Rousi Pro 站点未读消息未推送通知的问题 2026-01-20 22:12:31 +08:00
jxxghp
bf290f063d Merge pull request #5386 from PKC278/v2 2026-01-20 22:09:02 +08:00
PKC278
7ac0fbaf76 fix(otp): 修正 OTP 关闭逻辑 2026-01-20 19:53:59 +08:00
PKC278
7489c76722 feat(passkey): 允许在未开启 OTP 时注册通行密钥 2026-01-20 19:35:36 +08:00
jxxghp
bcdf1b6efe 更新 transhandler.py 2026-01-20 15:29:28 +08:00
jxxghp
8a9dbe212c Merge pull request #5385 from cddjr/feature_optimize_transfer 2026-01-20 15:25:38 +08:00
景大侠
16bd71a6cb 优化整理代码效率、减少额外递归 2026-01-20 14:38:41 +08:00
jxxghp
71caad0655 feat:优化蓝光目录判断,减少目录遍历 2026-01-20 13:38:52 +08:00
jxxghp
2c62ffe34a feat:优化字幕和音频文件整理方式 2026-01-20 13:24:35 +08:00
21 changed files with 721 additions and 691 deletions

View File

@@ -29,7 +29,7 @@ class UpdateSubscribeInput(BaseModel):
include: Optional[str] = Field(None, description="Include filter as regular expression (optional)")
exclude: Optional[str] = Field(None, description="Exclude filter as regular expression (optional)")
filter: Optional[str] = Field(None, description="Filter rule as regular expression (optional)")
state: Optional[str] = Field(None, description="Subscription state: 'R' for enabled, 'P' for disabled, 'S' for paused (optional)")
state: Optional[str] = Field(None, description="Subscription state: 'R' for enabled, 'P' for pending, 'S' for stoped (optional)")
sites: Optional[List[int]] = Field(None, description="List of site IDs to search from (optional)")
downloader: Optional[str] = Field(None, description="Downloader name (optional)")
save_path: Optional[str] = Field(None, description="Save path for downloaded files (optional)")

View File

@@ -161,9 +161,9 @@ async def otp_disable(
current_user: User = Depends(get_current_active_user_async)
) -> Any:
"""关闭当前用户的 OTP 验证功能"""
# 安全检查:如果存在 PassKey不允许关闭 OTP
# 安全检查:如果存在 PassKey默认不允许关闭 OTP,除非配置允许
has_passkey = await _check_user_has_passkey(db, current_user.id)
if has_passkey:
if has_passkey and not settings.PASSKEY_ALLOW_REGISTER_WITHOUT_OTP:
return schemas.Response(
success=False,
message="您已注册通行密钥,为了防止域名配置变更导致无法登录,请先删除所有通行密钥再关闭 OTP 验证"
@@ -207,8 +207,8 @@ def passkey_register_start(
) -> Any:
"""开始注册 PassKey - 生成注册选项"""
try:
# 安全检查:必须先启用 OTP
if not current_user.is_otp:
# 安全检查:默认需要先启用 OTP,除非配置允许在未启用 OTP 时注册
if not current_user.is_otp and not settings.PASSKEY_ALLOW_REGISTER_WITHOUT_OTP:
return schemas.Response(
success=False,
message="为了确保在域名配置错误时仍能找回访问权限,请先启用 OTP 验证码再注册通行密钥"

View File

@@ -167,7 +167,7 @@ def rename(fileitem: schemas.FileItem,
# 重命名目录内文件
if recursive:
transferchain = TransferChain()
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIO_TRACK_EXT
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 递归修改目录内文件(智能识别命名)
sub_files: List[schemas.FileItem] = StorageChain().list_files(fileitem)
if sub_files:

View File

@@ -163,7 +163,8 @@ async def get_user_global_setting(_: User = Depends(get_current_active_user_asyn
include={
"RECOGNIZE_SOURCE",
"SEARCH_SOURCE",
"AI_RECOMMEND_ENABLED"
"AI_RECOMMEND_ENABLED",
"PASSKEY_ALLOW_REGISTER_WITHOUT_OTP"
}
)
# 智能助手总开关未开启智能推荐状态强制返回False

View File

@@ -332,9 +332,10 @@ class DownloadChain(ChainBase):
if not file_meta.begin_episode \
or file_meta.begin_episode not in episodes:
continue
# 只处理视频格式
# 只处理视频、字幕格式
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if not Path(file).suffix \
or Path(file).suffix.lower() not in settings.RMT_MEDIAEXT:
or Path(file).suffix.lower() not in media_exts:
continue
files_to_add.append({
"download_hash": _hash,

View File

@@ -139,7 +139,11 @@ class StorageChain(ChainBase):
"""
if not fileitem or fileitem.type != "dir":
return False
return self.contains_bluray_subdirectories(self.list_files(fileitem))
if self.get_file_item(storage=fileitem.storage, path=Path(fileitem.path) / "BDMV"):
return True
if self.get_file_item(storage=fileitem.storage, path=Path(fileitem.path) / "CERTIFICATE"):
return True
return False
@staticmethod
def contains_bluray_subdirectories(fileitems: Optional[List[schemas.FileItem]]) -> bool:
@@ -147,10 +151,10 @@ class StorageChain(ChainBase):
判断是否包含蓝光必备的文件夹
"""
required_files = ("BDMV", "CERTIFICATE")
for item in fileitems or []:
if item.type == "dir" and item.name in required_files:
return True
return False
return any(
item.type == "dir" and item.name in required_files
for item in fileitems or []
)
def delete_media_file(self, fileitem: schemas.FileItem, delete_self: bool = True) -> bool:
"""

View File

@@ -4,7 +4,6 @@ import threading
import traceback
from copy import deepcopy
from pathlib import Path
from time import sleep
from typing import List, Optional, Tuple, Union, Dict, Callable
from app import schemas
@@ -31,12 +30,16 @@ from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeForm
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType
from app.utils.mixins import ConfigReloadMixin
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
# 下载器锁
downloader_lock = threading.Lock()
# 作业锁
job_lock = threading.Lock()
# 任务锁
task_lock = threading.Lock()
@@ -148,7 +151,7 @@ class JobManager:
def running_task(self, task: TransferTask):
"""
任务运行中
设置任务运行中
"""
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -162,7 +165,7 @@ class JobManager:
def finish_task(self, task: TransferTask):
"""
任务完成
设置任务完成
"""
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -176,7 +179,7 @@ class JobManager:
def fail_task(self, task: TransferTask):
"""
任务失败
设置任务失败
"""
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -195,7 +198,7 @@ class JobManager:
def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]:
"""
移除所有作业中的整理任务
移除任务
"""
with job_lock:
for mediaid in list(self._job_view):
@@ -218,9 +221,8 @@ class JobManager:
"""
移除作业
"""
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
with job_lock:
# 移除作业
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
if __mediaid__ in self._job_view:
# 移除季集信息
if __mediaid__ in self._season_episodes:
@@ -230,7 +232,7 @@ class JobManager:
def is_done(self, task: TransferTask) -> bool:
"""
检查某项作业是否整理完成(不管成功还是失败)
检查任务对应的作业是否整理完成(不管成功还是失败)
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
@@ -253,7 +255,7 @@ class JobManager:
def is_finished(self, task: TransferTask) -> bool:
"""
检查某项作业是否已完成且有成功的记录
检查任务对应的作业是否已完成且有成功的记录
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
@@ -279,7 +281,7 @@ class JobManager:
def is_success(self, task: TransferTask) -> bool:
"""
检查某项作业是否全部成功
检查任务对应的作业是否全部成功
"""
__metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season)
__mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season)
@@ -300,49 +302,65 @@ class JobManager:
media_success = True
return meta_success and media_success
def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool:
"""
判断是否有任务正在处理
"""
if mediainfo:
__mediaid__ = self.__get_media_id(media=meta, season=season)
if __mediaid__ in self._job_view:
return True
__metaid__ = self.__get_meta_id(meta=meta, season=season)
return __metaid__ in self._job_view
def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取某项任务成功的任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
if __mediaid__ not in self._job_view:
return []
return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]
def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]:
"""
获取全部任务
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return []
return self._job_view[__mediaid__].tasks
def count(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取某项任务成功总数
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
# 计算状态为完成的任务数
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
if __mediaid__ not in self._job_view:
return 0
return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"])
def size(self, media: MediaInfo, season: Optional[int] = None) -> int:
"""
获取某项任务成功文件总大小
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
# 计算状态为完成的任务数
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
if __mediaid__ not in self._job_view:
return 0
return sum([
task.fileitem.size if task.fileitem.size is not None
else (
SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0)
for task in self._job_view[__mediaid__].tasks
if task.state == "completed"
])
def total(self) -> int:
"""
获取所有task任务总数
"""
with job_lock:
return sum([len(job.tasks) for job in self._job_view.values()])
return sum([len(job.tasks) for job in self._job_view.values()])
def list_jobs(self) -> List[TransferJob]:
"""
@@ -355,39 +373,105 @@ class JobManager:
获取季集清单
"""
__mediaid__ = self.__get_media_id(media=media, season=season)
with job_lock:
return self._season_episodes.get(__mediaid__) or []
return self._season_episodes.get(__mediaid__) or []
class TransferChain(ChainBase, metaclass=Singleton):
class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
"""
文件整理处理链
"""
CONFIG_WATCH = {
"TRANSFER_THREADS",
}
def __init__(self):
super().__init__()
# 可处理的文件后缀
self.all_exts = settings.RMT_MEDIAEXT
# 主要媒体文件后缀
self._media_exts = settings.RMT_MEDIAEXT
# 附加文件后缀
self._extra_exts = settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 可处理的文件后缀(视频文件、字幕、音频文件)
self._allowed_exts = self._media_exts + self._extra_exts
# 待整理任务队列
self._queue = queue.Queue()
# 文件整理线程
self._transfer_thread = None
self._transfer_threads = []
# 队列间隔时间(秒)
self._transfer_interval = 15
# 事件管理器
self.jobview = JobManager()
# 转移成功的文件清单
self._success_target_files: Dict[str, List[str]] = {}
# 整理进度进度
self._progress = ProgressHelper(ProgressKey.FileTransfer)
# 队列相关状态
self._threads = []
self._queue_active = False
self._active_tasks = 0
self._processed_num = 0
self._fail_num = 0
self._total_num = 0
# 启动整理任务
self.__init()
def __init(self):
"""
初始化
启动文件整理线程
"""
# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()
self._queue_active = True
for i in range(settings.TRANSFER_THREADS):
logger.info(f"启动文件整理线程 {i + 1} ...")
thread = threading.Thread(target=self.__start_transfer,
name=f"transfer-{i}",
daemon=True)
self._threads.append(thread)
thread.start()
def __stop(self):
"""
停止文件整理进程
"""
self._queue_active = False
for thread in self._threads:
thread.join()
self._threads = []
logger.info("文件整理线程已停止")
def on_config_changed(self):
self.__stop()
self.__init()
def __is_allowed_file(self, fileitem: FileItem) -> bool:
"""
判断是否允许的扩展名
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._allowed_exts else False
def __is_extra_file(self, fileitem: FileItem) -> bool:
"""
判断是否额外的扩展名
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._extra_exts else False
def __is_media_file(self, fileitem: FileItem) -> bool:
"""
判断是否为主要媒体文件
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._media_exts else False
@staticmethod
def __is_allow_filesize(fileitem: FileItem, min_filesize: int) -> bool:
"""
判断是否满足最小文件大小
"""
return True if not min_filesize or (fileitem.size or 0) > min_filesize * 1024 * 1024 else False
def __default_callback(self, task: TransferTask,
transferinfo: TransferInfo, /) -> Tuple[bool, str]:
@@ -395,7 +479,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
整理完成后处理
"""
def __do_finished():
def __all_finished():
"""
完成时发送消息、刮削事件、移除任务等
"""
@@ -405,7 +489,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
# 更新文件清单
transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, [])
with job_lock:
transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, [])
# 发送通知,实时手动整理时不发
if transferinfo.need_notify and (task.background or not task.manual):
se_str = None
@@ -420,8 +506,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
transferinfo=transferinfo,
season_episode=se_str,
username=task.username)
# 刮削事件
if transferinfo.need_scrape:
if transferinfo.need_scrape and self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
@@ -429,6 +516,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
'file_list': transferinfo.file_list_new,
'overwrite': False
})
# 移除已完成的任务
self.jobview.remove_job(task)
@@ -436,6 +524,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not transferinfo.success:
# 转移失败
logger.warn(f"{task.fileitem.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
transferhis.add_fail(
fileitem=task.fileitem,
@@ -446,6 +535,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 整理失败事件
if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferFailed, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
# 发送失败消息
self.post_message(Notification(
mtype=NotificationType.Manual,
@@ -455,19 +556,23 @@ class TransferChain(ChainBase, metaclass=Singleton):
username=task.username,
link=settings.MP_DOMAIN('#/history')
))
# 整理失败
self.jobview.fail_task(task)
with task_lock:
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
__do_finished()
# 全部整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送消息、刮削事件、移除任务
__all_finished()
return False, transferinfo.message
# 转移成功
# task转移成功
self.jobview.finish_task(task)
logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}")
# 新增转移成功历史记录
# 新增task转移成功历史记录
transferhis.add_success(
fileitem=task.fileitem,
mode=transferinfo.transfer_type if transferinfo else '',
@@ -478,43 +583,55 @@ class TransferChain(ChainBase, metaclass=Singleton):
transferinfo=transferinfo
)
# 整理完成事件
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
# task整理完成事件
if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
with task_lock:
# 登记转移成功文件清单
target_dir_path = transferinfo.target_diritem.path
target_files = transferinfo.file_list_new
# task登记转移成功文件清单
target_dir_path = transferinfo.target_diritem.path
target_files = transferinfo.file_list_new
with job_lock:
if self._success_target_files.get(target_dir_path):
self._success_target_files[target_dir_path].extend(target_files)
else:
self._success_target_files[target_dir_path] = target_files
# 全部整理成功时
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
storagechain = StorageChain()
# 获取整理屏蔽词
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
for t in tasks:
if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader,
transfer_exclude_words):
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash}")
if t.fileitem:
storagechain.delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
__do_finished()
# 全部整理成功时
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 获取整理屏蔽词
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
for t in tasks:
if t.download_hash and self._can_delete_torrent(t.download_hash, t.downloader,
transfer_exclude_words):
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash}")
if t.fileitem:
StorageChain().delete_media_file(t.fileitem, delete_self=False)
# 全部整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送消息、刮削事件、移除任务
__all_finished()
# 全部整理完成不管成功还是失败
if self.jobview.is_done(task):
# 所有任务
tasks = self.jobview.all_tasks()
for t in tasks:
if t.download_hash:
# 设置种子状态为已整理
self.transfer_completed(hashs=t.download_hash, downloader=t.downloader)
return True, ""
@@ -537,8 +654,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
添加到作业视图
"""
with task_lock:
self.jobview.add_task(task)
self.jobview.add_task(task)
def remove_from_queue(self, fileitem: FileItem):
"""
@@ -552,75 +668,88 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
处理队列
"""
# 队列开始标识
__queue_start = True
# 任务总数
total_num = 0
# 已处理总数
processed_num = 0
# 失败数量
fail_num = 0
progress = ProgressHelper(ProgressKey.FileTransfer)
while not global_vars.is_system_stopped:
while not global_vars.is_system_stopped and self._queue_active:
try:
item: TransferQueue = self._queue.get(block=False)
if item:
task = item.task
if not task:
continue
# 文件信息
fileitem = task.fileitem
# 开始新队列
if __queue_start:
item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval)
if not item:
continue
task = item.task
if not task:
self._queue.task_done()
continue
# 文件信息
fileitem = task.fileitem
with task_lock:
# 获取当前最新总数
current_total = self.jobview.total()
# 更新总数,取当前总数和当前已处理+运行中+队列中的最大值
self._total_num = max(self._total_num, current_total)
# 如果当前没有在运行的任务且处理数为0说明是一个新序列的开始
if self._active_tasks == 0 and self._processed_num == 0:
logger.info("开始整理队列处理...")
# 启动进度
progress.start()
self._progress.start()
# 重置计数
processed_num = 0
fail_num = 0
total_num = self.jobview.total()
__process_msg = f"开始整理队列处理,当前共 {total_num} 个文件 ..."
self._processed_num = 0
self._fail_num = 0
__process_msg = f"开始整理队列处理,当前共 {self._total_num} 个文件 ..."
logger.info(__process_msg)
progress.update(value=0,
text=__process_msg)
# 队列已开始
__queue_start = False
self._progress.update(value=0,
text=__process_msg)
# 增加运行中的任务数
self._active_tasks += 1
try:
# 更新进度
__process_msg = f"正在整理 {fileitem.name} ..."
logger.info(__process_msg)
progress.update(value=processed_num / total_num * 100,
text=__process_msg,
data={})
with task_lock:
self._progress.update(
value=(self._processed_num / self._total_num * 100) if self._total_num else 0,
text=__process_msg)
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state:
# 任务失败
fail_num += 1
# 更新进度
processed_num += 1
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
progress.update(value=(processed_num / total_num) * 100,
text=__process_msg,
data={})
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
progress.update(value=100,
text=__end_msg)
progress.end()
# 重置计数
processed_num = 0
fail_num = 0
# 标记为新队列
__queue_start = True
# 等待一定时间,以让其他任务加入队列
sleep(self._transfer_interval)
with task_lock:
if not state:
# 任务失败
self._fail_num += 1
# 更新进度
self._processed_num += 1
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
self._progress.update(
value=(self._processed_num / self._total_num * 100) if self._total_num else 100,
text=__process_msg)
except Exception as e:
logger.error(f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}")
with task_lock:
self._processed_num += 1
self._fail_num += 1
finally:
self._queue.task_done()
with task_lock:
# 减少运行中的任务数
self._active_tasks -= 1
# 检查是否所有任务都已完成且队列为空
if self._active_tasks == 0 and self._queue.empty():
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {self._processed_num} 个文件,失败 {self._fail_num}"
logger.info(__end_msg)
self._progress.update(value=100,
text=__end_msg)
self._progress.end()
# 重置计数
self._processed_num = 0
self._fail_num = 0
except queue.Empty:
# 即使队列空了,如果还有任务在运行,也不应该结束进度
# 这部分逻辑已经在 finally 的 active_tasks == 0 中处理了
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}")
@@ -670,7 +799,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。",
text=f"回复:\n```\n/redo {his.id} [tmdbid]|[类型]\n```\n手动识别整理。",
username=task.username,
link=settings.MP_DOMAIN('#/history')
))
@@ -776,9 +905,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
finally:
# 移除已完成的任务
with task_lock:
if self.jobview.is_done(task):
self.jobview.remove_job(task)
if self.jobview.is_done(task):
self.jobview.remove_job(task)
def get_queue_tasks(self) -> List[TransferJob]:
"""
@@ -799,15 +927,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
获取下载器中的种子列表,并执行整理
"""
# 全局锁,避免重复处理
# 全局锁,避免定时服务重复
with downloader_lock:
# 获取下载器监控目录
download_dirs = DirectoryHelper().get_download_dirs()
# 如果没有下载器监控的目录则不处理
if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local"
for dir_info in download_dirs):
return True
logger.info("开始整理下载器中已经完成下载的文件 ...")
# 从下载器获取种子列表
@@ -822,11 +951,13 @@ class TransferChain(ChainBase, metaclass=Singleton):
for torrent in torrents:
if global_vars.is_system_stopped:
break
# 文件路径
file_path = torrent.path
if not file_path.exists():
logger.warn(f"文件不存在:{file_path}")
continue
# 检查是否为下载器监控目录中的文件
is_downloader_monitor = False
for dir_info in download_dirs:
@@ -840,15 +971,25 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not is_downloader_monitor:
logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理")
continue
# 查询下载记录识别情况
downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash(torrent.hash)
if downloadhis:
# 获取自定义识别词
custom_words_list = None
if downloadhis.custom_words:
custom_words_list = downloadhis.custom_words.split('\n')
# 类型
try:
mtype = MediaType(downloadhis.type)
except ValueError:
mtype = MediaType.TV
# 按TMDBID识别
# 识别元数据
metainfo = MetaInfoPath(file_path, custom_words=custom_words_list)
# 识别媒体信息
mediainfo = self.recognize_media(mtype=mtype,
tmdbid=downloadhis.tmdbid,
doubanid=downloadhis.doubanid,
@@ -859,12 +1000,19 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 更新自定义媒体类别
if downloadhis.media_category:
mediainfo.category = downloadhis.media_category
else:
# 非MoviePilot下载的任务按文件识别
metainfo = MetaInfoPath(file_path)
mediainfo = None
# 执行实时整理,匹配源目录
state, errmsg = self.do_transfer(
# 检查是否已经有任务处理中,如有则跳过本次整理
if self.jobview.has_tasks(meta=metainfo, mediainfo=mediainfo):
logger.info(f"有任务正在整理中,跳过本次整理 ...")
return False
# 执行异步整理,匹配源目录
self.do_transfer(
fileitem=FileItem(
storage="local",
path=file_path.as_posix() + ("/" if file_path.is_dir() else ""),
@@ -875,30 +1023,23 @@ class TransferChain(ChainBase, metaclass=Singleton):
),
mediainfo=mediainfo,
downloader=torrent.downloader,
download_hash=torrent.hash,
background=False,
download_hash=torrent.hash
)
# 设置下载任务状态
if not state:
logger.warn(f"整理下载器任务失败:{torrent.hash} - {errmsg}")
self.transfer_completed(hashs=torrent.hash, downloader=torrent.downloader)
finally:
torrents.clear()
del torrents
# 结束
logger.info("所有下载器中下载完成的文件已整理完成")
return True
def __get_trans_fileitems(
self, fileitem: FileItem, depth: int = 1
self, fileitem: FileItem, check: bool = True
) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
:param fileitem: 文件项
:param depth: 递归深度默认为1
:param check: 检查文件是否存在默认为True
"""
storagechain = StorageChain()
@@ -917,44 +1058,40 @@ class TransferChain(ChainBase, metaclass=Singleton):
return storagechain.get_file_item(storage=_storage, path=p.parent)
return None
latest_fileitem = storagechain.get_item(fileitem)
if not latest_fileitem:
logger.warn(f"目录或文件不存在:{fileitem.path}")
return []
# 确保从历史记录重新整理时 能获得最新的源文件大小、修改日期等
fileitem = latest_fileitem
if check:
latest_fileitem = storagechain.get_item(fileitem)
if not latest_fileitem:
logger.warn(f"目录或文件不存在:{fileitem.path}")
return []
# 确保从历史记录重新整理时 能获得最新的源文件大小、修改日期等
fileitem = latest_fileitem
# 蓝光原盘子目录或文件
# 是否蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
dir_item = __get_bluray_dir(fileitem.storage, Path(fileitem.path))
if dir_item:
if dir_item := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
# 返回该文件所在的原盘根目录
return [(dir_item, True)]
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
# 蓝光原盘根目录
sub_items = storagechain.list_files(fileitem) or []
# 是否蓝光原盘根目录
sub_items = storagechain.list_files(fileitem, recursion=False) or []
if storagechain.contains_bluray_subdirectories(sub_items):
# 当前目录是原盘根目录,不需要递归
return [(fileitem, True)]
# 需要整理的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
for sub_dir in sub_items if depth >= 1 else []:
if sub_dir.type == "dir":
trans_items.extend(self.__get_trans_fileitems(sub_dir, depth=depth - 1))
if not trans_items:
# 没有有效子目录,直接整理当前目录
trans_items.append((fileitem, False))
else:
# 有子目录时,把当前目录的文件添加到整理任务中
if sub_items:
trans_items.extend([(f, False) for f in sub_items if f.type == "file"])
return trans_items
# 不是原盘根目录 递归获取目录内需要整理的文件项列表
return [
item
for sub_item in sub_items
for item in (
self.__get_trans_fileitems(sub_item, check=False)
if sub_item.type == "dir"
else [(sub_item, False)]
)
]
def do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
@@ -989,19 +1126,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param continue_callback: 继续处理回调
返回:成功标识,错误信息
"""
def __is_allow_extensions(_ext: str) -> bool:
"""
判断是否允许的扩展名
"""
return True if not self.all_exts or f".{_ext.lower()}" in self.all_exts else False
def __is_allow_filesize(_size: int, _min_filesize: int) -> bool:
"""
判断是否满足最小文件大小
"""
return True if not _min_filesize or _size > _min_filesize * 1024 * 1024 else False
# 是否全部成功
all_success = True
@@ -1015,34 +1139,22 @@ class TransferChain(ChainBase, metaclass=Singleton):
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
# 汇总错误信息
err_msgs: List[str] = []
# 待整理目录或文件项
trans_items = self.__get_trans_fileitems(
fileitem, depth=2 # 为解决 issue#4371 深度至少需要>=2
)
# 待整理的文件列表
file_items: List[Tuple[FileItem, bool]] = []
# 递归获取待整理的文件/目录列表
file_items = self.__get_trans_fileitems(fileitem)
if not trans_items:
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 转换为所有待处理的文件清单
for trans_item, bluray_dir in trans_items:
# 如果是目录且不是⼀蓝光原盘,获取所有文件并整理
if trans_item.type == "dir" and not bluray_dir:
# 遍历获取下载目录所有文件(递归)
if files := StorageChain().list_files(trans_item, recursion=True):
file_items.extend([(file, False) for file in files])
else:
file_items.append((trans_item, bluray_dir))
# 有集自定义格式,过滤文件
if formaterHandler:
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
# 过滤后缀和大小
file_items = [f for f in file_items if f[1] # 蓝光目录不过滤
or __is_allow_extensions(f[0].extension) and __is_allow_filesize(f[0].size, min_filesize)]
# 过滤后缀和大小(蓝光目录、附加文件不过滤大小)
file_items = [f for f in file_items if f[1] or
self.__is_extra_file(f[0]) or
(self.__is_allowed_file(f[0]) and self.__is_allow_filesize(f[0], min_filesize))]
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
@@ -1469,7 +1581,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
for file in torrent_files:
file_path = save_path / file.name
# 如果存在未被屏蔽的媒体文件,则不删除种子
if (file_path.suffix in self.all_exts
if (file_path.suffix in self._allowed_exts
and not self._is_blocked_by_exclude_words(file_path.as_posix(), transfer_exclude_words)
and file_path.exists()):
return False

View File

@@ -219,7 +219,7 @@ class ConfigModel(BaseModel):
AUTO_UPDATE_RESOURCE: bool = True
# ==================== 媒体文件格式配置 ====================
# 支持的后缀格式
# 支持的视频文件后缀格式
RMT_MEDIAEXT: list = Field(
default_factory=lambda: ['.mp4', '.mkv', '.ts', '.iso',
'.rmvb', '.avi', '.mov', '.mpeg',
@@ -230,8 +230,6 @@ class ConfigModel(BaseModel):
# 支持的字幕文件后缀格式
RMT_SUBEXT: list = Field(default_factory=lambda: ['.srt', '.ass', '.ssa', '.sup'])
# 支持的音轨文件后缀格式
RMT_AUDIO_TRACK_EXT: list = Field(default_factory=lambda: ['.mka'])
# 音轨文件后缀格式
RMT_AUDIOEXT: list = Field(
default_factory=lambda: ['.aac', '.ac3', '.amr', '.caf', '.cda', '.dsf',
'.dff', '.kar', '.m4a', '.mp1', '.mp2', '.mp3',
@@ -305,6 +303,8 @@ class ConfigModel(BaseModel):
COOKIECLOUD_BLACKLIST: Optional[str] = None
# ==================== 整理配置 ====================
# 文件整理线程数
TRANSFER_THREADS: int = 1
# 电影重命名格式
MOVIE_RENAME_FORMAT: str = "{{title}}{% if year %} ({{year}}){% endif %}" \
"/{{title}}{% if year %} ({{year}}){% endif %}{% if part %}-{{part}}{% endif %}{% if videoFormat %} - {{videoFormat}}{% endif %}" \
@@ -395,6 +395,8 @@ class ConfigModel(BaseModel):
SECURITY_IMAGE_SUFFIXES: list = Field(default=[".jpg", ".jpeg", ".png", ".webp", ".gif", ".svg", ".avif"])
# PassKey 是否强制用户验证(生物识别等)
PASSKEY_REQUIRE_UV: bool = True
# 允许在未启用 OTP 时直接注册 PassKey
PASSKEY_ALLOW_REGISTER_WITHOUT_OTP: bool = False
# ==================== 工作流配置 ====================
# 工作流数据共享
@@ -849,14 +851,18 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
rename_format = re.sub(r'/+', '/', rename_format)
return rename_format.strip("/")
def TMDB_IMAGE_URL(self, file_path: str, file_size: str = "original") -> str:
def TMDB_IMAGE_URL(
self, file_path: Optional[str], file_size: str = "original"
) -> Optional[str]:
"""
获取TMDB图片网址
:param file_path: TMDB API返回的xxx_path
:param file_size: 图片大小,例如:'original', 'w500'
:return: 图片的完整URL
:return: 图片的完整URL,如果 file_path 为空则返回 None
"""
if not file_path:
return None
return (
f"https://{self.TMDB_IMAGE_DOMAIN}/t/p/{file_size}/{file_path.removeprefix('/')}"
)

View File

@@ -301,7 +301,8 @@ class MetaVideo(MetaBase):
return
else:
# 后缀名不要
if ".%s".lower() % token in settings.RMT_MEDIAEXT:
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if ".%s".lower() % token in media_exts:
return
# 英文或者英文+数字,拼装起来
if self.en_name:

View File

@@ -25,7 +25,8 @@ def MetaInfo(title: str, subtitle: Optional[str] = None, custom_words: List[str]
# 获取标题中媒体信息
title, metainfo = find_metainfo(title)
# 判断是否处理文件
if title and Path(title).suffix.lower() in settings.RMT_MEDIAEXT:
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if title and Path(title).suffix.lower() in media_exts:
isfile = True
# 去掉后缀
title = Path(title).stem

View File

@@ -126,7 +126,7 @@ class LocalStorage(StorageBase):
return None
path_obj = Path(fileitem.path) / name
if not path_obj.exists():
path_obj.mkdir(parents=True)
path_obj.mkdir(parents=True, exist_ok=True)
return self.__get_diritem(path_obj)
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:

View File

@@ -45,7 +45,7 @@ class Rclone(StorageBase):
logger.info(f"【rclone】配置写入文件{filepath}")
path = Path(filepath)
if not path.parent.exists():
path.parent.mkdir(parents=True)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(conf.get('content'), encoding='utf-8')
@staticmethod

View File

@@ -1,6 +1,5 @@
import re
from pathlib import Path
from threading import Lock
from typing import Optional, List, Tuple
from jinja2 import Template
@@ -19,53 +18,43 @@ from app.schemas import TransferInfo, TmdbEpisode, TransferDirectoryConf, FileIt
from app.schemas.types import MediaType, ChainEventType
from app.utils.system import SystemUtils
lock = Lock()
class TransHandler:
"""
文件转移整理类
"""
inner_lock: Lock = Lock()
def __init__(self):
self.result = None
pass
def __reset_result(self):
@staticmethod
def __update_result(result: TransferInfo, **kwargs):
"""
重置结果
更新结果
"""
self.result = TransferInfo()
def __set_result(self, **kwargs):
"""
设置结果
"""
with self.inner_lock:
# 设置值
for key, value in kwargs.items():
if hasattr(self.result, key):
current_value = getattr(self.result, key)
if current_value is None:
current_value = value
elif isinstance(current_value, list):
if isinstance(value, list):
current_value.extend(value)
else:
current_value.append(value)
elif isinstance(current_value, dict):
if isinstance(value, dict):
current_value.update(value)
else:
current_value[key] = value
elif isinstance(current_value, bool):
current_value = value
elif isinstance(current_value, int):
current_value += (value or 0)
# 设置值
for key, value in kwargs.items():
if hasattr(result, key):
current_value = getattr(result, key)
if current_value is None:
current_value = value
elif isinstance(current_value, list):
if isinstance(value, list):
current_value.extend(value)
else:
current_value = value
setattr(self.result, key, current_value)
current_value.append(value)
elif isinstance(current_value, dict):
if isinstance(value, dict):
current_value.update(value)
else:
current_value[key] = value
elif isinstance(current_value, bool):
current_value = value
elif isinstance(current_value, int):
current_value += (value or 0)
else:
current_value = value
setattr(result, key, current_value)
def transfer_media(self,
fileitem: FileItem,
@@ -100,8 +89,32 @@ class TransHandler:
:return: TransferInfo、错误信息
"""
# 重置结果
self.__reset_result()
def __is_subtitle_file(_fileitem: FileItem) -> bool:
"""
判断是否为字幕文件
:param _fileitem: 文件项
:return: True/False
"""
if not _fileitem.extension:
return False
if f".{_fileitem.extension.lower()}" in settings.RMT_SUBEXT:
return True
return False
def __is_extra_file(_fileitem: FileItem) -> bool:
"""
判断是否为附加文件
:param _fileitem: 文件项
:return: True/False
"""
if not _fileitem.extension:
return False
if f".{_fileitem.extension.lower()}" in (settings.RMT_SUBEXT + settings.RMT_AUDIOEXT):
return True
return False
# 整理结果
result = TransferInfo()
try:
@@ -122,20 +135,20 @@ class TransHandler:
rename_format, rename_path=new_path
)
if not new_path:
self.__set_result(
self.__update_result(
result=result,
success=False,
message="重命名格式无效",
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify,
)
return self.result.model_copy()
return result
else:
new_path = target_path / fileitem.name
# 在整理目录前先尝试获取原盘大小避免整理记录出现0字节的情况
# TODO 当前只计算STREAM目录内的文件大小如果需要精确则递归完整目录
# 原盘大小只计算STREAM目录内的文件大小
if stream_fileitem := source_oper.get_item(
Path(fileitem.path) / "BDMV" / "STREAM"
Path(fileitem.path) / "BDMV" / "STREAM"
):
fileitem.size = 0
files = source_oper.list(stream_fileitem) or []
@@ -148,39 +161,43 @@ class TransHandler:
target_oper=target_oper,
target_storage=target_storage,
target_path=new_path,
transfer_type=transfer_type)
transfer_type=transfer_type,
result=result)
if not new_diritem:
logger.error(f"文件夹 {fileitem.path} 整理失败:{errmsg}")
self.__set_result(success=False,
message=errmsg,
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
self.__update_result(result=result,
success=False,
message=errmsg,
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify)
return result
logger.info(f"文件夹 {fileitem.path} 整理成功")
# 返回整理后的路径
self.__set_result(success=True,
fileitem=fileitem,
target_item=new_diritem,
target_diritem=new_diritem,
need_scrape=need_scrape,
need_notify=need_notify,
transfer_type=transfer_type)
return self.result.model_copy()
self.__update_result(result=result,
success=True,
fileitem=fileitem,
target_item=new_diritem,
target_diritem=new_diritem,
need_scrape=need_scrape,
need_notify=need_notify,
transfer_type=transfer_type)
return result
else:
# 整理单个文件
if mediainfo.type == MediaType.TV:
# 电视剧
if in_meta.begin_episode is None:
logger.warn(f"文件 {fileitem.path} 整理失败:未识别到文件集数")
self.__set_result(success=False,
message="未识别到文件集数",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
self.__update_result(result=result,
success=False,
message="未识别到文件集数",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return result
# 文件结束季为空
in_meta.end_season = None
@@ -204,11 +221,18 @@ class TransHandler:
file_ext=f".{fileitem.extension}"
)
)
# 针对字幕文件,文件名中补充额外标识信息
if __is_subtitle_file(fileitem):
new_file = self.__rename_subtitles(fileitem, new_file)
# 文件目录
folder_path = DirectoryHelper.get_media_root_path(
rename_format, rename_path=new_file
)
if not folder_path:
self.__set_result(
self.__update_result(
result=result,
success=False,
message="重命名格式无效",
fileitem=fileitem,
@@ -216,75 +240,81 @@ class TransHandler:
transfer_type=transfer_type,
need_notify=need_notify,
)
return self.result.model_copy()
return result
else:
new_file = target_path / fileitem.name
folder_path = target_path
# 判断是否要覆盖
overflag = False
# 目标目录
target_diritem = target_oper.get_folder(folder_path)
if not target_diritem:
logger.error(f"目标目录 {folder_path} 获取失败")
self.__set_result(success=False,
message=f"目标目录 {folder_path} 获取失败",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
# 目标文件
target_item = target_oper.get_item(new_file)
if target_item:
# 目标文件已存在
target_file = new_file
if target_storage == "local" and new_file.is_symlink():
target_file = new_file.readlink()
if not target_file.exists():
overflag = True
if not overflag:
self.__update_result(result=result,
success=False,
message=f"目标目录 {folder_path} 获取失败",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return result
# 判断是否要覆盖,附加文件强制覆盖
overflag = False
if not __is_extra_file(fileitem):
# 目标文件
target_item = target_oper.get_item(new_file)
if target_item:
# 目标文件已存在
logger.info(
f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
elif overwrite_mode == 'size':
# 存在时大覆盖小
if target_item.size < fileitem.size:
logger.info(f"目标文件文件大小更小,将覆盖:{new_file}")
target_file = new_file
if target_storage == "local" and new_file.is_symlink():
target_file = new_file.readlink()
if not target_file.exists():
overflag = True
else:
self.__set_result(success=False,
message=f"媒体库存在同名文件,且质量更好",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
elif overwrite_mode == 'never':
# 存在不覆盖
self.__set_result(success=False,
message=f"媒体库存在同名文件,当前覆盖模式为不覆盖",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
elif overwrite_mode == 'latest':
# 仅保留最新版本
logger.info(f"当前整理覆盖模式设置为仅保留最新版本,将覆盖:{new_file}")
overflag = True
else:
if overwrite_mode == 'latest':
# 文件不存在,但仅保留最新版本
logger.info(f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_oper, new_file)
if not overflag:
# 目标文件已存在
logger.info(
f"目的文件系统中已经存在同名文件 {target_file},当前整理覆盖模式设置为 {overwrite_mode}")
if overwrite_mode == 'always':
# 总是覆盖同名文件
overflag = True
elif overwrite_mode == 'size':
# 存在时大覆盖小
if target_item.size < fileitem.size:
logger.info(f"目标文件文件大小更小,将覆盖:{new_file}")
overflag = True
else:
self.__update_result(result=result,
success=False,
message=f"媒体库存在同名文件,且质量更好",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return result
elif overwrite_mode == 'never':
# 存在不覆盖
self.__update_result(result=result,
message=f"媒体库存在同名文件,当前覆盖模式为不覆盖",
fileitem=fileitem,
target_item=target_item,
target_diritem=target_diritem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return result
elif overwrite_mode == 'latest':
# 仅保留最新版本
logger.info(f"当前整理覆盖模式设置为仅保留最新版本,将覆盖:{new_file}")
overflag = True
else:
if overwrite_mode == 'latest':
# 文件不存在,但仅保留最新版本
logger.info(
f"当前整理覆盖模式设置为 {overwrite_mode},仅保留最新版本,正在删除已有版本文件 ...")
self.__delete_version_files(target_oper, new_file)
# 整理文件
new_item, err_msg = self.__transfer_file(fileitem=fileitem,
mediainfo=mediainfo,
@@ -293,28 +323,32 @@ class TransHandler:
transfer_type=transfer_type,
over_flag=overflag,
source_oper=source_oper,
target_oper=target_oper)
target_oper=target_oper,
result=result)
if not new_item:
logger.error(f"文件 {fileitem.path} 整理失败:{err_msg}")
self.__set_result(success=False,
message=err_msg,
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
self.__update_result(result=result,
success=False,
message=err_msg,
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
need_notify=need_notify)
return result
logger.info(f"文件 {fileitem.path} 整理成功")
self.__set_result(success=True,
fileitem=fileitem,
target_item=new_item,
target_diritem=target_diritem,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=need_notify)
return self.result.model_copy()
finally:
self.result = None
self.__update_result(result=result,
success=True,
fileitem=fileitem,
target_item=new_item,
target_diritem=target_diritem,
need_scrape=need_scrape,
transfer_type=transfer_type,
need_notify=need_notify)
return result
except Exception as e:
logger.error(f"媒体整理出错:{e}")
return TransferInfo(success=False, message=str(e))
@staticmethod
def __transfer_command(fileitem: FileItem, target_storage: str,
@@ -350,158 +384,118 @@ class TransHandler:
and fileitem.storage != "local" and target_storage != "local"):
return None, f"不支持 {fileitem.storage}{target_storage} 的文件整理"
# 加锁
with lock:
if fileitem.storage == "local" and target_storage == "local":
# 创建目录
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True)
# 本地到本地
if transfer_type == "copy":
state = source_oper.copy(fileitem, target_file.parent, target_file.name)
elif transfer_type == "move":
state = source_oper.move(fileitem, target_file.parent, target_file.name)
elif transfer_type == "link":
state = source_oper.link(fileitem, target_file)
elif transfer_type == "softlink":
state = source_oper.softlink(fileitem, target_file)
if fileitem.storage == "local" and target_storage == "local":
# 创建目录
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True, exist_ok=True)
# 本地到本地
if transfer_type == "copy":
state = source_oper.copy(fileitem, target_file.parent, target_file.name)
elif transfer_type == "move":
state = source_oper.move(fileitem, target_file.parent, target_file.name)
elif transfer_type == "link":
state = source_oper.link(fileitem, target_file)
elif transfer_type == "softlink":
state = source_oper.softlink(fileitem, target_file)
else:
return None, f"不支持的整理方式:{transfer_type}"
if state:
return __get_targetitem(target_file), ""
else:
return None, f"{fileitem.path} {transfer_type} 失败"
elif fileitem.storage == "local" and target_storage != "local":
# 本地到网盘
filepath = Path(fileitem.path)
if not filepath.exists():
return None, f"文件 {filepath} 不存在"
if transfer_type == "copy":
# 复制
# 根据目的路径创建文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
else:
return None, f"不支持的整理方式:{transfer_type}"
if state:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动
# 根据目的路径获取文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
# 删除源文件
source_oper.delete(fileitem)
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif fileitem.storage != "local" and target_storage == "local":
# 网盘到本地
if target_file.exists():
logger.warn(f"文件已存在:{target_file}")
return __get_targetitem(target_file), ""
# 网盘到本地
if transfer_type in ["copy", "move"]:
# 下载
tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent)
if tmp_file:
# 创建目录
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True, exist_ok=True)
# 将tmp_file移动后target_file
SystemUtils.move(tmp_file, target_file)
if transfer_type == "move":
# 删除源文件
source_oper.delete(fileitem)
return __get_targetitem(target_file), ""
else:
return None, f"{fileitem.path} {transfer_type} 失败"
elif fileitem.storage == "local" and target_storage != "local":
# 本地到网盘
filepath = Path(fileitem.path)
if not filepath.exists():
return None, f"文件 {filepath} 不存在"
if transfer_type == "copy":
# 复制
# 根据目的路径创建文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动
# 根据目的路径获取文件夹
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
# 上传文件
new_item = target_oper.upload(target_fileitem, filepath, target_file.name)
if new_item:
# 删除源文件
source_oper.delete(fileitem)
return new_item, ""
else:
return None, f"{fileitem.path} 上传 {target_storage} 失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif fileitem.storage != "local" and target_storage == "local":
# 网盘到本地
if target_file.exists():
logger.warn(f"文件已存在:{target_file}")
return __get_targetitem(target_file), ""
# 网盘到本地
if transfer_type in ["copy", "move"]:
# 下载
tmp_file = source_oper.download(fileitem=fileitem, path=target_file.parent)
if tmp_file:
# 创建目录
if not target_file.parent.exists():
target_file.parent.mkdir(parents=True)
# 将tmp_file移动后target_file
SystemUtils.move(tmp_file, target_file)
if transfer_type == "move":
# 删除源文件
source_oper.delete(fileitem)
return __get_targetitem(target_file), ""
else:
return None, f"{fileitem.path} {fileitem.storage} 下载失败"
elif fileitem.storage == target_storage:
# 同一网盘
if not source_oper.is_support_transtype(transfer_type):
return None, f"存储 {fileitem.storage} 不支持 {transfer_type} 整理方式"
return None, f"{fileitem.path} {fileitem.storage} 下载失败"
elif fileitem.storage == target_storage:
# 同一网盘
if not source_oper.is_support_transtype(transfer_type):
return None, f"存储 {fileitem.storage} 不支持 {transfer_type} 整理方式"
if transfer_type == "copy":
# 复制文件到新目录
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.copy(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 复制文件失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动文件到新目录
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 移动文件失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "link":
if source_oper.link(fileitem, target_file):
if transfer_type == "copy":
# 复制文件到新目录
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.copy(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 创建硬链接失败"
return None, f"{target_storage}{fileitem.path} 复制文件失败"
else:
return None, f"不支持的整理方式:{transfer_type}"
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "move":
# 移动文件到新目录
target_fileitem = target_oper.get_folder(target_file.parent)
if target_fileitem:
if source_oper.move(fileitem, Path(target_fileitem.path), target_file.name):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 移动文件失败"
else:
return None, f"{target_storage}{target_file.parent} 目录获取失败"
elif transfer_type == "link":
if source_oper.link(fileitem, target_file):
return target_oper.get_item(target_file), ""
else:
return None, f"{target_storage}{fileitem.path} 创建硬链接失败"
else:
return None, f"不支持的整理方式:{transfer_type}"
return None, "未知错误"
def __transfer_other_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
@staticmethod
def __rename_subtitles(sub_item: FileItem, new_file: Path) -> Path:
"""
根据文件名整理其他相关文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
"""
# 整理字幕
state, errmsg = self.__transfer_subtitles(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
if not state:
return False, errmsg
# 整理音轨文件
state, errmsg = self.__transfer_audio_track_files(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
return state, errmsg
def __transfer_subtitles(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
"""
根据文件名整理对应字幕文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
重命名字幕文件,补充附加信息
"""
# 字幕正则式
_zhcn_sub_re = r"([.\[(](((zh[-_])?(cn|ch[si]|sg|sc))|zho?" \
@@ -517,149 +511,33 @@ class TransHandler:
r"|(?<![a-z0-9])big5(?![a-z0-9])"
_eng_sub_re = r"[.\[(]eng[.\])]"
# 比对文件名并整理字幕
org_path = Path(fileitem.path)
# 查找上级文件项
parent_item: FileItem = source_oper.get_parent(fileitem)
if not parent_item:
return False, f"{org_path} 上级目录获取失败"
# 字幕文件列表
file_list: List[FileItem] = source_oper.list(parent_item) or []
file_list = [f for f in file_list if f.type == "file" and f.extension
and f".{f.extension.lower()}" in settings.RMT_SUBEXT]
if len(file_list) == 0:
logger.info(f"{parent_item.path} 目录下没有找到字幕文件...")
else:
logger.info(f"字幕文件清单:{[f.name for f in file_list]}")
# 识别文件名
metainfo = MetaInfoPath(org_path)
for sub_item in file_list:
# 识别字幕文件名
sub_file_name = re.sub(_zhtw_sub_re,
".",
re.sub(_zhcn_sub_re,
".",
sub_item.name,
flags=re.I),
flags=re.I)
sub_file_name = re.sub(_eng_sub_re, ".", sub_file_name, flags=re.I)
sub_metainfo = MetaInfoPath(Path(sub_item.path))
# 匹配字幕文件名
if (org_path.stem == Path(sub_file_name).stem) or \
(sub_metainfo.cn_name and sub_metainfo.cn_name == metainfo.cn_name) \
or (sub_metainfo.en_name and sub_metainfo.en_name == metainfo.en_name):
if metainfo.part and metainfo.part != sub_metainfo.part:
continue
if metainfo.season \
and metainfo.season != sub_metainfo.season:
continue
if metainfo.episode \
and metainfo.episode != sub_metainfo.episode:
continue
new_file_type = ""
# 兼容jellyfin字幕识别(多重识别), emby则会识别最后一个后缀
if re.search(_zhcn_sub_re, sub_item.name, re.I):
new_file_type = ".chi.zh-cn"
elif re.search(_zhtw_sub_re, sub_item.name,
re.I):
new_file_type = ".zh-tw"
elif re.search(_eng_sub_re, sub_item.name, re.I):
new_file_type = ".eng"
# 通过对比字幕文件大小 尽量整理所有存在的字幕
file_ext = f".{sub_item.extension}"
new_sub_tag_dict = {
".eng": ".英文",
".chi.zh-cn": ".简体中文",
".zh-tw": ".繁体中文"
}
new_sub_tag_list = [
(".default" + new_file_type if (
(settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn") or
(settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw") or
(settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")
) else new_file_type) if t == 0 else "%s%s(%s)" % (new_file_type,
new_sub_tag_dict.get(
new_file_type, ""
),
t) for t in range(6)
]
for new_sub_tag in new_sub_tag_list:
new_file: Path = target_file.with_name(target_file.stem + new_sub_tag + file_ext)
# 如果字幕文件不存在, 直接整理字幕, 并跳出循环
try:
logger.debug(f"正在处理字幕:{sub_item.name}")
new_item, errmsg = self.__transfer_command(fileitem=sub_item,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=new_file,
transfer_type=transfer_type)
if new_item:
logger.info(f"字幕 {sub_item.name} 整理完成")
self.__set_result(
subtitle_list=[sub_item.path],
subtitle_list_new=[new_item.path],
)
break
else:
logger.error(f"字幕 {sub_item.name} 整理失败:{errmsg}")
return False, errmsg
except Exception as error:
logger.info(f"字幕 {new_file} 出错了,原因: {str(error)}")
return True, ""
# 原文件后缀
file_ext = f".{sub_item.extension}"
# 新文件后缀
new_file_type = ""
def __transfer_audio_track_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
target_file: Path, transfer_type: str) -> Tuple[bool, str]:
"""
根据文件名整理对应音轨文件
:param fileitem: 源文件
:param target_storage: 目标存储
:param source_oper: 源存储操作对象
:param target_oper: 目标存储操作对象
:param target_file: 目标路径
:param transfer_type: 整理方式
"""
org_path = Path(fileitem.path)
# 查找上级文件项
parent_item: FileItem = source_oper.get_parent(fileitem)
if not parent_item:
return False, f"{org_path} 上级目录获取失败"
file_list: List[FileItem] = source_oper.list(parent_item)
# 匹配音轨文件
pending_file_list: List[FileItem] = [file for file in file_list
if Path(file.name).stem == org_path.stem
and file.type == "file" and file.extension
and f".{file.extension.lower()}" in settings.RMT_AUDIOEXT]
if len(pending_file_list) == 0:
return True, f"{parent_item.path} 目录下没有找到匹配的音轨文件"
logger.debug("音轨文件清单:" + str(pending_file_list))
for track_file in pending_file_list:
track_ext = f".{track_file.extension}"
new_track_file = target_file.with_name(target_file.stem + track_ext)
try:
logger.info(f"正在整理音轨文件:{track_file}{new_track_file}")
new_item, errmsg = self.__transfer_command(fileitem=track_file,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=new_track_file,
transfer_type=transfer_type)
if new_item:
logger.info(f"音轨文件 {org_path.name} 整理完成")
self.__set_result(
audio_list=[track_file.path],
audio_list_new=[new_item.path],
)
else:
logger.error(f"音轨文件 {org_path.name} 整理失败:{errmsg}")
except Exception as error:
logger.error(f"音轨文件 {org_path.name} 整理失败:{str(error)}")
return True, ""
# 识别字幕语言
if re.search(_zhcn_sub_re, sub_item.name, re.I):
new_file_type = ".chi.zh-cn"
elif re.search(_zhtw_sub_re, sub_item.name, re.I):
new_file_type = ".zh-tw"
elif re.search(_eng_sub_re, sub_item.name, re.I):
new_file_type = ".eng"
# 添加默认字幕标识
if ((settings.DEFAULT_SUB == "zh-cn" and new_file_type == ".chi.zh-cn")
or (settings.DEFAULT_SUB == "zh-tw" and new_file_type == ".zh-tw")
or (settings.DEFAULT_SUB == "eng" and new_file_type == ".eng")):
new_sub_tag = ".default" + new_file_type
else:
new_sub_tag = new_file_type
return new_file.with_name(new_file.stem + new_sub_tag + file_ext)
def __transfer_dir(self, fileitem: FileItem, mediainfo: MediaInfo,
source_oper: StorageBase, target_oper: StorageBase,
transfer_type: str, target_storage: str, target_path: Path) -> Tuple[Optional[FileItem], str]:
transfer_type: str, target_storage: str, target_path: Path,
result: TransferInfo) -> Tuple[Optional[FileItem], str]:
"""
整理整个文件夹
:param fileitem: 源文件
@@ -696,7 +574,8 @@ class TransHandler:
source_oper=source_oper,
target_oper=target_oper,
target_path=target_path,
transfer_type=transfer_type)
transfer_type=transfer_type,
result=result)
if state:
return target_item, errmsg
else:
@@ -704,7 +583,8 @@ class TransHandler:
def __transfer_dir_files(self, fileitem: FileItem, target_storage: str,
source_oper: StorageBase, target_oper: StorageBase,
transfer_type: str, target_path: Path) -> Tuple[bool, str]:
transfer_type: str, target_path: Path,
result: TransferInfo) -> Tuple[bool, str]:
"""
按目录结构整理目录下所有文件
:param fileitem: 源文件
@@ -725,7 +605,8 @@ class TransHandler:
source_oper=source_oper,
target_oper=target_oper,
transfer_type=transfer_type,
target_path=new_path)
target_path=new_path,
result=result)
if not state:
return False, errmsg
else:
@@ -739,7 +620,8 @@ class TransHandler:
transfer_type=transfer_type)
if not new_item:
return False, errmsg
self.__set_result(
self.__update_result(
result=result,
file_list=[item.path],
file_list_new=[new_item.path],
)
@@ -749,7 +631,8 @@ class TransHandler:
def __transfer_file(self, fileitem: FileItem, mediainfo: MediaInfo,
source_oper: StorageBase, target_oper: StorageBase,
target_storage: str, target_file: Path,
transfer_type: str, over_flag: Optional[bool] = False) -> Tuple[Optional[FileItem], str]:
transfer_type: str, result: TransferInfo,
over_flag: Optional[bool] = False) -> Tuple[Optional[FileItem], str]:
"""
整理一个文件,同时处理其他相关文件
:param fileitem: 原文件
@@ -808,19 +691,13 @@ class TransHandler:
target_file=target_file,
transfer_type=transfer_type)
if new_item:
self.__set_result(
self.__update_result(
result=result,
file_list=[fileitem.path],
file_list_new=[new_item.path],
file_count=1,
total_size=fileitem.size,
)
# 处理其他相关文件
self.__transfer_other_files(fileitem=fileitem,
target_storage=target_storage,
source_oper=source_oper,
target_oper=target_oper,
target_file=target_file,
transfer_type=transfer_type)
return new_item, errmsg
return None, errmsg
@@ -913,7 +790,8 @@ class TransHandler:
continue
if media_file.type != "file":
continue
if f".{media_file.extension.lower()}" not in settings.RMT_MEDIAEXT:
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if f".{media_file.extension.lower()}" not in media_exts:
continue
# 识别文件中的季集信息
filemeta = MetaInfoPath(media_path)

View File

@@ -216,6 +216,7 @@ class RousiSiteUserInfo(SiteParserBase):
messages.extend(res.get("messages", []))
page += 1
self.message_unread = len(messages)
for messsage in messages:
head = messsage.get("title")
date = StringUtils.unify_datetime_str(messsage.get("created_at"))

View File

@@ -80,7 +80,7 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
# 快照文件缓存
self._snapshot_cache = FileCache(base=settings.CACHE_PATH / "snapshots")
# 监控的文件扩展名
self.all_exts = settings.RMT_MEDIAEXT
self.all_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 启动目录监控和文件整理
self.init()

View File

@@ -38,8 +38,10 @@ class EventType(Enum):
SiteUpdated = "site.updated"
# 站点已刷新
SiteRefreshed = "site.refreshed"
# 转移完成
# 整理完成
TransferComplete = "transfer.complete"
# 整理失败
TransferFailed = "transfer.failed"
# 下载已添加
DownloadAdded = "download.added"
# 删除历史记录

View File

@@ -10,6 +10,7 @@ import requests
import urllib3
from requests import Response, Session
from urllib3.exceptions import InsecureRequestWarning
from urllib.parse import unquote, quote
from app.core.config import settings
from app.log import logger
@@ -17,6 +18,25 @@ from app.log import logger
urllib3.disable_warnings(InsecureRequestWarning)
def _url_decode_if_latin(original: str) -> str:
"""
解码URL编码的字符串只解码文本二进程数据保持不变
:param original: URL编码字符串
:return: 解码后的字符串或原始二进制数据
"""
try:
# 先解码
decoded = unquote(original, encoding='latin-1')
# 再完整编码
fully_encoded = quote(decoded, safe='')
# 验证
decoded_again = unquote(fully_encoded, encoding='latin-1')
if decoded_again == decoded:
return decoded
except Exception as e:
logger.error(f"latin-1解码URL编码失败{e}")
return original
def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]:
"""
解析cookie转化为字典或者数组
@@ -26,14 +46,14 @@ def cookie_parse(cookies_str: str, array: bool = False) -> Union[list, dict]:
"""
if not cookies_str:
return {}
from urllib.parse import unquote
cookie_dict = {}
cookies = cookies_str.split(";")
for cookie in cookies:
cstr = cookie.split("=", 1) # 只分割第一个=因为value可能包含=
if len(cstr) > 1:
# URL解码Cookie值但保留Cookie名不解码
cookie_dict[cstr[0].strip()] = unquote(cstr[1].strip())
cookie_dict[cstr[0].strip()] = _url_decode_if_latin(cstr[1].strip())
if array:
return [{"name": k, "value": v} for k, v in cookie_dict.items()]
return cookie_dict

View File

@@ -65,7 +65,8 @@ class ScanFileAction(BaseAction):
for file in files:
if global_vars.is_workflow_stopped(workflow_id):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
media_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
if not file.extension or f".{file.extension.lower()}" not in media_exts:
continue
# 添加文件到队列,而不是目录
self._fileitems.append(file)

View File

@@ -94,6 +94,7 @@ bluray_files = [
("Pokemon.2029.mp4", 104857600),
("Pokemon.2039.mp4", 104857600),
("Pokemon (2030)", [("S", 104857600)]),
("Pokemon (2031)", [("Pokemon (2031).mp4", 104857600)]),
],
)
]

View File

@@ -126,8 +126,9 @@ class BluRayTest(TestCase):
"/FOLDER/Pokemon (2028)/Pokemon.2028.mkv",
"/FOLDER/Pokemon.2029.mp4",
"/FOLDER/Pokemon.2039.mp4",
"/FOLDER/Pokemon (2031)/Pokemon (2031).mp4",
],
__test_do_transfer("/FOLDER"),
__test_do_transfer("/"),
)
def _test_scrape_metadata(self, mock_metadata_nfo):

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.9.4'
FRONTEND_VERSION = 'v2.9.4'
APP_VERSION = 'v2.9.5'
FRONTEND_VERSION = 'v2.9.5'