mirror of
https://github.com/snailyp/gemini-balance.git
synced 2026-05-20 15:50:10 +08:00
重构项目目录结构,提高代码组织性和可维护性 将schemas目录重命名为domain,更好地表达领域模型概念 将services目录细分为service/chat、service/image等子目录 将api目录重命名为router,更符合FastAPI惯例 创建utils目录存放通用工具函数 更新FastAPI应用程序生命周期管理 替换已弃用的on_event方法为推荐的lifespan事件处理器 添加应用程序关闭时的日志记录 代码质量改进 抽取常量到constants.py,减少硬编码值 添加helpers.py提供通用工具函数 优化配置管理,使用环境变量和默认值 完善文档字符串,提高代码可读性
64 lines
2.5 KiB
Python
64 lines
2.5 KiB
Python
from fastapi import HTTPException, Header
|
|
from typing import Optional
|
|
from app.logger.logger import get_security_logger
|
|
from app.config.config import settings
|
|
|
|
logger = get_security_logger()
|
|
|
|
def verify_auth_token(token: str) -> bool:
|
|
return token == settings.AUTH_TOKEN
|
|
|
|
class SecurityService:
|
|
def __init__(self, allowed_tokens: list, auth_token: str):
|
|
self.allowed_tokens = allowed_tokens
|
|
self.auth_token = auth_token
|
|
|
|
async def verify_key(self, key: str):
|
|
if key not in self.allowed_tokens and key != self.auth_token:
|
|
logger.error("Invalid key")
|
|
raise HTTPException(status_code=401, detail="Invalid key")
|
|
return key
|
|
|
|
async def verify_authorization(
|
|
self, authorization: Optional[str] = Header(None)
|
|
) -> str:
|
|
if not authorization:
|
|
logger.error("Missing Authorization header")
|
|
raise HTTPException(status_code=401, detail="Missing Authorization header")
|
|
|
|
if not authorization.startswith("Bearer "):
|
|
logger.error("Invalid Authorization header format")
|
|
raise HTTPException(
|
|
status_code=401, detail="Invalid Authorization header format"
|
|
)
|
|
|
|
token = authorization.replace("Bearer ", "")
|
|
if token not in self.allowed_tokens and token != self.auth_token:
|
|
logger.error("Invalid token")
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
return token
|
|
|
|
async def verify_goog_api_key(self, x_goog_api_key: Optional[str] = Header(None)) -> str:
|
|
"""验证Google API Key"""
|
|
if not x_goog_api_key:
|
|
logger.error("Missing x-goog-api-key header")
|
|
raise HTTPException(status_code=401, detail="Missing x-goog-api-key header")
|
|
|
|
if x_goog_api_key not in self.allowed_tokens and x_goog_api_key != self.auth_token:
|
|
logger.error("Invalid x-goog-api-key")
|
|
raise HTTPException(status_code=401, detail="Invalid x-goog-api-key")
|
|
|
|
return x_goog_api_key
|
|
|
|
async def verify_auth_token(self, authorization: Optional[str] = Header(None)) -> str:
|
|
if not authorization:
|
|
logger.error("Missing auth_token header")
|
|
raise HTTPException(status_code=401, detail="Missing auth_token header")
|
|
token = authorization.replace("Bearer ", "")
|
|
if token != self.auth_token:
|
|
logger.error("Invalid auth_token")
|
|
raise HTTPException(status_code=401, detail="Invalid auth_token")
|
|
|
|
return token
|