From a0b3800f6bf4857bf4f889a63d44350eb8380f28 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 5 Jun 2026 17:42:43 +0800 Subject: [PATCH] fix: prevent cloud storage download path traversal --- app/modules/filemanager/storages/__init__.py | 34 ++- app/modules/filemanager/storages/alipan.py | 4 +- app/modules/filemanager/storages/rclone.py | 4 +- app/modules/filemanager/storages/smb.py | 4 +- app/modules/filemanager/storages/u115.py | 4 +- tests/test_storage_download_path.py | 230 +++++++++++++++++++ 6 files changed, 275 insertions(+), 5 deletions(-) create mode 100644 tests/test_storage_download_path.py diff --git a/app/modules/filemanager/storages/__init__.py b/app/modules/filemanager/storages/__init__.py index 3adeca74..5c495484 100644 --- a/app/modules/filemanager/storages/__init__.py +++ b/app/modules/filemanager/storages/__init__.py @@ -1,5 +1,5 @@ from abc import ABCMeta, abstractmethod -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Optional, List, Dict, Tuple, Callable, Union from tqdm import tqdm @@ -105,6 +105,38 @@ class StorageBase(metaclass=ABCMeta): self.storagehelper.reset_storage(self.schema.value) self.init_storage() + @staticmethod + def _safe_download_name(name: Optional[str]) -> Optional[str]: + """ + 提取可安全落盘的文件名。 + """ + if not name: + return None + + safe_name = PurePosixPath(str(name).replace("\\", "/")).name + if safe_name in ("", ".", ".."): + return None + return safe_name + + def _build_download_path( + self, fileitem: schemas.FileItem, path: Path + ) -> Optional[Path]: + """ + 构造本地下载路径,避免远端文件名携带目录片段时越过目标目录。 + """ + safe_name = self._safe_download_name(fileitem.name) + if not safe_name: + logger.error(f"【存储】下载文件名无效:{fileitem.name}") + return None + + local_path = path / safe_name + try: + local_path.resolve().relative_to(path.resolve()) + except ValueError: + logger.error(f"【存储】下载路径越界:{fileitem.name} -> {local_path}") + return None + return local_path + @abstractmethod def check(self) -> bool: """ diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 18b71cd8..7fae228a 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -741,7 +741,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton): logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}") return None - local_path = (path or settings.TEMP_PATH) / fileitem.name + local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH) + if not local_path: + return None # 获取文件大小 file_size = fileitem.size diff --git a/app/modules/filemanager/storages/rclone.py b/app/modules/filemanager/storages/rclone.py index 5613ea68..84e94697 100644 --- a/app/modules/filemanager/storages/rclone.py +++ b/app/modules/filemanager/storages/rclone.py @@ -340,7 +340,9 @@ class Rclone(StorageBase): """ 带实时进度显示的下载 """ - local_path = (path or settings.TEMP_PATH) / fileitem.name + local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH) + if not local_path: + return None # 初始化进度条 logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}") diff --git a/app/modules/filemanager/storages/smb.py b/app/modules/filemanager/storages/smb.py index 163a02e0..44fece2a 100644 --- a/app/modules/filemanager/storages/smb.py +++ b/app/modules/filemanager/storages/smb.py @@ -511,7 +511,9 @@ class SMB(StorageBase, metaclass=WeakSingleton): """ 带实时进度显示的下载 """ - local_path = (path or settings.TEMP_PATH) / fileitem.name + local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH) + if not local_path: + return None smb_path = self._normalize_path(fileitem.path) try: self._check_connection() diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 56ba8ae4..8ecd2be2 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -830,7 +830,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): logger.error(f"【115】下载链接为空: {fileitem.name}") return None - local_path = (path or settings.TEMP_PATH) / fileitem.name + local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH) + if not local_path: + return None # 获取文件大小 file_size = detail.size diff --git a/tests/test_storage_download_path.py b/tests/test_storage_download_path.py new file mode 100644 index 00000000..68751d84 --- /dev/null +++ b/tests/test_storage_download_path.py @@ -0,0 +1,230 @@ +from pathlib import Path +from typing import Iterator, Union +from unittest.mock import PropertyMock, patch + +import pytest + +from app import schemas +from app.modules.filemanager.storages.alipan import AliPan +from app.modules.filemanager.storages.rclone import Rclone +from app.modules.filemanager.storages.u115 import U115Pan + + +PAYLOAD = b"safe-download\n" + + +def _noop_progress(_percent: Union[int, float]) -> None: + """忽略测试中的进度更新。""" + return None + + +class _FakeAliPanStream: + """模拟阿里云盘下载流。""" + + def __init__(self, payload: bytes) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + """模拟响应状态检查。""" + return None + + def iter_content(self, chunk_size: int) -> Iterator[bytes]: + """返回下载内容分块。""" + yield self._payload + + def __enter__(self) -> "_FakeAliPanStream": + """进入上下文。""" + return self + + def __exit__(self, *args: object) -> None: + """退出上下文。""" + return None + + +class _FakeU115Stream: + """模拟 115 下载流。""" + + def __init__(self, payload: bytes) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + """模拟响应状态检查。""" + return None + + def iter_bytes(self, chunk_size: int) -> Iterator[bytes]: + """返回下载内容分块。""" + yield self._payload + + def close(self) -> None: + """模拟关闭响应流。""" + return None + + def __enter__(self) -> "_FakeU115Stream": + """进入上下文。""" + return self + + def __exit__(self, *args: object) -> None: + """退出上下文。""" + return None + + +class _FakeU115Session: + """模拟 115 HTTP 会话。""" + + def __init__(self, payload: bytes) -> None: + self._payload = payload + + def stream(self, method: str, url: str) -> _FakeU115Stream: + """返回伪造的下载流。""" + return _FakeU115Stream(self._payload) + + +class _FakeRcloneProcess: + """模拟 rclone 子进程。""" + + stdout: list[str] = [] + + def wait(self) -> int: + """返回成功退出码。""" + return 0 + + +@pytest.mark.parametrize( + ("name", "expected"), + [ + ("../proof.txt", "proof.txt"), + ("..\\proof.txt", "proof.txt"), + ("/tmp/proof.txt", "proof.txt"), + ], +) +def test_build_download_path_strips_remote_directory_segments( + tmp_path: Path, name: str, expected: str +) -> None: + """本地下载路径应剥离远端文件名中的目录片段。""" + storage = Rclone.__new__(Rclone) + fileitem = schemas.FileItem(path=f"/remote/{expected}", name=name) + + local_path = storage._build_download_path(fileitem, tmp_path) + + assert local_path == tmp_path / expected + assert local_path.resolve().relative_to(tmp_path.resolve()) == Path(expected) + + +@pytest.mark.parametrize("name", ["", ".", "..", "subdir/.."]) +def test_build_download_path_rejects_unsafe_filename( + tmp_path: Path, name: str +) -> None: + """本地下载路径应拒绝无法安全落盘的文件名。""" + storage = Rclone.__new__(Rclone) + fileitem = schemas.FileItem(path="/remote/proof.txt", name=name) + + assert storage._build_download_path(fileitem, tmp_path) is None + + +def test_alipan_download_writes_sanitized_filename(tmp_path: Path) -> None: + """阿里云盘下载应将路径穿越文件名写入目标目录内。""" + alipan = AliPan.__new__(AliPan) + alipan.chunk_size = 8192 + fileitem = schemas.FileItem( + storage="alipan", + type="file", + path="/remote/proof.txt", + name="../proof.txt", + size=len(PAYLOAD), + fileid="file-id", + drive_id="drive-id", + ) + + with ( + patch.object( + alipan, + "_request_api", + return_value={"url": "https://example.invalid/proof.txt"}, + ), + patch.object(AliPan, "access_token", new_callable=PropertyMock, return_value=None), + patch( + "app.modules.filemanager.storages.alipan.transfer_process", + return_value=_noop_progress, + ), + patch( + "app.modules.filemanager.storages.alipan.global_vars.is_transfer_stopped", + return_value=False, + ), + patch("app.modules.filemanager.storages.alipan.RequestUtils") as request_utils, + ): + request_utils.return_value.get_stream.return_value = _FakeAliPanStream(PAYLOAD) + result = alipan.download(fileitem, path=tmp_path) + + expected_path = tmp_path / "proof.txt" + assert result == expected_path + assert expected_path.read_bytes() == PAYLOAD + assert not (tmp_path.parent / "proof.txt").exists() + + +def test_u115_download_writes_sanitized_filename(tmp_path: Path) -> None: + """115 下载应将路径穿越文件名写入目标目录内。""" + u115 = U115Pan.__new__(U115Pan) + u115.chunk_size = 8192 + u115.session = _FakeU115Session(PAYLOAD) + detail = schemas.FileItem(size=len(PAYLOAD), pickcode="pick-code") + fileitem = schemas.FileItem( + storage="u115", + type="file", + path="/remote/proof.txt", + name="../proof.txt", + size=len(PAYLOAD), + ) + + with ( + patch.object(u115, "get_item", return_value=detail), + patch.object( + u115, + "_request_api", + return_value={"file-id": {"url": {"url": "https://example.invalid/proof.txt"}}}, + ), + patch( + "app.modules.filemanager.storages.u115.transfer_process", + return_value=_noop_progress, + ), + patch( + "app.modules.filemanager.storages.u115.global_vars.is_transfer_stopped", + return_value=False, + ), + ): + result = u115.download(fileitem, path=tmp_path) + + expected_path = tmp_path / "proof.txt" + assert result == expected_path + assert expected_path.read_bytes() == PAYLOAD + assert not (tmp_path.parent / "proof.txt").exists() + + +def test_rclone_download_uses_sanitized_target_path(tmp_path: Path) -> None: + """rclone 下载应把清洗后的本地路径传给 copyto。""" + storage = Rclone.__new__(Rclone) + fileitem = schemas.FileItem( + storage="rclone", + type="file", + path="/remote/proof.txt", + name="../proof.txt", + size=len(PAYLOAD), + ) + captured_cmd: dict[str, list[str]] = {} + + def fake_popen(cmd: list[str], *args: object, **kwargs: object) -> _FakeRcloneProcess: + captured_cmd["cmd"] = cmd + return _FakeRcloneProcess() + + with ( + patch( + "app.modules.filemanager.storages.rclone.transfer_process", + return_value=_noop_progress, + ), + patch("app.modules.filemanager.storages.rclone.subprocess.Popen", side_effect=fake_popen), + ): + result = storage.download(fileitem, path=tmp_path) + + expected_path = tmp_path / "proof.txt" + assert result == expected_path + assert captured_cmd["cmd"][-1] == str(expected_path) + assert expected_path.resolve().relative_to(tmp_path.resolve()) == Path("proof.txt")