Compare commits

...

11 Commits

Author SHA1 Message Date
jxxghp
4b3f04c73f fix 目录监控控重 2024-12-31 12:42:28 +08:00
jxxghp
bb478c949a 更新 version.py 2024-12-31 07:15:18 +08:00
jxxghp
11b1003d4d fix 中入队列等待时间,以例聚合消息发送 2024-12-30 19:25:29 +08:00
jxxghp
c0ad5f2970 fix 整理队列锁 2024-12-30 19:02:16 +08:00
jxxghp
54c98cf3a1 fix 目录监控消息重复发送 2024-12-30 18:59:20 +08:00
jxxghp
dfbe8a2c0e fix 目录监控消息重复发送 2024-12-30 18:57:45 +08:00
jxxghp
873f80d534 fix 重复添加队列任务 2024-12-30 18:42:36 +08:00
jxxghp
089992db74 Merge pull request #3640 from InfinityPacer/feature/transfer 2024-12-30 07:00:53 +08:00
jxxghp
f07ab73fde Merge pull request #3639 from InfinityPacer/feature/site 2024-12-30 06:59:02 +08:00
InfinityPacer
166674bfe7 feat(transfer): match source dir in subdirs or prioritize same drive 2024-12-30 02:11:48 +08:00
InfinityPacer
adb4a8fe01 feat(site): add proxy support for site sync 2024-12-30 00:37:54 +08:00
6 changed files with 113 additions and 78 deletions

View File

@@ -319,6 +319,7 @@ class SiteChain(ChainBase):
continue
# 新增站点
domain_url = __indexer_domain(inx=indexer, sub_domain=domain)
proxy = False
res = RequestUtils(cookies=cookie,
ua=settings.USER_AGENT
).get_res(url=domain_url)
@@ -336,16 +337,37 @@ class SiteChain(ChainBase):
logger.warn(f"站点 {indexer.get('name')} 连接状态码:{res.status_code},无法添加站点")
continue
else:
_fail_count += 1
logger.warn(f"站点 {indexer.get('name')} 连接失败,无法添加站点")
continue
if not settings.PROXY_HOST:
_fail_count += 1
logger.warn(f"站点 {indexer.get('name')} 连接失败,无法添加站点")
continue
else:
# 如果配置了代理,尝试通过代理重试
logger.info(f"站点 {indexer.get('name')} 初次连接失败,尝试通过代理重试...")
proxy = True
res = RequestUtils(cookies=cookie,
ua=settings.USER_AGENT,
proxies=settings.PROXY
).get_res(url=domain_url)
if res and res.status_code in [200, 500, 403]:
if not indexer.get("public") and not SiteUtils.is_logged_in(res.text):
logger.warn(f"站点 {indexer.get('name')} 登录失败,即使通过代理,无法添加站点")
_fail_count += 1
continue
logger.info(f"站点 {indexer.get('name')} 通过代理连接成功")
else:
logger.warn(f"站点 {indexer.get('name')} 通过代理连接失败,无法添加站点")
_fail_count += 1
continue
# 获取rss地址
rss_url = None
if not indexer.get("public") and domain_url:
# 自动生成rss地址
rss_url, errmsg = self.rsshelper.get_rss_link(url=domain_url,
cookie=cookie,
ua=settings.USER_AGENT)
ua=settings.USER_AGENT,
proxy=proxy)
if errmsg:
logger.warn(errmsg)
# 插入数据库
@@ -355,6 +377,7 @@ class SiteChain(ChainBase):
domain=domain,
cookie=cookie,
rss=rss_url,
proxy=1 if proxy else 0,
public=1 if indexer.get("public") else 0)
_add_count += 1

View File

