mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-07 06:12:43 +08:00
250 lines
8.3 KiB
Python
250 lines
8.3 KiB
Python
import asyncio
|
|
import sys
|
|
import unittest
|
|
from types import ModuleType, SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
|
|
def _stub_module(name: str, **attrs):
|
|
module = sys.modules.get(name)
|
|
if module is None:
|
|
module = ModuleType(name)
|
|
sys.modules[name] = module
|
|
for key, value in attrs.items():
|
|
setattr(module, key, value)
|
|
return module
|
|
|
|
|
|
class _Dummy:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def __getattr__(self, _name):
|
|
return lambda *args, **kwargs: None
|
|
|
|
|
|
class _DummyError(Exception):
|
|
def __init__(self, message="", duration_ms=None):
|
|
super().__init__(message)
|
|
self.duration_ms = duration_ms
|
|
|
|
|
|
for _module_name in ("pillow_avif", "aiofiles", "psutil"):
|
|
_stub_module(_module_name)
|
|
|
|
_stub_module("app.helper.sites", SitesHelper=_Dummy)
|
|
_stub_module("app.chain.mediaserver", MediaServerChain=_Dummy)
|
|
_stub_module("app.chain.search", SearchChain=_Dummy)
|
|
_stub_module("app.chain.system", SystemChain=_Dummy)
|
|
_stub_module("app.core.event", eventmanager=_Dummy())
|
|
_stub_module("app.core.metainfo", MetaInfo=_Dummy)
|
|
_stub_module("app.core.module", ModuleManager=_Dummy)
|
|
_stub_module(
|
|
"app.core.security",
|
|
verify_apitoken=_Dummy,
|
|
verify_resource_token=_Dummy,
|
|
verify_token=_Dummy,
|
|
)
|
|
_stub_module("app.db.models", User=_Dummy)
|
|
_stub_module("app.db.systemconfig_oper", SystemConfigOper=_Dummy)
|
|
_stub_module(
|
|
"app.db.user_oper",
|
|
get_current_active_superuser=_Dummy,
|
|
get_current_active_superuser_async=_Dummy,
|
|
get_current_active_user_async=_Dummy,
|
|
)
|
|
_stub_module(
|
|
"app.helper.llm",
|
|
LLMHelper=_Dummy,
|
|
LLMTestError=_DummyError,
|
|
LLMTestTimeout=_DummyError,
|
|
)
|
|
_stub_module("app.helper.mediaserver", MediaServerHelper=_Dummy)
|
|
_stub_module("app.helper.message", MessageHelper=_Dummy)
|
|
_stub_module("app.helper.progress", ProgressHelper=_Dummy)
|
|
_stub_module("app.helper.rule", RuleHelper=_Dummy)
|
|
_stub_module("app.helper.subscribe", SubscribeHelper=_Dummy)
|
|
_stub_module("app.helper.system", SystemHelper=_Dummy)
|
|
_stub_module("app.helper.image", ImageHelper=_Dummy)
|
|
_stub_module("app.scheduler", Scheduler=_Dummy)
|
|
_stub_module(
|
|
"app.log",
|
|
logger=_Dummy(),
|
|
log_settings=_Dummy(),
|
|
LogConfigModel=type("LogConfigModel", (), {}),
|
|
)
|
|
_stub_module("app.utils.crypto", HashUtils=_Dummy)
|
|
_stub_module("app.utils.http", RequestUtils=_Dummy, AsyncRequestUtils=_Dummy)
|
|
_stub_module("version", APP_VERSION="test")
|
|
|
|
from app.api.endpoints import system as system_endpoint
|
|
|
|
|
|
class NettestSecurityTest(unittest.TestCase):
|
|
def test_nettest_targets_are_served_by_backend(self):
|
|
resp = asyncio.run(system_endpoint.nettest_targets(_="token"))
|
|
|
|
self.assertTrue(resp.success)
|
|
self.assertTrue(any(item["id"] == "pip_proxy" for item in resp.data))
|
|
self.assertTrue(any(item["id"] == "github_proxy_web" for item in resp.data))
|
|
|
|
def test_nettest_blocks_unknown_target(self):
|
|
class FailIfCalled:
|
|
def __init__(self, *args, **kwargs):
|
|
raise AssertionError("nettest should reject unknown targets before any outbound request")
|
|
|
|
with patch.object(system_endpoint, "AsyncRequestUtils", FailIfCalled):
|
|
resp = asyncio.run(
|
|
system_endpoint.nettest(
|
|
target_id="unknown-target",
|
|
_="token",
|
|
)
|
|
)
|
|
|
|
self.assertFalse(resp.success)
|
|
self.assertIn("不存在", resp.message)
|
|
|
|
def test_nettest_blocks_unapproved_redirect(self):
|
|
captured = {"calls": 0}
|
|
|
|
class FakeResponse:
|
|
def __init__(self, status_code, headers=None, text=""):
|
|
self.status_code = status_code
|
|
self.headers = headers or {}
|
|
self.text = text
|
|
|
|
async def aclose(self):
|
|
return None
|
|
|
|
class FakeAsyncRequestUtils:
|
|
def __init__(self, **kwargs):
|
|
captured["init_kwargs"] = kwargs
|
|
|
|
async def get_res(self, url, allow_redirects=True):
|
|
captured["calls"] += 1
|
|
return FakeResponse(
|
|
302,
|
|
headers={"location": "https://169.254.169.254/latest/meta-data/"},
|
|
)
|
|
|
|
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
|
|
system_endpoint.settings,
|
|
"GITHUB_PROXY",
|
|
"https://ghproxy.example/",
|
|
):
|
|
resp = asyncio.run(
|
|
system_endpoint.nettest(
|
|
target_id="github_proxy_web",
|
|
_="token",
|
|
)
|
|
)
|
|
|
|
self.assertFalse(resp.success)
|
|
self.assertIn("跳转", resp.message)
|
|
self.assertEqual(captured["calls"], 1)
|
|
|
|
def test_nettest_allows_known_external_redirects(self):
|
|
cases = {
|
|
"telegram_api": "https://core.telegram.org/bots",
|
|
"douban_api": "https://www.douban.com/doubanapp/frodo?wechat=0&os=Other",
|
|
"github_codeload": "https://github.com/",
|
|
}
|
|
|
|
for target_id, redirect_url in cases.items():
|
|
call_urls = []
|
|
|
|
class FakeResponse:
|
|
def __init__(self, status_code, headers=None, text=""):
|
|
self.status_code = status_code
|
|
self.headers = headers or {}
|
|
self.text = text
|
|
|
|
async def aclose(self):
|
|
return None
|
|
|
|
class FakeAsyncRequestUtils:
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
|
|
async def get_res(self, url, allow_redirects=True):
|
|
call_urls.append(url)
|
|
if len(call_urls) == 1:
|
|
return FakeResponse(302, headers={"location": redirect_url})
|
|
return FakeResponse(200, text="ok")
|
|
|
|
with self.subTest(target_id=target_id), patch.object(
|
|
system_endpoint,
|
|
"AsyncRequestUtils",
|
|
FakeAsyncRequestUtils,
|
|
):
|
|
resp = asyncio.run(
|
|
system_endpoint.nettest(
|
|
target_id=target_id,
|
|
_="token",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(resp.success)
|
|
self.assertEqual(len(call_urls), 2)
|
|
|
|
def test_nettest_uses_safe_http_options_and_server_side_content_check(self):
|
|
captured = {}
|
|
|
|
class FakeAsyncRequestUtils:
|
|
def __init__(self, **kwargs):
|
|
captured["init_kwargs"] = kwargs
|
|
|
|
async def get_res(self, url, allow_redirects=True):
|
|
captured["url"] = url
|
|
captured["allow_redirects"] = allow_redirects
|
|
return SimpleNamespace(status_code=200, text="MoviePilot README")
|
|
|
|
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
|
|
system_endpoint.settings,
|
|
"GITHUB_PROXY",
|
|
"https://ghproxy.example/",
|
|
):
|
|
resp = asyncio.run(
|
|
system_endpoint.nettest(
|
|
target_id="github_proxy_web",
|
|
include="tag_name",
|
|
_="token",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(resp.success)
|
|
self.assertEqual(
|
|
captured["url"],
|
|
"https://ghproxy.example/https://github.com/jxxghp/MoviePilot/blob/v2/README.md",
|
|
)
|
|
self.assertFalse(captured["allow_redirects"])
|
|
self.assertTrue(captured["init_kwargs"]["verify"])
|
|
self.assertFalse(captured["init_kwargs"]["follow_redirects"])
|
|
|
|
def test_nettest_fails_when_expected_content_is_missing(self):
|
|
class FakeAsyncRequestUtils:
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
|
|
async def get_res(self, url, allow_redirects=True):
|
|
return SimpleNamespace(status_code=200, text="proxy landing page")
|
|
|
|
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
|
|
system_endpoint.settings,
|
|
"PIP_PROXY",
|
|
"https://pypi.tuna.tsinghua.edu.cn/simple/",
|
|
):
|
|
resp = asyncio.run(
|
|
system_endpoint.nettest(
|
|
target_id="pip_proxy",
|
|
_="token",
|
|
)
|
|
)
|
|
|
|
self.assertFalse(resp.success)
|
|
self.assertIn("PIP加速代理", resp.message)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|