Harden system nettest SSRF handling

This commit is contained in:
jxxghp
2026-04-18 17:43:38 +08:00
parent 787db8f5ac
commit 4c32ad902b
3 changed files with 494 additions and 43 deletions

View File

@@ -3,7 +3,8 @@ import json
import re
from collections import deque
from datetime import datetime
from typing import Optional, Union, Annotated
from typing import Any, Optional, Union, Annotated
from urllib.parse import urljoin, urlparse
import aiofiles
import pillow_avif # noqa 用于自动注册AVIF支持
@@ -48,6 +49,236 @@ from version import APP_VERSION
router = APIRouter()
_NETTEST_REDIRECT_STATUS_CODES = {301, 302, 303, 307, 308}
def _match_nettest_prefix(url: str, prefix: str) -> bool:
parsed_url = urlparse(url)
parsed_prefix = urlparse(prefix)
if parsed_url.scheme.lower() != parsed_prefix.scheme.lower():
return False
if (parsed_url.hostname or "").lower() != (parsed_prefix.hostname or "").lower():
return False
url_port = parsed_url.port or (443 if parsed_url.scheme.lower() == "https" else 80)
prefix_port = parsed_prefix.port or (443 if parsed_prefix.scheme.lower() == "https" else 80)
if url_port != prefix_port:
return False
return parsed_url.path.startswith(parsed_prefix.path or "/")
def _build_nettest_rules() -> list[dict[str, Any]]:
github_proxy = UrlUtils.standardize_base_url(settings.GITHUB_PROXY or "")
pip_proxy = UrlUtils.standardize_base_url(
settings.PIP_PROXY or "https://pypi.org/simple/"
)
tmdb_key = settings.TMDB_API_KEY
tmdb_domain = settings.TMDB_API_DOMAIN or "api.themoviedb.org"
github_readme_url = "https://github.com/jxxghp/MoviePilot/blob/v2/README.md"
raw_readme_url = "https://raw.githubusercontent.com/jxxghp/MoviePilot/v2/README.md"
rules = [
{
"id": "tmdb_api",
"name": "api.themoviedb.org",
"icon": "tmdb",
"url": f"https://api.themoviedb.org/3/movie/550?api_key={tmdb_key}",
"proxy": True,
"allowed_redirect_prefixes": [
"https://api.themoviedb.org/3/",
],
},
{
"id": "tmdb_api_alt",
"name": "api.tmdb.org",
"icon": "tmdb",
"url": f"https://api.tmdb.org/3/movie/550?api_key={tmdb_key}",
"proxy": True,
"allowed_redirect_prefixes": [
"https://api.tmdb.org/3/",
],
},
{
"id": "tmdb_web",
"name": "www.themoviedb.org",
"icon": "tmdb",
"url": "https://www.themoviedb.org",
"proxy": True,
"allowed_redirect_prefixes": ["https://www.themoviedb.org/"],
},
{
"id": "tvdb_api",
"name": "api.thetvdb.com",
"icon": "tvdb",
"url": "https://api.thetvdb.com/series/81189",
"proxy": True,
"allowed_redirect_prefixes": ["https://api.thetvdb.com/"],
},
{
"id": "fanart_api",
"name": "webservice.fanart.tv",
"icon": "fanart",
"url": "https://webservice.fanart.tv",
"proxy": True,
"allowed_redirect_prefixes": ["https://webservice.fanart.tv/"],
},
{
"id": "telegram_api",
"name": "api.telegram.org",
"icon": "telegram",
"url": "https://api.telegram.org",
"proxy": True,
"allowed_redirect_prefixes": ["https://api.telegram.org/"],
},
{
"id": "wechat_api",
"name": "qyapi.weixin.qq.com",
"icon": "wechat",
"url": "https://qyapi.weixin.qq.com/cgi-bin/gettoken",
"proxy": False,
"allowed_redirect_prefixes": ["https://qyapi.weixin.qq.com/"],
},
{
"id": "douban_api",
"name": "frodo.douban.com",
"icon": "douban",
"url": "https://frodo.douban.com",
"proxy": False,
"allowed_redirect_prefixes": ["https://frodo.douban.com/"],
},
{
"id": "slack_api",
"name": "slack.com",
"icon": "slack",
"url": "https://slack.com",
"proxy": False,
"allowed_redirect_prefixes": [
"https://slack.com/",
"https://www.slack.com/",
],
},
{
"id": "pip_proxy",
"name": "pypi.org",
"icon": "python",
"url": f"{pip_proxy}rsa/",
"proxy": True,
"allowed_redirect_prefixes": [
pip_proxy,
"https://pypi.org/simple/",
],
"expected_text": "pypi:repository-version",
"invalid_message": "PIP加速代理已失效请检查配置",
"proxy_name": "PIP加速代理",
},
{
"id": "github_proxy_web",
"name": "github.com",
"icon": "github",
"url": f"{github_proxy}{github_readme_url}" if github_proxy else github_readme_url,
"proxy": True,
"allowed_redirect_prefixes": [
"https://github.com/",
*((f"{github_proxy}https://github.com/",) if github_proxy else ()),
],
"expected_text": "MoviePilot",
"invalid_message": "Github加速代理已失效请检查配置" if github_proxy else "无效响应",
"proxy_name": "Github加速代理" if github_proxy else "",
"headers": settings.GITHUB_HEADERS,
},
{
"id": "github_api",
"name": "api.github.com",
"icon": "github",
"url": "https://api.github.com",
"proxy": True,
"allowed_redirect_prefixes": ["https://api.github.com/"],
"headers": settings.GITHUB_HEADERS,
},
{
"id": "github_codeload",
"name": "codeload.github.com",
"icon": "github",
"url": "https://codeload.github.com",
"proxy": True,
"allowed_redirect_prefixes": ["https://codeload.github.com/"],
"headers": settings.GITHUB_HEADERS,
},
{
"id": "github_proxy_raw",
"name": "raw.githubusercontent.com",
"icon": "github",
"url": f"{github_proxy}{raw_readme_url}" if github_proxy else raw_readme_url,
"proxy": True,
"allowed_redirect_prefixes": [
"https://raw.githubusercontent.com/",
*((f"{github_proxy}https://raw.githubusercontent.com/",) if github_proxy else ()),
],
"expected_text": "MoviePilot",
"invalid_message": "Github加速代理已失效请检查配置" if github_proxy else "无效响应",
"proxy_name": "Github加速代理" if github_proxy else "",
"headers": settings.GITHUB_HEADERS,
},
]
if tmdb_domain not in {"api.themoviedb.org", "api.tmdb.org"}:
rules.insert(
2,
{
"id": "tmdb_api_configured",
"name": tmdb_domain,
"icon": "tmdb",
"url": f"https://{tmdb_domain}/3/movie/550?api_key={tmdb_key}",
"proxy": True,
"allowed_redirect_prefixes": [
f"https://{tmdb_domain}/3/",
],
},
)
return rules
def _validate_nettest_url(url: str) -> Optional[str]:
parsed = urlparse(url)
if parsed.scheme.lower() != "https":
return "测试地址仅支持 HTTPS"
if not parsed.netloc:
return "测试地址无效"
if parsed.username or parsed.password:
return "测试地址不支持携带账号信息"
if not _get_nettest_rule(url):
return "测试地址不在允许的测试目标列表中"
return None
def _get_nettest_rule(url: Optional[str] = None, target_id: Optional[str] = None) -> Optional[dict[str, Any]]:
for rule in _build_nettest_rules():
if target_id and rule.get("id") == target_id:
return rule
if url and rule.get("url") == url:
return rule
return None
def _is_allowed_nettest_redirect(url: str, rule: dict[str, Any]) -> bool:
parsed = urlparse(url)
if parsed.scheme.lower() != "https" or not parsed.netloc:
return False
if parsed.username or parsed.password:
return False
return any(
_match_nettest_prefix(url, prefix)
for prefix in rule.get("allowed_redirect_prefixes", [])
)
async def _close_nettest_response(response: Any) -> None:
if response is None or not hasattr(response, "aclose"):
return
try:
await response.aclose()
except Exception as err:
logger.debug(f"关闭网络测试响应失败: {err}")
async def fetch_image(
url: str,
@@ -541,72 +772,101 @@ def ruletest(
)
@router.get("/nettest/targets", summary="获取网络测试目标", response_model=schemas.Response)
async def nettest_targets(_: schemas.TokenPayload = Depends(verify_token)):
"""
获取网络测试目标
"""
return schemas.Response(
success=True,
data=[
{
"id": item["id"],
"name": item["name"],
"icon": item["icon"],
}
for item in _build_nettest_rules()
],
)
@router.get("/nettest", summary="测试网络连通性")
async def nettest(
url: str,
proxy: bool,
target_id: Optional[str] = None,
url: Optional[str] = None,
proxy: Optional[bool] = None,
include: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token),
):
"""
测试网络连通性
"""
target = _get_nettest_rule(url=url, target_id=target_id)
if not target:
return schemas.Response(success=False, message="测试目标不存在")
# 记录开始的毫秒数
start_time = datetime.now()
headers = None
# 当前使用的加速代理
proxy_name = ""
if "github" in url:
# 这是github的连通性测试
headers = settings.GITHUB_HEADERS
if "{GITHUB_PROXY}" in url:
url = url.replace(
"{GITHUB_PROXY}", UrlUtils.standardize_base_url(settings.GITHUB_PROXY or "")
)
if settings.GITHUB_PROXY:
proxy_name = "Github加速代理"
if "{PIP_PROXY}" in url:
url = url.replace(
"{PIP_PROXY}",
UrlUtils.standardize_base_url(
settings.PIP_PROXY or "https://pypi.org/simple/"
),
)
if settings.PIP_PROXY:
proxy_name = "PIP加速代理"
url = url.replace("{TMDBAPIKEY}", settings.TMDB_API_KEY)
result = await AsyncRequestUtils(
proxies=settings.PROXY if proxy else None,
headers=headers,
url = target["url"]
invalid_message = _validate_nettest_url(url)
if invalid_message:
logger.warning(f"拦截不安全的网络测试地址: {url}")
return schemas.Response(success=False, message=invalid_message)
if include:
logger.debug("nettest include 参数已忽略,改为服务端固定校验")
request_utils = AsyncRequestUtils(
proxies=settings.PROXY if target.get("proxy") else None,
headers=target.get("headers"),
timeout=10,
ua=settings.NORMAL_USER_AGENT,
).get_res(url)
verify=True,
follow_redirects=False,
)
result = None
current_url = url
redirect_count = 0
while redirect_count <= 3:
result = await request_utils.get_res(current_url, allow_redirects=False)
if result is None:
break
if result.status_code not in _NETTEST_REDIRECT_STATUS_CODES:
break
location = result.headers.get("location")
if not location:
break
next_url = urljoin(current_url, location)
if not _is_allowed_nettest_redirect(next_url, target):
await _close_nettest_response(result)
logger.warning(f"拦截网络测试重定向: {current_url} -> {next_url}")
return schemas.Response(success=False, message="测试目标发生了未授权跳转")
await _close_nettest_response(result)
current_url = next_url
redirect_count += 1
if redirect_count > 3:
return schemas.Response(success=False, message="测试目标重定向次数过多")
# 计时结束的毫秒数
end_time = datetime.now()
time = round((end_time - start_time).total_seconds() * 1000)
# 计算相关秒数
if result is None:
return schemas.Response(
success=False, message=f"{proxy_name}无法连接", data={"time": time}
success=False,
message=f"{target.get('proxy_name') or target.get('name')}无法连接",
data={"time": time},
)
elif result.status_code == 200:
if include and not re.search(r"%s" % include, result.text, re.IGNORECASE):
# 通常是被加速代理跳转到其它页面了
logger.error(f"{url} 的响应内容不匹配包含规则 {include}")
if proxy_name:
message = f"{proxy_name}已失效,请检查配置"
else:
message = f"无效响应,不匹配 {include}"
expected_text = target.get("expected_text")
if expected_text and expected_text.lower() not in (result.text or "").lower():
return schemas.Response(
success=False,
message=message,
message=target.get("invalid_message") or "无效响应",
data={"time": time},
)
return schemas.Response(success=True, data={"time": time})
else:
if proxy_name:
if target.get("proxy_name"):
# 加速代理失败
message = f"{proxy_name}已失效,错误码:{result.status_code}"
message = f"{target['proxy_name']}已失效,错误码:{result.status_code}"
else:
message = f"错误码:{result.status_code}"
if "github" in url:

View File

@@ -581,7 +581,9 @@ class AsyncRequestUtils:
timeout: int = None,
referer: str = None,
content_type: str = None,
accept_type: str = None):
accept_type: str = None,
verify: bool = False,
follow_redirects: bool = True):
"""
:param headers: 请求头部信息
:param ua: User-Agent字符串
@@ -592,10 +594,14 @@ class AsyncRequestUtils:
:param referer: Referer头部信息
:param content_type: 请求的Content-Type默认为 "application/x-www-form-urlencoded; charset=UTF-8"
:param accept_type: Accept头部信息默认为 "application/json"
:param verify: 是否校验证书
:param follow_redirects: 客户端默认是否跟随重定向
"""
self._proxies = self._convert_proxies_for_httpx(proxies)
self._client = client
self._timeout = timeout or 20
self._verify = verify
self._follow_redirects = follow_redirects
if not content_type:
content_type = "application/x-www-form-urlencoded; charset=UTF-8"
if headers:
@@ -681,8 +687,8 @@ class AsyncRequestUtils:
async with httpx.AsyncClient(
proxy=self._proxies,
timeout=self._timeout,
verify=False,
follow_redirects=True,
verify=self._verify,
follow_redirects=self._follow_redirects,
cookies=self._cookies # 在创建客户端时传入Cookie
) as client:
return await self._make_request(client, method, url, raise_exception, **kwargs)

