Compare commits

...

11 Commits

Author SHA1 Message Date
jxxghp
4b3f04c73f fix 目录监控控重 2024-12-31 12:42:28 +08:00
jxxghp
bb478c949a 更新 version.py 2024-12-31 07:15:18 +08:00
jxxghp
11b1003d4d fix 中入队列等待时间,以例聚合消息发送 2024-12-30 19:25:29 +08:00
jxxghp
c0ad5f2970 fix 整理队列锁 2024-12-30 19:02:16 +08:00
jxxghp
54c98cf3a1 fix 目录监控消息重复发送 2024-12-30 18:59:20 +08:00
jxxghp
dfbe8a2c0e fix 目录监控消息重复发送 2024-12-30 18:57:45 +08:00
jxxghp
873f80d534 fix 重复添加队列任务 2024-12-30 18:42:36 +08:00
jxxghp
089992db74 Merge pull request #3640 from InfinityPacer/feature/transfer 2024-12-30 07:00:53 +08:00
jxxghp
f07ab73fde Merge pull request #3639 from InfinityPacer/feature/site 2024-12-30 06:59:02 +08:00
InfinityPacer
166674bfe7 feat(transfer): match source dir in subdirs or prioritize same drive 2024-12-30 02:11:48 +08:00
InfinityPacer
adb4a8fe01 feat(site): add proxy support for site sync 2024-12-30 00:37:54 +08:00
6 changed files with 113 additions and 78 deletions

View File

@@ -319,6 +319,7 @@ class SiteChain(ChainBase):
continue continue
# 新增站点 # 新增站点
domain_url = __indexer_domain(inx=indexer, sub_domain=domain) domain_url = __indexer_domain(inx=indexer, sub_domain=domain)
proxy = False
res = RequestUtils(cookies=cookie, res = RequestUtils(cookies=cookie,
ua=settings.USER_AGENT ua=settings.USER_AGENT
).get_res(url=domain_url) ).get_res(url=domain_url)
@@ -336,16 +337,37 @@ class SiteChain(ChainBase):
logger.warn(f"站点 {indexer.get('name')} 连接状态码:{res.status_code},无法添加站点") logger.warn(f"站点 {indexer.get('name')} 连接状态码:{res.status_code},无法添加站点")
continue continue
else: else:
_fail_count += 1 if not settings.PROXY_HOST:
logger.warn(f"站点 {indexer.get('name')} 连接失败,无法添加站点") _fail_count += 1
continue logger.warn(f"站点 {indexer.get('name')} 连接失败,无法添加站点")
continue
else:
# 如果配置了代理,尝试通过代理重试
logger.info(f"站点 {indexer.get('name')} 初次连接失败,尝试通过代理重试...")
proxy = True
res = RequestUtils(cookies=cookie,
ua=settings.USER_AGENT,
proxies=settings.PROXY
).get_res(url=domain_url)
if res and res.status_code in [200, 500, 403]:
if not indexer.get("public") and not SiteUtils.is_logged_in(res.text):
logger.warn(f"站点 {indexer.get('name')} 登录失败,即使通过代理,无法添加站点")
_fail_count += 1
continue
logger.info(f"站点 {indexer.get('name')} 通过代理连接成功")
else:
logger.warn(f"站点 {indexer.get('name')} 通过代理连接失败,无法添加站点")
_fail_count += 1
continue
# 获取rss地址 # 获取rss地址
rss_url = None rss_url = None
if not indexer.get("public") and domain_url: if not indexer.get("public") and domain_url:
# 自动生成rss地址 # 自动生成rss地址
rss_url, errmsg = self.rsshelper.get_rss_link(url=domain_url, rss_url, errmsg = self.rsshelper.get_rss_link(url=domain_url,
cookie=cookie, cookie=cookie,
ua=settings.USER_AGENT) ua=settings.USER_AGENT,
proxy=proxy)
if errmsg: if errmsg:
logger.warn(errmsg) logger.warn(errmsg)
# 插入数据库 # 插入数据库
@@ -355,6 +377,7 @@ class SiteChain(ChainBase):
domain=domain, domain=domain,
cookie=cookie, cookie=cookie,
rss=rss_url, rss=rss_url,
proxy=1 if proxy else 0,
public=1 if indexer.get("public") else 0) public=1 if indexer.get("public") else 0)
_add_count += 1 _add_count += 1

View File

