feat: 支持多文件手动整理与集数定位模板推荐 (#5820)

This commit is contained in:
Album
2026-05-23 09:23:50 +08:00
committed by GitHub
parent 7cbfeb2377
commit 2eb7f57a4c
6 changed files with 703 additions and 18 deletions

View File

@@ -485,6 +485,101 @@ def test_transfer_chain_recommend_episode_format_passes_helper_data(monkeypatch)
assert data == helper_data
def test_transfer_chain_recommend_episode_format_uses_selected_fileitems(monkeypatch):
chain = object.__new__(TransferChain)
monkeypatch.setattr(chain, "_media_exts", [".mkv", ".mp4"], raising=False)
monkeypatch.setattr(chain, "_subtitle_exts", [".ass", ".ssa"], raising=False)
monkeypatch.setattr(chain, "_audio_exts", [".mka", ".aac"], raising=False)
selected_fileitems = [
FileItem(
storage="local",
path="/downloads/Show/Show - 01.mkv",
type="file",
name="Show - 01.mkv",
extension="mkv",
),
FileItem(
storage="local",
path="/downloads/Show/Show - 01.ass",
type="file",
name="Show - 01.ass",
extension="ass",
),
]
helper_data = {
"rule_name": "智能分析",
"episode_format": "Show - {ep}.{a}",
"sample_file": "Show - 01.mkv",
"pattern": None,
"sample_count": 2,
"majority_count": 2,
"confidence": "high",
"size_filter_relaxed": False,
"native_verified_count": 0,
"native_fallback_count": 0,
"native_conflict_count": 0,
"reason": "selected_samples",
"reasons": ["selected_samples"],
"message": "已基于选中文件生成集数定位模板",
}
monkeypatch.setattr(
chain,
"_TransferChain__get_episode_format_rules",
lambda: [],
)
monkeypatch.setattr(
"app.chain.transfer.EpisodeFormatRuleHelper.recommend",
lambda self, rules, sample_files: (True, "", {
**helper_data,
"received_samples": [item.name for item in sample_files],
}),
)
state, errmsg, data = TransferChain.recommend_episode_format(
chain,
fileitem=None,
fileitems=selected_fileitems,
)
assert state is True
assert errmsg == ""
assert data["received_samples"] == [item.name for item in selected_fileitems]
def test_transfer_chain_recommend_episode_format_rejects_invalid_selected_fileitems():
chain = object.__new__(TransferChain)
chain._media_exts = [".mkv", ".mp4"]
chain._subtitle_exts = [".ass", ".ssa"]
chain._audio_exts = [".mka", ".aac"]
selected_fileitems = [
FileItem(
storage="local",
path="/downloads/Show/Show - 01.mkv",
type="file",
name="Show - 01.mkv",
extension="mkv",
),
FileItem(
storage="local",
path="/downloads/Other/Show - 02.mkv",
type="file",
name="Show - 02.mkv",
extension="mkv",
),
]
state, errmsg, data = TransferChain.recommend_episode_format(
chain,
fileitem=None,
fileitems=selected_fileitems,
)
assert state is False
assert errmsg == "当前选择不满足智能识别条件"
assert data is None
def test_transfer_chain_episode_format_samples_include_extra_files(monkeypatch):
chain = object.__new__(TransferChain)
directory = FileItem(

View File

@@ -6,8 +6,8 @@ import sys
sys.modules.setdefault("app.helper.sites", ModuleType("app.helper.sites"))
setattr(sys.modules["app.helper.sites"], "SitesHelper", object)
from app.api.endpoints.transfer import manual_transfer
from app.schemas import ManualTransferItem
from app.api.endpoints.transfer import manual_transfer, recommend_episode_format
from app.schemas import ManualTransferItem, EpisodeFormatRecommendItem
def test_manual_transfer_from_history_preserves_download_context(monkeypatch):
@@ -52,3 +52,206 @@ def test_manual_transfer_from_history_preserves_download_context(monkeypatch):
assert captured["download_hash"] == "abc123"
assert captured["episode_group"] == "WEB-DL"
assert captured["season"] == 1
def test_manual_transfer_preview_uses_explicit_fileitems_instead_of_directory(monkeypatch):
dir_item = {
"storage": "local",
"path": "/downloads/Test Show/",
"name": "Test Show",
"type": "dir",
}
file_paths = [
"/downloads/Test Show/Test.Show.S01E01.mkv",
"/downloads/Test Show/Test.Show.S01E02.mkv",
"/downloads/Test Show/Test.Show.S01E03.mkv",
]
selected_fileitems = [
{
"storage": "local",
"path": file_path,
"name": file_path.rsplit("/", 1)[-1],
"type": "file",
}
for file_path in file_paths
]
captured = []
class FakeTransferChain:
def manual_transfer(self, **kwargs):
captured.append(kwargs)
fileitem = kwargs["fileitem"]
return True, {
"summary": {"total": 1, "success": 1, "failed": 0},
"items": [
{
"source": fileitem.path,
"target": f"/library/{fileitem.name}",
"target_dir": "/library",
"success": True,
"message": "",
"type": "电视剧",
"title": "Test Show (2026)",
"season": 1,
"episode": 1,
"episode_end": None,
"part": None,
}
],
"message": "",
}
monkeypatch.setattr("app.api.endpoints.transfer.TransferChain", FakeTransferChain)
resp = manual_transfer(
transer_item=ManualTransferItem(
fileitem=dir_item,
fileitems=selected_fileitems,
preview=True,
),
background=False,
db=object(),
_="token",
)
assert resp.success is True
assert len(captured) == 3
assert [item["fileitem"].path for item in captured] == file_paths
assert all(item["sync_extra_files"] is False for item in captured)
assert resp.data["summary"] == {"total": 3, "success": 3, "failed": 0}
assert [item["source"] for item in resp.data["items"]] == file_paths
def test_manual_transfer_preview_multi_select_collects_failures(monkeypatch):
file_paths = [
"/downloads/Test Show/Test.Show.S01E01.mkv",
"/downloads/Test Show/Test.Show.S01E02.mkv",
]
selected_fileitems = [
{
"storage": "local",
"path": file_path,
"name": file_path.rsplit("/", 1)[-1],
"type": "file",
}
for file_path in file_paths
]
class FakeTransferChain:
def manual_transfer(self, **kwargs):
fileitem = kwargs["fileitem"]
if fileitem.path.endswith("E02.mkv"):
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
return True, {
"summary": {"total": 1, "success": 1, "failed": 0},
"items": [
{
"source": fileitem.path,
"target": f"/library/{fileitem.name}",
"target_dir": "/library",
"success": True,
"message": "",
"type": "电视剧",
"title": "Test Show (2026)",
"season": 1,
"episode": 1,
"episode_end": None,
"part": None,
}
],
"message": "",
}
monkeypatch.setattr("app.api.endpoints.transfer.TransferChain", FakeTransferChain)
resp = manual_transfer(
transer_item=ManualTransferItem(
fileitems=selected_fileitems,
preview=True,
),
background=False,
db=object(),
_="token",
)
assert resp.success is True
assert resp.data["summary"] == {"total": 2, "success": 1, "failed": 1}
assert [item["source"] for item in resp.data["items"]] == file_paths
assert resp.data["items"][1]["success"] is False
def test_recommend_episode_format_passes_selected_fileitems(monkeypatch):
selected_fileitems = [
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E01.mkv",
"name": "Test.Show.S01E01.mkv",
"type": "file",
},
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E02.mkv",
"name": "Test.Show.S01E02.mkv",
"type": "file",
},
]
captured = {}
class FakeTransferChain:
def recommend_episode_format(self, **kwargs):
captured.update(kwargs)
return True, "", {"episode_format": "Show.S01E{ep}.mkv"}
monkeypatch.setattr("app.api.endpoints.transfer.TransferChain", FakeTransferChain)
resp = recommend_episode_format(
recommend_item=EpisodeFormatRecommendItem(
fileitem=selected_fileitems[0],
fileitems=selected_fileitems,
),
_="token",
)
assert resp.success is True
assert captured["fileitem"].path == selected_fileitems[0]["path"]
assert [item.path for item in captured["fileitems"]] == [
item["path"] for item in selected_fileitems
]
def test_recommend_episode_format_accepts_fileitems_without_fileitem(monkeypatch):
selected_fileitems = [
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E01.mkv",
"name": "Test.Show.S01E01.mkv",
"type": "file",
},
{
"storage": "local",
"path": "/downloads/Test Show/Test.Show.S01E02.mkv",
"name": "Test.Show.S01E02.mkv",
"type": "file",
},
]
captured = {}
class FakeTransferChain:
def recommend_episode_format(self, **kwargs):
captured.update(kwargs)
return True, "", {"episode_format": "Show.S01E{ep}.mkv"}
monkeypatch.setattr("app.api.endpoints.transfer.TransferChain", FakeTransferChain)
resp = recommend_episode_format(
recommend_item=EpisodeFormatRecommendItem(
fileitems=selected_fileitems,
),
_="token",
)
assert resp.success is True
assert captured["fileitem"] is None
assert [item.path for item in captured["fileitems"]] == [
item["path"] for item in selected_fileitems
]

View File

@@ -682,6 +682,184 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual("", errmsg)
self.assertEqual([main_fileitem.path], planned)
def test_manual_transfer_enables_sync_extra_files(self):
chain = make_transfer_chain()
captured = {}
fileitem = make_fileitem("/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv")
def fake_do_transfer(**kwargs):
captured.update(kwargs)
return True, ""
chain.do_transfer = fake_do_transfer
state, errmsg = TransferChain.manual_transfer(
chain,
fileitem=fileitem,
preview=True,
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertTrue(captured["manual"])
self.assertTrue(captured["sync_extra_files"])
def test_manual_transfer_respects_sync_extra_files_argument(self):
chain = make_transfer_chain()
captured = {}
fileitem = make_fileitem("/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv")
def fake_do_transfer(**kwargs):
captured.update(kwargs)
return True, ""
chain.do_transfer = fake_do_transfer
state, errmsg = TransferChain.manual_transfer(
chain,
fileitem=fileitem,
preview=True,
sync_extra_files=False,
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertFalse(captured["sync_extra_files"])
def test_do_transfer_keeps_manual_single_extra_file_when_epformat_misses(self):
chain = make_transfer_chain()
planned = []
subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Show - 01.sc.ass"
)
chain._TransferChain__put_to_jobview = lambda task: True
chain._TransferChain__register_scrape_batch_task = lambda task: None
chain._TransferChain__close_scrape_batch = lambda batch_id: None
def fake_handle_transfer(task, callback=None):
planned.append((task.fileitem.path, task.meta.begin_episode))
return True, ""
chain._TransferChain__handle_transfer = fake_handle_transfer
transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None)
download_history_oper = SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
)
system_config_oper = SimpleNamespace(get=lambda key: None)
storage_chain = SimpleNamespace(get_item=lambda fileitem: subtitle_fileitem)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.DownloadHistoryOper",
return_value=download_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
), patch(
"app.chain.transfer.StorageChain",
return_value=storage_chain,
), patch(
"app.chain.transfer.MetaInfoPath",
side_effect=lambda path, custom_words=None: FakeMeta(1),
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=subtitle_fileitem,
background=False,
manual=True,
sync_extra_files=True,
epformat=EpisodeFormat(format="Show - {ep}.mkv"),
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertEqual([(subtitle_fileitem.path, 1)], planned)
def test_do_transfer_syncs_extra_files_when_epformat_only_matches_main_video(self):
chain = make_transfer_chain()
planned = []
main_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Show - 01.mkv"
)
subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Show - 01.sc.ass"
)
parent_fileitem = FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
)
chain._TransferChain__get_trans_fileitems = lambda fileitem, predicate: [
(main_fileitem, False)
]
chain._TransferChain__put_to_jobview = lambda task: True
chain._TransferChain__register_scrape_batch_task = lambda task: None
chain._TransferChain__close_scrape_batch = lambda batch_id: None
def fake_handle_transfer(task, callback=None):
planned.append((task.fileitem.path, task.meta.begin_episode))
return True, ""
chain._TransferChain__handle_transfer = fake_handle_transfer
transfer_history_oper = SimpleNamespace(get_by_src=lambda src, storage=None: None)
download_history_oper = SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
)
system_config_oper = SimpleNamespace(get=lambda key: None)
storage_chain = SimpleNamespace(
get_parent_item=lambda fileitem: parent_fileitem,
list_files=lambda fileitem, recursion=False: [
main_fileitem,
subtitle_fileitem,
],
)
with patch(
"app.chain.transfer.TransferHistoryOper",
return_value=transfer_history_oper,
), patch(
"app.chain.transfer.DownloadHistoryOper",
return_value=download_history_oper,
), patch(
"app.chain.transfer.SystemConfigOper",
return_value=system_config_oper,
), patch(
"app.chain.transfer.StorageChain",
return_value=storage_chain,
), patch(
"app.chain.transfer.MetaInfoPath",
side_effect=lambda path, custom_words=None: FakeMeta(1),
):
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=main_fileitem,
background=False,
manual=True,
sync_extra_files=True,
epformat=EpisodeFormat(format="Show - {ep}.mkv"),
)
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertEqual(
[
(main_fileitem.path, 1),
(subtitle_fileitem.path, 1),
],
planned,
)
def test_do_transfer_syncs_matching_extra_files_for_each_main_video(self):
chain = make_transfer_chain()
planned = []