fix: restrict sensitive system endpoints

This commit is contained in:
jxxghp
2026-06-09 21:45:51 +08:00
parent d1cf584af9
commit dc2b6910a4
10 changed files with 476 additions and 78 deletions

View File

@@ -0,0 +1,242 @@
import asyncio
import io
import inspect
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
from starlette.requests import Request
from starlette.responses import Response
from app.api.endpoints import login as login_endpoint
from app.api.endpoints import plugin as plugin_endpoint
from app.api.endpoints import system as system_endpoint
from app.api.endpoints import user as user_endpoint
from app.api.endpoints import dashboard as dashboard_endpoint
from app.core.security import verify_resource_token
from app.db.user_oper import (
get_current_active_superuser,
get_current_active_superuser_async,
get_current_active_user_async,
)
from app.schemas.types import SystemConfigKey
def _dependency_of(func, parameter_name: str):
"""读取 FastAPI 函数参数上声明的依赖函数。"""
return inspect.signature(func).parameters[parameter_name].default.dependency
def _build_request() -> Request:
"""构造最小测试请求。"""
return Request(
{
"type": "http",
"method": "POST",
"path": "/api/v1/login/access-token",
"headers": [(b"host", b"testserver")],
"scheme": "http",
"server": ("testserver", 80),
"client": ("testclient", 123),
}
)
def test_system_sensitive_read_endpoints_require_superuser():
"""系统敏感读取接口必须只允许管理员访问。"""
assert _dependency_of(system_endpoint.get_env_setting, "_") is get_current_active_superuser_async
assert _dependency_of(system_endpoint.get_setting, "_") is get_current_active_superuser_async
def test_system_public_read_endpoints_require_active_user():
"""公开读取接口只要求登录且启用的用户。"""
assert _dependency_of(system_endpoint.ping, "_") is get_current_active_user_async
assert _dependency_of(system_endpoint.get_public_setting, "_") is get_current_active_user_async
def test_dashboard_endpoints_require_superuser():
"""仪表板页面相关接口必须只允许管理员访问。"""
assert _dependency_of(dashboard_endpoint.statistic, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.storage, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.processes, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.downloader, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.schedule, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.transfer, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.cpu, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.memory, "_") is get_current_active_superuser
assert _dependency_of(dashboard_endpoint.network, "_") is get_current_active_superuser
def test_plugin_dashboard_endpoints_require_superuser():
"""插件仪表板接口必须只允许管理员访问。"""
assert _dependency_of(plugin_endpoint.plugin_dashboard_meta, "_") is get_current_active_superuser
assert _dependency_of(plugin_endpoint.plugin_dashboard_by_key, "_") is get_current_active_superuser
assert _dependency_of(plugin_endpoint.plugin_dashboard, "_") is get_current_active_superuser
def test_system_public_setting_allows_only_non_sensitive_keys(monkeypatch):
"""公开系统设置接口只能读取明确列入白名单的非敏感配置。"""
calls = []
class FakeSystemConfigOper:
"""返回测试配置值的系统配置桩。"""
def get(self, key):
"""返回测试配置值。"""
calls.append(key)
return [{"path": "/downloads"}]
monkeypatch.setattr(system_endpoint, "SystemConfigOper", FakeSystemConfigOper)
response = asyncio.run(
system_endpoint.get_public_setting(SystemConfigKey.Directories.value)
)
assert response.success is True
assert response.data == {"value": [{"path": "/downloads"}]}
assert calls == [SystemConfigKey.Directories]
response = asyncio.run(system_endpoint.get_public_setting("PLUGIN_MARKET"))
assert response.success is True
assert response.data == {"value": system_endpoint.settings.PLUGIN_MARKET}
assert calls == [SystemConfigKey.Directories]
with pytest.raises(HTTPException) as exc_info:
asyncio.run(system_endpoint.get_public_setting("API_TOKEN"))
assert exc_info.value.status_code == 404
assert exc_info.value.detail == "配置项不存在"
def test_system_ping_returns_success():
"""服务存活检测接口返回标准成功响应。"""
response = asyncio.run(system_endpoint.ping())
assert response.success is True
def test_login_sets_resource_token_cookie(monkeypatch):
"""登录成功时应立即写入资源 Cookie避免插件静态文件抢先加载失败。"""
class FakeUserChain:
"""返回登录成功用户的用户链桩。"""
def user_authenticate(self, username, password, mfa_code=None):
"""返回认证成功结果。"""
return True, SimpleNamespace(
id=1,
name=username,
is_superuser=False,
avatar="",
permissions={"discovery": True},
)
class FakeSystemConfigOper:
"""返回已完成向导状态的系统配置桩。"""
def get(self, key):
"""返回测试配置值。"""
return "1"
form_data = SimpleNamespace(username="user", password="password")
request = _build_request()
response = Response()
monkeypatch.setattr(login_endpoint, "UserChain", FakeUserChain)
monkeypatch.setattr(login_endpoint, "SystemConfigOper", FakeSystemConfigOper)
token = login_endpoint.login_access_token(
request=request,
response=response,
form_data=form_data,
)
assert token.user_id == 1
assert token.permissions == {"discovery": True}
assert "set-cookie" in response.headers
resource_cookie = response.headers["set-cookie"].split("=", 1)[1].split(";", 1)[0]
payload = verify_resource_token(resource_cookie)
assert payload.sub == 1
assert payload.username == "user"
assert payload.purpose == "resource"
def test_plugin_static_file_requires_resource_token_by_default(monkeypatch):
"""普通插件静态资源必须校验资源令牌。"""
calls = []
class FakePluginManager:
"""返回空认证提供方的插件管理器桩。"""
def get_plugin_auth_providers(self):
"""返回插件认证入口列表。"""
return []
monkeypatch.setattr(plugin_endpoint, "PluginManager", FakePluginManager)
monkeypatch.setattr(plugin_endpoint, "verify_resource_token", lambda token: calls.append(token))
plugin_endpoint._verify_plugin_static_file_access(
plugin_id="DemoPlugin",
filepath="dist/remoteEntry.js",
resource_token="resource-token",
)
assert calls == ["resource-token"]
def test_plugin_auth_remote_files_allow_anonymous_bootstrap(monkeypatch):
"""插件登录认证远程组件需要允许登录前匿名加载。"""
calls = []
class FakePluginManager:
"""返回认证插件 remote 信息的插件管理器桩。"""
def get_plugin_auth_providers(self):
"""返回插件认证入口列表。"""
return [
{
"remote": {
"id": "AuthPlugin",
"url": "/plugin/file/AuthPlugin/dist/remoteEntry.js",
}
}
]
monkeypatch.setattr(plugin_endpoint, "PluginManager", FakePluginManager)
monkeypatch.setattr(plugin_endpoint, "verify_resource_token", lambda token: calls.append(token))
plugin_endpoint._verify_plugin_static_file_access(
plugin_id="AuthPlugin",
filepath="dist/remoteEntry.js",
)
plugin_endpoint._verify_plugin_static_file_access(
plugin_id="AuthPlugin",
filepath="dist/assets/chunk.js",
)
plugin_endpoint._verify_plugin_static_file_access(
plugin_id="authplugin",
filepath="dist/assets/chunk.js",
)
assert calls == []
def test_upload_avatar_rejects_other_user_for_non_superuser():
"""普通用户不能通过 user_id 参数修改其他用户头像。"""
current_user = SimpleNamespace(id=1, is_superuser=False)
upload_file = SimpleNamespace(file=io.BytesIO(b"avatar"), filename="avatar.png")
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
user_endpoint.upload_avatar(
user_id=2,
db=object(),
file=upload_file,
current_user=current_user,
)
)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "用户权限不足"