@@ -5,6 +5,7 @@ import traceback
from copy import deepcopy from copy import deepcopy
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from time import sleep
from typing import List, Optional, Tuple, Union, Dict, Callable from typing import List, Optional, Tuple, Union, Dict, Callable
from app import schemas from app import schemas
@@ -26,15 +27,15 @@ from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper from app.helper.progress import ProgressHelper
from app.log import logger from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \ from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey SystemConfigKey
from app.utils.singleton import Singleton from app.utils.singleton import Singleton
from app.utils.string import StringUtils from app.utils.string import StringUtils
from schemas import TransferJob, TransferJobTask
downloader_lock = threading.Lock() downloader_lock = threading.Lock()
job_lock = threading.Lock() job_lock = threading.Lock()
task_lock = threading.Lock()
class JobManager: class JobManager:
@@ -109,7 +110,7 @@ class JobManager:
""" """
if not any([task, task.meta, task.fileitem]): if not any([task, task.meta, task.fileitem]):
return return
with (job_lock): with job_lock:
__mediaid__ = self.__get_id(task) __mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view: if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = TransferJob( self._job_view[__mediaid__] = TransferJob(
@@ -124,6 +125,9 @@ class JobManager:
)] )]
) )
else: else:
# 不重复添加任务
if any([t.fileitem == task.fileitem for t in self._job_view[__mediaid__].tasks]):
return
self._job_view[__mediaid__].tasks.append( self._job_view[__mediaid__].tasks.append(
TransferJobTask( TransferJobTask(
fileitem=task.fileitem, fileitem=task.fileitem,
@@ -360,8 +364,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 文件整理线程 # 文件整理线程
_transfer_thread = None _transfer_thread = None
# 文件整理检查间隔(秒) # 队列间隔时间(秒)
_transfer_interval = 10 _transfer_interval = 15
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -441,48 +445,51 @@ class TransferChain(ChainBase, metaclass=Singleton):
'download_hash': task.download_hash, 'download_hash': task.download_hash,
}) })
# 全部整理成功时 with task_lock:
if self.jobview.is_success(task): # 全部整理成功时
# 移动模式删除空目录 if self.jobview.is_success(task):
if transferinfo.transfer_type in ["move"]: # 移动模式删除空目录
# 所有成功的业务 if transferinfo.transfer_type in ["move"]:
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) # 所有成功的业务
for t in tasks: tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 下载器hash for t in tasks:
if t.download_hash: # 下载器hash
if self.remove_torrents(t.download_hash, downloader=t.downloader): if t.download_hash:
logger.info(f"移动模式删除种子成功:{t.download_hash} ") if self.remove_torrents(t.download_hash, downloader=t.downloader):
# 删除残留目录 logger.info(f"移动模式删除种子成功:{t.download_hash} ")
if t.fileitem: # 删除残留目录
self.storagechain.delete_media_file(t.fileitem, delete_self=False) if t.fileitem:
self.storagechain.delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送通知
if transferinfo.need_notify:
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem
})
# 整理完成且有成功的任务 # 移除已完成的任务
if self.jobview.is_finished(task): self.jobview.remove_job(task)
# 发送通知
if transferinfo.need_notify:
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem
})
return True, "" return True, ""
@@ -495,7 +502,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not task: if not task:
return return
# 维护整理任务视图 # 维护整理任务视图
self.jobview.add_task(task) with task_lock:
self.jobview.add_task(task)
# 添加到队列 # 添加到队列
self._queue.put(TransferQueue( self._queue.put(TransferQueue(
task=task, task=task,
@@ -525,7 +533,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
while not global_vars.is_system_stopped: while not global_vars.is_system_stopped:
try: try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval) item: TransferQueue = self._queue.get(block=False)
if item: if item:
task = item.task task = item.task
if not task: if not task:
@@ -569,8 +577,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=__process_msg, text=__process_msg,
key=ProgressKey.FileTransfer) key=ProgressKey.FileTransfer)
# 移除已完成的任务 # 移除已完成的任务
if self.jobview.is_done(task): with task_lock:
self.jobview.remove_job(task) if self.jobview.is_done(task):
self.jobview.remove_job(task)
except queue.Empty: except queue.Empty:
if not __queue_start: if not __queue_start:
# 结束进度 # 结束进度
@@ -585,6 +594,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num = 0 fail_num = 0
# 标记为新队列 # 标记为新队列
__queue_start = True __queue_start = True
# 等待一定时间,以让其他任务加入队列
sleep(self._transfer_interval)
continue continue
except Exception as e: except Exception as e:
logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}") logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}")
@@ -657,21 +669,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 查询整理目标目录 # 查询整理目标目录
if not task.target_directory: if not task.target_directory:
if task.src_match: if task.target_path:
# 按源目录匹配,以便找到更合适的目录配置
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
elif task.target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配 # 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
dest_path=task.target_path, dest_path=task.target_path,
target_storage=task.target_storage) target_storage=task.target_storage)
else: else:
# 未指定目标路径,根据媒体信息获取目标目录 # 启用源目录匹配时,根据源目录匹配下载目录,否则按源目录同盘优先原则,如无源目录,则根据媒体信息获取目标目录
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo, task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage, storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage) target_storage=task.target_storage)
# 执行整理 # 执行整理
@@ -696,7 +703,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
return transferinfo.success, transferinfo.message return transferinfo.success, transferinfo.message
def get_queue_tasks(self) -> List[dict]: def get_queue_tasks(self) -> List[TransferJob]:
""" """
获取整理任务列表 获取整理任务列表
""" """
@@ -788,8 +795,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
), ),
mediainfo=mediainfo, mediainfo=mediainfo,
downloader=torrent.downloader, downloader=torrent.downloader,
download_hash=torrent.hash, download_hash=torrent.hash
src_match=True
) )
# 设置下载任务状态 # 设置下载任务状态
@@ -879,8 +885,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
library_type_folder: bool = None, library_category_folder: bool = None, library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0, season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None, downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False, force: bool = False, background: bool = True) -> Tuple[bool, str]:
background: bool = True) -> Tuple[bool, str]:
""" """
执行一个复杂目录的整理操作 执行一个复杂目录的整理操作
:param fileitem: 文件项 :param fileitem: 文件项
@@ -899,7 +904,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param downloader: 下载器 :param downloader: 下载器
:param download_hash: 下载记录hash :param download_hash: 下载记录hash
:param force: 是否强制整理 :param force: 是否强制整理
:param src_match: 是否源目录匹配
:param background: 是否后台运行 :param background: 是否后台运行
返回:成功标识,错误信息 返回:成功标识,错误信息
""" """
@@ -1072,7 +1076,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
target_storage=target_storage, target_storage=target_storage,
target_path=target_path, target_path=target_path,
transfer_type=transfer_type, transfer_type=transfer_type,
src_match=src_match,
scrape=scrape, scrape=scrape,
library_type_folder=library_type_folder, library_type_folder=library_type_folder,
library_category_folder=library_category_folder, library_category_folder=library_category_folder,

