fix: sign media server image proxy URLs

This commit is contained in:
jxxghp
2026-05-25 12:41:55 +08:00
parent d713ea54c1
commit 63b9994b0e
6 changed files with 327 additions and 160 deletions

View File

@@ -1,16 +1,21 @@
import hmac
import ipaddress
import socket
import time
from hashlib import sha256
from pathlib import Path
from typing import List, Optional, Set, Union
from urllib.parse import quote, urlparse
from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse
from anyio import Path as AsyncPath
from app.core.config import settings
from app.log import logger
class SecurityUtils:
_SIGNED_URL_PURPOSE = "image-proxy"
_SIGNED_URL_EXPIRE_SECONDS = 86400
@staticmethod
def is_safe_path(base_path: Path, user_path: Path,
@@ -107,6 +112,103 @@ class SecurityUtils:
return False
return True
@staticmethod
def _url_signature_payload(url: str, expires_at: int, purpose: str) -> bytes:
"""
构造 URL 签名载荷。
签名覆盖用途、过期时间和完整 URL确保同一个签名不能挪用到其它
内网地址或其它代理用途。
"""
return f"{purpose}\n{expires_at}\n{url}".encode("utf-8")
@staticmethod
def _sign_url_payload(url: str, expires_at: int, purpose: str) -> str:
"""
使用 RESOURCE_SECRET_KEY 对 URL 签名载荷生成 HMAC。
"""
return hmac.new(
settings.RESOURCE_SECRET_KEY.encode("utf-8"),
SecurityUtils._url_signature_payload(url, expires_at, purpose),
sha256,
).hexdigest()
@staticmethod
def strip_url_signature(url: str) -> str:
"""
移除 URL fragment 中的代理签名信息,得到真正要请求的地址。
图片代理签名放在 fragment 中,浏览器会把它传给 MoviePilot但 HTTP
客户端请求媒体服务器前不能把这些内部参数带过去。
"""
if not url:
return url
parsed_url = urlparse(url)
return urlunparse(parsed_url._replace(fragment=""))
@staticmethod
def sign_url(
url: str,
expires_in: int = _SIGNED_URL_EXPIRE_SECONDS,
purpose: str = _SIGNED_URL_PURPOSE,
) -> str:
"""
给服务端返回的资源 URL 添加临时签名。
该签名用于允许 `/system/img` 代理访问服务端已经确认过的私网图片 URL
避免代理端点重新依赖媒体服务器的具体路径规则。
"""
if not url:
return url
parsed_url = urlparse(url)
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
return url
clean_url = SecurityUtils.strip_url_signature(url)
expires_at = int(time.time() + expires_in)
signature = SecurityUtils._sign_url_payload(clean_url, expires_at, purpose)
fragment = urlencode(
{
"mp_exp": str(expires_at),
"mp_sig": signature,
"mp_purpose": purpose,
}
)
return urlunparse(urlparse(clean_url)._replace(fragment=fragment))
@staticmethod
def verify_signed_url(
url: str,
purpose: str = _SIGNED_URL_PURPOSE,
) -> Optional[str]:
"""
验证 URL fragment 中的代理签名,成功时返回去签名后的真实 URL。
"""
if not url:
return None
parsed_url = urlparse(url)
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
return None
fragment_params = dict(parse_qsl(parsed_url.fragment, keep_blank_values=True))
expires_at = fragment_params.get("mp_exp")
signature = fragment_params.get("mp_sig")
signed_purpose = fragment_params.get("mp_purpose")
if not expires_at or not signature or signed_purpose != purpose:
return None
try:
expires_at_int = int(expires_at)
except ValueError:
return None
if expires_at_int < int(time.time()):
return None
clean_url = SecurityUtils.strip_url_signature(url)
expected_signature = SecurityUtils._sign_url_payload(
clean_url, expires_at_int, purpose
)
if not hmac.compare_digest(signature, expected_signature):
return None
return clean_url
@staticmethod
def is_safe_url(
url: str,