View File

@@ -0,0 +1,185 @@
import asyncio
import sys
import unittest
from types import ModuleType, SimpleNamespace
from unittest.mock import patch
def _stub_module(name: str, **attrs):
module = sys.modules.get(name)
if module is None:
module = ModuleType(name)
sys.modules[name] = module
for key, value in attrs.items():
setattr(module, key, value)
return module
class _Dummy:
def __init__(self, *args, **kwargs):
pass
for _module_name in ("pillow_avif", "aiofiles", "psutil"):
_stub_module(_module_name)
_stub_module("app.helper.sites", SitesHelper=_Dummy)
_stub_module("app.chain.mediaserver", MediaServerChain=_Dummy)
_stub_module("app.chain.search", SearchChain=_Dummy)
_stub_module("app.chain.system", SystemChain=_Dummy)
_stub_module("app.core.event", eventmanager=_Dummy())
_stub_module("app.core.metainfo", MetaInfo=_Dummy)
_stub_module("app.core.module", ModuleManager=_Dummy)
_stub_module(
"app.core.security",
verify_apitoken=_Dummy,
verify_resource_token=_Dummy,
verify_token=_Dummy,
)
_stub_module("app.db.models", User=_Dummy)
_stub_module("app.db.systemconfig_oper", SystemConfigOper=_Dummy)
_stub_module(
"app.db.user_oper",
get_current_active_superuser=_Dummy,
get_current_active_superuser_async=_Dummy,
get_current_active_user_async=_Dummy,
)
_stub_module("app.helper.llm", LLMHelper=_Dummy)
_stub_module("app.helper.mediaserver", MediaServerHelper=_Dummy)
_stub_module("app.helper.message", MessageHelper=_Dummy)
_stub_module("app.helper.progress", ProgressHelper=_Dummy)
_stub_module("app.helper.rule", RuleHelper=_Dummy)
_stub_module("app.helper.subscribe", SubscribeHelper=_Dummy)
_stub_module("app.helper.system", SystemHelper=_Dummy)
_stub_module("app.helper.image", ImageHelper=_Dummy)
_stub_module("app.scheduler", Scheduler=_Dummy)
_stub_module("app.utils.crypto", HashUtils=_Dummy)
_stub_module("app.utils.http", RequestUtils=_Dummy, AsyncRequestUtils=_Dummy)
_stub_module("version", APP_VERSION="test")
from app.api.endpoints import system as system_endpoint
class NettestSecurityTest(unittest.TestCase):
def test_nettest_targets_are_served_by_backend(self):
resp = asyncio.run(system_endpoint.nettest_targets(_="token"))
self.assertTrue(resp.success)
self.assertTrue(any(item["id"] == "pip_proxy" for item in resp.data))
self.assertTrue(any(item["id"] == "github_proxy_web" for item in resp.data))
def test_nettest_blocks_unknown_target(self):
class FailIfCalled:
def __init__(self, *args, **kwargs):
raise AssertionError("nettest should reject unknown targets before any outbound request")
with patch.object(system_endpoint, "AsyncRequestUtils", FailIfCalled):
resp = asyncio.run(
system_endpoint.nettest(
target_id="unknown-target",
_="token",
)
)
self.assertFalse(resp.success)
self.assertIn("不存在", resp.message)
def test_nettest_blocks_unapproved_redirect(self):
captured = {"calls": 0}
class FakeResponse:
def __init__(self, status_code, headers=None, text=""):
self.status_code = status_code
self.headers = headers or {}
self.text = text
async def aclose(self):
return None
class FakeAsyncRequestUtils:
def __init__(self, **kwargs):
captured["init_kwargs"] = kwargs
async def get_res(self, url, allow_redirects=True):
captured["calls"] += 1
return FakeResponse(
302,
headers={"location": "https://169.254.169.254/latest/meta-data/"},
)
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
system_endpoint.settings,
"GITHUB_PROXY",
"https://ghproxy.example/",
):
resp = asyncio.run(
system_endpoint.nettest(
target_id="github_proxy_web",
_="token",
)
)
self.assertFalse(resp.success)
self.assertIn("跳转", resp.message)
self.assertEqual(captured["calls"], 1)
def test_nettest_uses_safe_http_options_and_server_side_content_check(self):
captured = {}
class FakeAsyncRequestUtils:
def __init__(self, **kwargs):
captured["init_kwargs"] = kwargs
async def get_res(self, url, allow_redirects=True):
captured["url"] = url
captured["allow_redirects"] = allow_redirects
return SimpleNamespace(status_code=200, text="MoviePilot README")
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
system_endpoint.settings,
"GITHUB_PROXY",
"https://ghproxy.example/",
):
resp = asyncio.run(
system_endpoint.nettest(
target_id="github_proxy_web",
include="tag_name",
_="token",
)
)
self.assertTrue(resp.success)
self.assertEqual(
captured["url"],
"https://ghproxy.example/https://github.com/jxxghp/MoviePilot/blob/v2/README.md",
)
self.assertFalse(captured["allow_redirects"])
self.assertTrue(captured["init_kwargs"]["verify"])
self.assertFalse(captured["init_kwargs"]["follow_redirects"])
def test_nettest_fails_when_expected_content_is_missing(self):
class FakeAsyncRequestUtils:
def __init__(self, **kwargs):
pass
async def get_res(self, url, allow_redirects=True):
return SimpleNamespace(status_code=200, text="proxy landing page")
with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object(
system_endpoint.settings,
"PIP_PROXY",
"https://pypi.tuna.tsinghua.edu.cn/simple/",
):
resp = asyncio.run(
system_endpoint.nettest(
target_id="pip_proxy",
_="token",
)
)
self.assertFalse(resp.success)
self.assertIn("PIP加速代理", resp.message)
if __name__ == "__main__":
unittest.main()