fix: prevent duplicate transfer uploads

This commit is contained in:
jxxghp
2026-05-20 16:39:07 +08:00
parent 72ead2970c
commit e00aa42f94
4 changed files with 182 additions and 6 deletions

View File

@@ -101,6 +101,18 @@ class JobManager:
return None, season
return media.tmdb_id or media.douban_id, season
@staticmethod
def __get_file_key(fileitem: FileItem) -> Optional[Tuple[str, str]]:
"""
获取源文件唯一键,用于跨媒体作业识别同一个整理任务。
"""
if not fileitem or not fileitem.path:
return None
normalized_path = (
Path(str(fileitem.path).replace("\\", "/")).as_posix().rstrip("/") or "/"
)
return fileitem.storage or "local", normalized_path
def __get_id(self, task: TransferTask = None) -> Tuple:
"""
获取作业ID
@@ -146,8 +158,19 @@ class JobManager:
"""
if not all([task, task.meta, task.fileitem]):
return False
file_key = self.__get_file_key(task.fileitem)
if not file_key:
return False
with job_lock:
__mediaid__ = self.__get_id(task)
# 同一个源文件可能在识别前后落入不同作业,必须跨作业去重。
if any(
self.__get_file_key(t.fileitem) == file_key
for job in self._job_view.values()
for t in job.tasks
):
logger.debug(f"任务 {task.fileitem.name} 已存在,跳过重复添加")
return False
if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = TransferJob(
media=self.__get_media(task),
@@ -166,7 +189,7 @@ class JobManager:
# 不重复添加任务
if any(
[
t.fileitem == task.fileitem
self.__get_file_key(t.fileitem) == file_key
for t in self._job_view[__mediaid__].tasks
]
):
@@ -282,10 +305,13 @@ class JobManager:
"""
if not task or not task.fileitem:
return
file_key = self.__get_file_key(task.fileitem)
if not file_key:
return
with job_lock:
for mediaid, job in self._job_view.items():
for job_task in job.tasks:
if job_task.fileitem != task.fileitem:
if self.__get_file_key(job_task.fileitem) != file_key:
continue
if job_task.state not in ["completed", "failed"]:
job_task.state = "failed"
@@ -309,11 +335,14 @@ class JobManager:
"""
根据文件项移除任务并返回任务所在的作业ID
"""
file_key = self.__get_file_key(fileitem)
if not file_key:
return None, None
with job_lock:
for mediaid in list(self._job_view):
job = self._job_view[mediaid]
for task in job.tasks:
if task.fileitem == fileitem:
if self.__get_file_key(task.fileitem) == file_key:
job.tasks.remove(task)
# 如果没有作业了,则移除作业
if not job.tasks:
@@ -1594,7 +1623,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 更新任务信息
task.mediainfo = mediainfo
# 更新队列任务
self.jobview.migrate_task(task)
if not self.jobview.migrate_task(task):
logger.info(f"{task.fileitem.name} 已存在整理任务,跳过重复处理")
return False, f"{task.fileitem.name} 已在整理队列中"
# 获取集数据
if task.mediainfo.type == MediaType.TV and not task.episodes_info:

View File

@@ -524,6 +524,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
"""
def encode_callback(cb: str) -> str:
"""
编码 115 OSS 回调参数。
"""
return oss2.utils.b64encode_as_string(cb)
target_name = new_name or local_path.name
@@ -631,7 +634,10 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
else None,
modify_time=info_resp["utime"],
)
return self.get_item(target_path)
uploaded_item = self.get_item(target_path)
return uploaded_item or self.__build_uploaded_fileitem(
target_path, local_path, file_size
)
# Step 4: 获取上传凭证
token_resp = self._request_api("GET", "/open/upload/get_token", "data")
@@ -740,7 +746,30 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
)
return None
# 返回结果
return self.get_item(target_path)
uploaded_item = self.get_item(target_path)
if uploaded_item:
return uploaded_item
logger.warn(
f"【115】{target_name} 上传已完成但元数据暂不可见,使用目标路径构造整理结果"
)
return self.__build_uploaded_fileitem(target_path, local_path, file_size)
def __build_uploaded_fileitem(
self, target_path: Path, local_path: Path, file_size: int
) -> schemas.FileItem:
"""
构造已上传文件项,用于兼容 115 上传成功后目录索引延迟刷新。
"""
return schemas.FileItem(
storage=self.schema.value,
path=target_path.as_posix(),
type="file",
name=target_path.name,
basename=target_path.stem,
extension=target_path.suffix[1:] or None,
size=file_size,
modify_time=local_path.stat().st_mtime if local_path.exists() else None,
)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""

