fix transfer chain

This commit is contained in:
jxxghp
2024-07-01 18:30:15 +08:00
parent 778b562cab
commit f16eb271da
9 changed files with 249 additions and 532 deletions

View File

@@ -120,7 +120,7 @@ def scrape(fileitem: schemas.FileItem,
if not fileitem.fileid:
return schemas.Response(success=False, message="刮削文件ID无效")
# 手动刮削
chain.manual_scrape(storage=storage, fileitem=fileitem, meta=meta, mediainfo=mediainfo)
chain.scrape_metadata(storage=storage, fileitem=fileitem, meta=meta, mediainfo=mediainfo)
return schemas.Response(success=True, message=f"{fileitem.path} 刮削完成")

View File

@@ -1,3 +1,4 @@
import json
from pathlib import Path
from typing import Any
@@ -11,7 +12,7 @@ from app.core.metainfo import MetaInfoPath
from app.core.security import verify_token, verify_apitoken
from app.db import get_db
from app.db.models.transferhistory import TransferHistory
from app.schemas import MediaType
from app.schemas import MediaType, FileItem
router = APIRouter()
@@ -46,13 +47,10 @@ def query_name(path: str, filetype: str,
@router.post("/manual", summary="手动转移", response_model=schemas.Response)
def manual_transfer(storage: str = "local",
path: str = None,
drive_id: str = None,
fileid: str = None,
filetype: str = None,
def manual_transfer(fileitem: FileItem = None,
logid: int = None,
target: str = None,
target_storage: str = None,
target_path: str = None,
tmdbid: int = None,
doubanid: str = None,
type_name: str = None,
@@ -68,13 +66,10 @@ def manual_transfer(storage: str = "local",
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
手动转移,文件或历史记录,支持自定义剧集识别格式
:param storage: 存储类型local/aliyun/u115
:param path: 转移路径或文件
:param drive_id: 云盘ID网盘等
:param fileid: 文件ID网盘等
:param filetype: 文件类型dir/file
:param fileitem: 文件信息
:param logid: 转移历史记录ID
:param target: 目标路径
:param target_storage: 目标存储
:param target_path: 目标路径
:param type_name: 媒体类型、电影/电视剧
:param tmdbid: tmdbid
:param doubanid: 豆瓣ID
@@ -90,7 +85,7 @@ def manual_transfer(storage: str = "local",
:param _: Token校验
"""
force = False
target = Path(target) if target else None
target_path = Path(target_path) if target_path else None
transfer = TransferChain()
if logid:
# 查询历史记录
@@ -101,18 +96,19 @@ def manual_transfer(storage: str = "local",
force = True
if history.status and ("move" in history.mode):
# 重新整理成功的转移,则使用成功的 dest 做 in_path
in_path = Path(history.dest)
src_fileitem = json.loads(history.dest_fileitem)
else:
# 源路径
in_path = Path(history.src)
src_fileitem = json.loads(history.src_fileitem)
# 目的路径
if history.dest and str(history.dest) != "None":
if history.dest_fileitem:
# 删除旧的已整理文件
transfer.delete_files(Path(history.dest))
elif path:
in_path = Path(path)
dest_fileitem = json.loads(history.dest_fileitem)
transfer.delete_files(dest_fileitem)
elif fileitem:
src_fileitem = fileitem
else:
return schemas.Response(success=False, message=f"缺少参数path/logid")
return schemas.Response(success=False, message=f"缺少参数")
# 类型
mtype = MediaType(type_name) if type_name else None
@@ -127,12 +123,9 @@ def manual_transfer(storage: str = "local",
)
# 开始转移
state, errormsg = transfer.manual_transfer(
storage=storage,
in_path=in_path,
drive_id=drive_id,
fileid=fileid,
filetype=filetype,
target=target,
fileitem=src_fileitem,
target_storage=target_storage,
target_path=target_path,
tmdbid=tmdbid,
doubanid=doubanid,
mtype=mtype,

View File

@@ -19,7 +19,7 @@ from app.db.message_oper import MessageOper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
WebhookEventInfo, TmdbEpisode, MediaPerson
WebhookEventInfo, TmdbEpisode, MediaPerson, FileItem
from app.schemas.types import TorrentStatus, MediaType, MediaImageType, EventType
from app.utils.object import ObjectUtils
@@ -369,24 +369,25 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("list_torrents", status=status, hashs=hashs, downloader=downloader)
def transfer(self, path: Path, meta: MetaBase, mediainfo: MediaInfo,
transfer_type: str, target: Path = None,
def transfer(self, fileitem: FileItem, meta: MetaBase, mediainfo: MediaInfo,
transfer_type: str, target_storage: str = None, target_path: Path = None,
episodes_info: List[TmdbEpisode] = None,
scrape: bool = None) -> Optional[TransferInfo]:
"""
文件转移
:param path: 文件路径
:param fileitem: 文件信息
:param meta: 预识别的元数据
:param mediainfo: 识别的媒体信息
:param transfer_type: 转移模式
:param target: 转移目标路径
:param target_storage: 目标存储
:param target_path: 目标路径
:param episodes_info: 当前季的全部集信息
:param scrape: 是否刮削元数据
:return: {path, target_path, message}
"""
return self.run_module("transfer", path=path, meta=meta, mediainfo=mediainfo,
transfer_type=transfer_type, target=target, episodes_info=episodes_info,
scrape=scrape)
return self.run_module("transfer", fileitem=fileitem, meta=meta, mediainfo=mediainfo,
transfer_type=transfer_type, target_storage=target_storage,
target_path=target_path, episodes_info=episodes_info, scrape=scrape)
def transfer_completed(self, hashs: str, path: Path = None,
downloader: str = settings.DEFAULT_DOWNLOADER) -> None:

View File

@@ -331,8 +331,8 @@ class MediaChain(ChainBase, metaclass=Singleton):
)
return None
def manual_scrape(self, storage: str, fileitem: schemas.FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None, init_folder: bool = True):
def scrape_metadata(self, storage: str, fileitem: schemas.FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None, init_folder: bool = True):
"""
手动刮削媒体信息
"""
@@ -395,9 +395,9 @@ class MediaChain(ChainBase, metaclass=Singleton):
# 电影目录
files = __list_files(_fileitem=fileitem)
for file in files:
self.manual_scrape(storage=storage, fileitem=file,
meta=meta, mediainfo=mediainfo,
init_folder=False)
self.scrape_metadata(storage=storage, fileitem=file,
meta=meta, mediainfo=mediainfo,
init_folder=False)
# 生成目录内图片文件
if init_folder:
# 图片
@@ -437,9 +437,9 @@ class MediaChain(ChainBase, metaclass=Singleton):
# 当前为目录,处理目录内的文件
files = __list_files(_fileitem=fileitem)
for file in files:
self.manual_scrape(storage=storage, fileitem=file,
meta=meta, mediainfo=mediainfo,
init_folder=True if file.type == "dir" else False)
self.scrape_metadata(storage=storage, fileitem=file,
meta=meta, mediainfo=mediainfo,
init_folder=True if file.type == "dir" else False)
# 生成目录的nfo和图片
if init_folder:
# 识别文件夹名称

View File

@@ -1,17 +1,17 @@
import json
import re
import shutil
import threading
from pathlib import Path
from typing import List, Optional, Tuple, Union, Dict
from app import schemas
from app.chain import ChainBase
from app.chain.media import MediaChain
from app.chain.storage import StorageChain
from app.chain.tmdb import TmdbChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfoPath, MetaInfo
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.downloadhistory import DownloadHistory
from app.db.models.transferhistory import TransferHistory
@@ -21,7 +21,7 @@ from app.helper.directory import DirectoryHelper
from app.helper.format import FormatParser
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat
from app.schemas import TransferInfo, TransferTorrent, Notification, EpisodeFormat, FileItem
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey
from app.utils.string import StringUtils
@@ -42,6 +42,7 @@ class TransferChain(ChainBase):
self.progress = ProgressHelper()
self.mediachain = MediaChain()
self.tmdbchain = TmdbChain()
self.storagechain = StorageChain()
self.systemconfig = SystemConfigOper()
self.directoryhelper = DirectoryHelper()
self.all_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIO_TRACK_EXT
@@ -92,8 +93,18 @@ class TransferChain(ChainBase):
mediainfo = None
# 执行转移
self.__do_transfer(storage="local", path=torrent.path,
mediainfo=mediainfo, download_hash=torrent.hash)
file_path = Path(torrent.path)
self.__do_transfer(
fileitem=FileItem(
storage="local",
path=torrent.path,
type="dir" if not file_path.is_file() else "file",
name=file_path.name,
size=file_path.stat().st_size,
extension=file_path.suffix.lstrip('.'),
),
mediainfo=mediainfo, download_hash=torrent.hash
)
# 设置下载任务状态
self.transfer_completed(hashs=torrent.hash, path=torrent.path)
@@ -101,24 +112,21 @@ class TransferChain(ChainBase):
logger.info("下载器文件转移执行完成")
return True
def __do_transfer(self, storage: str, path: Path, drive_id: str = None, fileid: str = None, filetype: str = None,
def __do_transfer(self, fileitem: FileItem,
meta: MetaBase = None, mediainfo: MediaInfo = None,
download_hash: str = None,
target: Path = None, transfer_type: str = None,
download_hash: str = None, target_storage: str = None,
target_path: Path = None, transfer_type: str = None,
season: int = None, epformat: EpisodeFormat = None,
min_filesize: int = 0, scrape: bool = None,
force: bool = False) -> Tuple[bool, str]:
"""
执行一个复杂目录的转移操作
:param storage: 存储器
:param path: 待转移目录或文件
:param drive_id: 网盘ID
:param fileid: 文件ID
:param filetype: 文件类型
:param fileitem: 文件项
:param meta: 元数据
:param mediainfo: 媒体信息
:param download_hash: 下载记录hash
:param target: 目标路径
:param target_storage: 目标存储器
:param target_path: 目标路径
:param transfer_type: 转移类型
:param season: 季
:param epformat: 剧集格式
@@ -142,55 +150,6 @@ class TransferChain(ChainBase):
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
# 本地存储
if storage == "local":
# 本地整理
result = self.__transfer_local(path=path, meta=meta, mediainfo=mediainfo,
formaterHandler=formaterHandler,
transfer_exclude_words=transfer_exclude_words,
min_filesize=min_filesize, transfer_type=transfer_type,
target=target, season=season, scrape=scrape,
download_hash=download_hash, force=force)
else:
# 网盘整理
result = self.__transfer_online(storage=storage,
fileitem=schemas.FileItem(
path=str(path) + ("/" if filetype == "dir" else ""),
type=filetype,
drive_id=drive_id,
fileid=fileid,
name=path.name
),
meta=meta,
mediainfo=mediainfo)
if result and result[0] and scrape:
# 刮削元数据
self.progress.update(value=0,
text=f"正在刮削 {path} ...",
key=ProgressKey.FileTransfer)
self.mediachain.manual_scrape(storage=storage,
fileitem=schemas.FileItem(
path=str(path) + ("/" if filetype == "dir" else ""),
type=filetype,
drive_id=drive_id,
fileid=fileid,
name=path.name
),
meta=meta,
mediainfo=mediainfo)
# 结速进度
self.progress.end(ProgressKey.FileTransfer)
return result
def __transfer_local(self, path: Path, meta: MetaBase = None, mediainfo: MediaInfo = None,
formaterHandler: FormatParser = None, transfer_exclude_words: List[str] = None,
min_filesize: int = 0, transfer_type: str = None, target: Path = None,
season: int = None, scrape: bool = None, download_hash: str = None,
force: bool = False) -> Tuple[bool, str]:
"""
整理一个本地目录
"""
# 汇总错误信息
err_msgs: List[str] = []
# 已处理数量
@@ -200,27 +159,33 @@ class TransferChain(ChainBase):
# 跳过数量
skip_num = 0
# 获取待转移路径清单
trans_paths = self.__get_trans_paths(path)
if not trans_paths:
logger.warn(f"{path.name} 没有找到可转移的媒体文件")
return False, f"{path.name} 没有找到可转移的媒体文件"
# 目录所有文件清单
transfer_files = SystemUtils.list_files(directory=path,
extensions=settings.RMT_MEDIAEXT,
min_filesize=min_filesize)
if formaterHandler:
# 有集自定义格式,过滤文件
transfer_files = [f for f in transfer_files if formaterHandler.match(f.name)]
transfer_files = self.storagechain.list_files(fileitem=fileitem)
if transfer_files:
# 过滤后缀和大小
transfer_files = [f for f in transfer_files
if (f".{f.extension.lower()}" in self.all_exts
and (not min_filesize or f.size > min_filesize * 1024 * 1024))]
if formaterHandler:
# 有集自定义格式,过滤文件
transfer_files = [f for f in transfer_files if formaterHandler.match(f.name)]
else:
return False, f"{fileitem.name} 没有找到可转移的媒体文件"
# 总文件数
total_num = len(transfer_files)
self.progress.update(value=0,
text=f"开始转移 {path},共 {total_num} 个文件 ...",
text=f"开始转移 {fileitem.path},共 {total_num} 个文件 ...",
key=ProgressKey.FileTransfer)
# 获取待转移路径清单
trans_items = self.__get_trans_fileitems(fileitem)
if not trans_items:
logger.warn(f"{fileitem.path} 没有找到可转移的媒体文件")
return False, f"{fileitem.name} 没有找到可转移的媒体文件"
# 处理所有待转移目录或文件,默认一个转移路径或文件只有一个媒体信息
for trans_path in trans_paths:
for trans_item in trans_items:
# 汇总季集清单
season_episodes: Dict[Tuple, List[int]] = {}
# 汇总元数据
@@ -230,29 +195,34 @@ class TransferChain(ChainBase):
# 汇总转移信息
transfers: Dict[Tuple, TransferInfo] = {}
item_path = Path(trans_item.path)
# 如果是目录且不是⼀蓝光原盘,获取所有文件并转移
if (not trans_path.is_file()
and not SystemUtils.is_bluray_dir(trans_path)):
if (trans_item.type == "dir"
and not (trans_item.storage == "local" and not SystemUtils.is_bluray_dir(item_path))):
# 遍历获取下载目录所有文件
file_paths = SystemUtils.list_files(directory=trans_path,
extensions=settings.RMT_MEDIAEXT,
min_filesize=min_filesize)
file_items = self.storagechain.list_files(trans_item)
if not file_items:
continue
# 过滤后缀和大小
file_items = [f for f in file_items
if (f".{f.extension.lower()}" in self.all_exts
and (not min_filesize or f.size > min_filesize * 1024 * 1024))]
else:
file_paths = [trans_path]
file_items = [trans_item]
if formaterHandler:
# 有集自定义格式,过滤文件
file_paths = [f for f in file_paths if formaterHandler.match(f.name)]
file_items = [f for f in file_items if formaterHandler.match(f.name)]
# 转移所有文件
for file_path in file_paths:
for file_item in file_items:
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
file_path_str = str(file_path)
if file_path_str.find('/@Recycle/') != -1 \
or file_path_str.find('/#recycle/') != -1 \
or file_path_str.find('/.') != -1 \
or file_path_str.find('/@eaDir') != -1:
logger.debug(f"{file_path_str} 是回收站或隐藏的文件")
if file_item.path.find('/@Recycle/') != -1 \
or file_item.path.find('/#recycle/') != -1 \
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
# 计数
processed_num += 1
skip_num += 1
@@ -264,12 +234,12 @@ class TransferChain(ChainBase):
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, file_path_str, re.IGNORECASE):
logger.info(f"{file_path} 命中整理屏蔽词 {keyword},不处理")
if keyword and re.search(r"%s" % keyword, file_item.path, re.IGNORECASE):
logger.info(f"{file_item.path} 命中整理屏蔽词 {keyword},不处理")
is_blocked = True
break
if is_blocked:
err_msgs.append(f"{file_path.name} 命中整理屏蔽词")
err_msgs.append(f"{file_item.name} 命中整理屏蔽词")
# 计数
processed_num += 1
skip_num += 1
@@ -277,9 +247,9 @@ class TransferChain(ChainBase):
# 转移成功的不再处理
if not force:
transferd = self.transferhis.get_by_src(file_path_str)
transferd = self.transferhis.get_by_src(file_item.path)
if transferd and transferd.status:
logger.info(f"{file_path} 已成功转移过,如需重新处理,请删除历史记录。")
logger.info(f"{file_item.path} 已成功转移过,如需重新处理,请删除历史记录。")
# 计数
processed_num += 1
skip_num += 1
@@ -287,7 +257,7 @@ class TransferChain(ChainBase):
# 更新进度
self.progress.update(value=processed_num / total_num * 100,
text=f"正在转移 {processed_num + 1}/{total_num}{file_path.name} ...",
text=f"正在转移 {processed_num + 1}/{total_num}{file_item.name} ...",
key=ProgressKey.FileTransfer)
if not meta:
@@ -328,7 +298,7 @@ class TransferChain(ChainBase):
logger.warn(f'{file_path} 未识别到媒体信息')
# 新增转移失败历史记录
his = self.transferhis.add_fail(
src_path=file_path,
fileitem=file_item,
mode=transfer_type,
meta=file_meta,
download_hash=download_hash
@@ -367,16 +337,17 @@ class TransferChain(ChainBase):
# 获取下载hash
if not download_hash:
download_file = self.downloadhis.get_file_by_fullpath(file_path_str)
download_file = self.downloadhis.get_file_by_fullpath(file_item.path)
if download_file:
download_hash = download_file.download_hash
# 执行转移
transferinfo: TransferInfo = self.transfer(meta=file_meta,
transferinfo: TransferInfo = self.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
path=file_path,
transfer_type=transfer_type,
target=target,
target_storage=target_storage,
target_path=target_path,
episodes_info=episodes_info,
scrape=scrape)
if not transferinfo:
@@ -388,7 +359,7 @@ class TransferChain(ChainBase):
err_msgs.append(f"{file_path.name} {transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
src_path=file_path,
fileitem=file_item,
mode=transfer_type,
download_hash=download_hash,
meta=file_meta,
@@ -428,7 +399,7 @@ class TransferChain(ChainBase):
# 新增转移成功历史记录
self.transferhis.add_success(
src_path=file_path,
fileitem=file_item,
mode=transfer_type,
download_hash=download_hash,
meta=file_meta,
@@ -437,10 +408,10 @@ class TransferChain(ChainBase):
)
# 刮削单个文件
if transferinfo.need_scrape:
self.scrape_metadata(path=transferinfo.target_path,
mediainfo=file_mediainfo,
transfer_type=transfer_type,
metainfo=file_meta)
self.mediachain.scrape_metadata(storage=target_storage,
fileitem=transferinfo.target_item,
meta=file_meta,
mediainfo=file_mediainfo)
# 更新进度
processed_num += 1
self.progress.update(value=processed_num / total_num * 100,
@@ -448,16 +419,13 @@ class TransferChain(ChainBase):
key=ProgressKey.FileTransfer)
# 目录或文件转移完成
self.progress.update(text=f"{trans_path} 转移完成,正在执行后续处理 ...",
self.progress.update(text=f"{trans_item.path} 转移完成,正在执行后续处理 ...",
key=ProgressKey.FileTransfer)
# 执行后续处理
for mkey, media in medias.items():
transfer_meta = metas[mkey]
transfer_info = transfers[mkey]
# 媒体目录
if transfer_info.target_path.is_file():
transfer_info.target_path = transfer_info.target_path.parent
# 发送通知
se_str = None
if media.type == MediaType.TV:
@@ -473,264 +441,64 @@ class TransferChain(ChainBase):
'transferinfo': transfer_info
})
# 结束进度
logger.info(f"{path} 转移完成,共 {total_num} 个文件,"
logger.info(f"{fileitem.path} 转移完成,共 {total_num} 个文件,"
f"失败 {fail_num} 个,跳过 {skip_num}")
self.progress.update(value=100,
text=f"{path} 转移完成,共 {total_num} 个文件,"
text=f"{fileitem.path} 转移完成,共 {total_num} 个文件,"
f"失败 {fail_num} 个,跳过 {skip_num}",
key=ProgressKey.FileTransfer)
# 结速进度
self.progress.end(ProgressKey.FileTransfer)
return True, "\n".join(err_msgs)
def __transfer_online(self, storage: str, fileitem: schemas.FileItem,
meta: MetaBase, mediainfo: MediaInfo) -> Tuple[bool, str]:
"""
整理一个远程目录
"""
def __list_files(_storage: str, _fileid: str,
_path: str = None, _drive_id: str = None) -> List[schemas.FileItem]:
"""
列出下级文件
"""
if _storage == "aliyun":
return AliyunHelper().list(drive_id=_drive_id, parent_file_id=_fileid, path=_path)
elif _storage == "u115":
return U115Helper().list(parent_file_id=_fileid, path=_path)
return []
def __rename_file(_storage: str, _deive_id: str, _fileid: str, _name: str) -> bool:
"""
重命名文件
"""
if _storage == "aliyun":
return AliyunHelper().rename(drive_id=_deive_id, file_id=_fileid, name=_name)
elif _storage == "u115":
return U115Helper().rename(file_id=_fileid, name=_name)
return False
def __create_folder(_storage: str, _drive_id: str, _parent_fileid: str,
_name: str, _path: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
if _storage == "aliyun":
return AliyunHelper().create_folder(drive_id=_drive_id, parent_file_id=_parent_fileid,
name=_name, path=_path)
elif _storage == "u115":
return U115Helper().create_folder(parent_file_id=_parent_fileid, name=_name, path=_path)
return None
def __move_file(_storage: str, _drive_id: str, _fileid: str, _target_fileid: str) -> bool:
"""
移动文件
"""
if _storage == "aliyun":
return AliyunHelper().move(drive_id=_drive_id, file_id=_fileid, target_id=_target_fileid)
elif _storage == "u115":
return U115Helper().move(file_id=_fileid, target_id=_target_fileid)
return False
def __remove_dir(_storage: str, _drive_id: str, _fileid: str) -> bool:
"""
删除目录
"""
if _storage == "aliyun":
return AliyunHelper().delete(drive_id=_drive_id, file_id=_fileid)
elif _storage == "u115":
return U115Helper().delete(file_id=_fileid)
return False
logger.info(f"开始整理 {fileitem.path} ...")
self.progress.update(value=0,
text=f"正在整理 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 重新识别
if not meta:
# 文件元数据
meta = MetaInfoPath(Path(fileitem.path))
if not mediainfo:
mediainfo = self.mediachain.recognize_by_meta(meta)
if not mediainfo:
logger.warn(f"{fileitem.name} 未识别到媒体信息")
return False, f"{fileitem.name} 未识别到媒体信息"
# 获取完整的路径命名
full_names = self.recommend_name(meta=meta, mediainfo=mediainfo)
if not full_names:
logger.warn(f"{fileitem.path} 未获取到命名")
return False, f"{fileitem.path} 未获取到命名"
if mediainfo.type == MediaType.TV:
# 电视剧
[folder_name, season_name, file_name] = Path(full_names).parts
else:
# 电影
season_name = None
[folder_name, file_name] = Path(full_names).parts
# 如果是单个文件,则直接重命名
if fileitem.type == "file":
# 重命名文件
logger.info(f"正在整理 {fileitem.name} => {file_name} ...")
if not __rename_file(_storage=storage, _deive_id=fileitem.drive_id, _fileid=fileitem.fileid, _name=file_name):
logger.error(f"{fileitem.name} 重命名失败")
return False, f"{fileitem.name} 重命名失败"
logger.info(f"{fileitem.path} 整理完成")
else:
# 目录处理
if mediainfo.type == MediaType.MOVIE:
# 电影目录
# 重命名当前目录
logger.info(f"正在重命名 {fileitem.path} => {folder_name} ...")
if not __rename_file(_storage=storage, _deive_id=fileitem.drive_id,
_fileid=fileitem.fileid, _name=folder_name):
logger.error(f"{fileitem.path} 重命名失败")
return False, f"{fileitem.path} 重命名失败"
logger.info(f"{fileitem.path} 重命名完成")
# 处理所有子文件或目录
files = __list_files(_storage=storage, _fileid=fileitem.fileid,
_drive_id=fileitem.drive_id, _path=fileitem.path)
if not files:
logger.info(f"{fileitem.path} 未找到文件,删除空目录")
if not __remove_dir(_storage=storage, _drive_id=fileitem.drive_id, _fileid=fileitem.fileid):
logger.error(f"{fileitem.path} 删除失败")
return False, f"{fileitem.path} 删除失败"
return True, ""
for file in files:
# 过滤不处理的文件
if file.type == "file" and str(file.extension) in ['nfo', 'jpg', 'png']:
continue
# 重新识别文件或目录
file_meta = MetaInfoPath(Path(file.path))
if not file_meta.name:
# 过滤掉无效文件
continue
file_media = self.mediachain.recognize_by_meta(file_meta)
if not file_media:
logger.warn(f"{file.name} 未识别到媒体信息")
continue
# 整理这个文件或目录
self.__transfer_online(storage=storage, fileitem=file, meta=file_meta, mediainfo=file_media)
else:
# 电视剧目录
# 判断当前目录类型
folder_meta = MetaInfo(fileitem.name)
if folder_meta.begin_season and not folder_meta.name:
# 季目录
logger.info(f"正在重命名 {fileitem.path} => {season_name} ...")
if not __rename_file(_storage=storage, _deive_id=fileitem.drive_id,
_fileid=fileitem.fileid, _name=season_name):
logger.error(f"{fileitem.path} 重命名失败")
return False, f"{fileitem.path} 重命名失败"
logger.info(f"{fileitem.path} 重命名完成")
elif folder_meta.name:
# 根目录,重命名当前目录
logger.info(f"正在重命名 {fileitem.path} => {folder_name} ...")
if not __rename_file(_storage=storage, _deive_id=fileitem.drive_id,
_fileid=fileitem.fileid, _name=folder_name):
logger.error(f"{fileitem.path} 重命名失败")
return False, f"{fileitem.path} 重命名失败"
logger.info(f"{fileitem.path} 重命名完成")
# 是否有季
if folder_meta.begin_season:
# 创建季目录
logger.info(f"正在创建目录 {fileitem.path}{season_name} ...")
season_dir = __create_folder(_storage=storage, _drive_id=fileitem.drive_id,
_parent_fileid=fileitem.fileid, _name=season_name,
_path=fileitem.path)
if not season_dir:
logger.error(f"{fileitem.path}/{season_name} 创建失败")
return False, f"{fileitem.path}/{season_name} 创建失败"
logger.info(f"{fileitem.path}/{season_name} 创建完成")
# 移动当前目录下的所有文件到季目录
files = __list_files(_storage=storage, _fileid=fileitem.fileid,
_drive_id=fileitem.drive_id, _path=fileitem.path)
if not files:
logger.error(f"{fileitem.path} 未找到文件,删除空目录")
if not __remove_dir(_storage=storage, _drive_id=fileitem.drive_id, _fileid=fileitem.fileid):
logger.error(f"{fileitem.path} 删除失败")
return False, f"{fileitem.path} 删除失败"
logger.info(f"{fileitem.path} 已删除")
return True, ""
for file in files:
if file.type == "dir":
continue
logger.info(f"正在移动 {file.path} => {season_dir.path}...")
if not __move_file(_storage=storage, _drive_id=fileitem.drive_id,
_fileid=file.fileid, _target_fileid=season_dir.fileid):
logger.error(f"{file.name} 移动失败")
return False, f"{file.name} 移动失败"
logger.info(f"{file.path} 移动完成")
# 修改当前目录为季目录
fileitem = season_dir
# 列出当前目录下所有的文件或目录,并进行重命名整理
files = __list_files(_storage=storage, _fileid=fileitem.fileid,
_drive_id=fileitem.drive_id, _path=fileitem.path)
if not files:
logger.info(f"{fileitem.path} 未找到文件,删除空目录")
if not __remove_dir(_storage=storage, _drive_id=fileitem.drive_id, _fileid=fileitem.fileid):
logger.error(f"{fileitem.path} 删除失败")
return False, f"{fileitem.path} 删除失败"
logger.info(f"{fileitem.path} 已删除")
return True, ""
for file in files:
# 过滤不处理的文件
if file.type == "file" and str(file.extension) in ['nfo', 'jpg', 'png']:
continue
# 重新识别文件或目录
file_meta = MetaInfoPath(Path(file.path))
file_media = self.mediachain.recognize_by_meta(file_meta)
if not file_media:
logger.warn(f"{file.name} 未识别到媒体信息")
continue
# 整理这个文件或目录
self.__transfer_online(storage=storage, fileitem=file, meta=file_meta, mediainfo=file_media)
logger.info(f"{fileitem.path} 整理完成")
self.progress.update(value=0,
text=f"{fileitem.path} 整理完成",
key=ProgressKey.FileTransfer)
return True, ""
@staticmethod
def __get_trans_paths(directory: Path):
def __get_trans_fileitems(self, fileitem: FileItem):
"""
获取转移目录列表
"""
if not directory.exists():
logger.warn(f"目录不存在:{directory}")
file_path = Path(fileitem.path)
if fileitem.storage == "local" and not file_path.exists():
logger.warn(f"目录不存在:{fileitem.path}")
return []
# 单文件
if directory.is_file():
return [directory]
if fileitem.type == "file":
return [fileitem]
# 蓝光原盘
if SystemUtils.is_bluray_dir(directory):
return [directory]
if fileitem.storage == "local" and SystemUtils.is_bluray_dir(file_path):
return [fileitem]
# 需要转移的路径列表
trans_paths = []
# 需要转移的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
for sub_dir in SystemUtils.list_sub_directory(directory):
# 如果是蓝光原盘
if SystemUtils.is_bluray_dir(sub_dir):
trans_paths.append(sub_dir)
# 没有媒体文件的目录跳过
elif SystemUtils.list_files(sub_dir, extensions=settings.RMT_MEDIAEXT):
trans_paths.append(sub_dir)
for sub_dir in self.storagechain.list_files(fileitem):
subfile_path = Path(sub_dir.path)
# 添加蓝光原盘
if sub_dir.storage == "local" \
and sub_dir.type == "dir" \
and SystemUtils.is_bluray_dir(subfile_path):
trans_items.append(sub_dir)
# 添加目录
elif sub_dir.type == "dir":
trans_items.append(sub_dir)
if not trans_paths:
if not trans_items:
# 没有有效子目录,直接转移当前目录
trans_paths.append(directory)
trans_items.append(fileitem)
else:
# 有子目录时,把当前目录的文件添加到转移任务中
trans_paths.extend(
SystemUtils.list_sub_files(directory, extensions=settings.RMT_MEDIAEXT)
)
return trans_paths
sub_items = self.storagechain.list_files(fileitem)
if sub_items:
sub_files = [f for f in sub_items if f.type == "file" and f".{f.extension.lower()}" in self.all_exts]
if sub_files:
trans_items.extend(sub_files)
return trans_items
def remote_transfer(self, arg_str: str, channel: MessageChannel, userid: Union[str, int] = None):
"""
@@ -805,27 +573,28 @@ class TransferChain(ChainBase):
logger.info(f"{src_path.name} 识别为:{mediainfo.title_year}")
# 删除旧的已整理文件
if history.dest:
self.delete_files(Path(history.dest))
if history.dest_fileitem:
# 解析目标文件对象
dest_fileitem = FileItem(**json.loads(history.dest_fileitem))
self.delete_files(dest_fileitem)
# 强制转移
state, errmsg = self.__do_transfer(storage="local",
path=src_path,
mediainfo=mediainfo,
download_hash=history.download_hash,
force=True)
if not state:
return False, errmsg
if history.src_fileitem:
# 解析源文件对象
fileitem = FileItem(**json.loads(history.src_fileitem))
state, errmsg = self.__do_transfer(fileitem=fileitem,
mediainfo=mediainfo,
download_hash=history.download_hash,
force=True)
if not state:
return False, errmsg
return True, ""
def manual_transfer(self,
storage: str,
in_path: Path,
drive_id: str = None,
fileid: str = None,
filetype: str = None,
target: Path = None,
fileitem: FileItem,
target_storage: str = None,
target_path: Path = None,
tmdbid: int = None,
doubanid: str = None,
mtype: MediaType = None,
@@ -837,12 +606,9 @@ class TransferChain(ChainBase):
force: bool = False) -> Tuple[bool, Union[str, list]]:
"""
手动转移,支持复杂条件,带进度显示
:param storage: 存储器
:param in_path: 源文件路径
:param drive_id: 网盘ID
:param fileid: 文件ID
:param filetype: 文件类型
:param target: 目标路径
:param fileitem: 文件项
:param target_storage: 目标存储
:param target_path: 目标路径
:param tmdbid: TMDB ID
:param doubanid: 豆瓣ID
:param mtype: 媒体类型
@@ -853,7 +619,7 @@ class TransferChain(ChainBase):
:param scrape: 是否刮削元数据
:param force: 是否强制转移
"""
logger.info(f"手动转移:{in_path} ...")
logger.info(f"手动转移:{fileitem.path} ...")
if tmdbid or doubanid:
# 有输入TMDBID时单个识别
@@ -867,17 +633,14 @@ class TransferChain(ChainBase):
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
self.progress.update(value=0,
text=f"开始转移 {in_path} ...",
text=f"开始转移 {fileitem.path} ...",
key=ProgressKey.FileTransfer)
# 开始转移
state, errmsg = self.__do_transfer(
storage=storage,
path=in_path,
drive_id=drive_id,
fileid=fileid,
filetype=filetype,
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
mediainfo=mediainfo,
target=target,
transfer_type=transfer_type,
season=season,
epformat=epformat,
@@ -889,16 +652,13 @@ class TransferChain(ChainBase):
return False, errmsg
self.progress.end(ProgressKey.FileTransfer)
logger.info(f"{in_path} 转移完成")
logger.info(f"{fileitem.path} 转移完成")
return True, ""
else:
# 没有输入TMDBID时按文件识别
state, errmsg = self.__do_transfer(storage=storage,
path=in_path,
drive_id=drive_id,
fileid=fileid,
filetype=filetype,
target=target,
state, errmsg = self.__do_transfer(fileitem=fileitem,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
season=season,
epformat=epformat,
@@ -931,61 +691,10 @@ class TransferChain(ChainBase):
title=msg_title, text=msg_str, image=mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')))
def delete_files(self, path: Path) -> Tuple[bool, str]:
def delete_files(self, fileitem: FileItem) -> Tuple[bool, str]:
"""
删除转移后的文件以及空目录
:param path: 文件路径
TODO 删除转移后的文件以及空目录
:param fileitem: 文件
:return: 成功标识,错误信息
"""
logger.info(f"开始删除文件以及空目录:{path} ...")
if not path.exists():
return True, f"文件或目录不存在:{path}"
if path.is_file():
# 删除文件、nfo、jpg等同名文件
pattern = path.stem.replace('[', '?').replace(']', '?')
files = path.parent.glob(f"{pattern}.*")
for file in files:
Path(file).unlink()
logger.warn(f"文件 {path} 已删除")
# 需要删除父目录
elif str(path.parent) == str(path.root):
# 根目录,不删除
logger.warn(f"根目录 {path} 不能删除!")
return False, f"根目录 {path} 不能删除!"
else:
# 非根目录,才删除目录
shutil.rmtree(path)
# 删除目录
logger.warn(f"目录 {path} 已删除")
# 需要删除父目录
# 判断当前媒体父路径下是否有媒体文件,如有则无需遍历父级
if not SystemUtils.exits_files(path.parent, settings.RMT_MEDIAEXT):
# 所有媒体库根目录的名称
library_roots = self.directoryhelper.get_library_dirs()
library_root_names = [Path(library_root.path).name for library_root in library_roots if library_root.path]
# 所有二级分类的名称
category_names = []
category_conf = self.media_category()
if category_conf:
category_names += list(category_conf.keys())
for cats in category_conf.values():
category_names += cats
# 判断父目录是否为空, 为空则删除
for parent_path in path.parents:
# 遍历父目录到媒体库二级分类根路径
if parent_path.name in library_root_names:
break
if parent_path.name in category_names:
continue
if str(parent_path.parent) != str(path.root):
# 父目录非根目录,才删除父目录
if not SystemUtils.exits_files(parent_path, settings.RMT_MEDIAEXT):
# 当前路径下没有媒体文件则删除
try:
shutil.rmtree(parent_path)
logger.warn(f"目录 {parent_path} 已删除")
except Exception as e:
logger.error(f"删除目录 {parent_path} 失败:{str(e)}")
return False, f"删除目录 {parent_path} 失败:{str(e)}"
return True, ""
pass

View File

@@ -11,10 +11,18 @@ class TransferHistory(Base):
转移历史记录
"""
id = Column(Integer, Sequence('id'), primary_key=True, index=True)
# 源目录
# 源路径
src = Column(String, index=True)
# 目标目录
# 源存储
src_storage = Column(String)
# 源文件项
src_fileitem = Column(String)
# 目标路径
dest = Column(String)
# 目标存储
dest_storage = Column(String)
# 目标文件项
dest_fileitem = Column(String)
# 转移模式 move/copy/link...
mode = Column(String)
# 类型 电影/电视剧

View File

@@ -1,13 +1,12 @@
import json
import time
from pathlib import Path
from typing import Any, List
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.db import DbOper
from app.db.models.transferhistory import TransferHistory
from app.schemas import TransferInfo
from app.schemas import TransferInfo, FileItem
class TransferHistoryOper(DbOper):
@@ -119,15 +118,19 @@ class TransferHistoryOper(DbOper):
"""
TransferHistory.update_download_hash(self._db, historyid, download_hash)
def add_success(self, src_path: Path, mode: str, meta: MetaBase,
def add_success(self, fileitem: FileItem, mode: str, meta: MetaBase,
mediainfo: MediaInfo, transferinfo: TransferInfo,
download_hash: str = None):
"""
新增转移成功历史记录
"""
self.add_force(
src=str(src_path),
dest=str(transferinfo.target_path or ''),
src=fileitem.path,
src_storage=fileitem.storage,
src_fileitem=json.dumps(fileitem.dict()),
dest=transferinfo.target_item.path,
dest_storage=transferinfo.target_item.storage,
dest_fileitem=json.dumps(transferinfo.target_item.dict()),
mode=mode,
type=mediainfo.type.value,
category=mediainfo.category,
@@ -145,15 +148,19 @@ class TransferHistoryOper(DbOper):
files=json.dumps(transferinfo.file_list)
)
def add_fail(self, src_path: Path, mode: str, meta: MetaBase, mediainfo: MediaInfo = None,
def add_fail(self, fileitem: FileItem, mode: str, meta: MetaBase, mediainfo: MediaInfo = None,
transferinfo: TransferInfo = None, download_hash: str = None):
"""
新增转移失败历史记录
"""
if mediainfo and transferinfo:
his = self.add_force(
src=str(src_path),
dest=str(transferinfo.target_path or ''),
src=fileitem.path,
src_storage=fileitem.storage,
src_fileitem=json.dumps(fileitem.dict()),
dest=transferinfo.target_item.path,
dest_storage=transferinfo.target_item.storage,
dest_fileitem=json.dumps(transferinfo.target_item.dict()),
mode=mode,
type=mediainfo.type.value,
category=mediainfo.category,
@@ -175,7 +182,9 @@ class TransferHistoryOper(DbOper):
his = self.add_force(
title=meta.name,
year=meta.year,
src=str(src_path),
src=fileitem.path,
src_storage=fileitem.storage,
src_fileitem=json.dumps(fileitem.dict()),
mode=mode,
seasons=meta.season,
episodes=meta.episode,

View File

@@ -1,34 +0,0 @@
from typing import Optional
from pydantic import BaseModel
class FileItem(BaseModel):
# 存储类型
storage: Optional[str] = "local"
# 类型 dir/file
type: Optional[str] = None
# 文件路径
path: Optional[str] = "/"
# 文件名
name: Optional[str] = None
# 文件名
basename: Optional[str] = None
# 文件后缀
extension: Optional[str] = None
# 文件大小
size: Optional[int] = None
# 修改时间
modify_time: Optional[float] = None
# 子节点
children: Optional[list] = []
# ID
fileid: Optional[str] = None
# 父ID
parent_fileid: Optional[str] = None
# 缩略图
thumbnail: Optional[str] = None
# 115 pickcode
pickcode: Optional[str] = None
# drive_id
drive_id: Optional[str] = None

View File

@@ -4,6 +4,37 @@ from typing import Optional
from pydantic import BaseModel
class FileItem(BaseModel):
# 存储类型
storage: Optional[str] = "local"
# 类型 dir/file
type: Optional[str] = None
# 文件路径
path: Optional[str] = "/"
# 文件名
name: Optional[str] = None
# 文件名
basename: Optional[str] = None
# 文件后缀
extension: Optional[str] = None
# 文件大小
size: Optional[int] = None
# 修改时间
modify_time: Optional[float] = None
# 子节点
children: Optional[list] = []
# ID
fileid: Optional[str] = None
# 父ID
parent_fileid: Optional[str] = None
# 缩略图
thumbnail: Optional[str] = None
# 115 pickcode
pickcode: Optional[str] = None
# drive_id
drive_id: Optional[str] = None
class TransferTorrent(BaseModel):
"""
待转移任务信息
@@ -43,9 +74,9 @@ class TransferInfo(BaseModel):
# 是否成功标志
success: bool = True
# 整理⼁路径
path: Optional[Path] = None
fileitem: Optional[FileItem] = None
# 转移后路径
target_path: Optional[Path] = None
target_item: Optional[FileItem] = None
# 处理文件数
file_count: Optional[int] = 0
# 处理文件清单
@@ -65,9 +96,9 @@ class TransferInfo(BaseModel):
"""
返回字典
"""
dicts = vars(self).copy() # 创建一个字典的副本以避免修改原始数据
dicts["path"] = str(self.path) if self.path else None
dicts["target_path"] = str(self.target_path) if self.target_path else None
dicts = vars(self).copy()
dicts["fileitem"] = self.fileitem.dict() if self.fileitem else None
dicts["target_item"] = self.target_item.dict() if self.target_item else None
return dicts