feat: 实现日志自动删除功能并更新配置管理

本次提交主要包含以下内容:

1.  **日志自动删除功能**:
    *   新增环境变量 (`AUTO_DELETE_ERROR_LOGS_ENABLED`, `AUTO_DELETE_ERROR_LOGS_DAYS`, `AUTO_DELETE_REQUEST_LOGS_ENABLED`, `AUTO_DELETE_REQUEST_LOGS_DAYS`) 用于控制错误日志和请求日志的自动删除策略。
    *   在 `app/config/config.py` 中添加了对这些新配置项的支持和验证逻辑 (Pydantic `validator` 更新为 `field_validator`)。
    *   修改了 `app/log/logger.py` 以适应新的日志配置。
    *   新增 `app/scheduler/scheduled_tasks.py` 用于执行定期的日志清理任务。
    *   新增 `app/service/error_log/error_log_service.py` 和 `app/service/request_log/request_log_service.py` 来处理具体的日志删除逻辑。
    *   更新了 `app/router/error_log_routes.py` 和 `app/router/scheduler_routes.py` 以集成新功能。

2.  **前端配置页面更新**:
    *   在 `app/templates/config_editor.html` 和 `app/static/js/config_editor.js` 中添加了用于配置日志自动删除选项的用户界面元素。

3.  **代码和文件结构调整**:
    *   删除了不再使用的 `app/scheduler/key_checker.py` 文件。
    *   在 `.gitignore` 文件中添加了 `default_db` 以忽略该目录。

4.  **其他**:
    *   对 `app/core/application.py` 进行了相应调整。

该更新旨在增强应用的日志管理能力,提供更灵活的日志保留策略,并优化了配置界面的用户体验。
This commit is contained in:
snaily
2025-05-08 00:31:17 +08:00
parent b86eac839d
commit e1c068ed9e
13 changed files with 3316 additions and 1836 deletions

View File

@@ -49,6 +49,14 @@ STREAM_CHUNK_SIZE=5
######################### 日志配置 #######################################
# 日志级别 (debug, info, warning, error, critical),默认为 info
LOG_LEVEL=info
# 是否开启自动删除错误日志
AUTO_DELETE_ERROR_LOGS_ENABLED=true
# 自动删除多少天前的错误日志 (1, 7, 30)
AUTO_DELETE_ERROR_LOGS_DAYS=7
# 是否开启自动删除请求日志
AUTO_DELETE_REQUEST_LOGS_ENABLED=false
# 自动删除多少天前的请求日志 (1, 7, 30)
AUTO_DELETE_REQUEST_LOGS_DAYS=30
##########################################################################
# 安全设置 (JSON 字符串格式)

3
.gitignore vendored
View File

@@ -257,4 +257,5 @@ $RECYCLE.BIN/
# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option)
tests/
tests/
default_db

View File

