mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-06 20:42:43 +08:00
fix: serialize rclone folder creation during concurrent transfers
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
import json
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
from typing import Optional, List, Union
|
||||
|
||||
from app import schemas
|
||||
from app.core.config import settings
|
||||
@@ -11,6 +13,9 @@ from app.schemas.types import StorageSchema
|
||||
from app.utils.string import StringUtils
|
||||
from app.utils.system import SystemUtils
|
||||
|
||||
_folder_locks: dict[str, threading.Lock] = {}
|
||||
_folder_locks_guard = threading.Lock()
|
||||
|
||||
|
||||
class Rclone(StorageBase):
|
||||
"""
|
||||
@@ -120,6 +125,43 @@ class Rclone(StorageBase):
|
||||
modify_time=StringUtils.str_to_timestamp(item.get("ModTime"))
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def __normalize_remote_path(path: Union[Path, str]) -> str:
|
||||
"""
|
||||
规范化远端路径,统一目录锁键值。
|
||||
"""
|
||||
path_str = Path(str(path or "/")).as_posix()
|
||||
if not path_str.startswith("/"):
|
||||
path_str = f"/{path_str}"
|
||||
if path_str != "/":
|
||||
path_str = path_str.rstrip("/")
|
||||
return path_str or "/"
|
||||
|
||||
@staticmethod
|
||||
def __get_path_lock(path: Union[Path, str]) -> threading.Lock:
|
||||
"""
|
||||
获取指定远端路径的模块级锁。
|
||||
"""
|
||||
normalized = Rclone.__normalize_remote_path(path)
|
||||
with _folder_locks_guard:
|
||||
if normalized not in _folder_locks:
|
||||
_folder_locks[normalized] = threading.Lock()
|
||||
return _folder_locks[normalized]
|
||||
|
||||
def __wait_for_item(
|
||||
self, path: Path, retries: int = 3, delay: float = 0.2
|
||||
) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
等待目录或文件在远端可见,兼容云盘最终一致性延迟。
|
||||
"""
|
||||
for attempt in range(retries):
|
||||
item = self.get_item(path)
|
||||
if item:
|
||||
return item
|
||||
if attempt < retries - 1:
|
||||
time.sleep(delay)
|
||||
return None
|
||||
|
||||
def check(self) -> bool:
|
||||
"""
|
||||
检查存储是否可用
|
||||
@@ -163,50 +205,53 @@ class Rclone(StorageBase):
|
||||
:param fileitem: 父目录
|
||||
:param name: 目录名
|
||||
"""
|
||||
path = Path(self.__normalize_remote_path(Path(fileitem.path) / name))
|
||||
try:
|
||||
retcode = subprocess.run(
|
||||
[
|
||||
'rclone', 'mkdir',
|
||||
f'MP:{Path(fileitem.path) / name}'
|
||||
f'MP:{path}'
|
||||
],
|
||||
startupinfo=self.__get_hidden_shell()
|
||||
).returncode
|
||||
if retcode == 0:
|
||||
return self.get_item(Path(fileitem.path) / name)
|
||||
folder = self.__wait_for_item(path)
|
||||
if folder:
|
||||
return folder
|
||||
logger.warn(f"【rclone】目录 {path} 创建成功后暂未可见")
|
||||
return None
|
||||
folder = self.__wait_for_item(path, retries=2)
|
||||
if folder:
|
||||
logger.info(f"【rclone】目录 {path} 已存在,忽略重复创建")
|
||||
return folder
|
||||
except Exception as err:
|
||||
logger.error(f"【rclone】创建目录失败:{err}")
|
||||
folder = self.__wait_for_item(path, retries=2)
|
||||
if folder:
|
||||
logger.info(f"【rclone】目录 {path} 已存在,忽略创建异常")
|
||||
return folder
|
||||
return None
|
||||
|
||||
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
根据文件路程获取目录,不存在则创建
|
||||
"""
|
||||
|
||||
def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
查找下级目录中匹配名称的目录
|
||||
"""
|
||||
for sub_folder in self.list(_fileitem):
|
||||
if sub_folder.type != "dir":
|
||||
continue
|
||||
if sub_folder.name == _name:
|
||||
return sub_folder
|
||||
return None
|
||||
normalized = Path(self.__normalize_remote_path(path))
|
||||
|
||||
# 是否已存在
|
||||
folder = self.get_item(path)
|
||||
folder = self.get_item(normalized)
|
||||
if folder:
|
||||
return folder
|
||||
# 逐级查找和创建目录
|
||||
fileitem = schemas.FileItem(storage=self.schema.value, path="/")
|
||||
for part in path.parts[1:]:
|
||||
dir_file = __find_dir(fileitem, part)
|
||||
if dir_file:
|
||||
fileitem = dir_file
|
||||
else:
|
||||
dir_file = self.create_folder(fileitem, part)
|
||||
fileitem = schemas.FileItem(storage=self.schema.value, type="dir", path="/")
|
||||
for part in normalized.parts[1:]:
|
||||
current_path = Path(self.__normalize_remote_path(Path(fileitem.path) / part))
|
||||
with self.__get_path_lock(current_path):
|
||||
dir_file = self.get_item(current_path)
|
||||
if not dir_file:
|
||||
logger.warn(f"【rclone】创建目录 {fileitem.path}{part} 失败!")
|
||||
dir_file = self.create_folder(fileitem, part)
|
||||
if not dir_file:
|
||||
logger.warn(f"【rclone】创建目录 {current_path} 失败!")
|
||||
return None
|
||||
fileitem = dir_file
|
||||
return fileitem
|
||||
|
||||
205
tests/test_rclone_storage.py
Normal file
205
tests/test_rclone_storage.py
Normal file
@@ -0,0 +1,205 @@
|
||||
import threading
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from typing import Union
|
||||
from unittest.mock import patch
|
||||
|
||||
from app import schemas
|
||||
from app.modules.filemanager.storages import rclone as rclone_module
|
||||
from app.modules.filemanager.storages.rclone import Rclone
|
||||
|
||||
|
||||
class RcloneStorageTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
with rclone_module._folder_locks_guard:
|
||||
rclone_module._folder_locks.clear()
|
||||
|
||||
@staticmethod
|
||||
def _normalize(path: Union[Path, str]) -> str:
|
||||
return Rclone._Rclone__normalize_remote_path(path)
|
||||
|
||||
def _make_dir_item(self, path: Union[Path, str]) -> schemas.FileItem:
|
||||
normalized = self._normalize(path)
|
||||
name = Path(normalized).name or "/"
|
||||
return schemas.FileItem(
|
||||
storage="rclone",
|
||||
type="dir",
|
||||
path="/" if normalized == "/" else f"{normalized}/",
|
||||
name=name,
|
||||
basename=name,
|
||||
)
|
||||
|
||||
def test_get_folder_serializes_same_target_directory_creation(self):
|
||||
storage = Rclone()
|
||||
thread_count = 4
|
||||
start_event = threading.Event()
|
||||
missing_barrier = threading.Barrier(thread_count)
|
||||
state_lock = threading.Lock()
|
||||
existing_paths = {"/"}
|
||||
mkdir_calls = []
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def fake_get_item(_self, path: Path):
|
||||
normalized = self._normalize(path)
|
||||
with state_lock:
|
||||
exists = normalized in existing_paths
|
||||
if not exists and normalized == "/Show":
|
||||
try:
|
||||
missing_barrier.wait(timeout=0.1)
|
||||
except threading.BrokenBarrierError:
|
||||
pass
|
||||
with state_lock:
|
||||
exists = normalized in existing_paths
|
||||
if exists:
|
||||
return self._make_dir_item(normalized)
|
||||
return None
|
||||
|
||||
def fake_run(cmd, *args, **kwargs):
|
||||
target = self._normalize(cmd[-1].removeprefix("MP:"))
|
||||
with state_lock:
|
||||
mkdir_calls.append(target)
|
||||
existing_paths.add(target)
|
||||
return SimpleNamespace(returncode=0)
|
||||
|
||||
def worker():
|
||||
try:
|
||||
start_event.wait()
|
||||
results.append(storage.get_folder(Path("/Show/Season 1")))
|
||||
except Exception as err: # pragma: no cover - 仅用于调试失败
|
||||
errors.append(err)
|
||||
|
||||
threads = [threading.Thread(target=worker) for _ in range(thread_count)]
|
||||
|
||||
with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item):
|
||||
with patch(
|
||||
"app.modules.filemanager.storages.rclone.subprocess.run",
|
||||
side_effect=fake_run,
|
||||
):
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
start_event.set()
|
||||
for thread in threads:
|
||||
thread.join(timeout=1)
|
||||
|
||||
self.assertFalse(errors)
|
||||
self.assertTrue(all(not thread.is_alive() for thread in threads))
|
||||
self.assertEqual(thread_count, len(results))
|
||||
self.assertTrue(all(result and result.path == "/Show/Season 1/" for result in results))
|
||||
self.assertEqual(1, mkdir_calls.count("/Show"))
|
||||
self.assertEqual(1, mkdir_calls.count("/Show/Season 1"))
|
||||
|
||||
def test_get_folder_serializes_shared_parent_creation(self):
|
||||
storage = Rclone()
|
||||
thread_count = 4
|
||||
start_event = threading.Event()
|
||||
missing_barrier = threading.Barrier(thread_count)
|
||||
state_lock = threading.Lock()
|
||||
existing_paths = {"/"}
|
||||
mkdir_calls = []
|
||||
results = []
|
||||
errors = []
|
||||
targets = [
|
||||
Path("/Show/Season 1"),
|
||||
Path("/Show/Season 2"),
|
||||
Path("/Show/Season 1"),
|
||||
Path("/Show/Season 2"),
|
||||
]
|
||||
|
||||
def fake_get_item(_self, path: Path):
|
||||
normalized = self._normalize(path)
|
||||
with state_lock:
|
||||
exists = normalized in existing_paths
|
||||
if not exists and normalized == "/Show":
|
||||
try:
|
||||
missing_barrier.wait(timeout=0.1)
|
||||
except threading.BrokenBarrierError:
|
||||
pass
|
||||
with state_lock:
|
||||
exists = normalized in existing_paths
|
||||
if exists:
|
||||
return self._make_dir_item(normalized)
|
||||
return None
|
||||
|
||||
def fake_run(cmd, *args, **kwargs):
|
||||
target = self._normalize(cmd[-1].removeprefix("MP:"))
|
||||
with state_lock:
|
||||
mkdir_calls.append(target)
|
||||
existing_paths.add(target)
|
||||
return SimpleNamespace(returncode=0)
|
||||
|
||||
def worker(target: Path):
|
||||
try:
|
||||
start_event.wait()
|
||||
results.append(storage.get_folder(target))
|
||||
except Exception as err: # pragma: no cover - 仅用于调试失败
|
||||
errors.append(err)
|
||||
|
||||
threads = [threading.Thread(target=worker, args=(target,)) for target in targets]
|
||||
|
||||
with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item):
|
||||
with patch(
|
||||
"app.modules.filemanager.storages.rclone.subprocess.run",
|
||||
side_effect=fake_run,
|
||||
):
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
start_event.set()
|
||||
for thread in threads:
|
||||
thread.join(timeout=1)
|
||||
|
||||
self.assertFalse(errors)
|
||||
self.assertTrue(all(not thread.is_alive() for thread in threads))
|
||||
self.assertEqual(4, len(results))
|
||||
self.assertEqual(1, mkdir_calls.count("/Show"))
|
||||
self.assertEqual(1, mkdir_calls.count("/Show/Season 1"))
|
||||
self.assertEqual(1, mkdir_calls.count("/Show/Season 2"))
|
||||
|
||||
def test_create_folder_retries_visibility_after_successful_mkdir(self):
|
||||
storage = Rclone()
|
||||
expected = self._make_dir_item("/Show")
|
||||
responses = [None, expected]
|
||||
|
||||
def fake_get_item(_self, path: Path):
|
||||
return responses.pop(0)
|
||||
|
||||
with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item):
|
||||
with patch(
|
||||
"app.modules.filemanager.storages.rclone.subprocess.run",
|
||||
return_value=SimpleNamespace(returncode=0),
|
||||
) as run_mock:
|
||||
with patch("app.modules.filemanager.storages.rclone.time.sleep", return_value=None):
|
||||
folder = storage.create_folder(
|
||||
schemas.FileItem(storage="rclone", type="dir", path="/"),
|
||||
"Show",
|
||||
)
|
||||
|
||||
self.assertEqual("/Show/", folder.path)
|
||||
run_mock.assert_called_once()
|
||||
|
||||
def test_create_folder_accepts_existing_directory_after_failed_mkdir(self):
|
||||
storage = Rclone()
|
||||
expected = self._make_dir_item("/Show")
|
||||
responses = [None, expected]
|
||||
|
||||
def fake_get_item(_self, path: Path):
|
||||
return responses.pop(0)
|
||||
|
||||
with patch.object(Rclone, "get_item", autospec=True, side_effect=fake_get_item):
|
||||
with patch(
|
||||
"app.modules.filemanager.storages.rclone.subprocess.run",
|
||||
return_value=SimpleNamespace(returncode=1),
|
||||
) as run_mock:
|
||||
with patch("app.modules.filemanager.storages.rclone.time.sleep", return_value=None):
|
||||
folder = storage.create_folder(
|
||||
schemas.FileItem(storage="rclone", type="dir", path="/"),
|
||||
"Show",
|
||||
)
|
||||
|
||||
self.assertEqual("/Show/", folder.path)
|
||||
run_mock.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user