feat: add manual transfer target path matching

This commit is contained in:
jxxghp
2026-05-27 13:26:01 +08:00
parent 0e5c592862
commit b1259fdc02
4 changed files with 305 additions and 3 deletions

View File

@@ -92,6 +92,152 @@ async def remove_queue(
return schemas.Response(success=True)
def _resolve_manual_transfer_source_fileitems(
transer_item: ManualTransferItem, db: Session
) -> tuple[List[FileItem], Optional[str]]:
"""
从手动整理请求中解析源文件项。
"""
if transer_item.logids:
fileitems: List[FileItem] = []
for logid in transer_item.logids:
history: TransferHistory = TransferHistory.get(db, logid)
if not history:
return [], f"整理记录不存在ID{logid}"
if history.status and ("move" in history.mode):
fileitems.append(FileItem(**history.dest_fileitem))
else:
fileitems.append(FileItem(**history.src_fileitem))
return fileitems, None
if transer_item.logid:
history: TransferHistory = TransferHistory.get(db, transer_item.logid)
if not history:
return [], f"整理记录不存在ID{transer_item.logid}"
if history.status and ("move" in history.mode):
return [FileItem(**history.dest_fileitem)], None
return [FileItem(**history.src_fileitem)], None
if transer_item.fileitems:
return [fileitem for fileitem in transer_item.fileitems if fileitem], None
if transer_item.fileitem:
return [transer_item.fileitem], None
return [], None
def _deduplicate_fileitems(fileitems: List[FileItem]) -> List[FileItem]:
"""
按存储和路径去重文件项。
"""
dedup_fileitems: List[FileItem] = []
seen_paths = set()
for current_fileitem in fileitems:
storage = current_fileitem.storage or "local"
path = current_fileitem.path
if not path:
continue
key = (storage, path)
if key in seen_paths:
continue
seen_paths.add(key)
dedup_fileitems.append(current_fileitem)
return dedup_fileitems
def _build_manual_transfer_target_path(
directory: Optional[schemas.TransferDirectoryConf] = None,
) -> schemas.ManualTransferTargetPath:
"""
根据目录配置生成手动整理目的路径响应。
"""
if not directory or not directory.library_path:
return schemas.ManualTransferTargetPath()
return schemas.ManualTransferTargetPath(
target_storage=directory.library_storage or "local",
target_path=directory.library_path,
transfer_type=directory.transfer_type,
scrape=directory.scraping or False,
library_type_folder=directory.library_type_folder or False,
library_category_folder=directory.library_category_folder or False,
)
def _get_manual_transfer_target_key(
directory: schemas.TransferDirectoryConf,
) -> tuple[Optional[str], Optional[str]]:
"""
生成目的目录唯一键。
"""
return (
directory.library_storage or "local",
Path(directory.library_path).as_posix() if directory.library_path else None,
)
@router.post(
"/manual/target-path",
summary="匹配手动转移目的路径",
response_model=schemas.Response,
)
def match_manual_transfer_target_path(
transer_item: ManualTransferItem,
db: Session = Depends(get_db),
_: User = Depends(get_current_active_superuser),
) -> Any:
"""
根据源文件匹配手动整理目的路径。
:param transer_item: 手工整理项
:param db: 数据库
:param _: Token校验
"""
src_fileitems, error_message = _resolve_manual_transfer_source_fileitems(
transer_item=transer_item,
db=db,
)
if error_message:
return schemas.Response(success=False, message=error_message)
matched_directories: List[schemas.TransferDirectoryConf] = []
target_storage = transer_item.target_storage or None
for src_fileitem in _deduplicate_fileitems(src_fileitems):
directory = DirectoryHelper().get_dir(
media=None,
storage=src_fileitem.storage or "local",
src_path=Path(src_fileitem.path),
target_storage=target_storage,
)
if not directory or not directory.library_path:
return schemas.Response(
success=True,
data=schemas.ManualTransferTargetPath().model_dump(),
)
matched_directories.append(directory)
if not matched_directories:
return schemas.Response(
success=True,
data=schemas.ManualTransferTargetPath().model_dump(),
)
first_directory = matched_directories[0]
first_key = _get_manual_transfer_target_key(first_directory)
if any(
_get_manual_transfer_target_key(directory) != first_key
for directory in matched_directories[1:]
):
return schemas.Response(
success=True,
data=schemas.ManualTransferTargetPath().model_dump(),
)
return schemas.Response(
success=True,
data=_build_manual_transfer_target_path(first_directory).model_dump(),
)
@router.post("/manual", summary="手动转移", response_model=schemas.Response)
def manual_transfer(
transer_item: ManualTransferItem,

View File

@@ -192,6 +192,8 @@ class ManualTransferItem(BaseModel):
fileitems: Optional[List[FileItem]] = None
# 日志ID
logid: Optional[int] = None
# 日志ID列表前端多选历史记录时传入
logids: Optional[List[int]] = None
# 目标存储
target_storage: Optional[str] = None
# 目标路径
@@ -228,3 +230,22 @@ class ManualTransferItem(BaseModel):
episode_group: Optional[str] = None
# 仅预览,不执行整理
preview: Optional[bool] = False
class ManualTransferTargetPath(BaseModel):
"""
手动整理目的路径匹配结果
"""
# 目标存储
target_storage: Optional[str] = None
# 目标路径
target_path: Optional[str] = None
# 整理方式
transfer_type: Optional[str] = None
# 刮削
scrape: Optional[bool] = False
# 媒体库类型子目录
library_type_folder: Optional[bool] = False
# 媒体库类别子目录
library_category_folder: Optional[bool] = False

View File

@@ -231,13 +231,14 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| POST | `/api/v1/storage/save/{name}` | Save storage config. Body: JSON object |
| GET | `/api/v1/storage/reset/{name}` | Reset storage config |
### Transfer (5 endpoints)
### Transfer (6 endpoints)
| Method | Path | Description |
|--------|------|-------------|
| GET | `/api/v1/transfer/name` | Preview transfer name. Params: `path` (required), `filetype` (required) |
| GET | `/api/v1/transfer/queue` | Transfer queue |
| DELETE | `/api/v1/transfer/queue` | Remove from transfer queue. Body: FileItem JSON |
| POST | `/api/v1/transfer/manual/target-path` | Match manual transfer target path. Body: ManualTransferItem JSON |
| POST | `/api/v1/transfer/manual` | Manual transfer. Params: `background`. Body: ManualTransferItem JSON |
| GET | `/api/v1/transfer/now` | Run immediate transfer |

View File

@@ -6,8 +6,12 @@ import sys
sys.modules.setdefault("app.helper.sites", ModuleType("app.helper.sites"))
setattr(sys.modules["app.helper.sites"], "SitesHelper", object)
from app.api.endpoints.transfer import manual_transfer, recommend_episode_format
from app.schemas import ManualTransferItem, EpisodeFormatRecommendItem
from app.api.endpoints.transfer import (
manual_transfer,
match_manual_transfer_target_path,
recommend_episode_format,
)
from app.schemas import EpisodeFormatRecommendItem, ManualTransferItem, TransferDirectoryConf
def test_manual_transfer_from_history_preserves_download_context(monkeypatch):
@@ -180,6 +184,136 @@ def test_manual_transfer_preview_multi_select_collects_failures(monkeypatch):
assert resp.data["items"][1]["success"] is False
def test_match_manual_transfer_target_path_returns_directory_match(monkeypatch):
captured = {}
class FakeDirectoryHelper:
def get_dir(self, **kwargs):
captured.update(kwargs)
return TransferDirectoryConf(
library_storage="rclone",
library_path="/library/tv",
transfer_type="copy",
scraping=True,
library_type_folder=True,
library_category_folder=False,
)
monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper)
resp = match_manual_transfer_target_path(
transer_item=ManualTransferItem(
fileitem={
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E01.mkv",
"name": "Test.Show.S01E01.mkv",
"type": "file",
},
),
db=object(),
_="token",
)
assert resp.success is True
assert captured["storage"] == "local"
assert captured["src_path"].as_posix() == "/downloads/Test Show/Test.Show.S01E01.mkv"
assert captured["target_storage"] is None
assert resp.data == {
"target_storage": "rclone",
"target_path": "/library/tv",
"transfer_type": "copy",
"scrape": True,
"library_type_folder": True,
"library_category_folder": False,
}
def test_match_manual_transfer_target_path_returns_null_for_ambiguous_matches(monkeypatch):
class FakeDirectoryHelper:
def get_dir(self, **kwargs):
src_path = kwargs["src_path"].as_posix()
return TransferDirectoryConf(
library_storage="local",
library_path="/library/tv" if "E01" in src_path else "/library/movie",
transfer_type="copy",
)
monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper)
resp = match_manual_transfer_target_path(
transer_item=ManualTransferItem(
fileitems=[
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E01.mkv",
"name": "Test.Show.S01E01.mkv",
"type": "file",
},
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E02.mkv",
"name": "Test.Show.S01E02.mkv",
"type": "file",
},
],
),
db=object(),
_="token",
)
assert resp.success is True
assert resp.data["target_path"] is None
assert resp.data["target_storage"] is None
def test_match_manual_transfer_target_path_accepts_multiple_history_records(monkeypatch):
histories = {
1: SimpleNamespace(
status=0,
mode="copy",
src_fileitem={
"storage": "local",
"path": "/downloads/Show/Show.S01E01.mkv",
"name": "Show.S01E01.mkv",
"type": "file",
},
),
2: SimpleNamespace(
status=0,
mode="copy",
src_fileitem={
"storage": "local",
"path": "/downloads/Show/Show.S01E02.mkv",
"name": "Show.S01E02.mkv",
"type": "file",
},
),
}
def fake_get(_db, logid):
return histories.get(logid)
class FakeDirectoryHelper:
def get_dir(self, **_kwargs):
return TransferDirectoryConf(
library_storage="local",
library_path="/library/tv",
transfer_type="copy",
)
monkeypatch.setattr("app.api.endpoints.transfer.TransferHistory.get", fake_get)
monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper)
resp = match_manual_transfer_target_path(
transer_item=ManualTransferItem(logids=[1, 2]),
db=object(),
_="token",
)
assert resp.success is True
assert resp.data["target_path"] == "/library/tv"
def test_recommend_episode_format_passes_selected_fileitems(monkeypatch):
selected_fileitems = [
{