mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-09 17:50:23 +08:00
Merge pull request #2826 from InfinityPacer/feature/security
This commit is contained in:
@@ -32,10 +32,14 @@ class ConfigModel(BaseModel):
|
||||
FRONTEND_PATH: str = "/public"
|
||||
# 密钥
|
||||
SECRET_KEY: str = secrets.token_urlsafe(32)
|
||||
# RESOURCE密钥
|
||||
RESOURCE_SECRET_KEY: str = secrets.token_urlsafe(32)
|
||||
# 允许的域名
|
||||
ALLOWED_HOSTS: list = ["*"]
|
||||
# TOKEN过期时间
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8
|
||||
# RESOURCE_TOKEN过期时间
|
||||
RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS: int = 60 * 10
|
||||
# 时区
|
||||
TZ: str = "Asia/Shanghai"
|
||||
# API监听地址
|
||||
|
||||
@@ -11,8 +11,8 @@ import jwt
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from cryptography.fernet import Fernet
|
||||
from fastapi import HTTPException, status, Security
|
||||
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery
|
||||
from fastapi import HTTPException, status, Security, Request, Response
|
||||
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery, APIKeyCookie
|
||||
from passlib.context import CryptContext
|
||||
|
||||
from app import schemas
|
||||
@@ -27,8 +27,8 @@ oauth2_scheme = OAuth2PasswordBearer(
|
||||
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
|
||||
)
|
||||
|
||||
# JWT TOKEN 通过 QUERY 认证
|
||||
jwt_token_query = APIKeyQuery(name="token", auto_error=False, scheme_name="jwt_token_query")
|
||||
# RESOURCE TOKEN 通过 Cookie 认证
|
||||
resource_token_cookie = APIKeyCookie(name=settings.PROJECT_NAME, auto_error=False, scheme_name="resource_token_cookie")
|
||||
|
||||
# API TOKEN 通过 QUERY 认证
|
||||
api_token_query = APIKeyQuery(name="token", auto_error=False, scheme_name="api_token_query")
|
||||
@@ -45,50 +45,124 @@ def create_access_token(
|
||||
username: str,
|
||||
super_user: bool = False,
|
||||
expires_delta: Optional[timedelta] = None,
|
||||
level: int = 1
|
||||
level: int = 1,
|
||||
purpose: Optional[str] = "authentication"
|
||||
) -> str:
|
||||
"""
|
||||
创建 JWT 访问令牌,包含用户 ID、用户名、是否为超级用户以及权限等级
|
||||
:param userid: 用户的唯一标识符,通常是字符串或整数
|
||||
:param username: 用户名,用于标识用户的账户名
|
||||
:param super_user: 是否为超级用户,默认值为 False
|
||||
:param expires_delta: 令牌的有效期时长,如果不提供则使用默认过期时间
|
||||
:param expires_delta: 令牌的有效期时长,如果不提供则根据用途使用默认过期时间
|
||||
:param level: 用户的权限级别,默认为 1
|
||||
:param purpose: 令牌的用途,"authentication" 或 "resource"
|
||||
:return: 编码后的 JWT 令牌字符串
|
||||
:raises ValueError: 如果 expires_delta 为负数
|
||||
"""
|
||||
if purpose == "resource":
|
||||
default_expire = timedelta(seconds=settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS)
|
||||
secret_key = settings.RESOURCE_SECRET_KEY
|
||||
else:
|
||||
default_expire = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
secret_key = settings.SECRET_KEY
|
||||
|
||||
if expires_delta is not None:
|
||||
if expires_delta.total_seconds() <= 0:
|
||||
raise ValueError("过期时间必须为正数")
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
else:
|
||||
expire = datetime.utcnow() + timedelta(
|
||||
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
)
|
||||
expire = datetime.utcnow() + default_expire
|
||||
|
||||
to_encode = {
|
||||
"exp": expire,
|
||||
"sub": str(userid),
|
||||
"username": username,
|
||||
"super_user": super_user,
|
||||
"level": level
|
||||
"level": level,
|
||||
"purpose": purpose
|
||||
}
|
||||
|
||||
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
|
||||
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def __verify_token(token: str) -> schemas.TokenPayload:
|
||||
def __set_or_refresh_resource_token_cookie(request: Request, response: Response, payload: schemas.TokenPayload):
|
||||
"""
|
||||
设置资源令牌 Cookie
|
||||
:param request: 包含请求相关的上下文数据
|
||||
:param response: 用于在服务器响应时设置 Cookie
|
||||
:param payload: 已通过身份验证的 TokenPayload 对象
|
||||
"""
|
||||
resource_token = request.cookies.get(settings.PROJECT_NAME)
|
||||
|
||||
if resource_token:
|
||||
# 检查令牌剩余时间
|
||||
try:
|
||||
decoded_token = jwt.decode(resource_token, settings.RESOURCE_SECRET_KEY, algorithms=[ALGORITHM])
|
||||
exp = decoded_token.get("exp")
|
||||
if exp:
|
||||
remaining_time = datetime.utcfromtimestamp(exp) - datetime.utcnow()
|
||||
# 如果剩余时间少于 2 分钟,刷新令牌
|
||||
if remaining_time < timedelta(minutes=2):
|
||||
raise jwt.ExpiredSignatureError
|
||||
except jwt.PyJWTError:
|
||||
logger.debug(f"Token error occurred. refreshing token")
|
||||
except Exception as e:
|
||||
logger.debug(f"Unexpected error occurred while decoding token: {e}")
|
||||
else:
|
||||
# 如果令牌有效且没有即将过期,则不需要刷新
|
||||
return
|
||||
|
||||
# 创建新的资源访问令牌
|
||||
resource_token_expires = timedelta(seconds=settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS)
|
||||
resource_token = create_access_token(
|
||||
userid=payload.sub,
|
||||
username=payload.username,
|
||||
super_user=payload.super_user,
|
||||
expires_delta=resource_token_expires,
|
||||
level=payload.level,
|
||||
purpose="resource"
|
||||
)
|
||||
|
||||
# 设置会话级别的 HttpOnly Cookie
|
||||
response.set_cookie(
|
||||
key=settings.PROJECT_NAME,
|
||||
value=resource_token,
|
||||
httponly=True,
|
||||
secure=request.url.scheme == "https",
|
||||
samesite="strict"
|
||||
)
|
||||
|
||||
|
||||
def __verify_token(token: str, purpose: str = "authentication") -> schemas.TokenPayload:
|
||||
"""
|
||||
使用 JWT Token 进行身份认证并解析 Token 的内容
|
||||
:param token: JWT 令牌
|
||||
:param purpose: 期望的令牌用途,默认为 "authentication"
|
||||
:return: 包含用户身份信息的 Token 负载数据
|
||||
:raises HTTPException: 如果令牌无效或用途不匹配
|
||||
"""
|
||||
使用 JWT Token 进行身份认证并解析 Token 的内容
|
||||
:param token: JWT 令牌
|
||||
:return: 包含用户身份信息的 Token 负载数据
|
||||
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
|
||||
"""
|
||||
try:
|
||||
if purpose == "resource":
|
||||
secret_key = settings.RESOURCE_SECRET_KEY
|
||||
else:
|
||||
secret_key = settings.SECRET_KEY
|
||||
|
||||
if not token:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"{purpose} token not found"
|
||||
)
|
||||
|
||||
payload = jwt.decode(
|
||||
token, settings.SECRET_KEY, algorithms=[ALGORITHM]
|
||||
token, secret_key, algorithms=[ALGORITHM]
|
||||
)
|
||||
|
||||
token_payload = schemas.TokenPayload(**payload)
|
||||
|
||||
if token_payload.purpose != purpose:
|
||||
raise jwt.InvalidTokenError("令牌用途不匹配")
|
||||
|
||||
return schemas.TokenPayload(**payload)
|
||||
except (jwt.DecodeError, jwt.InvalidTokenError, jwt.ImmatureSignatureError):
|
||||
raise HTTPException(
|
||||
@@ -97,24 +171,39 @@ def __verify_token(token: str) -> schemas.TokenPayload:
|
||||
)
|
||||
|
||||
|
||||
def verify_token(token: str = Security(oauth2_scheme)) -> schemas.TokenPayload:
|
||||
def verify_token(
|
||||
request: Request,
|
||||
response: Response,
|
||||
token: str = Security(oauth2_scheme)
|
||||
) -> schemas.TokenPayload:
|
||||
"""
|
||||
使用 JWT Token 进行身份认证并解析 Token 的内容
|
||||
:param token: JWT 令牌,从请求的 Authorization 头部获取
|
||||
:return: 包含用户身份信息的 Token 负载数据
|
||||
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
|
||||
验证 JWT 令牌并自动处理 resource_token 写入
|
||||
:param request: 请求对象,用于访问 Cookie 和请求信息
|
||||
:param response: 响应对象,用于设置 Cookie
|
||||
:param token: 从 Authorization 头部获取的 JWT 令牌
|
||||
:return: 解析后的 TokenPayload
|
||||
:raises HTTPException: 如果令牌无效或用途不匹配
|
||||
"""
|
||||
return __verify_token(token)
|
||||
# 验证并解析 JWT 认证令牌
|
||||
payload = __verify_token(token=token, purpose="authentication")
|
||||
|
||||
# 如果没有 resource_token,生成并写入到 Cookie
|
||||
__set_or_refresh_resource_token_cookie(request, response, payload)
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def verify_uri_token(token: str = Security(jwt_token_query)) -> schemas.TokenPayload:
|
||||
def verify_resource_token(
|
||||
resource_token: str = Security(resource_token_cookie)
|
||||
) -> schemas.TokenPayload:
|
||||
"""
|
||||
使用 JWT Token 进行身份认证并解析 Token 的内容
|
||||
:param token: JWT 令牌,从 URL 中的 `token` 查询参数获取
|
||||
:return: 包含用户身份信息的 Token 负载数据
|
||||
:raises HTTPException: 如果令牌无效或解码失败,抛出 403 错误
|
||||
验证资源访问令牌(从 Cookie 中获取)
|
||||
:param resource_token: 从 Cookie 中获取的资源访问令牌
|
||||
:return: 解析后的 TokenPayload
|
||||
:raises HTTPException: 如果资源访问令牌无效
|
||||
"""
|
||||
return __verify_token(token)
|
||||
# 验证并解析资源访问令牌
|
||||
return __verify_token(token=resource_token, purpose="resource")
|
||||
|
||||
|
||||
def __get_api_token(
|
||||
|
||||
Reference in New Issue
Block a user