fix: restrict log access and add zip download (#5936)

This commit is contained in:
InfinityPacer
2026-06-13 20:20:10 +08:00
committed by GitHub
parent ac9132cba6
commit 13cb1683ff
2 changed files with 226 additions and 1 deletions

View File

@@ -1,7 +1,11 @@
import asyncio
import io
import json
import re
import zipfile
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union, Annotated
from urllib.parse import urljoin, urlparse
@@ -62,6 +66,8 @@ _PUBLIC_SYSTEM_CONFIG_KEYS = {
)
}
_PUBLIC_SETTINGS_KEYS = {"PLUGIN_MARKET"}
_LOG_DOWNLOAD_LIMIT = 10
_LOG_DOWNLOAD_NAME_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$")
def _match_nettest_prefix(url: str, prefix: str) -> bool:
@@ -285,6 +291,98 @@ def _build_nettest_rules() -> list[dict[str, Any]]:
return rules
def _collect_named_log_files(name: str) -> list[Path]:
"""
根据前端传入的日志标识收集可下载日志文件。
`moviepilot` 固定表示主程序日志,其余标识按插件 ID 处理并映射到
`plugins/<plugin_id>.log*`。这里不接收路径或后缀,避免下载入口变成任意
日志文件选择器;滚动日志按当前文件优先、备份文件按修改时间倒序补足。
"""
normalized_name = (name or "").strip().lower()
if not normalized_name or not _LOG_DOWNLOAD_NAME_PATTERN.fullmatch(normalized_name):
raise HTTPException(status_code=404, detail="Not Found")
log_root = settings.LOG_PATH
if normalized_name == "moviepilot":
log_dir = log_root
log_prefix = "moviepilot.log"
else:
log_dir = log_root / "plugins"
log_prefix = f"{normalized_name}.log"
if not log_dir.exists() or not log_dir.is_dir():
raise HTTPException(status_code=404, detail="Not Found")
current_log = log_dir / log_prefix
backup_logs = [
item
for item in log_dir.iterdir()
if item.is_file() and item.name.startswith(f"{log_prefix}.")
]
backup_logs.sort(key=lambda item: item.stat().st_mtime, reverse=True)
log_files = []
if current_log.exists() and current_log.is_file():
log_files.append(current_log)
log_files.extend(backup_logs)
return log_files[:_LOG_DOWNLOAD_LIMIT]
def _verify_log_resource_superuser(
token_payload: schemas.TokenPayload = Depends(verify_resource_token),
) -> schemas.TokenPayload:
"""
校验日志资源访问权限。
日志接口通过浏览器新窗口和 EventSource 访问,不能依赖普通 API 请求头;
因此这里复用资源 Cookie 完成身份识别,再额外要求管理员身份,避免普通
登录用户读取可能包含敏感信息的日志。
"""
if not token_payload.super_user:
raise HTTPException(status_code=403, detail="用户权限不足")
return token_payload
async def _build_log_zip_response(name: str) -> StreamingResponse:
"""
将指定日志标识对应的日志文件打包为 zip 响应。
打包前逐个校验文件仍位于日志根目录内,避免符号链接或并发文件变更绕过
`name` 到固定目录的映射约束。zip 内使用日志根目录相对路径,便于区分
主程序日志与插件日志。
"""
log_files = _collect_named_log_files(name)
if not log_files:
raise HTTPException(status_code=404, detail="Not Found")
log_root = settings.LOG_PATH
async_log_root = AsyncPath(log_root)
zip_buffer = io.BytesIO()
filename_time = datetime.now().strftime("%Y%m%d-%H%M%S")
safe_name = (name or "logs").strip().lower() or "logs"
zip_stem = f"{safe_name}-logs-{filename_time}"
with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
for log_file in log_files:
if not await SecurityUtils.async_is_safe_path(
base_path=async_log_root,
user_path=AsyncPath(log_file),
):
raise HTTPException(status_code=404, detail="Not Found")
arcname = f"{zip_stem}/{log_file.name}"
archive.write(log_file, arcname)
zip_buffer.seek(0)
headers = {
"Content-Disposition": f'attachment; filename="{zip_stem}.zip"'
}
return StreamingResponse(
iter([zip_buffer.getvalue()]),
media_type="application/zip",
headers=headers,
)
def _validate_nettest_url(url: str) -> Optional[str]:
"""
对实际请求地址做基础安全校验。
@@ -705,7 +803,7 @@ async def get_logging(
request: Request,
length: Optional[int] = 50,
logfile: Optional[str] = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token),
_: schemas.TokenPayload = Depends(_verify_log_resource_superuser),
):
"""
实时获取系统日志
@@ -814,6 +912,17 @@ async def get_logging(
return StreamingResponse(log_generator(), media_type="text/event-stream")
@router.get("/logging/download/{name}", summary="下载日志")
async def download_logging(
name: str,
_: schemas.TokenPayload = Depends(_verify_log_resource_superuser),
):
"""
按日志标识下载主程序或插件滚动日志,返回 zip 文件。
"""
return await _build_log_zip_response(name)
@router.get(
"/versions", summary="查询Github所有Release版本", response_model=schemas.Response
)

View File

@@ -0,0 +1,116 @@
"""系统日志查看与下载接口的权限和打包行为测试。"""
import asyncio
import io
import zipfile
from pathlib import Path
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
from starlette.responses import Response
from app.api.endpoints import system as system_endpoint
from app.core.config import settings
def test_logging_routes_use_superuser_dependency():
"""日志查看和下载路由都必须绑定管理员依赖,避免普通登录用户读取敏感日志。"""
routes = {route.path: route for route in system_endpoint.router.routes}
logging_dependencies = {dependency.call for dependency in routes["/logging"].dependant.dependencies}
download_dependencies = {dependency.call for dependency in routes["/logging/download/{name}"].dependant.dependencies}
assert system_endpoint._verify_log_resource_superuser in logging_dependencies
assert system_endpoint._verify_log_resource_superuser in download_dependencies
def test_log_resource_dependency_rejects_normal_user():
"""日志资源依赖必须拒绝非管理员 resource token。"""
with pytest.raises(HTTPException) as exc_info:
system_endpoint._verify_log_resource_superuser(
SimpleNamespace(super_user=False),
)
assert exc_info.value.status_code == 403
@pytest.fixture(name="isolated_log_path")
def fixture_isolated_log_path(monkeypatch, tmp_path: Path) -> Path:
"""将日志目录隔离到临时目录,避免测试读取或打包真实运行日志。"""
config_path = tmp_path / "config"
log_path = config_path / "logs"
log_path.mkdir(parents=True)
monkeypatch.setattr(settings, "CONFIG_DIR", str(config_path))
return log_path
def test_logging_requires_superuser_dependency(monkeypatch, isolated_log_path):
"""实时日志查看接口必须通过管理员依赖,普通资源令牌不能直接读取日志。"""
(isolated_log_path / "moviepilot.log").write_text("hello\n", encoding="utf-8")
response = asyncio.run(
system_endpoint.get_logging(
request=SimpleNamespace(is_disconnected=lambda: False),
length=-1,
logfile="moviepilot.log",
_=SimpleNamespace(id=1, name="admin", is_superuser=True),
)
)
assert isinstance(response, Response)
def test_download_moviepilot_logs_packages_latest_ten_log_files(isolated_log_path):
"""传入 moviepilot 时下载主程序滚动日志,最多打包 10 个文件。"""
for index in range(12):
(isolated_log_path / f"moviepilot.log.{index}").write_text(f"old-{index}", encoding="utf-8")
(isolated_log_path / "moviepilot.log").write_text("current", encoding="utf-8")
(isolated_log_path / "moviepilot.txt").write_text("ignored", encoding="utf-8")
(isolated_log_path / "plugins").mkdir()
(isolated_log_path / "plugins" / "demo.log").write_text("plugin", encoding="utf-8")
response = asyncio.run(system_endpoint.download_logging(name="moviepilot", _=SimpleNamespace()))
body = asyncio.run(_read_streaming_body(response))
with zipfile.ZipFile(io.BytesIO(body)) as archive:
names = archive.namelist()
moviepilot_zip_root = response.headers["Content-Disposition"].split('filename="', 1)[1].removesuffix('.zip"')
assert response.media_type == "application/zip"
assert 'filename="moviepilot-logs-' in response.headers["Content-Disposition"]
assert "moviepilot-moviepilot-logs" not in response.headers["Content-Disposition"]
assert len(names) == 10
assert f"{moviepilot_zip_root}/moviepilot.log" in names
assert "moviepilot.log" not in names
assert "plugins/demo.log" not in names
assert "moviepilot.txt" not in names
def test_download_plugin_logs_packages_plugin_files_only(isolated_log_path):
"""传入插件 ID 时只下载该插件滚动日志,最多打包 10 个文件。"""
plugin_dir = isolated_log_path / "plugins"
plugin_dir.mkdir()
for index in range(11):
(plugin_dir / f"demoplugin.log.{index}").write_text(f"plugin-{index}", encoding="utf-8")
(plugin_dir / "demoplugin.log").write_text("current", encoding="utf-8")
(plugin_dir / "other.log").write_text("other", encoding="utf-8")
(isolated_log_path / "moviepilot.log").write_text("main", encoding="utf-8")
response = asyncio.run(system_endpoint.download_logging(name="DemoPlugin", _=SimpleNamespace()))
body = asyncio.run(_read_streaming_body(response))
with zipfile.ZipFile(io.BytesIO(body)) as archive:
names = archive.namelist()
plugin_zip_root = response.headers["Content-Disposition"].split('filename="', 1)[1].removesuffix('.zip"')
assert len(names) == 10
assert f"{plugin_zip_root}/demoplugin.log" in names
assert "demoplugin.log" not in names
assert "plugins/demoplugin.log" not in names
assert "plugins/other.log" not in names
assert "moviepilot.log" not in names
async def _read_streaming_body(response) -> bytes:
"""读取 StreamingResponse 内容,便于断言 zip 文件条目。"""
return b"".join([chunk async for chunk in response.body_iterator])