From 6f6fcc79f2da8e6804159c671a98ec6a1d042293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AC=A8=E7=AC=A8?= Date: Mon, 20 Apr 2026 21:18:30 +0800 Subject: [PATCH] fix: serialize rclone folder creation during concurrent transfers --- app/modules/filemanager/storages/rclone.py | 91 ++++++--- tests/test_rclone_storage.py | 205 +++++++++++++++++++++ 2 files changed, 273 insertions(+), 23 deletions(-) create mode 100644 tests/test_rclone_storage.py diff --git a/app/modules/filemanager/storages/rclone.py b/app/modules/filemanager/storages/rclone.py index 3d3c8232..c4d34ade 100644 --- a/app/modules/filemanager/storages/rclone.py +++ b/app/modules/filemanager/storages/rclone.py @@ -1,7 +1,9 @@ import json import subprocess +import threading +import time from pathlib import Path -from typing import Optional, List +from typing import Optional, List, Union from app import schemas from app.core.config import settings @@ -11,6 +13,9 @@ from app.schemas.types import StorageSchema from app.utils.string import StringUtils from app.utils.system import SystemUtils +_folder_locks: dict[str, threading.Lock] = {} +_folder_locks_guard = threading.Lock() + class Rclone(StorageBase): """ @@ -120,6 +125,43 @@ class Rclone(StorageBase): modify_time=StringUtils.str_to_timestamp(item.get("ModTime")) ) + @staticmethod + def __normalize_remote_path(path: Union[Path, str]) -> str: + """ + 规范化远端路径,统一目录锁键值。 + """ + path_str = Path(str(path or "/")).as_posix() + if not path_str.startswith("/"): + path_str = f"/{path_str}" + if path_str != "/": + path_str = path_str.rstrip("/") + return path_str or "/" + + @staticmethod + def __get_path_lock(path: Union[Path, str]) -> threading.Lock: + """ + 获取指定远端路径的模块级锁。 + """ + normalized = Rclone.__normalize_remote_path(path) + with _folder_locks_guard: + if normalized not in _folder_locks: + _folder_locks[normalized] = threading.Lock() + return _folder_locks[normalized] + + def __wait_for_item( + self, path: Path, retries: int = 3, delay: float = 0.2 + ) -> Optional[schemas.FileItem]: + """ + 等待目录或文件在远端可见,兼容云盘最终一致性延迟。 + """ + for attempt in range(retries): + item = self.get_item(path) + if item: + return item + if attempt < retries - 1: + time.sleep(delay) + return None + def check(self) -> bool: """ 检查存储是否可用 @@ -163,50 +205,53 @@ class Rclone(StorageBase): :param fileitem: 父目录 :param name: 目录名 """ + path = Path(self.__normalize_remote_path(Path(fileitem.path) / name)) try: retcode = subprocess.run( [ 'rclone', 'mkdir', - f'MP:{Path(fileitem.path) / name}' + f'MP:{path}' ], startupinfo=self.__get_hidden_shell() ).returncode if retcode == 0: - return self.get_item(Path(fileitem.path) / name) + folder = self.__wait_for_item(path) + if folder: + return folder + logger.warn(f"【rclone】目录 {path} 创建成功后暂未可见") + return None + folder = self.__wait_for_item(path, retries=2) + if folder: + logger.info(f"【rclone】目录 {path} 已存在,忽略重复创建") + return folder except Exception as err: logger.error(f"【rclone】创建目录失败:{err}") + folder = self.__wait_for_item(path, retries=2) + if folder: + logger.info(f"【rclone】目录 {path} 已存在,忽略创建异常") + return folder return None def get_folder(self, path: Path) -> Optional[schemas.FileItem]: """ 根据文件路程获取目录,不存在则创建 """ - - def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]: - """ - 查找下级目录中匹配名称的目录 - """ - for sub_folder in self.list(_fileitem): - if sub_folder.type != "dir": - continue - if sub_folder.name == _name: - return sub_folder - return None + normalized = Path(self.__normalize_remote_path(path)) # 是否已存在 - folder = self.get_item(path) + folder = self.get_item(normalized) if folder: return folder # 逐级查找和创建目录 - fileitem = schemas.FileItem(storage=self.schema.value, path="/") - for part in path.parts[1:]: - dir_file = __find_dir(fileitem, part) - if dir_file: - fileitem = dir_file - else: - dir_file = self.create_folder(fileitem, part) + fileitem = schemas.FileItem(storage=self.schema.value, type="dir", path="/") + for part in normalized.parts[1:]: + current_path = Path(self.__normalize_remote_path(Path(fileitem.path) / part)) + with self.__get_path_lock(current_path): + dir_file = self.get_item(current_path) if not dir_file: - logger.warn(f"【rclone】创建目录 {fileitem.path}{part} 失败!") + dir_file = self.create_folder(fileitem, part) + if not dir_file: + logger.warn(f"【rclone】创建目录 {current_path} 失败!") return None fileitem = dir_file return fileitem diff --git a/tests/test_rclone_storage.py b/tests/test_rclone_storage.py new file mode 100644 index 00000000..a7474e2d --- /dev/null +++ b/tests/test_rclone_storage.py @@ -0,0 +1,205 @@ +import threading +import unittest +from pathlib import Path +from types import SimpleNamespace +from typing import Union +from unittest.mock import patch + +from app import schemas +from app.modules.filemanager.storages import rclone as rclone_module +from app.modules.filemanager.storages.rclone import Rclone + + +class RcloneStorageTest(unittest.TestCase): + def setUp(self): + with rclone_module._folder_locks_guard: + rclone_module._folder_locks.clear() + + @staticmethod + def _normalize(path: Union[Path, str]) -> str: + return Rclone._Rclone__normalize_remote_path(path) + + def _make_dir_item(self, path: Union[Path, str]) -> schemas.FileItem: + normalized = self._normalize(path) + name = Path(normalized).name or "/" + return schemas.FileItem( + storage="rclone", + type="dir", + path="/" if normalized == "/" else f"{normalized}/", + name=name, + basename=name, + ) + + def test_get_folder_serializes_same_target_directory_creation(self): + storage = Rclone() + thread_count = 4 + start_event = threading.Event() + missing_barrier = threading.Barrier(thread_count) + state_lock = threading.Lock() + existing_paths = {"/"} + mkdir_calls = [] + results = [] + errors = [] + + def fake_get_item(_self, path: Path): + normalized = self._normalize(path) + with state_lock: + exists = normalized in existing_paths + if not exists and normalized == "/Show": + try: + missing_barrier.wait(timeout=0.1) + except threading.BrokenBarrierError: + pass + with state_lock: + exists = normalized in existing_paths + if exists: + return self._make_dir_item(normalized) + return None + + def fake_run(cmd, *args, **kwargs): + target = self._normalize(cmd[-1].removeprefix("MP:")) + with state_lock: + mkdir_calls.append(target) + existing_paths.add(target) + return SimpleNamespace(returncode=0) + + def worker(): + try: + start_event.wait() + results.append(storage.get_folder(Path("/Show/Season 1"))) + except Exception as err: # pragma: no cover - 仅用于调试失败 + errors.append(err) + + threads = [threading.Thread(target=worker) for _ in range(thread_count)] + + with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item): + with patch( + "app.modules.filemanager.storages.rclone.subprocess.run", + side_effect=fake_run, + ): + for thread in threads: + thread.start() + start_event.set() + for thread in threads: + thread.join(timeout=1) + + self.assertFalse(errors) + self.assertTrue(all(not thread.is_alive() for thread in threads)) + self.assertEqual(thread_count, len(results)) + self.assertTrue(all(result and result.path == "/Show/Season 1/" for result in results)) + self.assertEqual(1, mkdir_calls.count("/Show")) + self.assertEqual(1, mkdir_calls.count("/Show/Season 1")) + + def test_get_folder_serializes_shared_parent_creation(self): + storage = Rclone() + thread_count = 4 + start_event = threading.Event() + missing_barrier = threading.Barrier(thread_count) + state_lock = threading.Lock() + existing_paths = {"/"} + mkdir_calls = [] + results = [] + errors = [] + targets = [ + Path("/Show/Season 1"), + Path("/Show/Season 2"), + Path("/Show/Season 1"), + Path("/Show/Season 2"), + ] + + def fake_get_item(_self, path: Path): + normalized = self._normalize(path) + with state_lock: + exists = normalized in existing_paths + if not exists and normalized == "/Show": + try: + missing_barrier.wait(timeout=0.1) + except threading.BrokenBarrierError: + pass + with state_lock: + exists = normalized in existing_paths + if exists: + return self._make_dir_item(normalized) + return None + + def fake_run(cmd, *args, **kwargs): + target = self._normalize(cmd[-1].removeprefix("MP:")) + with state_lock: + mkdir_calls.append(target) + existing_paths.add(target) + return SimpleNamespace(returncode=0) + + def worker(target: Path): + try: + start_event.wait() + results.append(storage.get_folder(target)) + except Exception as err: # pragma: no cover - 仅用于调试失败 + errors.append(err) + + threads = [threading.Thread(target=worker, args=(target,)) for target in targets] + + with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item): + with patch( + "app.modules.filemanager.storages.rclone.subprocess.run", + side_effect=fake_run, + ): + for thread in threads: + thread.start() + start_event.set() + for thread in threads: + thread.join(timeout=1) + + self.assertFalse(errors) + self.assertTrue(all(not thread.is_alive() for thread in threads)) + self.assertEqual(4, len(results)) + self.assertEqual(1, mkdir_calls.count("/Show")) + self.assertEqual(1, mkdir_calls.count("/Show/Season 1")) + self.assertEqual(1, mkdir_calls.count("/Show/Season 2")) + + def test_create_folder_retries_visibility_after_successful_mkdir(self): + storage = Rclone() + expected = self._make_dir_item("/Show") + responses = [None, expected] + + def fake_get_item(_self, path: Path): + return responses.pop(0) + + with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item): + with patch( + "app.modules.filemanager.storages.rclone.subprocess.run", + return_value=SimpleNamespace(returncode=0), + ) as run_mock: + with patch("app.modules.filemanager.storages.rclone.time.sleep", return_value=None): + folder = storage.create_folder( + schemas.FileItem(storage="rclone", type="dir", path="/"), + "Show", + ) + + self.assertEqual("/Show/", folder.path) + run_mock.assert_called_once() + + def test_create_folder_accepts_existing_directory_after_failed_mkdir(self): + storage = Rclone() + expected = self._make_dir_item("/Show") + responses = [None, expected] + + def fake_get_item(_self, path: Path): + return responses.pop(0) + + with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item): + with patch( + "app.modules.filemanager.storages.rclone.subprocess.run", + return_value=SimpleNamespace(returncode=1), + ) as run_mock: + with patch("app.modules.filemanager.storages.rclone.time.sleep", return_value=None): + folder = storage.create_folder( + schemas.FileItem(storage="rclone", type="dir", path="/"), + "Show", + ) + + self.assertEqual("/Show/", folder.path) + run_mock.assert_called_once() + + +if __name__ == "__main__": + unittest.main()