security: Implemented API key redaction for secure logging

- Add redact_key_for_logging() helper function to show only first/last 6 chars
- Fix API key exposure in app/service/key/key_manager.py line 68
- Apply key redaction across all Python files with API key logging
- Standardize logging security across 17 files including routers, services, handlers
This commit is contained in:
Shuai Lin
2025-07-21 02:03:19 +08:00
parent b25cf7d978
commit 6abda7d902
17 changed files with 78 additions and 46 deletions

View File

@@ -8,6 +8,7 @@ import json
from app.database.connection import database
from app.database.models import Settings, ErrorLog, RequestLog, FileRecord, FileState
from app.log.logger import get_database_logger
from app.utils.helpers import redact_key_for_logging
logger = get_database_logger()
@@ -143,7 +144,7 @@ async def add_error_log(
)
)
await database.execute(query)
logger.info(f"Added error log for key: {gemini_key}")
logger.info(f"Added error log for key: {redact_key_for_logging(gemini_key)}")
return True
except Exception as e:
logger.error(f"Failed to add error log: {str(e)}")

View File

@@ -4,6 +4,7 @@ from typing import Callable, TypeVar
from app.config.config import settings
from app.log.logger import get_retry_logger
from app.utils.helpers import redact_key_for_logging
T = TypeVar("T")
logger = get_retry_logger()
@@ -37,7 +38,7 @@ class RetryHandler:
new_key = await key_manager.handle_api_failure(old_key, retries)
if new_key:
kwargs[self.key_arg] = new_key
logger.info(f"Switched to new API key: {new_key}")
logger.info(f"Switched to new API key: {redact_key_for_logging(new_key)}")
else:
logger.error(f"No valid API key available after {retries} retries.")
break

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, Field
from app.core.security import verify_auth_token
from app.log.logger import Logger, get_config_routes_logger
from app.service.config.config_service import ConfigService
from app.utils.helpers import redact_key_for_logging
router = APIRouter(prefix="/api/config", tags=["config"])
@@ -63,10 +64,10 @@ class DeleteKeysRequest(BaseModel):
async def delete_single_key(key_to_delete: str, request: Request):
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning(f"Unauthorized attempt to delete key: {key_to_delete}")
logger.warning(f"Unauthorized attempt to delete key: {redact_key_for_logging(key_to_delete)}")
return RedirectResponse(url="/", status_code=302)
try:
logger.info(f"Attempting to delete key: {key_to_delete}")
logger.info(f"Attempting to delete key: {redact_key_for_logging(key_to_delete)}")
result = await ConfigService.delete_key(key_to_delete)
if not result.get("success"):
raise HTTPException(
@@ -79,7 +80,7 @@ async def delete_single_key(key_to_delete: str, request: Request):
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"Error deleting key '{key_to_delete}': {e}", exc_info=True)
logger.error(f"Error deleting key '{redact_key_for_logging(key_to_delete)}': {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error deleting key: {str(e)}")

View File

