From b1259fdc02917ee0ee8210813be6886053f441f1 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 27 May 2026 13:26:01 +0800 Subject: [PATCH] feat: add manual transfer target path matching --- app/api/endpoints/transfer.py | 146 ++++++++++++++++++++++ app/schemas/transfer.py | 21 ++++ skills/moviepilot-api/SKILL.md | 3 +- tests/test_transfer_history_retransfer.py | 138 +++++++++++++++++++- 4 files changed, 305 insertions(+), 3 deletions(-) diff --git a/app/api/endpoints/transfer.py b/app/api/endpoints/transfer.py index fa5403cc..9a599da6 100644 --- a/app/api/endpoints/transfer.py +++ b/app/api/endpoints/transfer.py @@ -92,6 +92,152 @@ async def remove_queue( return schemas.Response(success=True) +def _resolve_manual_transfer_source_fileitems( + transer_item: ManualTransferItem, db: Session +) -> tuple[List[FileItem], Optional[str]]: + """ + 从手动整理请求中解析源文件项。 + """ + if transer_item.logids: + fileitems: List[FileItem] = [] + for logid in transer_item.logids: + history: TransferHistory = TransferHistory.get(db, logid) + if not history: + return [], f"整理记录不存在,ID:{logid}" + if history.status and ("move" in history.mode): + fileitems.append(FileItem(**history.dest_fileitem)) + else: + fileitems.append(FileItem(**history.src_fileitem)) + return fileitems, None + + if transer_item.logid: + history: TransferHistory = TransferHistory.get(db, transer_item.logid) + if not history: + return [], f"整理记录不存在,ID:{transer_item.logid}" + if history.status and ("move" in history.mode): + return [FileItem(**history.dest_fileitem)], None + return [FileItem(**history.src_fileitem)], None + + if transer_item.fileitems: + return [fileitem for fileitem in transer_item.fileitems if fileitem], None + if transer_item.fileitem: + return [transer_item.fileitem], None + return [], None + + +def _deduplicate_fileitems(fileitems: List[FileItem]) -> List[FileItem]: + """ + 按存储和路径去重文件项。 + """ + dedup_fileitems: List[FileItem] = [] + seen_paths = set() + for current_fileitem in fileitems: + storage = current_fileitem.storage or "local" + path = current_fileitem.path + if not path: + continue + key = (storage, path) + if key in seen_paths: + continue + seen_paths.add(key) + dedup_fileitems.append(current_fileitem) + return dedup_fileitems + + +def _build_manual_transfer_target_path( + directory: Optional[schemas.TransferDirectoryConf] = None, +) -> schemas.ManualTransferTargetPath: + """ + 根据目录配置生成手动整理目的路径响应。 + """ + if not directory or not directory.library_path: + return schemas.ManualTransferTargetPath() + + return schemas.ManualTransferTargetPath( + target_storage=directory.library_storage or "local", + target_path=directory.library_path, + transfer_type=directory.transfer_type, + scrape=directory.scraping or False, + library_type_folder=directory.library_type_folder or False, + library_category_folder=directory.library_category_folder or False, + ) + + +def _get_manual_transfer_target_key( + directory: schemas.TransferDirectoryConf, +) -> tuple[Optional[str], Optional[str]]: + """ + 生成目的目录唯一键。 + """ + return ( + directory.library_storage or "local", + Path(directory.library_path).as_posix() if directory.library_path else None, + ) + + +@router.post( + "/manual/target-path", + summary="匹配手动转移目的路径", + response_model=schemas.Response, +) +def match_manual_transfer_target_path( + transer_item: ManualTransferItem, + db: Session = Depends(get_db), + _: User = Depends(get_current_active_superuser), +) -> Any: + """ + 根据源文件匹配手动整理目的路径。 + + :param transer_item: 手工整理项 + :param db: 数据库 + :param _: Token校验 + """ + src_fileitems, error_message = _resolve_manual_transfer_source_fileitems( + transer_item=transer_item, + db=db, + ) + if error_message: + return schemas.Response(success=False, message=error_message) + + matched_directories: List[schemas.TransferDirectoryConf] = [] + target_storage = transer_item.target_storage or None + for src_fileitem in _deduplicate_fileitems(src_fileitems): + directory = DirectoryHelper().get_dir( + media=None, + storage=src_fileitem.storage or "local", + src_path=Path(src_fileitem.path), + target_storage=target_storage, + ) + if not directory or not directory.library_path: + return schemas.Response( + success=True, + data=schemas.ManualTransferTargetPath().model_dump(), + ) + matched_directories.append(directory) + + if not matched_directories: + return schemas.Response( + success=True, + data=schemas.ManualTransferTargetPath().model_dump(), + ) + + first_directory = matched_directories[0] + first_key = _get_manual_transfer_target_key(first_directory) + if any( + _get_manual_transfer_target_key(directory) != first_key + for directory in matched_directories[1:] + ): + return schemas.Response( + success=True, + data=schemas.ManualTransferTargetPath().model_dump(), + ) + + return schemas.Response( + success=True, + data=_build_manual_transfer_target_path(first_directory).model_dump(), + ) + + @router.post("/manual", summary="手动转移", response_model=schemas.Response) def manual_transfer( transer_item: ManualTransferItem, diff --git a/app/schemas/transfer.py b/app/schemas/transfer.py index dc7a623c..a36d98d7 100644 --- a/app/schemas/transfer.py +++ b/app/schemas/transfer.py @@ -192,6 +192,8 @@ class ManualTransferItem(BaseModel): fileitems: Optional[List[FileItem]] = None # 日志ID logid: Optional[int] = None + # 日志ID列表(前端多选历史记录时传入) + logids: Optional[List[int]] = None # 目标存储 target_storage: Optional[str] = None # 目标路径 @@ -228,3 +230,22 @@ class ManualTransferItem(BaseModel): episode_group: Optional[str] = None # 仅预览,不执行整理 preview: Optional[bool] = False + + +class ManualTransferTargetPath(BaseModel): + """ + 手动整理目的路径匹配结果 + """ + + # 目标存储 + target_storage: Optional[str] = None + # 目标路径 + target_path: Optional[str] = None + # 整理方式 + transfer_type: Optional[str] = None + # 刮削 + scrape: Optional[bool] = False + # 媒体库类型子目录 + library_type_folder: Optional[bool] = False + # 媒体库类别子目录 + library_category_folder: Optional[bool] = False diff --git a/skills/moviepilot-api/SKILL.md b/skills/moviepilot-api/SKILL.md index 2462a98f..4482ce7e 100644 --- a/skills/moviepilot-api/SKILL.md +++ b/skills/moviepilot-api/SKILL.md @@ -231,13 +231,14 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as ` | POST | `/api/v1/storage/save/{name}` | Save storage config. Body: JSON object | | GET | `/api/v1/storage/reset/{name}` | Reset storage config | -### Transfer (5 endpoints) +### Transfer (6 endpoints) | Method | Path | Description | |--------|------|-------------| | GET | `/api/v1/transfer/name` | Preview transfer name. Params: `path` (required), `filetype` (required) | | GET | `/api/v1/transfer/queue` | Transfer queue | | DELETE | `/api/v1/transfer/queue` | Remove from transfer queue. Body: FileItem JSON | +| POST | `/api/v1/transfer/manual/target-path` | Match manual transfer target path. Body: ManualTransferItem JSON | | POST | `/api/v1/transfer/manual` | Manual transfer. Params: `background`. Body: ManualTransferItem JSON | | GET | `/api/v1/transfer/now` | Run immediate transfer | diff --git a/tests/test_transfer_history_retransfer.py b/tests/test_transfer_history_retransfer.py index 225010c1..e132e503 100644 --- a/tests/test_transfer_history_retransfer.py +++ b/tests/test_transfer_history_retransfer.py @@ -6,8 +6,12 @@ import sys sys.modules.setdefault("app.helper.sites", ModuleType("app.helper.sites")) setattr(sys.modules["app.helper.sites"], "SitesHelper", object) -from app.api.endpoints.transfer import manual_transfer, recommend_episode_format -from app.schemas import ManualTransferItem, EpisodeFormatRecommendItem +from app.api.endpoints.transfer import ( + manual_transfer, + match_manual_transfer_target_path, + recommend_episode_format, +) +from app.schemas import EpisodeFormatRecommendItem, ManualTransferItem, TransferDirectoryConf def test_manual_transfer_from_history_preserves_download_context(monkeypatch): @@ -180,6 +184,136 @@ def test_manual_transfer_preview_multi_select_collects_failures(monkeypatch): assert resp.data["items"][1]["success"] is False +def test_match_manual_transfer_target_path_returns_directory_match(monkeypatch): + captured = {} + + class FakeDirectoryHelper: + def get_dir(self, **kwargs): + captured.update(kwargs) + return TransferDirectoryConf( + library_storage="rclone", + library_path="/library/tv", + transfer_type="copy", + scraping=True, + library_type_folder=True, + library_category_folder=False, + ) + + monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper) + + resp = match_manual_transfer_target_path( + transer_item=ManualTransferItem( + fileitem={ + "storage": "local", + "path": "/downloads/Test Show/Test.Show.S01E01.mkv", + "name": "Test.Show.S01E01.mkv", + "type": "file", + }, + ), + db=object(), + _="token", + ) + + assert resp.success is True + assert captured["storage"] == "local" + assert captured["src_path"].as_posix() == "/downloads/Test Show/Test.Show.S01E01.mkv" + assert captured["target_storage"] is None + assert resp.data == { + "target_storage": "rclone", + "target_path": "/library/tv", + "transfer_type": "copy", + "scrape": True, + "library_type_folder": True, + "library_category_folder": False, + } + + +def test_match_manual_transfer_target_path_returns_null_for_ambiguous_matches(monkeypatch): + class FakeDirectoryHelper: + def get_dir(self, **kwargs): + src_path = kwargs["src_path"].as_posix() + return TransferDirectoryConf( + library_storage="local", + library_path="/library/tv" if "E01" in src_path else "/library/movie", + transfer_type="copy", + ) + + monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper) + + resp = match_manual_transfer_target_path( + transer_item=ManualTransferItem( + fileitems=[ + { + "storage": "local", + "path": "/downloads/Test Show/Test.Show.S01E01.mkv", + "name": "Test.Show.S01E01.mkv", + "type": "file", + }, + { + "storage": "local", + "path": "/downloads/Test Show/Test.Show.S01E02.mkv", + "name": "Test.Show.S01E02.mkv", + "type": "file", + }, + ], + ), + db=object(), + _="token", + ) + + assert resp.success is True + assert resp.data["target_path"] is None + assert resp.data["target_storage"] is None + + +def test_match_manual_transfer_target_path_accepts_multiple_history_records(monkeypatch): + histories = { + 1: SimpleNamespace( + status=0, + mode="copy", + src_fileitem={ + "storage": "local", + "path": "/downloads/Show/Show.S01E01.mkv", + "name": "Show.S01E01.mkv", + "type": "file", + }, + ), + 2: SimpleNamespace( + status=0, + mode="copy", + src_fileitem={ + "storage": "local", + "path": "/downloads/Show/Show.S01E02.mkv", + "name": "Show.S01E02.mkv", + "type": "file", + }, + ), + } + + def fake_get(_db, logid): + return histories.get(logid) + + class FakeDirectoryHelper: + def get_dir(self, **_kwargs): + return TransferDirectoryConf( + library_storage="local", + library_path="/library/tv", + transfer_type="copy", + ) + + monkeypatch.setattr("app.api.endpoints.transfer.TransferHistory.get", fake_get) + monkeypatch.setattr("app.api.endpoints.transfer.DirectoryHelper", FakeDirectoryHelper) + + resp = match_manual_transfer_target_path( + transer_item=ManualTransferItem(logids=[1, 2]), + db=object(), + _="token", + ) + + assert resp.success is True + assert resp.data["target_path"] == "/library/tv" + + def test_recommend_episode_format_passes_selected_fileitems(monkeypatch): selected_fileitems = [ {