View File

@@ -68,10 +68,16 @@ class DirectoryHelper:
# 电影/电视剧 # 电影/电视剧
media_type = media.type.value media_type = media.type.value
dirs = self.get_dirs() dirs = self.get_dirs()
# 如果存在源目录,并源目录为任一下载目录的子目录时,则进行源目录匹配,否则,允许源目录按同盘优先的逻辑匹配
matching_dirs = [d for d in dirs if src_path.is_relative_to(d.download_path)] if src_path else []
# 根据是否有匹配的源目录,决定要考虑的目录集合
dirs_to_consider = matching_dirs if matching_dirs else dirs
# 已匹配的目录 # 已匹配的目录
matched_dirs: List[schemas.TransferDirectoryConf] = [] matched_dirs: List[schemas.TransferDirectoryConf] = []
# 按照配置顺序查找 # 按照配置顺序查找
for d in dirs: for d in dirs_to_consider:
# 没有启用整理的目录 # 没有启用整理的目录
if not d.monitor_type and not include_unsorted: if not d.monitor_type and not include_unsorted:
continue continue
@@ -81,9 +87,6 @@ class DirectoryHelper:
# 目标存储类型不匹配 # 目标存储类型不匹配
if target_storage and d.library_storage != target_storage: if target_storage and d.library_storage != target_storage:
continue continue
# 有源目录时,源目录不匹配下载目录
if src_path and not src_path.is_relative_to(d.download_path):
continue
# 有目标目录时,目标目录不匹配媒体库目录 # 有目标目录时,目标目录不匹配媒体库目录
if dest_path and dest_path != Path(d.library_path): if dest_path and dest_path != Path(d.library_path):
continue continue

View File

@@ -6,6 +6,7 @@ from threading import Lock
from typing import Any from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from cachetools import TTLCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver from watchdog.observers.polling import PollingObserver
@@ -67,6 +68,9 @@ class Monitor(metaclass=Singleton):
# 存储过照间隔(分钟) # 存储过照间隔(分钟)
_snapshot_interval = 5 _snapshot_interval = 5
# TTL缓存10秒钟有效
_cache = TTLCache(maxsize=1024, ttl=10)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.transferchain = TransferChain() self.transferchain = TransferChain()
@@ -215,6 +219,10 @@ class Monitor(metaclass=Singleton):
""" """
# 全程加锁 # 全程加锁
with lock: with lock:
# TTL缓存控重
if self._cache.get(str(event_path)):
return
self._cache[str(event_path)] = True
try: try:
# 开始整理 # 开始整理
self.transferchain.do_transfer( self.transferchain.do_transfer(
@@ -226,8 +234,7 @@ class Monitor(metaclass=Singleton):
basename=event_path.stem, basename=event_path.stem,
extension=event_path.suffix[1:], extension=event_path.suffix[1:],
size=file_size size=file_size
), )
src_match=True
) )
except Exception as e: except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))

View File

@@ -54,7 +54,6 @@ class TransferTask(BaseModel):
target_storage: Optional[str] = None target_storage: Optional[str] = None
target_path: Optional[Path] = None target_path: Optional[Path] = None
transfer_type: Optional[str] = None transfer_type: Optional[str] = None
src_match: Optional[bool] = False
scrape: Optional[bool] = False scrape: Optional[bool] = False
library_type_folder: Optional[bool] = False library_type_folder: Optional[bool] = False
library_category_folder: Optional[bool] = False library_category_folder: Optional[bool] = False

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.1.6' APP_VERSION = 'v2.1.7'
FRONTEND_VERSION = 'v2.1.6' FRONTEND_VERSION = 'v2.1.7'