From 4c32ad902bba1039e2433567d483598299781f47 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sat, 18 Apr 2026 17:43:38 +0800 Subject: [PATCH] Harden system nettest SSRF handling --- app/api/endpoints/system.py | 340 ++++++++++++++++++++++++++++++----- app/utils/http.py | 12 +- tests/test_system_nettest.py | 185 +++++++++++++++++++ 3 files changed, 494 insertions(+), 43 deletions(-) create mode 100644 tests/test_system_nettest.py diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index 6bb912b1..0cce3d44 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -3,7 +3,8 @@ import json import re from collections import deque from datetime import datetime -from typing import Optional, Union, Annotated +from typing import Any, Optional, Union, Annotated +from urllib.parse import urljoin, urlparse import aiofiles import pillow_avif # noqa 用于自动注册AVIF支持 @@ -48,6 +49,236 @@ from version import APP_VERSION router = APIRouter() +_NETTEST_REDIRECT_STATUS_CODES = {301, 302, 303, 307, 308} + + +def _match_nettest_prefix(url: str, prefix: str) -> bool: + parsed_url = urlparse(url) + parsed_prefix = urlparse(prefix) + if parsed_url.scheme.lower() != parsed_prefix.scheme.lower(): + return False + if (parsed_url.hostname or "").lower() != (parsed_prefix.hostname or "").lower(): + return False + url_port = parsed_url.port or (443 if parsed_url.scheme.lower() == "https" else 80) + prefix_port = parsed_prefix.port or (443 if parsed_prefix.scheme.lower() == "https" else 80) + if url_port != prefix_port: + return False + return parsed_url.path.startswith(parsed_prefix.path or "/") + + +def _build_nettest_rules() -> list[dict[str, Any]]: + github_proxy = UrlUtils.standardize_base_url(settings.GITHUB_PROXY or "") + pip_proxy = UrlUtils.standardize_base_url( + settings.PIP_PROXY or "https://pypi.org/simple/" + ) + tmdb_key = settings.TMDB_API_KEY + tmdb_domain = settings.TMDB_API_DOMAIN or "api.themoviedb.org" + + github_readme_url = "https://github.com/jxxghp/MoviePilot/blob/v2/README.md" + raw_readme_url = "https://raw.githubusercontent.com/jxxghp/MoviePilot/v2/README.md" + + rules = [ + { + "id": "tmdb_api", + "name": "api.themoviedb.org", + "icon": "tmdb", + "url": f"https://api.themoviedb.org/3/movie/550?api_key={tmdb_key}", + "proxy": True, + "allowed_redirect_prefixes": [ + "https://api.themoviedb.org/3/", + ], + }, + { + "id": "tmdb_api_alt", + "name": "api.tmdb.org", + "icon": "tmdb", + "url": f"https://api.tmdb.org/3/movie/550?api_key={tmdb_key}", + "proxy": True, + "allowed_redirect_prefixes": [ + "https://api.tmdb.org/3/", + ], + }, + { + "id": "tmdb_web", + "name": "www.themoviedb.org", + "icon": "tmdb", + "url": "https://www.themoviedb.org", + "proxy": True, + "allowed_redirect_prefixes": ["https://www.themoviedb.org/"], + }, + { + "id": "tvdb_api", + "name": "api.thetvdb.com", + "icon": "tvdb", + "url": "https://api.thetvdb.com/series/81189", + "proxy": True, + "allowed_redirect_prefixes": ["https://api.thetvdb.com/"], + }, + { + "id": "fanart_api", + "name": "webservice.fanart.tv", + "icon": "fanart", + "url": "https://webservice.fanart.tv", + "proxy": True, + "allowed_redirect_prefixes": ["https://webservice.fanart.tv/"], + }, + { + "id": "telegram_api", + "name": "api.telegram.org", + "icon": "telegram", + "url": "https://api.telegram.org", + "proxy": True, + "allowed_redirect_prefixes": ["https://api.telegram.org/"], + }, + { + "id": "wechat_api", + "name": "qyapi.weixin.qq.com", + "icon": "wechat", + "url": "https://qyapi.weixin.qq.com/cgi-bin/gettoken", + "proxy": False, + "allowed_redirect_prefixes": ["https://qyapi.weixin.qq.com/"], + }, + { + "id": "douban_api", + "name": "frodo.douban.com", + "icon": "douban", + "url": "https://frodo.douban.com", + "proxy": False, + "allowed_redirect_prefixes": ["https://frodo.douban.com/"], + }, + { + "id": "slack_api", + "name": "slack.com", + "icon": "slack", + "url": "https://slack.com", + "proxy": False, + "allowed_redirect_prefixes": [ + "https://slack.com/", + "https://www.slack.com/", + ], + }, + { + "id": "pip_proxy", + "name": "pypi.org", + "icon": "python", + "url": f"{pip_proxy}rsa/", + "proxy": True, + "allowed_redirect_prefixes": [ + pip_proxy, + "https://pypi.org/simple/", + ], + "expected_text": "pypi:repository-version", + "invalid_message": "PIP加速代理已失效,请检查配置", + "proxy_name": "PIP加速代理", + }, + { + "id": "github_proxy_web", + "name": "github.com", + "icon": "github", + "url": f"{github_proxy}{github_readme_url}" if github_proxy else github_readme_url, + "proxy": True, + "allowed_redirect_prefixes": [ + "https://github.com/", + *((f"{github_proxy}https://github.com/",) if github_proxy else ()), + ], + "expected_text": "MoviePilot", + "invalid_message": "Github加速代理已失效,请检查配置" if github_proxy else "无效响应", + "proxy_name": "Github加速代理" if github_proxy else "", + "headers": settings.GITHUB_HEADERS, + }, + { + "id": "github_api", + "name": "api.github.com", + "icon": "github", + "url": "https://api.github.com", + "proxy": True, + "allowed_redirect_prefixes": ["https://api.github.com/"], + "headers": settings.GITHUB_HEADERS, + }, + { + "id": "github_codeload", + "name": "codeload.github.com", + "icon": "github", + "url": "https://codeload.github.com", + "proxy": True, + "allowed_redirect_prefixes": ["https://codeload.github.com/"], + "headers": settings.GITHUB_HEADERS, + }, + { + "id": "github_proxy_raw", + "name": "raw.githubusercontent.com", + "icon": "github", + "url": f"{github_proxy}{raw_readme_url}" if github_proxy else raw_readme_url, + "proxy": True, + "allowed_redirect_prefixes": [ + "https://raw.githubusercontent.com/", + *((f"{github_proxy}https://raw.githubusercontent.com/",) if github_proxy else ()), + ], + "expected_text": "MoviePilot", + "invalid_message": "Github加速代理已失效,请检查配置" if github_proxy else "无效响应", + "proxy_name": "Github加速代理" if github_proxy else "", + "headers": settings.GITHUB_HEADERS, + }, + ] + if tmdb_domain not in {"api.themoviedb.org", "api.tmdb.org"}: + rules.insert( + 2, + { + "id": "tmdb_api_configured", + "name": tmdb_domain, + "icon": "tmdb", + "url": f"https://{tmdb_domain}/3/movie/550?api_key={tmdb_key}", + "proxy": True, + "allowed_redirect_prefixes": [ + f"https://{tmdb_domain}/3/", + ], + }, + ) + return rules + + +def _validate_nettest_url(url: str) -> Optional[str]: + parsed = urlparse(url) + if parsed.scheme.lower() != "https": + return "测试地址仅支持 HTTPS" + if not parsed.netloc: + return "测试地址无效" + if parsed.username or parsed.password: + return "测试地址不支持携带账号信息" + if not _get_nettest_rule(url): + return "测试地址不在允许的测试目标列表中" + return None + + +def _get_nettest_rule(url: Optional[str] = None, target_id: Optional[str] = None) -> Optional[dict[str, Any]]: + for rule in _build_nettest_rules(): + if target_id and rule.get("id") == target_id: + return rule + if url and rule.get("url") == url: + return rule + return None + + +def _is_allowed_nettest_redirect(url: str, rule: dict[str, Any]) -> bool: + parsed = urlparse(url) + if parsed.scheme.lower() != "https" or not parsed.netloc: + return False + if parsed.username or parsed.password: + return False + return any( + _match_nettest_prefix(url, prefix) + for prefix in rule.get("allowed_redirect_prefixes", []) + ) + + +async def _close_nettest_response(response: Any) -> None: + if response is None or not hasattr(response, "aclose"): + return + try: + await response.aclose() + except Exception as err: + logger.debug(f"关闭网络测试响应失败: {err}") + async def fetch_image( url: str, @@ -541,72 +772,101 @@ def ruletest( ) +@router.get("/nettest/targets", summary="获取网络测试目标", response_model=schemas.Response) +async def nettest_targets(_: schemas.TokenPayload = Depends(verify_token)): + """ + 获取网络测试目标 + """ + return schemas.Response( + success=True, + data=[ + { + "id": item["id"], + "name": item["name"], + "icon": item["icon"], + } + for item in _build_nettest_rules() + ], + ) + + @router.get("/nettest", summary="测试网络连通性") async def nettest( - url: str, - proxy: bool, + target_id: Optional[str] = None, + url: Optional[str] = None, + proxy: Optional[bool] = None, include: Optional[str] = None, _: schemas.TokenPayload = Depends(verify_token), ): """ 测试网络连通性 """ + target = _get_nettest_rule(url=url, target_id=target_id) + if not target: + return schemas.Response(success=False, message="测试目标不存在") # 记录开始的毫秒数 start_time = datetime.now() - headers = None - # 当前使用的加速代理 - proxy_name = "" - if "github" in url: - # 这是github的连通性测试 - headers = settings.GITHUB_HEADERS - if "{GITHUB_PROXY}" in url: - url = url.replace( - "{GITHUB_PROXY}", UrlUtils.standardize_base_url(settings.GITHUB_PROXY or "") - ) - if settings.GITHUB_PROXY: - proxy_name = "Github加速代理" - if "{PIP_PROXY}" in url: - url = url.replace( - "{PIP_PROXY}", - UrlUtils.standardize_base_url( - settings.PIP_PROXY or "https://pypi.org/simple/" - ), - ) - if settings.PIP_PROXY: - proxy_name = "PIP加速代理" - url = url.replace("{TMDBAPIKEY}", settings.TMDB_API_KEY) - result = await AsyncRequestUtils( - proxies=settings.PROXY if proxy else None, - headers=headers, + url = target["url"] + invalid_message = _validate_nettest_url(url) + if invalid_message: + logger.warning(f"拦截不安全的网络测试地址: {url}") + return schemas.Response(success=False, message=invalid_message) + if include: + logger.debug("nettest include 参数已忽略,改为服务端固定校验") + + request_utils = AsyncRequestUtils( + proxies=settings.PROXY if target.get("proxy") else None, + headers=target.get("headers"), timeout=10, ua=settings.NORMAL_USER_AGENT, - ).get_res(url) + verify=True, + follow_redirects=False, + ) + result = None + current_url = url + redirect_count = 0 + while redirect_count <= 3: + result = await request_utils.get_res(current_url, allow_redirects=False) + if result is None: + break + if result.status_code not in _NETTEST_REDIRECT_STATUS_CODES: + break + location = result.headers.get("location") + if not location: + break + next_url = urljoin(current_url, location) + if not _is_allowed_nettest_redirect(next_url, target): + await _close_nettest_response(result) + logger.warning(f"拦截网络测试重定向: {current_url} -> {next_url}") + return schemas.Response(success=False, message="测试目标发生了未授权跳转") + await _close_nettest_response(result) + current_url = next_url + redirect_count += 1 + if redirect_count > 3: + return schemas.Response(success=False, message="测试目标重定向次数过多") # 计时结束的毫秒数 end_time = datetime.now() time = round((end_time - start_time).total_seconds() * 1000) # 计算相关秒数 if result is None: return schemas.Response( - success=False, message=f"{proxy_name}无法连接", data={"time": time} + success=False, + message=f"{target.get('proxy_name') or target.get('name')}无法连接", + data={"time": time}, ) elif result.status_code == 200: - if include and not re.search(r"%s" % include, result.text, re.IGNORECASE): - # 通常是被加速代理跳转到其它页面了 - logger.error(f"{url} 的响应内容不匹配包含规则 {include}") - if proxy_name: - message = f"{proxy_name}已失效,请检查配置" - else: - message = f"无效响应,不匹配 {include}" + expected_text = target.get("expected_text") + if expected_text and expected_text.lower() not in (result.text or "").lower(): return schemas.Response( success=False, - message=message, + message=target.get("invalid_message") or "无效响应", data={"time": time}, ) return schemas.Response(success=True, data={"time": time}) else: - if proxy_name: + if target.get("proxy_name"): # 加速代理失败 - message = f"{proxy_name}已失效,错误码:{result.status_code}" + message = f"{target['proxy_name']}已失效,错误码:{result.status_code}" else: message = f"错误码:{result.status_code}" if "github" in url: diff --git a/app/utils/http.py b/app/utils/http.py index 137ca6f5..0cbcaeef 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -581,7 +581,9 @@ class AsyncRequestUtils: timeout: int = None, referer: str = None, content_type: str = None, - accept_type: str = None): + accept_type: str = None, + verify: bool = False, + follow_redirects: bool = True): """ :param headers: 请求头部信息 :param ua: User-Agent字符串 @@ -592,10 +594,14 @@ class AsyncRequestUtils: :param referer: Referer头部信息 :param content_type: 请求的Content-Type,默认为 "application/x-www-form-urlencoded; charset=UTF-8" :param accept_type: Accept头部信息,默认为 "application/json" + :param verify: 是否校验证书 + :param follow_redirects: 客户端默认是否跟随重定向 """ self._proxies = self._convert_proxies_for_httpx(proxies) self._client = client self._timeout = timeout or 20 + self._verify = verify + self._follow_redirects = follow_redirects if not content_type: content_type = "application/x-www-form-urlencoded; charset=UTF-8" if headers: @@ -681,8 +687,8 @@ class AsyncRequestUtils: async with httpx.AsyncClient( proxy=self._proxies, timeout=self._timeout, - verify=False, - follow_redirects=True, + verify=self._verify, + follow_redirects=self._follow_redirects, cookies=self._cookies # 在创建客户端时传入Cookie ) as client: return await self._make_request(client, method, url, raise_exception, **kwargs) diff --git a/tests/test_system_nettest.py b/tests/test_system_nettest.py new file mode 100644 index 00000000..6798ba89 --- /dev/null +++ b/tests/test_system_nettest.py @@ -0,0 +1,185 @@ +import asyncio +import sys +import unittest +from types import ModuleType, SimpleNamespace +from unittest.mock import patch + + +def _stub_module(name: str, **attrs): + module = sys.modules.get(name) + if module is None: + module = ModuleType(name) + sys.modules[name] = module + for key, value in attrs.items(): + setattr(module, key, value) + return module + + +class _Dummy: + def __init__(self, *args, **kwargs): + pass + + +for _module_name in ("pillow_avif", "aiofiles", "psutil"): + _stub_module(_module_name) + +_stub_module("app.helper.sites", SitesHelper=_Dummy) +_stub_module("app.chain.mediaserver", MediaServerChain=_Dummy) +_stub_module("app.chain.search", SearchChain=_Dummy) +_stub_module("app.chain.system", SystemChain=_Dummy) +_stub_module("app.core.event", eventmanager=_Dummy()) +_stub_module("app.core.metainfo", MetaInfo=_Dummy) +_stub_module("app.core.module", ModuleManager=_Dummy) +_stub_module( + "app.core.security", + verify_apitoken=_Dummy, + verify_resource_token=_Dummy, + verify_token=_Dummy, +) +_stub_module("app.db.models", User=_Dummy) +_stub_module("app.db.systemconfig_oper", SystemConfigOper=_Dummy) +_stub_module( + "app.db.user_oper", + get_current_active_superuser=_Dummy, + get_current_active_superuser_async=_Dummy, + get_current_active_user_async=_Dummy, +) +_stub_module("app.helper.llm", LLMHelper=_Dummy) +_stub_module("app.helper.mediaserver", MediaServerHelper=_Dummy) +_stub_module("app.helper.message", MessageHelper=_Dummy) +_stub_module("app.helper.progress", ProgressHelper=_Dummy) +_stub_module("app.helper.rule", RuleHelper=_Dummy) +_stub_module("app.helper.subscribe", SubscribeHelper=_Dummy) +_stub_module("app.helper.system", SystemHelper=_Dummy) +_stub_module("app.helper.image", ImageHelper=_Dummy) +_stub_module("app.scheduler", Scheduler=_Dummy) +_stub_module("app.utils.crypto", HashUtils=_Dummy) +_stub_module("app.utils.http", RequestUtils=_Dummy, AsyncRequestUtils=_Dummy) +_stub_module("version", APP_VERSION="test") + +from app.api.endpoints import system as system_endpoint + + +class NettestSecurityTest(unittest.TestCase): + def test_nettest_targets_are_served_by_backend(self): + resp = asyncio.run(system_endpoint.nettest_targets(_="token")) + + self.assertTrue(resp.success) + self.assertTrue(any(item["id"] == "pip_proxy" for item in resp.data)) + self.assertTrue(any(item["id"] == "github_proxy_web" for item in resp.data)) + + def test_nettest_blocks_unknown_target(self): + class FailIfCalled: + def __init__(self, *args, **kwargs): + raise AssertionError("nettest should reject unknown targets before any outbound request") + + with patch.object(system_endpoint, "AsyncRequestUtils", FailIfCalled): + resp = asyncio.run( + system_endpoint.nettest( + target_id="unknown-target", + _="token", + ) + ) + + self.assertFalse(resp.success) + self.assertIn("不存在", resp.message) + + def test_nettest_blocks_unapproved_redirect(self): + captured = {"calls": 0} + + class FakeResponse: + def __init__(self, status_code, headers=None, text=""): + self.status_code = status_code + self.headers = headers or {} + self.text = text + + async def aclose(self): + return None + + class FakeAsyncRequestUtils: + def __init__(self, **kwargs): + captured["init_kwargs"] = kwargs + + async def get_res(self, url, allow_redirects=True): + captured["calls"] += 1 + return FakeResponse( + 302, + headers={"location": "https://169.254.169.254/latest/meta-data/"}, + ) + + with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( + system_endpoint.settings, + "GITHUB_PROXY", + "https://ghproxy.example/", + ): + resp = asyncio.run( + system_endpoint.nettest( + target_id="github_proxy_web", + _="token", + ) + ) + + self.assertFalse(resp.success) + self.assertIn("跳转", resp.message) + self.assertEqual(captured["calls"], 1) + + def test_nettest_uses_safe_http_options_and_server_side_content_check(self): + captured = {} + + class FakeAsyncRequestUtils: + def __init__(self, **kwargs): + captured["init_kwargs"] = kwargs + + async def get_res(self, url, allow_redirects=True): + captured["url"] = url + captured["allow_redirects"] = allow_redirects + return SimpleNamespace(status_code=200, text="MoviePilot README") + + with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( + system_endpoint.settings, + "GITHUB_PROXY", + "https://ghproxy.example/", + ): + resp = asyncio.run( + system_endpoint.nettest( + target_id="github_proxy_web", + include="tag_name", + _="token", + ) + ) + + self.assertTrue(resp.success) + self.assertEqual( + captured["url"], + "https://ghproxy.example/https://github.com/jxxghp/MoviePilot/blob/v2/README.md", + ) + self.assertFalse(captured["allow_redirects"]) + self.assertTrue(captured["init_kwargs"]["verify"]) + self.assertFalse(captured["init_kwargs"]["follow_redirects"]) + + def test_nettest_fails_when_expected_content_is_missing(self): + class FakeAsyncRequestUtils: + def __init__(self, **kwargs): + pass + + async def get_res(self, url, allow_redirects=True): + return SimpleNamespace(status_code=200, text="proxy landing page") + + with patch.object(system_endpoint, "AsyncRequestUtils", FakeAsyncRequestUtils), patch.object( + system_endpoint.settings, + "PIP_PROXY", + "https://pypi.tuna.tsinghua.edu.cn/simple/", + ): + resp = asyncio.run( + system_endpoint.nettest( + target_id="pip_proxy", + _="token", + ) + ) + + self.assertFalse(resp.success) + self.assertIn("PIP加速代理", resp.message) + + +if __name__ == "__main__": + unittest.main()