@@ -1,15 +1,29 @@
"""
应用程序配置模块
"""
import datetime
import json
from typing import List, Any, Dict, Type
from typing import Any, Dict, List, Type
from pydantic import ValidationError, validator
from pydantic import ValidationError, ValidationInfo, field_validator
from pydantic_settings import BaseSettings
from sqlalchemy import insert, update, select
from sqlalchemy import insert, select, update
from app.core.constants import API_VERSION, DEFAULT_CREATE_IMAGE_MODEL, DEFAULT_FILTER_MODELS, DEFAULT_MODEL, DEFAULT_SAFETY_SETTINGS, DEFAULT_STREAM_CHUNK_SIZE, DEFAULT_STREAM_LONG_TEXT_THRESHOLD, DEFAULT_STREAM_MAX_DELAY, DEFAULT_STREAM_MIN_DELAY, DEFAULT_STREAM_SHORT_TEXT_THRESHOLD, DEFAULT_TIMEOUT, MAX_RETRIES
from app.core.constants import (
API_VERSION,
DEFAULT_CREATE_IMAGE_MODEL,
DEFAULT_FILTER_MODELS,
DEFAULT_MODEL,
DEFAULT_SAFETY_SETTINGS,
DEFAULT_STREAM_CHUNK_SIZE,
DEFAULT_STREAM_LONG_TEXT_THRESHOLD,
DEFAULT_STREAM_MAX_DELAY,
DEFAULT_STREAM_MIN_DELAY,
DEFAULT_STREAM_SHORT_TEXT_THRESHOLD,
DEFAULT_TIMEOUT,
MAX_RETRIES,
)
from app.log.logger import Logger
@@ -23,13 +37,17 @@ class Settings(BaseSettings):
MYSQL_PASSWORD: str = ""
MYSQL_DATABASE: str = ""
MYSQL_SOCKET: str = ""
# 验证 MySQL 配置
@validator('MYSQL_HOST', 'MYSQL_PORT', 'MYSQL_USER', 'MYSQL_PASSWORD', 'MYSQL_DATABASE')
def validate_mysql_config(cls, v, values):
if values.get('DATABASE_TYPE') == 'mysql':
@field_validator(
"MYSQL_HOST", "MYSQL_PORT", "MYSQL_USER", "MYSQL_PASSWORD", "MYSQL_DATABASE"
)
def validate_mysql_config(cls, v: Any, info: ValidationInfo) -> Any:
if info.data.get("DATABASE_TYPE") == "mysql":
if v is None or v == "":
raise ValueError(f"MySQL configuration is required when DATABASE_TYPE is 'mysql'")
raise ValueError(
"MySQL configuration is required when DATABASE_TYPE is 'mysql'"
)
return v
# API相关配置
@@ -41,7 +59,7 @@ class Settings(BaseSettings):
TEST_MODEL: str = DEFAULT_MODEL
TIME_OUT: int = DEFAULT_TIMEOUT
MAX_RETRIES: int = MAX_RETRIES
PROXIES: List[str] = [] # 新增:代理服务器列表
PROXIES: List[str] = [] # 新增:代理服务器列表
# 模型相关配置
SEARCH_MODELS: List[str] = ["gemini-2.0-flash-exp"]
@@ -50,9 +68,9 @@ class Settings(BaseSettings):
TOOLS_CODE_EXECUTION_ENABLED: bool = False
SHOW_SEARCH_LINK: bool = True
SHOW_THINKING_PROCESS: bool = True
THINKING_MODELS: List[str] = [] # 新增:用于思考过程的模型列表
THINKING_BUDGET_MAP: Dict[str, float] = {} # 新增:模型对应的预算映射
THINKING_MODELS: List[str] = [] # 新增:用于思考过程的模型列表
THINKING_BUDGET_MAP: Dict[str, float] = {} # 新增:模型对应的预算映射
# 图像生成相关配置
PAID_KEY: str = ""
CREATE_IMAGE_MODEL: str = DEFAULT_CREATE_IMAGE_MODEL
@@ -61,7 +79,7 @@ class Settings(BaseSettings):
PICGO_API_KEY: str = ""
CLOUDFLARE_IMGBED_URL: str = ""
CLOUDFLARE_IMGBED_AUTH_CODE: str = ""
# 流式输出优化器配置
STREAM_OPTIMIZER_ENABLED: bool = False
STREAM_MIN_DELAY: float = DEFAULT_STREAM_MIN_DELAY
@@ -71,16 +89,20 @@ class Settings(BaseSettings):
STREAM_CHUNK_SIZE: int = DEFAULT_STREAM_CHUNK_SIZE
# 调度器配置
CHECK_INTERVAL_HOURS: int = 1 # 默认检查间隔为1小时
TIMEZONE: str = "Asia/Shanghai" # 默认时区
# github
CHECK_INTERVAL_HOURS: int = 1 # 默认检查间隔为1小时
TIMEZONE: str = "Asia/Shanghai" # 默认时区
# github
GITHUB_REPO_OWNER: str = "snailyp"
GITHUB_REPO_NAME: str = "gemini-balance"
# 日志配置
LOG_LEVEL: str = "INFO" # 默认日志级别
SAFETY_SETTINGS: List[Dict[str, str]] = DEFAULT_SAFETY_SETTINGS # 新增:安全设置
LOG_LEVEL: str = "INFO" # 默认日志级别
AUTO_DELETE_ERROR_LOGS_ENABLED: bool = True # 是否开启自动删除错误日志
AUTO_DELETE_ERROR_LOGS_DAYS: int = 7 # 自动删除多少天前的错误日志 (1, 7, 30)
AUTO_DELETE_REQUEST_LOGS_ENABLED: bool = False # 是否开启自动删除请求日志
AUTO_DELETE_REQUEST_LOGS_DAYS: int = 30 # 自动删除多少天前的请求日志 (1, 7, 30)
SAFETY_SETTINGS: List[Dict[str, str]] = DEFAULT_SAFETY_SETTINGS # 新增:安全设置
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -88,13 +110,16 @@ class Settings(BaseSettings):
if not self.AUTH_TOKEN and self.ALLOWED_TOKENS:
self.AUTH_TOKEN = self.ALLOWED_TOKENS[0]
# 创建全局配置实例
settings = Settings()
def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
"""尝试将数据库字符串值解析为目标 Python 类型"""
from app.log.logger import get_config_logger # 函数内导入
logger = get_config_logger() # 函数内初始化
from app.log.logger import get_config_logger # 函数内导入
logger = get_config_logger() # 函数内初始化
try:
# 处理 List[str]
if target_type == List[str]:
@@ -103,9 +128,11 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
if isinstance(parsed, list):
return [str(item) for item in parsed]
except json.JSONDecodeError:
return [item.strip() for item in db_value.split(',') if item.strip()]
logger.warning(f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list.")
return [item.strip() for item in db_value.split(',') if item.strip()]
return [item.strip() for item in db_value.split(",") if item.strip()]
logger.warning(
f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list."
)
return [item.strip() for item in db_value.split(",") if item.strip()]
# 处理 Dict[str, float]
elif target_type == Dict[str, float]:
parsed_dict = {}
@@ -115,24 +142,34 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
else:
logger.warning(f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}")
logger.warning(
f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e1:
# Second attempt: try replacing single quotes if JSONDecodeError occurred
if isinstance(e1, json.JSONDecodeError) and "'" in db_value:
logger.warning(f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}")
logger.warning(
f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}"
)
try:
corrected_db_value = db_value.replace("'", '"')
parsed = json.loads(corrected_db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
else:
logger.warning(f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}")
logger.warning(
f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e2:
logger.error(f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict.")
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict."
)
else:
# Log other errors (ValueError, TypeError) or JSON errors without single quotes
logger.error(f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict.")
return parsed_dict # Return the parsed dict or an empty one if all attempts fail
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict."
)
return parsed_dict # Return the parsed dict or an empty one if all attempts fail
# 处理 List[Dict[str, str]]
elif target_type == List[Dict[str, str]]:
try:
@@ -140,28 +177,36 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
if isinstance(parsed, list):
# 验证列表中的每个元素是否为字典,并且键和值都是字符串
valid = all(
isinstance(item, dict) and
all(isinstance(k, str) for k in item.keys()) and
all(isinstance(v, str) for v in item.values())
isinstance(item, dict)
and all(isinstance(k, str) for k in item.keys())
and all(isinstance(v, str) for v in item.values())
for item in parsed
)
if valid:
return parsed
else:
logger.warning(f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}")
return [] # 或者返回默认值?这里返回空列表
logger.warning(
f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}"
)
return [] # 或者返回默认值?这里返回空列表
else:
logger.warning(f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}")
return []
logger.warning(
f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}"
)
return []
except json.JSONDecodeError:
logger.error(f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list.")
logger.error(
f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list."
)
return []
except Exception as e:
logger.error(f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list.")
logger.error(
f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list."
)
return []
# 处理 bool
elif target_type == bool:
return db_value.lower() in ('true', '1', 'yes', 'on')
return db_value.lower() in ("true", "1", "yes", "on")
# 处理 int
elif target_type == int:
return int(db_value)
@@ -172,8 +217,11 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
else:
return db_value
except (ValueError, TypeError, json.JSONDecodeError) as e:
logger.warning(f"Failed to parse db_value '{db_value}' for key '{key}' as type {target_type}: {e}. Using original string value.")
return db_value # 解析失败则返回原始字符串
logger.warning(
f"Failed to parse db_value '{db_value}' for key '{key}' as type {target_type}: {e}. Using original string value."
)
return db_value # 解析失败则返回原始字符串
async def sync_initial_settings():
"""
@@ -182,8 +230,9 @@ async def sync_initial_settings():
2. 将数据库设置合并到内存 settings (数据库优先)。
3. 将最终的内存 settings 同步回数据库。
"""
from app.log.logger import get_config_logger # 函数内导入
logger = get_config_logger() # 函数内初始化
from app.log.logger import get_config_logger # 函数内导入
logger = get_config_logger() # 函数内初始化
# 延迟导入以避免循环依赖和确保数据库连接已初始化
from app.database.connection import database
from app.database.models import Settings as SettingsModel
@@ -196,7 +245,9 @@ async def sync_initial_settings():
await database.connect()
logger.info("Database connection established for initial sync.")
except Exception as e:
logger.error(f"Failed to connect to database for initial settings sync: {e}. Skipping sync.")
logger.error(
f"Failed to connect to database for initial settings sync: {e}. Skipping sync."
)
return
try:
@@ -205,13 +256,19 @@ async def sync_initial_settings():
try:
query = select(SettingsModel.key, SettingsModel.value)
results = await database.fetch_all(query)
db_settings_raw = [{"key": row["key"], "value": row["value"]} for row in results]
db_settings_raw = [
{"key": row["key"], "value": row["value"]} for row in results
]
logger.info(f"Fetched {len(db_settings_raw)} settings from database.")
except Exception as e:
logger.error(f"Failed to fetch settings from database: {e}. Proceeding with environment/dotenv settings.")
logger.error(
f"Failed to fetch settings from database: {e}. Proceeding with environment/dotenv settings."
)
# 即使数据库读取失败,也要继续执行,确保基于 env/dotenv 的配置能同步到数据库
db_settings_map: Dict[str, str] = {s['key']: s['value'] for s in db_settings_raw}
db_settings_map: Dict[str, str] = {
s["key"]: s["value"] for s in db_settings_raw
}
# 2. 将数据库设置合并到内存 settings (数据库优先)
updated_in_memory = False
@@ -229,35 +286,52 @@ async def sync_initial_settings():
if parsed_db_value != memory_value:
# 检查类型是否匹配,以防解析函数返回了不兼容的类型
type_match = False
if target_type == List[str] and isinstance(parsed_db_value, list):
if target_type == List[str] and isinstance(
parsed_db_value, list
):
type_match = True
elif target_type == Dict[str, float] and isinstance(parsed_db_value, dict):
elif target_type == Dict[str, float] and isinstance(
parsed_db_value, dict
):
type_match = True
elif target_type not in (List[str], Dict[str, float]) and isinstance(parsed_db_value, target_type):
elif target_type not in (
List[str],
Dict[str, float],
) and isinstance(parsed_db_value, target_type):
type_match = True
if type_match:
setattr(settings, key, parsed_db_value)
logger.debug(f"Updated setting '{key}' in memory from database value ({target_type}).")
logger.debug(
f"Updated setting '{key}' in memory from database value ({target_type})."
)
updated_in_memory = True
else:
logger.warning(f"Parsed DB value type mismatch for key '{key}'. Expected {target_type}, got {type(parsed_db_value)}. Skipping update.")
logger.warning(
f"Parsed DB value type mismatch for key '{key}'. Expected {target_type}, got {type(parsed_db_value)}. Skipping update."
)
except Exception as e:
logger.error(f"Error processing database setting for key '{key}': {e}")
logger.error(
f"Error processing database setting for key '{key}': {e}"
)
else:
logger.warning(f"Database setting '{key}' not found in Settings model definition. Ignoring.")
logger.warning(
f"Database setting '{key}' not found in Settings model definition. Ignoring."
)
# 如果内存中有更新,重新验证 Pydantic 模型(可选但推荐)
if updated_in_memory:
try:
# 重新加载以确保类型转换和验证
settings = Settings(**settings.model_dump())
logger.info("Settings object re-validated after merging database values.")
logger.info(
"Settings object re-validated after merging database values."
)
except ValidationError as e:
logger.error(f"Validation error after merging database settings: {e}. Settings might be inconsistent.")
logger.error(
f"Validation error after merging database settings: {e}. Settings might be inconsistent."
)
# 3. 将最终的内存 settings 同步回数据库
final_memory_settings = settings.model_dump()
@@ -269,20 +343,22 @@ async def sync_initial_settings():
for key, value in final_memory_settings.items():
# 序列化值为字符串或 JSON 字符串
if isinstance(value, (list, dict)): # 处理列表和字典
db_value = json.dumps(value, ensure_ascii=False) # 使用 ensure_ascii=False 以支持非 ASCII 字符
if isinstance(value, (list, dict)): # 处理列表和字典
db_value = json.dumps(
value, ensure_ascii=False
) # 使用 ensure_ascii=False 以支持非 ASCII 字符
elif isinstance(value, bool):
db_value = str(value).lower()
elif value is None: # 处理 None 值
db_value = "" # 或者根据需要设为 NULL 或其他标记
elif value is None: # 处理 None 值
db_value = "" # 或者根据需要设为 NULL 或其他标记
else:
db_value = str(value)
data = {
'key': key,
'value': db_value,
'description': f"{key} configuration setting", # 默认描述
'updated_at': now
"key": key,
"value": db_value,
"description": f"{key} configuration setting", # 默认描述
"updated_at": now,
}
if key in existing_db_keys:
@@ -291,7 +367,7 @@ async def sync_initial_settings():
settings_to_update.append(data)
else:
# 如果键不在数据库中,则插入
data['created_at'] = now
data["created_at"] = now
settings_to_insert.append(data)
# 在事务中执行批量插入和更新
@@ -300,48 +376,78 @@ async def sync_initial_settings():
async with database.transaction():
if settings_to_insert:
# 获取现有描述以避免覆盖
query_existing = select(SettingsModel.key, SettingsModel.description).where(SettingsModel.key.in_([s['key'] for s in settings_to_insert]))
existing_desc = {row['key']: row['description'] for row in await database.fetch_all(query_existing)}
query_existing = select(
SettingsModel.key, SettingsModel.description
).where(
SettingsModel.key.in_(
[s["key"] for s in settings_to_insert]
)
)
existing_desc = {
row["key"]: row["description"]
for row in await database.fetch_all(query_existing)
}
for item in settings_to_insert:
item['description'] = existing_desc.get(item['key'], item['description'])
item["description"] = existing_desc.get(
item["key"], item["description"]
)
query_insert = insert(SettingsModel).values(settings_to_insert)
await database.execute(query=query_insert)
logger.info(f"Synced (inserted) {len(settings_to_insert)} settings to database.")
logger.info(
f"Synced (inserted) {len(settings_to_insert)} settings to database."
)
if settings_to_update:
# 获取现有描述以避免覆盖
query_existing = select(SettingsModel.key, SettingsModel.description).where(SettingsModel.key.in_([s['key'] for s in settings_to_update]))
existing_desc = {row['key']: row['description'] for row in await database.fetch_all(query_existing)}
query_existing = select(
SettingsModel.key, SettingsModel.description
).where(
SettingsModel.key.in_(
[s["key"] for s in settings_to_update]
)
)
existing_desc = {
row["key"]: row["description"]
for row in await database.fetch_all(query_existing)
}
for setting_data in settings_to_update:
setting_data['description'] = existing_desc.get(setting_data['key'], setting_data['description'])
setting_data["description"] = existing_desc.get(
setting_data["key"], setting_data["description"]
)
query_update = (
update(SettingsModel)
.where(SettingsModel.key == setting_data['key'])
.where(SettingsModel.key == setting_data["key"])
.values(
value=setting_data['value'],
description=setting_data['description'],
updated_at=setting_data['updated_at']
value=setting_data["value"],
description=setting_data["description"],
updated_at=setting_data["updated_at"],
)
)
await database.execute(query=query_update)
logger.info(f"Synced (updated) {len(settings_to_update)} settings to database.")
logger.info(
f"Synced (updated) {len(settings_to_update)} settings to database."
)
except Exception as e:
logger.error(f"Failed to sync settings to database during startup: {str(e)}")
logger.error(
f"Failed to sync settings to database during startup: {str(e)}"
)
else:
logger.info("No setting changes detected between memory and database during initial sync.")
logger.info(
"No setting changes detected between memory and database during initial sync."
)
# 刷新日志等级
Logger.update_log_levels(final_memory_settings.get("LOG_LEVEL"))
except Exception as e:
logger.error(f"An unexpected error occurred during initial settings sync: {e}")
finally:
if database.is_connected:
try:
pass
except Exception as e:
logger.error(f"Error disconnecting database after initial sync: {e}")
try:
pass
except Exception as e:
logger.error(f"Error disconnecting database after initial sync: {e}")
logger.info("Initial settings synchronization finished.")

View File

@@ -13,7 +13,7 @@ from app.service.key.key_manager import get_key_manager_instance
from app.database.connection import connect_to_db, disconnect_from_db
from app.utils.helpers import get_current_version # Import from helpers
from app.database.initialization import initialize_database
from app.scheduler.key_checker import start_scheduler, stop_scheduler
from app.scheduler.scheduled_tasks import start_scheduler, stop_scheduler
from app.service.update.update_service import check_for_updates
logger = get_application_logger()

View File

@@ -217,4 +217,9 @@ def get_api_client_logger():
def get_openai_compatible_logger():
return Logger.setup_logger("openai_compatible")
return Logger.setup_logger("openai_compatible")
def get_error_log_logger():
return Logger.setup_logger("error_log")

View File

@@ -1,20 +1,25 @@
"""
日志路由模块
"""
from typing import List, Optional, Dict
from datetime import datetime
from typing import Dict, List, Optional
from fastapi import (
APIRouter,
Body,
HTTPException,
Path,
Query,
Request,
Response,
status,
)
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException, Request, Query, Path, Body, Response, status
from app.core.security import verify_auth_token
from app.log.logger import get_log_routes_logger
from app.database.services import (
get_error_logs,
get_error_logs_count,
get_error_log_details,
delete_error_logs_by_ids,
delete_error_log_by_id
)
from app.service.error_log import error_log_service
router = APIRouter(prefix="/api/logs", tags=["logs"])
@@ -29,22 +34,36 @@ class ErrorLogListItem(BaseModel):
model_name: Optional[str] = None
request_time: Optional[datetime] = None
class ErrorLogListResponse(BaseModel):
logs: List[ErrorLogListItem]
total: int
@router.get("/errors", response_model=ErrorLogListResponse)
async def get_error_logs_api(
request: Request,
limit: int = Query(10, ge=1, le=1000),
offset: int = Query(0, ge=0),
key_search: Optional[str] = Query(None, description="Search term for Gemini key (partial match)"),
error_search: Optional[str] = Query(None, description="Search term for error type or log message"),
error_code_search: Optional[str] = Query(None, description="Search term for error code"),
start_date: Optional[datetime] = Query(None, description="Start datetime for filtering"),
end_date: Optional[datetime] = Query(None, description="End datetime for filtering"),
sort_by: str = Query('id', description="Field to sort by (e.g., 'id', 'request_time')"),
sort_order: str = Query('desc', description="Sort order ('asc' or 'desc')")
key_search: Optional[str] = Query(
None, description="Search term for Gemini key (partial match)"
),
error_search: Optional[str] = Query(
None, description="Search term for error type or log message"
),
error_code_search: Optional[str] = Query(
None, description="Search term for error code"
),
start_date: Optional[datetime] = Query(
None, description="Start datetime for filtering"
),
end_date: Optional[datetime] = Query(
None, description="End datetime for filtering"
),
sort_by: str = Query(
"id", description="Field to sort by (e.g., 'id', 'request_time')"
),
sort_order: str = Query("desc", description="Sort order ('asc' or 'desc')"),
):
"""
获取错误日志列表 (返回错误码),支持过滤和排序
@@ -68,9 +87,9 @@ async def get_error_logs_api(
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to error logs list")
raise HTTPException(status_code=401, detail="Not authenticated")
try:
logs_data = await get_error_logs(
result = await error_log_service.process_get_error_logs(
limit=limit,
offset=offset,
key_search=key_search,
@@ -79,20 +98,18 @@ async def get_error_logs_api(
start_date=start_date,
end_date=end_date,
sort_by=sort_by,
sort_order=sort_order
)
total_count = await get_error_logs_count(
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date
sort_order=sort_order,
)
logs_data = result["logs"]
total_count = result["total"]
validated_logs = [ErrorLogListItem(**log) for log in logs_data]
return ErrorLogListResponse(logs=validated_logs, total=total_count)
except Exception as e:
logger.exception(f"Failed to get error logs list: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get error logs list: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to get error logs list: {str(e)}"
)
class ErrorLogDetailResponse(BaseModel):
@@ -104,6 +121,7 @@ class ErrorLogDetailResponse(BaseModel):
model_name: Optional[str] = None
request_time: Optional[datetime] = None
@router.get("/errors/{log_id}/details", response_model=ErrorLogDetailResponse)
async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=1)):
"""
@@ -111,11 +129,15 @@ async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning(f"Unauthorized access attempt to error log details for ID: {log_id}")
logger.warning(
f"Unauthorized access attempt to error log details for ID: {log_id}"
)
raise HTTPException(status_code=401, detail="Not authenticated")
try:
log_details = await get_error_log_details(log_id=log_id)
log_details = await error_log_service.process_get_error_log_details(
log_id=log_id
)
if not log_details:
raise HTTPException(status_code=404, detail="Error log not found")
@@ -124,13 +146,14 @@ async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=
raise http_exc
except Exception as e:
logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get error log details: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to get error log details: {str(e)}"
)
@router.delete("/errors", status_code=status.HTTP_204_NO_CONTENT)
async def delete_error_logs_bulk_api(
request: Request,
payload: Dict[str, List[int]] = Body(...)
request: Request, payload: Dict[str, List[int]] = Body(...)
):
"""
批量删除错误日志 (异步)
@@ -145,20 +168,23 @@ async def delete_error_logs_bulk_api(
raise HTTPException(status_code=400, detail="No log IDs provided for deletion.")
try:
deleted_count = await delete_error_logs_by_ids(log_ids)
deleted_count = await error_log_service.process_delete_error_logs_by_ids(
log_ids
)
# 注意:异步函数返回的是尝试删除的数量,可能不是精确值
logger.info(f"Attempted bulk deletion for {deleted_count} error logs with IDs: {log_ids}")
logger.info(
f"Attempted bulk deletion for {deleted_count} error logs with IDs: {log_ids}"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
logger.exception(f"Error bulk deleting error logs with IDs {log_ids}: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error during bulk deletion")
raise HTTPException(
status_code=500, detail="Internal server error during bulk deletion"
)
@router.delete("/errors/{log_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_error_log_api(
request: Request,
log_id: int = Path(..., ge=1)
):
async def delete_error_log_api(request: Request, log_id: int = Path(..., ge=1)):
"""
删除单个错误日志 (异步)
"""
@@ -168,14 +194,18 @@ async def delete_error_log_api(
raise HTTPException(status_code=401, detail="Not authenticated")
try:
success = await delete_error_log_by_id(log_id)
success = await error_log_service.process_delete_error_log_by_id(log_id)
if not success:
# 服务层现在在未找到时返回 False我们在这里转换为 404
raise HTTPException(status_code=404, detail=f"Error log with ID {log_id} not found")
raise HTTPException(
status_code=404, detail=f"Error log with ID {log_id} not found"
)
logger.info(f"Successfully deleted error log with ID: {log_id}")
return Response(status_code=status.HTTP_204_NO_CONTENT)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
logger.exception(f"Error deleting error log with ID {log_id}: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error during deletion")
raise HTTPException(
status_code=500, detail="Internal server error during deletion"
)

View File

@@ -6,7 +6,7 @@ from fastapi import APIRouter, Request, HTTPException, status
from fastapi.responses import JSONResponse
from app.core.security import verify_auth_token
from app.scheduler.key_checker import start_scheduler, stop_scheduler
from app.scheduler.scheduled_tasks import start_scheduler, stop_scheduler
from app.log.logger import get_scheduler_routes
logger = get_scheduler_routes()

View File

@@ -1,102 +0,0 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.service.key.key_manager import get_key_manager_instance
from app.service.chat.gemini_chat_service import GeminiChatService
from app.domain.gemini_models import GeminiRequest, GeminiContent
from app.config.config import settings
from app.log.logger import Logger # 导入 Logger 类
logger = Logger.setup_logger("scheduler") # 使用 Logger.setup_logger
async def check_failed_keys():
"""
定时检查失败次数大于0的API密钥并尝试验证它们。
如果验证成功,重置失败计数;如果失败,增加失败计数。
"""
logger.info("Starting scheduled check for failed API keys...")
try:
key_manager = await get_key_manager_instance()
# 确保 KeyManager 已经初始化
if not key_manager or not hasattr(key_manager, 'key_failure_counts'):
logger.warning("KeyManager instance not available or not initialized. Skipping check.")
return
# 创建 GeminiChatService 实例用于验证
# 注意:这里直接创建实例,而不是通过依赖注入,因为这是后台任务
chat_service = GeminiChatService(settings.BASE_URL, key_manager)
# 获取需要检查的 key 列表 (失败次数 > 0)
keys_to_check = []
async with key_manager.failure_count_lock: # 访问共享数据需要加锁
# 复制一份以避免在迭代时修改字典
failure_counts_copy = key_manager.key_failure_counts.copy()
keys_to_check = [key for key, count in failure_counts_copy.items() if count > 0] # 检查所有失败次数大于0的key
if not keys_to_check:
logger.info("No keys with failure count > 0 found. Skipping verification.")
return
logger.info(f"Found {len(keys_to_check)} keys with failure count > 0 to verify.")
for key in keys_to_check:
# 隐藏部分 key 用于日志记录
log_key = f"{key[:4]}...{key[-4:]}" if len(key) > 8 else key
logger.info(f"Verifying key: {log_key}...")
try:
# 构造测试请求
gemini_request = GeminiRequest(
contents=[
GeminiContent(
role="user",
parts=[{"text": "hi"}] # 使用简单的文本进行验证
)
]
)
# 调用 generate_content 进行验证
await chat_service.generate_content(
settings.TEST_MODEL, # 使用配置中定义的测试模型
gemini_request,
key
)
# 如果没有抛出异常,说明 key 有效
logger.info(f"Key {log_key} verification successful. Resetting failure count.")
await key_manager.reset_key_failure_count(key)
except Exception as e:
# 验证失败,增加失败计数
logger.warning(f"Key {log_key} verification failed: {str(e)}. Incrementing failure count.")
# 直接操作计数器,需要加锁
async with key_manager.failure_count_lock:
# 再次检查 key 是否存在且失败次数未达上限
if key in key_manager.key_failure_counts and key_manager.key_failure_counts[key] < key_manager.MAX_FAILURES:
key_manager.key_failure_counts[key] += 1
logger.info(f"Failure count for key {log_key} incremented to {key_manager.key_failure_counts[key]}.")
elif key in key_manager.key_failure_counts:
logger.warning(f"Key {log_key} reached MAX_FAILURES ({key_manager.MAX_FAILURES}). Not incrementing further.")
except Exception as e:
logger.error(f"An error occurred during the scheduled key check: {str(e)}", exc_info=True)
def setup_scheduler():
"""设置并启动 APScheduler"""
scheduler = AsyncIOScheduler(timezone=str(settings.TIMEZONE)) # 从配置读取时区
# 添加定时任务,例如每小时执行一次 (可以调整)
scheduler.add_job(check_failed_keys, 'interval', hours=settings.CHECK_INTERVAL_HOURS)
scheduler.start()
logger.info(f"Scheduler started. Key check job scheduled to run every {settings.CHECK_INTERVAL_HOURS} hour(s).")
return scheduler
# 可以在这里添加一个全局的 scheduler 实例,以便在应用关闭时优雅地停止
scheduler_instance = None
def start_scheduler():
global scheduler_instance
if scheduler_instance is None or not scheduler_instance.running:
logger.info("Starting scheduler...")
scheduler_instance = setup_scheduler()
logger.info("Scheduler is already running.")
def stop_scheduler():
global scheduler_instance
if scheduler_instance and scheduler_instance.running:
scheduler_instance.shutdown()
logger.info("Scheduler stopped.")

View File

@@ -0,0 +1,162 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.config.config import settings
from app.domain.gemini_models import GeminiContent, GeminiRequest
from app.log.logger import Logger
from app.service.chat.gemini_chat_service import GeminiChatService
from app.service.error_log.error_log_service import delete_old_error_logs
from app.service.key.key_manager import get_key_manager_instance
from app.service.request_log.request_log_service import delete_old_request_logs_task
logger = Logger.setup_logger("scheduler")
async def check_failed_keys():
"""
定时检查失败次数大于0的API密钥并尝试验证它们。
如果验证成功,重置失败计数;如果失败,增加失败计数。
"""
logger.info("Starting scheduled check for failed API keys...")
try:
key_manager = await get_key_manager_instance()
# 确保 KeyManager 已经初始化
if not key_manager or not hasattr(key_manager, "key_failure_counts"):
logger.warning(
"KeyManager instance not available or not initialized. Skipping check."
)
return
# 创建 GeminiChatService 实例用于验证
# 注意:这里直接创建实例,而不是通过依赖注入,因为这是后台任务
chat_service = GeminiChatService(settings.BASE_URL, key_manager)
# 获取需要检查的 key 列表 (失败次数 > 0)
keys_to_check = []
async with key_manager.failure_count_lock: # 访问共享数据需要加锁
# 复制一份以避免在迭代时修改字典
failure_counts_copy = key_manager.key_failure_counts.copy()
keys_to_check = [
key for key, count in failure_counts_copy.items() if count > 0
] # 检查所有失败次数大于0的key
if not keys_to_check:
logger.info("No keys with failure count > 0 found. Skipping verification.")
return
logger.info(
f"Found {len(keys_to_check)} keys with failure count > 0 to verify."
)
for key in keys_to_check:
# 隐藏部分 key 用于日志记录
log_key = f"{key[:4]}...{key[-4:]}" if len(key) > 8 else key
logger.info(f"Verifying key: {log_key}...")
try:
# 构造测试请求
gemini_request = GeminiRequest(
contents=[
GeminiContent(
role="user",
parts=[{"text": "hi"}], # 使用简单的文本进行验证
)
]
)
# 调用 generate_content 进行验证
await chat_service.generate_content(
settings.TEST_MODEL, gemini_request, key # 使用配置中定义的测试模型
)
# 如果没有抛出异常,说明 key 有效
logger.info(
f"Key {log_key} verification successful. Resetting failure count."
)
await key_manager.reset_key_failure_count(key)
except Exception as e:
# 验证失败,增加失败计数
logger.warning(
f"Key {log_key} verification failed: {str(e)}. Incrementing failure count."
)
# 直接操作计数器,需要加锁
async with key_manager.failure_count_lock:
# 再次检查 key 是否存在且失败次数未达上限
if (
key in key_manager.key_failure_counts
and key_manager.key_failure_counts[key]
< key_manager.MAX_FAILURES
):
key_manager.key_failure_counts[key] += 1
logger.info(
f"Failure count for key {log_key} incremented to {key_manager.key_failure_counts[key]}."
)
elif key in key_manager.key_failure_counts:
logger.warning(
f"Key {log_key} reached MAX_FAILURES ({key_manager.MAX_FAILURES}). Not incrementing further."
)
except Exception as e:
logger.error(
f"An error occurred during the scheduled key check: {str(e)}", exc_info=True
)
def setup_scheduler():
"""设置并启动 APScheduler"""
scheduler = AsyncIOScheduler(timezone=str(settings.TIMEZONE)) # 从配置读取时区
# 添加检查失败密钥的定时任务
scheduler.add_job(
check_failed_keys,
"interval",
hours=settings.CHECK_INTERVAL_HOURS,
id="check_failed_keys_job",
name="Check Failed API Keys",
)
logger.info(
f"Key check job scheduled to run every {settings.CHECK_INTERVAL_HOURS} hour(s)."
)
# 新增添加自动删除错误日志的定时任务每天凌晨3点执行
scheduler.add_job(
delete_old_error_logs,
"cron",
hour=3,
minute=0,
id="delete_old_error_logs_job",
name="Delete Old Error Logs",
)
logger.info("Auto-delete error logs job scheduled to run daily at 3:00 AM.")
# 新增添加自动删除请求日志的定时任务每天凌晨3点05分执行
scheduler.add_job(
delete_old_request_logs_task,
"cron",
hour=3,
minute=5,
id="delete_old_request_logs_job",
name="Delete Old Request Logs",
)
logger.info(
f"Auto-delete request logs job scheduled to run daily at 3:05 AM, if enabled and AUTO_DELETE_REQUEST_LOGS_DAYS is set to {settings.AUTO_DELETE_REQUEST_LOGS_DAYS} days."
)
scheduler.start()
logger.info("Scheduler started with all jobs.")
return scheduler
# 可以在这里添加一个全局的 scheduler 实例,以便在应用关闭时优雅地停止
scheduler_instance = None
def start_scheduler():
global scheduler_instance
if scheduler_instance is None or not scheduler_instance.running:
logger.info("Starting scheduler...")
scheduler_instance = setup_scheduler()
logger.info("Scheduler is already running.")
def stop_scheduler():
global scheduler_instance
if scheduler_instance and scheduler_instance.running:
scheduler_instance.shutdown()
logger.info("Scheduler stopped.")

View File

@@ -0,0 +1,155 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from sqlalchemy import delete, func, select
from app.config.config import settings
from app.database import services as db_services
from app.database.connection import database
from app.database.models import ErrorLog
from app.log.logger import get_error_log_logger
logger = get_error_log_logger()
async def delete_old_error_logs():
"""
Deletes error logs older than a specified number of days,
based on the AUTO_DELETE_ERROR_LOGS_ENABLED and AUTO_DELETE_ERROR_LOGS_DAYS settings.
"""
if not settings.AUTO_DELETE_ERROR_LOGS_ENABLED:
logger.info("Auto-deletion of error logs is disabled. Skipping.")
return
days_to_keep = settings.AUTO_DELETE_ERROR_LOGS_DAYS
if not isinstance(days_to_keep, int) or days_to_keep <= 0:
logger.error(
f"Invalid AUTO_DELETE_ERROR_LOGS_DAYS value: {days_to_keep}. Must be a positive integer. Skipping deletion."
)
return
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_to_keep)
logger.info(
f"Attempting to delete error logs older than {days_to_keep} days (before {cutoff_date.strftime('%Y-%m-%d %H:%M:%S %Z')})."
)
try:
if not database.is_connected:
await database.connect()
logger.info("Database connection established for deleting error logs.")
# First, count how many logs will be deleted (optional, for logging)
count_query = select(func.count(ErrorLog.id)).where(
ErrorLog.request_time < cutoff_date
)
num_logs_to_delete = await database.fetch_val(count_query)
if num_logs_to_delete == 0:
logger.info(
"No error logs found older than the specified period. No deletion needed."
)
return
logger.info(f"Found {num_logs_to_delete} error logs to delete.")
# Perform the deletion
query = delete(ErrorLog).where(ErrorLog.request_time < cutoff_date)
await database.execute(query)
logger.info(
f"Successfully deleted {num_logs_to_delete} error logs older than {days_to_keep} days."
)
except Exception as e:
logger.error(
f"Error during automatic deletion of error logs: {e}", exc_info=True
)
async def process_get_error_logs(
limit: int,
offset: int,
key_search: Optional[str],
error_search: Optional[str],
error_code_search: Optional[str],
start_date: Optional[datetime],
end_date: Optional[datetime],
sort_by: str,
sort_order: str,
) -> Dict[str, Any]:
"""
处理错误日志的检索,支持分页和过滤。
"""
try:
logs_data = await db_services.get_error_logs(
limit=limit,
offset=offset,
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date,
sort_by=sort_by,
sort_order=sort_order,
)
total_count = await db_services.get_error_logs_count(
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date,
)
return {"logs": logs_data, "total": total_count}
except Exception as e:
logger.error(f"Service error in process_get_error_logs: {e}", exc_info=True)
raise
async def process_get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]:
"""
处理特定错误日志详细信息的检索。
如果未找到,则返回 None。
"""
try:
log_details = await db_services.get_error_log_details(log_id=log_id)
return log_details
except Exception as e:
logger.error(
f"Service error in process_get_error_log_details for ID {log_id}: {e}",
exc_info=True,
)
raise
async def process_delete_error_logs_by_ids(log_ids: List[int]) -> int:
"""
按 ID 批量删除错误日志。
返回尝试删除的日志数量。
"""
if not log_ids:
return 0
try:
deleted_count = await db_services.delete_error_logs_by_ids(log_ids)
return deleted_count
except Exception as e:
logger.error(
f"Service error in process_delete_error_logs_by_ids for IDs {log_ids}: {e}",
exc_info=True,
)
raise
async def process_delete_error_log_by_id(log_id: int) -> bool:
"""
按 ID 删除单个错误日志。
如果删除成功(或找到日志并尝试删除),则返回 True否则返回 False。
"""
try:
success = await db_services.delete_error_log_by_id(log_id)
return success
except Exception as e:
logger.error(
f"Service error in process_delete_error_log_by_id for ID {log_id}: {e}",
exc_info=True,
)
raise

View File

@@ -0,0 +1,50 @@
"""
Service for request log operations.
"""
from datetime import datetime, timedelta
from sqlalchemy import delete
from app import database
from app.config.config import settings
from app.database.models import RequestLog
from app.log.logger import Logger
logger = Logger.setup_logger("request_log_service")
async def delete_old_request_logs_task():
"""
定时删除旧的请求日志。
"""
if not settings.AUTO_DELETE_REQUEST_LOGS_ENABLED:
logger.info(
"Auto-delete for request logs is disabled by settings. Skipping task."
)
return
days_to_keep = settings.AUTO_DELETE_REQUEST_LOGS_DAYS
logger.info(
f"Starting scheduled task to delete old request logs older than {days_to_keep} days."
)
try:
cutoff_date = datetime.now(datetime.timezone.utc) - timedelta(days=days_to_keep)
query = delete(RequestLog).where(RequestLog.request_time < cutoff_date)
if not database.is_connected:
logger.info("Connecting to database for request log deletion.")
await database.connect()
result = await database.execute(query)
logger.info(
f"Request logs older than {cutoff_date} potentially deleted. Rows affected: {result.rowcount if result else 'N/A'}"
)
except Exception as e:
logger.error(
f"An error occurred during the scheduled request log deletion: {str(e)}",
exc_info=True,
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff