From e00aa42f94f91b7c7a65c884384c00ee5aeea035 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 20 May 2026 16:39:07 +0800 Subject: [PATCH] fix: prevent duplicate transfer uploads --- app/chain/transfer.py | 39 +++++++++- app/modules/filemanager/storages/u115.py | 33 +++++++- tests/test_transfer_job_manager.py | 17 ++++ tests/test_u115_storage.py | 99 ++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 6 deletions(-) create mode 100644 tests/test_u115_storage.py diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 3a51d055..60c9e3a7 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -101,6 +101,18 @@ class JobManager: return None, season return media.tmdb_id or media.douban_id, season + @staticmethod + def __get_file_key(fileitem: FileItem) -> Optional[Tuple[str, str]]: + """ + 获取源文件唯一键,用于跨媒体作业识别同一个整理任务。 + """ + if not fileitem or not fileitem.path: + return None + normalized_path = ( + Path(str(fileitem.path).replace("\\", "/")).as_posix().rstrip("/") or "/" + ) + return fileitem.storage or "local", normalized_path + def __get_id(self, task: TransferTask = None) -> Tuple: """ 获取作业ID @@ -146,8 +158,19 @@ class JobManager: """ if not all([task, task.meta, task.fileitem]): return False + file_key = self.__get_file_key(task.fileitem) + if not file_key: + return False with job_lock: __mediaid__ = self.__get_id(task) + # 同一个源文件可能在识别前后落入不同作业,必须跨作业去重。 + if any( + self.__get_file_key(t.fileitem) == file_key + for job in self._job_view.values() + for t in job.tasks + ): + logger.debug(f"任务 {task.fileitem.name} 已存在,跳过重复添加") + return False if __mediaid__ not in self._job_view: self._job_view[__mediaid__] = TransferJob( media=self.__get_media(task), @@ -166,7 +189,7 @@ class JobManager: # 不重复添加任务 if any( [ - t.fileitem == task.fileitem + self.__get_file_key(t.fileitem) == file_key for t in self._job_view[__mediaid__].tasks ] ): @@ -282,10 +305,13 @@ class JobManager: """ if not task or not task.fileitem: return + file_key = self.__get_file_key(task.fileitem) + if not file_key: + return with job_lock: for mediaid, job in self._job_view.items(): for job_task in job.tasks: - if job_task.fileitem != task.fileitem: + if self.__get_file_key(job_task.fileitem) != file_key: continue if job_task.state not in ["completed", "failed"]: job_task.state = "failed" @@ -309,11 +335,14 @@ class JobManager: """ 根据文件项移除任务,并返回任务所在的作业ID """ + file_key = self.__get_file_key(fileitem) + if not file_key: + return None, None with job_lock: for mediaid in list(self._job_view): job = self._job_view[mediaid] for task in job.tasks: - if task.fileitem == fileitem: + if self.__get_file_key(task.fileitem) == file_key: job.tasks.remove(task) # 如果没有作业了,则移除作业 if not job.tasks: @@ -1594,7 +1623,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 更新任务信息 task.mediainfo = mediainfo # 更新队列任务 - self.jobview.migrate_task(task) + if not self.jobview.migrate_task(task): + logger.info(f"{task.fileitem.name} 已存在整理任务,跳过重复处理") + return False, f"{task.fileitem.name} 已在整理队列中" # 获取集数据 if task.mediainfo.type == MediaType.TV and not task.episodes_info: diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 953e95db..2c683e22 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -524,6 +524,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): """ def encode_callback(cb: str) -> str: + """ + 编码 115 OSS 回调参数。 + """ return oss2.utils.b64encode_as_string(cb) target_name = new_name or local_path.name @@ -631,7 +634,10 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): else None, modify_time=info_resp["utime"], ) - return self.get_item(target_path) + uploaded_item = self.get_item(target_path) + return uploaded_item or self.__build_uploaded_fileitem( + target_path, local_path, file_size + ) # Step 4: 获取上传凭证 token_resp = self._request_api("GET", "/open/upload/get_token", "data") @@ -740,7 +746,30 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): ) return None # 返回结果 - return self.get_item(target_path) + uploaded_item = self.get_item(target_path) + if uploaded_item: + return uploaded_item + logger.warn( + f"【115】{target_name} 上传已完成但元数据暂不可见,使用目标路径构造整理结果" + ) + return self.__build_uploaded_fileitem(target_path, local_path, file_size) + + def __build_uploaded_fileitem( + self, target_path: Path, local_path: Path, file_size: int + ) -> schemas.FileItem: + """ + 构造已上传文件项,用于兼容 115 上传成功后目录索引延迟刷新。 + """ + return schemas.FileItem( + storage=self.schema.value, + path=target_path.as_posix(), + type="file", + name=target_path.name, + basename=target_path.stem, + extension=target_path.suffix[1:] or None, + size=file_size, + modify_time=local_path.stat().st_mtime if local_path.exists() else None, + ) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """ diff --git a/tests/test_transfer_job_manager.py b/tests/test_transfer_job_manager.py index 7b828ade..ff205201 100644 --- a/tests/test_transfer_job_manager.py +++ b/tests/test_transfer_job_manager.py @@ -295,6 +295,23 @@ class TransferJobManagerTest(unittest.TestCase): self.assertEqual(1, len(jobs)) self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem) + def test_same_source_file_is_deduped_across_media_jobs(self): + """ + 同一个源文件即使识别到不同媒体作业,也不能重复加入整理视图。 + """ + jobview = JobManager() + task1 = make_task(1) + task2 = make_task(1) + task1.mediainfo = FakeMedia(100) + task2.mediainfo = FakeMedia(200) + + self.assertTrue(jobview.add_task(task1)) + self.assertFalse(jobview.add_task(task2)) + + jobs = jobview.list_jobs() + self.assertEqual(1, len(jobs)) + self.assertEqual(task1.fileitem, jobs[0].tasks[0].fileitem) + def test_pre_recognized_migrations_with_same_meta_do_not_link_jobs(self): jobview = JobManager() task1 = make_task(1) diff --git a/tests/test_u115_storage.py b/tests/test_u115_storage.py new file mode 100644 index 00000000..96aca1c8 --- /dev/null +++ b/tests/test_u115_storage.py @@ -0,0 +1,99 @@ +from types import SimpleNamespace +from unittest.mock import patch + +from app.modules.filemanager.storages.u115 import U115Pan +from app.schemas import FileItem + + +def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_path): + """ + 115 上传完成后目录索引暂不可见时,应返回可落库的目标文件项。 + """ + local_file = tmp_path / "Test.Show.S01E01.mkv" + local_file.write_bytes(b"movie") + + target_dir = FileItem( + storage="u115", + path="/library/Test Show (2026)/Season 1", + type="dir", + name="Season 1", + fileid="100", + ) + storage = object.__new__(U115Pan) + storage._calc_sha1 = lambda *_args, **_kwargs: "sha1" + storage.get_item = lambda _path: None + + def fake_request_api(_method, endpoint, *_args, **_kwargs): + """ + 模拟 115 初始化、凭证和断点续传接口。 + """ + if endpoint == "/open/upload/init": + return { + "state": True, + "data": { + "bucket": "bucket", + "object": "object", + "callback": {"callback": "callback", "callback_var": "var"}, + "pick_code": "pickcode", + "status": 1, + }, + } + if endpoint == "/open/upload/get_token": + return { + "endpoint": "endpoint", + "AccessKeyId": "access_key_id", + "AccessKeySecret": "access_key_secret", + "SecurityToken": "security_token", + } + if endpoint == "/open/upload/resume": + return None + return None + + class FakeBucket: + """ + 模拟 OSS 分片上传客户端。 + """ + + def __init__(self, *_args, **_kwargs): + pass + + def init_multipart_upload(self, *_args, **_kwargs): + """ + 返回固定 upload_id。 + """ + return SimpleNamespace(upload_id="upload_id") + + def upload_part(self, *_args, **_kwargs): + """ + 返回固定分片 etag。 + """ + return SimpleNamespace(etag="etag") + + def complete_multipart_upload(self, *_args, **_kwargs): + """ + 模拟 OSS 完成分片上传成功。 + """ + response = SimpleNamespace(json=lambda: {"state": True}) + return SimpleNamespace( + status=200, resp=SimpleNamespace(response=response) + ) + + storage._request_api = fake_request_api + + with patch( + "app.modules.filemanager.storages.u115.oss2.StsAuth", + return_value=object(), + ), patch( + "app.modules.filemanager.storages.u115.oss2.Bucket", + FakeBucket, + ), patch( + "app.modules.filemanager.storages.u115.transfer_process", + return_value=lambda _progress: None, + ): + uploaded_item = storage.upload(target_dir, local_file) + + assert uploaded_item is not None + assert uploaded_item.storage == "u115" + assert uploaded_item.path == "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv" + assert uploaded_item.type == "file" + assert uploaded_item.size == local_file.stat().st_size