@@ -15,6 +15,7 @@ from app.log.logger import get_files_logger
from app.core.security import SecurityService
from app.service.files.files_service import get_files_service
from app.service.files.file_upload_handler import get_upload_handler
from app.utils.helpers import redact_key_for_logging
logger = get_files_logger()
@@ -202,7 +203,7 @@ async def handle_upload(
):
"""处理文件上传请求"""
try:
logger.info(f"Handling upload request: {request.method} {upload_path}, key={key}")
logger.info(f"Handling upload request: {request.method} {upload_path}, key={redact_key_for_logging(key)}")
# 從查詢參數獲取 upload_id
upload_id = request.query_params.get("upload_id")
@@ -222,7 +223,7 @@ async def handle_upload(
# 使用真實的 API key 構建完整的 Google 上傳 URL
# 保留原始 URL 的所有參數,但使用真實的 API key
upload_url = original_upload_url
logger.info(f"Using real API key for upload: {real_api_key[:8]}...{real_api_key[-4:]}")
logger.info(f"Using real API key for upload: {redact_key_for_logging(real_api_key)}")
# 代理上传请求
upload_handler = get_upload_handler()

View File

@@ -13,6 +13,7 @@ from app.service.model.model_service import ModelService
from app.handler.retry_handler import RetryHandler
from app.handler.error_handler import handle_route_errors
from app.core.constants import API_VERSION
from app.utils.helpers import redact_key_for_logging
router = APIRouter(prefix=f"/gemini/{API_VERSION}")
router_v1beta = APIRouter(prefix=f"/{API_VERSION}")
@@ -52,7 +53,7 @@ async def list_models(
api_key = await key_manager.get_first_valid_key()
if not api_key:
raise HTTPException(status_code=503, detail="No valid API keys available to fetch models.")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
models_data = await model_service.get_gemini_models(api_key)
if not models_data or "models" not in models_data:
@@ -125,7 +126,7 @@ async def generate_content(
logger.info(f"TTS responseModalities: {response_modalities}")
logger.info(f"TTS speechConfig: {speech_config}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
@@ -169,7 +170,7 @@ async def stream_generate_content(
async with handle_route_errors(logger, operation_name, failure_message="Streaming request initiation failed"):
logger.info(f"Handling Gemini streaming content generation for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
@@ -198,7 +199,7 @@ async def count_tokens(
async with handle_route_errors(logger, operation_name, failure_message="Token counting failed"):
logger.info(f"Handling Gemini token count request for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
@@ -274,9 +275,9 @@ async def reset_selected_key_fail_counts(
if result:
reset_count += 1
else:
logger.warning(f"Key not found during selective reset: {key}")
logger.warning(f"Key not found during selective reset: {redact_key_for_logging(key)}")
except Exception as key_error:
logger.error(f"Error resetting key {key}: {str(key_error)}")
logger.error(f"Error resetting key {redact_key_for_logging(key)}: {str(key_error)}")
errors.append(f"Key {key}: {str(key_error)}")
if errors:
@@ -303,7 +304,7 @@ async def reset_selected_key_fail_counts(
async def reset_key_fail_count(api_key: str, key_manager: KeyManager = Depends(get_key_manager)):
"""重置指定Gemini API密钥的失败计数"""
logger.info("-" * 50 + "reset_gemini_key_fail_count" + "-" * 50)
logger.info(f"Resetting failure count for API key: {api_key}")
logger.info(f"Resetting failure count for API key: {redact_key_for_logging(api_key)}")
try:
result = await key_manager.reset_key_failure_count(api_key)
@@ -348,7 +349,7 @@ async def verify_key(api_key: str, chat_service: GeminiChatService = Depends(get
async with key_manager.failure_count_lock:
if api_key in key_manager.key_failure_counts:
key_manager.key_failure_counts[api_key] += 1
logger.warning(f"Verification exception for key: {api_key}, incrementing failure count")
logger.warning(f"Verification exception for key: {redact_key_for_logging(api_key)}, incrementing failure count")
return JSONResponse({"status": "invalid", "error": str(e)})
@@ -389,14 +390,14 @@ async def verify_selected_keys(
return api_key, "valid", None
except Exception as e:
error_message = str(e)
logger.warning(f"Key verification failed for {api_key}: {error_message}")
logger.warning(f"Key verification failed for {redact_key_for_logging(api_key)}: {error_message}")
async with key_manager.failure_count_lock:
if api_key in key_manager.key_failure_counts:
key_manager.key_failure_counts[api_key] += 1
logger.warning(f"Bulk verification exception for key: {api_key}, incrementing failure count")
logger.warning(f"Bulk verification exception for key: {redact_key_for_logging(api_key)}, incrementing failure count")
else:
key_manager.key_failure_counts[api_key] = 1
logger.warning(f"Bulk verification exception for key: {api_key}, initializing failure count to 1")
logger.warning(f"Bulk verification exception for key: {redact_key_for_logging(api_key)}, initializing failure count to 1")
failed_keys[api_key] = error_message
return api_key, "invalid", error_message

View File

@@ -13,6 +13,7 @@ from app.handler.error_handler import handle_route_errors
from app.log.logger import get_openai_compatible_logger
from app.service.key.key_manager import KeyManager, get_key_manager_instance
from app.service.openai_compatiable.openai_compatiable_service import OpenAICompatiableService
from app.utils.helpers import redact_key_for_logging
router = APIRouter()
@@ -46,7 +47,7 @@ async def list_models(
async with handle_route_errors(logger, operation_name):
logger.info("Handling models list request")
api_key = await key_manager.get_first_valid_key()
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
return await openai_service.get_models(api_key)
@@ -69,7 +70,7 @@ async def chat_completion(
async with handle_route_errors(logger, operation_name):
logger.info(f"Handling chat completion request for model: {request.model}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {current_api_key}")
logger.info(f"Using API key: {redact_key_for_logging(current_api_key)}")
if is_image_chat:
response = await openai_service.create_image_chat_completion(request, current_api_key)
@@ -107,7 +108,7 @@ async def embedding(
async with handle_route_errors(logger, operation_name):
logger.info(f"Handling embedding request for model: {request.model}")
api_key = await key_manager.get_next_working_key()
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
return await openai_service.create_embeddings(
input_text=request.input, model=request.model, api_key=api_key
)

View File

@@ -18,6 +18,7 @@ from app.service.image.image_create_service import ImageCreateService
from app.service.tts.tts_service import TTSService
from app.service.key.key_manager import KeyManager, get_key_manager_instance
from app.service.model.model_service import ModelService
from app.utils.helpers import redact_key_for_logging
router = APIRouter()
logger = get_openai_logger()
@@ -60,7 +61,7 @@ async def list_models(
async with handle_route_errors(logger, operation_name):
logger.info("Handling models list request")
api_key = await key_manager.get_first_valid_key()
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
return await model_service.get_gemini_openai_models(api_key)
@@ -84,7 +85,7 @@ async def chat_completion(
async with handle_route_errors(logger, operation_name):
logger.info(f"Handling chat completion request for model: {request.model}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {current_api_key}")
logger.info(f"Using API key: {redact_key_for_logging(current_api_key)}")
if not await model_service.check_model_support(request.model):
raise HTTPException(
@@ -129,7 +130,7 @@ async def embedding(
async with handle_route_errors(logger, operation_name):
logger.info(f"Handling embedding request for model: {request.model}")
api_key = await key_manager.get_next_working_key()
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
response = await embedding_service.create_embedding(
input_text=request.input, model=request.model, api_key=api_key
)
@@ -170,6 +171,6 @@ async def text_to_speech(
async with handle_route_errors(logger, operation_name):
logger.info(f"Handling TTS request for model: {request.model}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
audio_data = await tts_service.create_tts(request, api_key)
return Response(content=audio_data, media_type="audio/wav")

View File

@@ -3,6 +3,7 @@ from starlette import status
from app.core.security import verify_auth_token
from app.service.stats.stats_service import StatsService
from app.log.logger import get_stats_logger
from app.utils.helpers import redact_key_for_logging
logger = get_stats_logger()
@@ -48,7 +49,7 @@ async def get_key_usage_details(key: str):
return {}
return usage_details
except Exception as e:
logger.error(f"Error fetching key usage details for key {key[:4]}...: {e}")
logger.error(f"Error fetching key usage details for key {redact_key_for_logging(key)}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取密钥使用详情时出错: {e}"

View File

@@ -11,6 +11,7 @@ from app.service.model.model_service import ModelService
from app.handler.retry_handler import RetryHandler
from app.handler.error_handler import handle_route_errors
from app.core.constants import API_VERSION
from app.utils.helpers import redact_key_for_logging
router = APIRouter(prefix=f"/vertex-express/{API_VERSION}")
logger = get_vertex_express_logger()
@@ -48,7 +49,7 @@ async def list_models(
api_key = await key_manager.get_first_valid_key()
if not api_key:
raise HTTPException(status_code=503, detail="No valid API keys available to fetch models.")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
models_data = await model_service.get_gemini_models(api_key)
if not models_data or "models" not in models_data:
@@ -105,7 +106,7 @@ async def generate_content(
async with handle_route_errors(logger, operation_name, failure_message="Content generation failed"):
logger.info(f"Handling Gemini content generation request for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")
@@ -133,7 +134,7 @@ async def stream_generate_content(
async with handle_route_errors(logger, operation_name, failure_message="Streaming request initiation failed"):
logger.info(f"Handling Gemini streaming content generation for model: {model_name}")
logger.debug(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
logger.info(f"Using API key: {redact_key_for_logging(api_key)}")
if not await model_service.check_model_support(model_name):
raise HTTPException(status_code=400, detail=f"Model {model_name} is not supported")

View File

@@ -9,6 +9,7 @@ from app.service.error_log.error_log_service import delete_old_error_logs
from app.service.key.key_manager import get_key_manager_instance
from app.service.request_log.request_log_service import delete_old_request_logs_task
from app.service.files.files_service import get_files_service
from app.utils.helpers import redact_key_for_logging
logger = Logger.setup_logger("scheduler")
@@ -51,7 +52,7 @@ async def check_failed_keys():
for key in keys_to_check:
# 隐藏部分 key 用于日志记录
log_key = f"{key[:4]}...{key[-4:]}" if len(key) > 8 else key
log_key = redact_key_for_logging(key)
logger.info(f"Verifying key: {log_key}...")
try:
# 构造测试请求

View File

@@ -14,6 +14,7 @@ from app.log.logger import get_gemini_logger
from app.service.client.api_client import GeminiApiClient
from app.service.key.key_manager import KeyManager
from app.database.services import add_error_log, add_request_log, get_file_api_key
from app.utils.helpers import redact_key_for_logging
logger = get_gemini_logger()
@@ -296,10 +297,10 @@ class GeminiChatService:
logger.info(f"Request contains file references: {file_names}")
file_api_key = await get_file_api_key(file_names[0])
if file_api_key:
logger.info(f"Found API key for file {file_names[0]}: {file_api_key[:8]}...{file_api_key[-4:]}")
logger.info(f"Found API key for file {file_names[0]}: {redact_key_for_logging(file_api_key)}")
api_key = file_api_key # 使用文件的 API key
else:
logger.warning(f"No API key found for file {file_names[0]}, using default key: {api_key[:8]}...{api_key[-4:]}")
logger.warning(f"No API key found for file {file_names[0]}, using default key: {redact_key_for_logging(api_key)}")
payload = _build_payload(model, request)
start_time = time.perf_counter()
@@ -402,10 +403,10 @@ class GeminiChatService:
logger.info(f"Request contains file references: {file_names}")
file_api_key = await get_file_api_key(file_names[0])
if file_api_key:
logger.info(f"Found API key for file {file_names[0]}: {file_api_key[:8]}...{file_api_key[-4:]}")
logger.info(f"Found API key for file {file_names[0]}: {redact_key_for_logging(file_api_key)}")
api_key = file_api_key # 使用文件的 API key
else:
logger.warning(f"No API key found for file {file_names[0]}, using default key: {api_key[:8]}...{api_key[-4:]}")
logger.warning(f"No API key found for file {file_names[0]}, using default key: {redact_key_for_logging(api_key)}")
retries = 0
max_retries = settings.MAX_RETRIES
@@ -472,7 +473,7 @@ class GeminiChatService:
api_key = await self.key_manager.handle_api_failure(current_attempt_key, retries)
if api_key:
logger.info(f"Switched to new API key: {api_key}")
logger.info(f"Switched to new API key: {redact_key_for_logging(api_key)}")
else:
logger.error(f"No valid API key available after {retries} retries.")
break

View File

@@ -14,6 +14,7 @@ from app.log.logger import get_gemini_logger
from app.service.client.api_client import GeminiApiClient
from app.service.key.key_manager import KeyManager
from app.database.services import add_error_log, add_request_log
from app.utils.helpers import redact_key_for_logging
logger = get_gemini_logger()
@@ -340,7 +341,7 @@ class GeminiChatService:
api_key = await self.key_manager.handle_api_failure(current_attempt_key, retries)
if api_key:
logger.info(f"Switched to new API key: {api_key}")
logger.info(f"Switched to new API key: {redact_key_for_logging(api_key)}")
else:
logger.error(f"No valid API key available after {retries} retries.")
break

View File

@@ -12,6 +12,7 @@ from app.config.config import settings
from app.database import services as db_services
from app.database.models import FileState
from app.log.logger import get_files_logger
from app.utils.helpers import redact_key_for_logging
logger = get_files_logger()
@@ -144,7 +145,7 @@ class FileUploadHandler:
sha256_hash=file_data.get("sha256Hash"),
user_token=session_info["user_token"]
)
logger.info(f"Created file record: name={real_file_name}, api_key={session_info['api_key'][:8]}...{session_info['api_key'][-4:]}")
logger.info(f"Created file record: name={real_file_name}, api_key={redact_key_for_logging(session_info['api_key'])}")
else:
logger.warning(f"No upload session found for URL: {upload_url}")
else:

View File

@@ -13,6 +13,7 @@ from app.database.models import FileState
from app.domain.file_models import FileMetadata, ListFilesResponse
from fastapi import HTTPException
from app.log.logger import get_files_logger
from app.utils.helpers import redact_key_for_logging
from app.service.client.api_client import GeminiApiClient
from app.service.key.key_manager import get_key_manager_instance
@@ -102,7 +103,7 @@ class FilesService:
# 儲存上傳資訊到 headers 中,供後續使用
# 不在這裡創建數據庫記錄,等到上傳完成後再創建
logger.info(f"Upload initialized with API key: {api_key[:8]}...{api_key[-4:]}")
logger.info(f"Upload initialized with API key: {redact_key_for_logging(api_key)}")
# 解析响应 - 初始化响应可能是空的
response_data = {}
@@ -133,7 +134,7 @@ class FilesService:
"created_at": datetime.now(timezone.utc),
"upload_url": upload_url
}
logger.info(f"Stored upload session for upload_id={upload_id}: api_key={api_key[:8]}...{api_key[-4:]}")
logger.info(f"Stored upload session for upload_id={upload_id}: api_key={redact_key_for_logging(api_key)}")
logger.debug(f"Total active sessions: {len(_upload_sessions)}")
else:
logger.warning(f"No upload_id found in upload URL: {upload_url}")
@@ -202,7 +203,7 @@ class FilesService:
# 先嘗試直接查找
session = _upload_sessions.get(key)
if session:
logger.debug(f"Found session by direct key {key}")
logger.debug(f"Found session by direct key {redact_key_for_logging(key)}")
return session
# 如果是 URL嘗試提取 upload_id
@@ -217,7 +218,7 @@ class FilesService:
logger.debug(f"Found session by upload_id {upload_id} from URL")
return session
logger.debug(f"No session found for key: {key}")
logger.debug(f"No session found for key: {redact_key_for_logging(key)}")
return None
async def get_file(self, file_name: str, user_token: str) -> FileMetadata:

View File

@@ -4,6 +4,7 @@ from typing import Dict, Union
from app.config.config import settings
from app.log.logger import get_key_manager_logger
from app.utils.helpers import redact_key_for_logging
logger = get_key_manager_logger()
@@ -65,7 +66,7 @@ class KeyManager:
async with self.failure_count_lock:
if key in self.key_failure_counts:
self.key_failure_counts[key] = 0
logger.info(f"Reset failure count for key: {key}")
logger.info(f"Reset failure count for key: {redact_key_for_logging(key)}")
return True
logger.warning(
f"Attempt to reset failure count for non-existent key: {key}"
@@ -77,7 +78,7 @@ class KeyManager:
async with self.vertex_failure_count_lock:
if key in self.vertex_key_failure_counts:
self.vertex_key_failure_counts[key] = 0
logger.info(f"Reset failure count for Vertex key: {key}")
logger.info(f"Reset failure count for Vertex key: {redact_key_for_logging(key)}")
return True
logger.warning(
f"Attempt to reset failure count for non-existent Vertex key: {key}"
@@ -116,7 +117,7 @@ class KeyManager:
self.key_failure_counts[api_key] += 1
if self.key_failure_counts[api_key] >= self.MAX_FAILURES:
logger.warning(
f"API key {api_key} has failed {self.MAX_FAILURES} times"
f"API key {redact_key_for_logging(api_key)} has failed {self.MAX_FAILURES} times"
)
if retries < settings.MAX_RETRIES:
return await self.get_next_working_key()
@@ -129,7 +130,7 @@ class KeyManager:
self.vertex_key_failure_counts[api_key] += 1
if self.vertex_key_failure_counts[api_key] >= self.MAX_FAILURES:
logger.warning(
f"Vertex Express API key {api_key} has failed {self.MAX_FAILURES} times"
f"Vertex Express API key {redact_key_for_logging(api_key)} has failed {self.MAX_FAILURES} times"
)
def get_fail_count(self, key: str) -> int:

View File

@@ -13,6 +13,7 @@ from app.database.services import (
from app.domain.openai_models import ChatRequest, ImageGenerationRequest
from app.service.client.api_client import OpenaiApiClient
from app.service.key.key_manager import KeyManager
from app.utils.helpers import redact_key_for_logging
from app.log.logger import get_openai_compatible_logger
logger = get_openai_compatible_logger()
@@ -159,7 +160,7 @@ class OpenAICompatiableService:
current_attempt_key, retries
)
if api_key:
logger.info(f"Switched to new API key: {api_key}")
logger.info(f"Switched to new API key: {redact_key_for_logging(api_key)}")
else:
logger.error(
f"No valid API key available after {retries} retries."

View File

@@ -154,6 +154,22 @@ def is_valid_api_key(key: str) -> bool:
def redact_key_for_logging(key: str) -> str:
"""
Redacts API key for secure logging by showing only first and last 6 characters.
Args:
key: API key to redact
Returns:
str: Redacted key in format "first6...last6" or original if too short
"""
if not key or len(key) <= 12:
return "***"
return f"{key[:6]}...{key[-6:]}"
def get_current_version(default_version: str = "0.0.0") -> str:
"""Reads the current version from the VERSION file."""
version_file = VERSION_FILE_PATH