View File

@@ -295,6 +295,23 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual(1, len(jobs))
self.assertEqual(task2.fileitem, jobs[0].tasks[0].fileitem)
def test_same_source_file_is_deduped_across_media_jobs(self):
"""
同一个源文件即使识别到不同媒体作业,也不能重复加入整理视图。
"""
jobview = JobManager()
task1 = make_task(1)
task2 = make_task(1)
task1.mediainfo = FakeMedia(100)
task2.mediainfo = FakeMedia(200)
self.assertTrue(jobview.add_task(task1))
self.assertFalse(jobview.add_task(task2))
jobs = jobview.list_jobs()
self.assertEqual(1, len(jobs))
self.assertEqual(task1.fileitem, jobs[0].tasks[0].fileitem)
def test_pre_recognized_migrations_with_same_meta_do_not_link_jobs(self):
jobview = JobManager()
task1 = make_task(1)

View File

@@ -0,0 +1,99 @@
from types import SimpleNamespace
from unittest.mock import patch
from app.modules.filemanager.storages.u115 import U115Pan
from app.schemas import FileItem
def test_upload_returns_target_fileitem_when_uploaded_metadata_is_delayed(tmp_path):
"""
115 上传完成后目录索引暂不可见时,应返回可落库的目标文件项。
"""
local_file = tmp_path / "Test.Show.S01E01.mkv"
local_file.write_bytes(b"movie")
target_dir = FileItem(
storage="u115",
path="/library/Test Show (2026)/Season 1",
type="dir",
name="Season 1",
fileid="100",
)
storage = object.__new__(U115Pan)
storage._calc_sha1 = lambda *_args, **_kwargs: "sha1"
storage.get_item = lambda _path: None
def fake_request_api(_method, endpoint, *_args, **_kwargs):
"""
模拟 115 初始化、凭证和断点续传接口。
"""
if endpoint == "/open/upload/init":
return {
"state": True,
"data": {
"bucket": "bucket",
"object": "object",
"callback": {"callback": "callback", "callback_var": "var"},
"pick_code": "pickcode",
"status": 1,
},
}
if endpoint == "/open/upload/get_token":
return {
"endpoint": "endpoint",
"AccessKeyId": "access_key_id",
"AccessKeySecret": "access_key_secret",
"SecurityToken": "security_token",
}
if endpoint == "/open/upload/resume":
return None
return None
class FakeBucket:
"""
模拟 OSS 分片上传客户端。
"""
def __init__(self, *_args, **_kwargs):
pass
def init_multipart_upload(self, *_args, **_kwargs):
"""
返回固定 upload_id。
"""
return SimpleNamespace(upload_id="upload_id")
def upload_part(self, *_args, **_kwargs):
"""
返回固定分片 etag。
"""
return SimpleNamespace(etag="etag")
def complete_multipart_upload(self, *_args, **_kwargs):
"""
模拟 OSS 完成分片上传成功。
"""
response = SimpleNamespace(json=lambda: {"state": True})
return SimpleNamespace(
status=200, resp=SimpleNamespace(response=response)
)
storage._request_api = fake_request_api
with patch(
"app.modules.filemanager.storages.u115.oss2.StsAuth",
return_value=object(),
), patch(
"app.modules.filemanager.storages.u115.oss2.Bucket",
FakeBucket,
), patch(
"app.modules.filemanager.storages.u115.transfer_process",
return_value=lambda _progress: None,
):
uploaded_item = storage.upload(target_dir, local_file)
assert uploaded_item is not None
assert uploaded_item.storage == "u115"
assert uploaded_item.path == "/library/Test Show (2026)/Season 1/Test.Show.S01E01.mkv"
assert uploaded_item.type == "file"
assert uploaded_item.size == local_file.stat().st_size