fix: prevent cloud storage download path traversal

This commit is contained in:
jxxghp
2026-06-05 17:42:43 +08:00
parent 871d1ec0d8
commit a0b3800f6b
6 changed files with 275 additions and 5 deletions

View File

@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from pathlib import Path
from pathlib import Path, PurePosixPath
from typing import Optional, List, Dict, Tuple, Callable, Union
from tqdm import tqdm
@@ -105,6 +105,38 @@ class StorageBase(metaclass=ABCMeta):
self.storagehelper.reset_storage(self.schema.value)
self.init_storage()
@staticmethod
def _safe_download_name(name: Optional[str]) -> Optional[str]:
"""
提取可安全落盘的文件名。
"""
if not name:
return None
safe_name = PurePosixPath(str(name).replace("\\", "/")).name
if safe_name in ("", ".", ".."):
return None
return safe_name
def _build_download_path(
self, fileitem: schemas.FileItem, path: Path
) -> Optional[Path]:
"""
构造本地下载路径,避免远端文件名携带目录片段时越过目标目录。
"""
safe_name = self._safe_download_name(fileitem.name)
if not safe_name:
logger.error(f"【存储】下载文件名无效:{fileitem.name}")
return None
local_path = path / safe_name
try:
local_path.resolve().relative_to(path.resolve())
except ValueError:
logger.error(f"【存储】下载路径越界:{fileitem.name} -> {local_path}")
return None
return local_path
@abstractmethod
def check(self) -> bool:
"""

View File

@@ -741,7 +741,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
return None
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 获取文件大小
file_size = fileitem.size

View File

@@ -340,7 +340,9 @@ class Rclone(StorageBase):
"""
带实时进度显示的下载
"""
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 初始化进度条
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")

View File

@@ -511,7 +511,9 @@ class SMB(StorageBase, metaclass=WeakSingleton):
"""
带实时进度显示的下载
"""
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
smb_path = self._normalize_path(fileitem.path)
try:
self._check_connection()

View File

@@ -830,7 +830,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
logger.error(f"【115】下载链接为空: {fileitem.name}")
return None
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 获取文件大小
file_size = detail.size

View File

@@ -0,0 +1,230 @@
from pathlib import Path
from typing import Iterator, Union
from unittest.mock import PropertyMock, patch
import pytest
from app import schemas
from app.modules.filemanager.storages.alipan import AliPan
from app.modules.filemanager.storages.rclone import Rclone
from app.modules.filemanager.storages.u115 import U115Pan
PAYLOAD = b"safe-download\n"
def _noop_progress(_percent: Union[int, float]) -> None:
"""忽略测试中的进度更新。"""
return None
class _FakeAliPanStream:
"""模拟阿里云盘下载流。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def raise_for_status(self) -> None:
"""模拟响应状态检查。"""
return None
def iter_content(self, chunk_size: int) -> Iterator[bytes]:
"""返回下载内容分块。"""
yield self._payload
def __enter__(self) -> "_FakeAliPanStream":
"""进入上下文。"""
return self
def __exit__(self, *args: object) -> None:
"""退出上下文。"""
return None
class _FakeU115Stream:
"""模拟 115 下载流。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def raise_for_status(self) -> None:
"""模拟响应状态检查。"""
return None
def iter_bytes(self, chunk_size: int) -> Iterator[bytes]:
"""返回下载内容分块。"""
yield self._payload
def close(self) -> None:
"""模拟关闭响应流。"""
return None
def __enter__(self) -> "_FakeU115Stream":
"""进入上下文。"""
return self
def __exit__(self, *args: object) -> None:
"""退出上下文。"""
return None
class _FakeU115Session:
"""模拟 115 HTTP 会话。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def stream(self, method: str, url: str) -> _FakeU115Stream:
"""返回伪造的下载流。"""
return _FakeU115Stream(self._payload)
class _FakeRcloneProcess:
"""模拟 rclone 子进程。"""
stdout: list[str] = []
def wait(self) -> int:
"""返回成功退出码。"""
return 0
@pytest.mark.parametrize(
("name", "expected"),
[
("../proof.txt", "proof.txt"),
("..\\proof.txt", "proof.txt"),
("/tmp/proof.txt", "proof.txt"),
],
)
def test_build_download_path_strips_remote_directory_segments(
tmp_path: Path, name: str, expected: str
) -> None:
"""本地下载路径应剥离远端文件名中的目录片段。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(path=f"/remote/{expected}", name=name)
local_path = storage._build_download_path(fileitem, tmp_path)
assert local_path == tmp_path / expected
assert local_path.resolve().relative_to(tmp_path.resolve()) == Path(expected)
@pytest.mark.parametrize("name", ["", ".", "..", "subdir/.."])
def test_build_download_path_rejects_unsafe_filename(
tmp_path: Path, name: str
) -> None:
"""本地下载路径应拒绝无法安全落盘的文件名。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(path="/remote/proof.txt", name=name)
assert storage._build_download_path(fileitem, tmp_path) is None
def test_alipan_download_writes_sanitized_filename(tmp_path: Path) -> None:
"""阿里云盘下载应将路径穿越文件名写入目标目录内。"""
alipan = AliPan.__new__(AliPan)
alipan.chunk_size = 8192
fileitem = schemas.FileItem(
storage="alipan",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
fileid="file-id",
drive_id="drive-id",
)
with (
patch.object(
alipan,
"_request_api",
return_value={"url": "https://example.invalid/proof.txt"},
),
patch.object(AliPan, "access_token", new_callable=PropertyMock, return_value=None),
patch(
"app.modules.filemanager.storages.alipan.transfer_process",
return_value=_noop_progress,
),
patch(
"app.modules.filemanager.storages.alipan.global_vars.is_transfer_stopped",
return_value=False,
),
patch("app.modules.filemanager.storages.alipan.RequestUtils") as request_utils,
):
request_utils.return_value.get_stream.return_value = _FakeAliPanStream(PAYLOAD)
result = alipan.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert expected_path.read_bytes() == PAYLOAD
assert not (tmp_path.parent / "proof.txt").exists()
def test_u115_download_writes_sanitized_filename(tmp_path: Path) -> None:
"""115 下载应将路径穿越文件名写入目标目录内。"""
u115 = U115Pan.__new__(U115Pan)
u115.chunk_size = 8192
u115.session = _FakeU115Session(PAYLOAD)
detail = schemas.FileItem(size=len(PAYLOAD), pickcode="pick-code")
fileitem = schemas.FileItem(
storage="u115",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
)
with (
patch.object(u115, "get_item", return_value=detail),
patch.object(
u115,
"_request_api",
return_value={"file-id": {"url": {"url": "https://example.invalid/proof.txt"}}},
),
patch(
"app.modules.filemanager.storages.u115.transfer_process",
return_value=_noop_progress,
),
patch(
"app.modules.filemanager.storages.u115.global_vars.is_transfer_stopped",
return_value=False,
),
):
result = u115.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert expected_path.read_bytes() == PAYLOAD
assert not (tmp_path.parent / "proof.txt").exists()
def test_rclone_download_uses_sanitized_target_path(tmp_path: Path) -> None:
"""rclone 下载应把清洗后的本地路径传给 copyto。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(
storage="rclone",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
)
captured_cmd: dict[str, list[str]] = {}
def fake_popen(cmd: list[str], *args: object, **kwargs: object) -> _FakeRcloneProcess:
captured_cmd["cmd"] = cmd
return _FakeRcloneProcess()
with (
patch(
"app.modules.filemanager.storages.rclone.transfer_process",
return_value=_noop_progress,
),
patch("app.modules.filemanager.storages.rclone.subprocess.Popen", side_effect=fake_popen),
):
result = storage.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert captured_cmd["cmd"][-1] == str(expected_path)
assert expected_path.resolve().relative_to(tmp_path.resolve()) == Path("proof.txt")