diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index a9ec3314..9a6ef823 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,7 +1,11 @@ import asyncio +import io import json +import re +import zipfile from collections import deque from datetime import datetime +from pathlib import Path from typing import Any, Optional, Union, Annotated from urllib.parse import urljoin, urlparse @@ -62,6 +66,8 @@ _PUBLIC_SYSTEM_CONFIG_KEYS = { ) } _PUBLIC_SETTINGS_KEYS = {"PLUGIN_MARKET"} +_LOG_DOWNLOAD_LIMIT = 10 +_LOG_DOWNLOAD_NAME_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$") def _match_nettest_prefix(url: str, prefix: str) -> bool: @@ -285,6 +291,98 @@ def _build_nettest_rules() -> list[dict[str, Any]]: return rules +def _collect_named_log_files(name: str) -> list[Path]: + """ + 根据前端传入的日志标识收集可下载日志文件。 + + `moviepilot` 固定表示主程序日志,其余标识按插件 ID 处理并映射到 + `plugins/.log*`。这里不接收路径或后缀,避免下载入口变成任意 + 日志文件选择器;滚动日志按当前文件优先、备份文件按修改时间倒序补足。 + """ + normalized_name = (name or "").strip().lower() + if not normalized_name or not _LOG_DOWNLOAD_NAME_PATTERN.fullmatch(normalized_name): + raise HTTPException(status_code=404, detail="Not Found") + + log_root = settings.LOG_PATH + if normalized_name == "moviepilot": + log_dir = log_root + log_prefix = "moviepilot.log" + else: + log_dir = log_root / "plugins" + log_prefix = f"{normalized_name}.log" + + if not log_dir.exists() or not log_dir.is_dir(): + raise HTTPException(status_code=404, detail="Not Found") + + current_log = log_dir / log_prefix + backup_logs = [ + item + for item in log_dir.iterdir() + if item.is_file() and item.name.startswith(f"{log_prefix}.") + ] + backup_logs.sort(key=lambda item: item.stat().st_mtime, reverse=True) + + log_files = [] + if current_log.exists() and current_log.is_file(): + log_files.append(current_log) + log_files.extend(backup_logs) + return log_files[:_LOG_DOWNLOAD_LIMIT] + + +def _verify_log_resource_superuser( + token_payload: schemas.TokenPayload = Depends(verify_resource_token), +) -> schemas.TokenPayload: + """ + 校验日志资源访问权限。 + + 日志接口通过浏览器新窗口和 EventSource 访问,不能依赖普通 API 请求头; + 因此这里复用资源 Cookie 完成身份识别,再额外要求管理员身份,避免普通 + 登录用户读取可能包含敏感信息的日志。 + """ + if not token_payload.super_user: + raise HTTPException(status_code=403, detail="用户权限不足") + return token_payload + + +async def _build_log_zip_response(name: str) -> StreamingResponse: + """ + 将指定日志标识对应的日志文件打包为 zip 响应。 + + 打包前逐个校验文件仍位于日志根目录内,避免符号链接或并发文件变更绕过 + `name` 到固定目录的映射约束。zip 内使用日志根目录相对路径,便于区分 + 主程序日志与插件日志。 + """ + log_files = _collect_named_log_files(name) + if not log_files: + raise HTTPException(status_code=404, detail="Not Found") + + log_root = settings.LOG_PATH + async_log_root = AsyncPath(log_root) + zip_buffer = io.BytesIO() + filename_time = datetime.now().strftime("%Y%m%d-%H%M%S") + safe_name = (name or "logs").strip().lower() or "logs" + zip_stem = f"{safe_name}-logs-{filename_time}" + with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + for log_file in log_files: + if not await SecurityUtils.async_is_safe_path( + base_path=async_log_root, + user_path=AsyncPath(log_file), + ): + raise HTTPException(status_code=404, detail="Not Found") + arcname = f"{zip_stem}/{log_file.name}" + archive.write(log_file, arcname) + + zip_buffer.seek(0) + headers = { + "Content-Disposition": f'attachment; filename="{zip_stem}.zip"' + } + return StreamingResponse( + iter([zip_buffer.getvalue()]), + media_type="application/zip", + headers=headers, + ) + + def _validate_nettest_url(url: str) -> Optional[str]: """ 对实际请求地址做基础安全校验。 @@ -705,7 +803,7 @@ async def get_logging( request: Request, length: Optional[int] = 50, logfile: Optional[str] = "moviepilot.log", - _: schemas.TokenPayload = Depends(verify_resource_token), + _: schemas.TokenPayload = Depends(_verify_log_resource_superuser), ): """ 实时获取系统日志 @@ -814,6 +912,17 @@ async def get_logging( return StreamingResponse(log_generator(), media_type="text/event-stream") +@router.get("/logging/download/{name}", summary="下载日志") +async def download_logging( + name: str, + _: schemas.TokenPayload = Depends(_verify_log_resource_superuser), +): + """ + 按日志标识下载主程序或插件滚动日志,返回 zip 文件。 + """ + return await _build_log_zip_response(name) + + @router.get( "/versions", summary="查询Github所有Release版本", response_model=schemas.Response ) diff --git a/tests/test_system_log_download.py b/tests/test_system_log_download.py new file mode 100644 index 00000000..79ac4961 --- /dev/null +++ b/tests/test_system_log_download.py @@ -0,0 +1,116 @@ +"""系统日志查看与下载接口的权限和打包行为测试。""" + +import asyncio +import io +import zipfile +from pathlib import Path +from types import SimpleNamespace + +import pytest +from fastapi import HTTPException +from starlette.responses import Response + +from app.api.endpoints import system as system_endpoint +from app.core.config import settings + + +def test_logging_routes_use_superuser_dependency(): + """日志查看和下载路由都必须绑定管理员依赖,避免普通登录用户读取敏感日志。""" + routes = {route.path: route for route in system_endpoint.router.routes} + + logging_dependencies = {dependency.call for dependency in routes["/logging"].dependant.dependencies} + download_dependencies = {dependency.call for dependency in routes["/logging/download/{name}"].dependant.dependencies} + + assert system_endpoint._verify_log_resource_superuser in logging_dependencies + assert system_endpoint._verify_log_resource_superuser in download_dependencies + + +def test_log_resource_dependency_rejects_normal_user(): + """日志资源依赖必须拒绝非管理员 resource token。""" + with pytest.raises(HTTPException) as exc_info: + system_endpoint._verify_log_resource_superuser( + SimpleNamespace(super_user=False), + ) + + assert exc_info.value.status_code == 403 + + +@pytest.fixture(name="isolated_log_path") +def fixture_isolated_log_path(monkeypatch, tmp_path: Path) -> Path: + """将日志目录隔离到临时目录,避免测试读取或打包真实运行日志。""" + config_path = tmp_path / "config" + log_path = config_path / "logs" + log_path.mkdir(parents=True) + monkeypatch.setattr(settings, "CONFIG_DIR", str(config_path)) + return log_path + + +def test_logging_requires_superuser_dependency(monkeypatch, isolated_log_path): + """实时日志查看接口必须通过管理员依赖,普通资源令牌不能直接读取日志。""" + (isolated_log_path / "moviepilot.log").write_text("hello\n", encoding="utf-8") + response = asyncio.run( + system_endpoint.get_logging( + request=SimpleNamespace(is_disconnected=lambda: False), + length=-1, + logfile="moviepilot.log", + _=SimpleNamespace(id=1, name="admin", is_superuser=True), + ) + ) + + assert isinstance(response, Response) + + +def test_download_moviepilot_logs_packages_latest_ten_log_files(isolated_log_path): + """传入 moviepilot 时下载主程序滚动日志,最多打包 10 个文件。""" + for index in range(12): + (isolated_log_path / f"moviepilot.log.{index}").write_text(f"old-{index}", encoding="utf-8") + (isolated_log_path / "moviepilot.log").write_text("current", encoding="utf-8") + (isolated_log_path / "moviepilot.txt").write_text("ignored", encoding="utf-8") + (isolated_log_path / "plugins").mkdir() + (isolated_log_path / "plugins" / "demo.log").write_text("plugin", encoding="utf-8") + + response = asyncio.run(system_endpoint.download_logging(name="moviepilot", _=SimpleNamespace())) + body = asyncio.run(_read_streaming_body(response)) + + with zipfile.ZipFile(io.BytesIO(body)) as archive: + names = archive.namelist() + + moviepilot_zip_root = response.headers["Content-Disposition"].split('filename="', 1)[1].removesuffix('.zip"') + assert response.media_type == "application/zip" + assert 'filename="moviepilot-logs-' in response.headers["Content-Disposition"] + assert "moviepilot-moviepilot-logs" not in response.headers["Content-Disposition"] + assert len(names) == 10 + assert f"{moviepilot_zip_root}/moviepilot.log" in names + assert "moviepilot.log" not in names + assert "plugins/demo.log" not in names + assert "moviepilot.txt" not in names + + +def test_download_plugin_logs_packages_plugin_files_only(isolated_log_path): + """传入插件 ID 时只下载该插件滚动日志,最多打包 10 个文件。""" + plugin_dir = isolated_log_path / "plugins" + plugin_dir.mkdir() + for index in range(11): + (plugin_dir / f"demoplugin.log.{index}").write_text(f"plugin-{index}", encoding="utf-8") + (plugin_dir / "demoplugin.log").write_text("current", encoding="utf-8") + (plugin_dir / "other.log").write_text("other", encoding="utf-8") + (isolated_log_path / "moviepilot.log").write_text("main", encoding="utf-8") + + response = asyncio.run(system_endpoint.download_logging(name="DemoPlugin", _=SimpleNamespace())) + body = asyncio.run(_read_streaming_body(response)) + + with zipfile.ZipFile(io.BytesIO(body)) as archive: + names = archive.namelist() + + plugin_zip_root = response.headers["Content-Disposition"].split('filename="', 1)[1].removesuffix('.zip"') + assert len(names) == 10 + assert f"{plugin_zip_root}/demoplugin.log" in names + assert "demoplugin.log" not in names + assert "plugins/demoplugin.log" not in names + assert "plugins/other.log" not in names + assert "moviepilot.log" not in names + + +async def _read_streaming_body(response) -> bytes: + """读取 StreamingResponse 内容,便于断言 zip 文件条目。""" + return b"".join([chunk async for chunk in response.body_iterator])