@@ -5,6 +5,7 @@ import traceback
from copy import deepcopy
from pathlib import Path
from queue import Queue
from time import sleep
from typing import List, Optional, Tuple, Union, Dict, Callable
from app import schemas
@@ -26,15 +27,15 @@ from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
from schemas import TransferJob, TransferJobTask
downloader_lock = threading.Lock()
job_lock = threading.Lock()
task_lock = threading.Lock()
class JobManager:
@@ -109,7 +110,7 @@ class JobManager:
"""
if not any([task, task.meta, task.fileitem]):
return
with (job_lock):
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = TransferJob(
@@ -124,6 +125,9 @@ class JobManager:
)]
)
else:
# 不重复添加任务
if any([t.fileitem == task.fileitem for t in self._job_view[__mediaid__].tasks]):
return
self._job_view[__mediaid__].tasks.append(
TransferJobTask(
fileitem=task.fileitem,
@@ -360,8 +364,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 文件整理线程
_transfer_thread = None
# 文件整理检查间隔(秒)
_transfer_interval = 10
# 队列间隔时间(秒)
_transfer_interval = 15
def __init__(self):
super().__init__()
@@ -441,48 +445,51 @@ class TransferChain(ChainBase, metaclass=Singleton):
'download_hash': task.download_hash,
})
# 全部整理成功时
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
for t in tasks:
# 下载器hash
if t.download_hash:
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录
if t.fileitem:
self.storagechain.delete_media_file(t.fileitem, delete_self=False)
with task_lock:
# 全部整理成功时
if self.jobview.is_success(task):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
for t in tasks:
# 下载器hash
if t.download_hash:
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录
if t.fileitem:
self.storagechain.delete_media_file(t.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task):
# 发送通知
if transferinfo.need_notify:
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem
})
# 整理完成且有成功的任务
if self.jobview.is_finished(task):
# 发送通知
if transferinfo.need_notify:
se_str = None
if task.mediainfo.type == MediaType.TV:
season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season)
if season_episodes:
se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}"
else:
se_str = f"{task.meta.season}"
# 更新文件数量
transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1
# 更新文件大小
transferinfo.total_size = self.jobview.size(task.mediainfo,
task.meta.begin_season) or task.fileitem.size
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str)
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem
})
# 移除已完成的任务
self.jobview.remove_job(task)
return True, ""
@@ -495,7 +502,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
if not task:
return
# 维护整理任务视图
self.jobview.add_task(task)
with task_lock:
self.jobview.add_task(task)
# 添加到队列
self._queue.put(TransferQueue(
task=task,
@@ -525,7 +533,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
item: TransferQueue = self._queue.get(block=False)
if item:
task = item.task
if not task:
@@ -569,8 +577,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=__process_msg,
key=ProgressKey.FileTransfer)
# 移除已完成的任务
if self.jobview.is_done(task):
self.jobview.remove_job(task)
with task_lock:
if self.jobview.is_done(task):
self.jobview.remove_job(task)
except queue.Empty:
if not __queue_start:
# 结束进度
@@ -585,6 +594,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
fail_num = 0
# 标记为新队列
__queue_start = True
# 等待一定时间,以让其他任务加入队列
sleep(self._transfer_interval)
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}")
@@ -657,21 +669,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 查询整理目标目录
if not task.target_directory:
if task.src_match:
# 按源目录匹配,以便找到更合适的目录配置
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
elif task.target_path:
if task.target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
dest_path=task.target_path,
target_storage=task.target_storage)
else:
# 未指定目标路径,根据媒体信息获取目标目录
# 启用源目录匹配时,根据源目录匹配下载目录,否则按源目录同盘优先原则,如无源目录,则根据媒体信息获取目标目录
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
# 执行整理
@@ -696,7 +703,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
return transferinfo.success, transferinfo.message
def get_queue_tasks(self) -> List[dict]:
def get_queue_tasks(self) -> List[TransferJob]:
"""
获取整理任务列表
"""
@@ -788,8 +795,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
),
mediainfo=mediainfo,
downloader=torrent.downloader,
download_hash=torrent.hash,
src_match=True
download_hash=torrent.hash
)
# 设置下载任务状态
@@ -879,8 +885,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
library_type_folder: bool = None, library_category_folder: bool = None,
season: int = None, epformat: EpisodeFormat = None, min_filesize: int = 0,
downloader: str = None, download_hash: str = None,
force: bool = False, src_match: bool = False,
background: bool = True) -> Tuple[bool, str]:
force: bool = False, background: bool = True) -> Tuple[bool, str]:
"""
执行一个复杂目录的整理操作
:param fileitem: 文件项
@@ -899,7 +904,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param downloader: 下载器
:param download_hash: 下载记录hash
:param force: 是否强制整理
:param src_match: 是否源目录匹配
:param background: 是否后台运行
返回:成功标识,错误信息
"""
@@ -1072,7 +1076,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
src_match=src_match,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,

View File

@@ -68,10 +68,16 @@ class DirectoryHelper:
# 电影/电视剧
media_type = media.type.value
dirs = self.get_dirs()
# 如果存在源目录,并源目录为任一下载目录的子目录时,则进行源目录匹配,否则,允许源目录按同盘优先的逻辑匹配
matching_dirs = [d for d in dirs if src_path.is_relative_to(d.download_path)] if src_path else []
# 根据是否有匹配的源目录,决定要考虑的目录集合
dirs_to_consider = matching_dirs if matching_dirs else dirs
# 已匹配的目录
matched_dirs: List[schemas.TransferDirectoryConf] = []
# 按照配置顺序查找
for d in dirs:
for d in dirs_to_consider:
# 没有启用整理的目录
if not d.monitor_type and not include_unsorted:
continue
@@ -81,9 +87,6 @@ class DirectoryHelper:
# 目标存储类型不匹配
if target_storage and d.library_storage != target_storage:
continue
# 有源目录时,源目录不匹配下载目录
if src_path and not src_path.is_relative_to(d.download_path):
continue
# 有目标目录时,目标目录不匹配媒体库目录
if dest_path and dest_path != Path(d.library_path):
continue

View File

@@ -6,6 +6,7 @@ from threading import Lock
from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler
from cachetools import TTLCache
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
@@ -67,6 +68,9 @@ class Monitor(metaclass=Singleton):
# 存储过照间隔(分钟)
_snapshot_interval = 5
# TTL缓存10秒钟有效
_cache = TTLCache(maxsize=1024, ttl=10)
def __init__(self):
super().__init__()
self.transferchain = TransferChain()
@@ -215,6 +219,10 @@ class Monitor(metaclass=Singleton):
"""
# 全程加锁
with lock:
# TTL缓存控重
if self._cache.get(str(event_path)):
return
self._cache[str(event_path)] = True
try:
# 开始整理
self.transferchain.do_transfer(
@@ -226,8 +234,7 @@ class Monitor(metaclass=Singleton):
basename=event_path.stem,
extension=event_path.suffix[1:],
size=file_size
),
src_match=True
)
)
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))

View File

@@ -54,7 +54,6 @@ class TransferTask(BaseModel):
target_storage: Optional[str] = None
target_path: Optional[Path] = None
transfer_type: Optional[str] = None
src_match: Optional[bool] = False
scrape: Optional[bool] = False
library_type_folder: Optional[bool] = False
library_category_folder: Optional[bool] = False

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.1.6'
FRONTEND_VERSION = 'v2.1.6'
APP_VERSION = 'v2.1.7'
FRONTEND_VERSION = 'v2.1.7'