Files
gemini-balance/app/router/error_log_routes.py
snaily 4becc8d4d4 feat: 改进错误日志功能并优化应用初始化流程
本次提交主要包含以下更新:

- **错误日志页面增强**:
    - 重构了 [`app/static/js/error_logs.js`](app/static/js/error_logs.js) 中的分页逻辑,将样式控制移至 CSS,简化了 JavaScript 代码。
    - 更新了 [`app/templates/error_logs.html`](app/templates/error_logs.html) 中的分页样式,使其与 `keys_status.html` 保持一致,提升了视觉统一性。
    - 在错误日志页面新增了“清空全部”按钮,方便用户一键清除所有错误记录。
    - 调整了错误日志表格头部的文本颜色为白色,以改善深色主题下的可读性。

- **应用初始化与配置优化**:
    - 调整了 [`app/config/config.py`](app/config/config.py) 中日志记录器的获取方式,确保在配置加载早期即可用。
    - 在 [`app/core/application.py`](app/core/application.py) 中引入了更明确的数据库连接管理(连接、断开、初始化)逻辑。
    - 优化了 [`app/utils/helpers.py`](app/utils/helpers.py) 中项目路径和版本文件路径的定义方式,使其在模块级别初始化。

- **依赖清理**:
    - 从 [`requirements.txt`](requirements.txt) 中移除了不必要的注释。

这些更改旨在提升错误日志模块的用户体验和功能性,并优化应用程序的启动和配置管理流程。
2025-05-14 14:25:04 +08:00

234 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
日志路由模块
"""
from datetime import datetime
from typing import Dict, List, Optional
from fastapi import (
APIRouter,
Body,
HTTPException,
Path,
Query,
Request,
Response,
status,
)
from pydantic import BaseModel
from app.core.security import verify_auth_token
from app.log.logger import get_log_routes_logger
from app.service.error_log import error_log_service
router = APIRouter(prefix="/api/logs", tags=["logs"])
logger = get_log_routes_logger()
class ErrorLogListItem(BaseModel):
id: int
gemini_key: Optional[str] = None
error_type: Optional[str] = None
error_code: Optional[int] = None
model_name: Optional[str] = None
request_time: Optional[datetime] = None
class ErrorLogListResponse(BaseModel):
logs: List[ErrorLogListItem]
total: int
@router.get("/errors", response_model=ErrorLogListResponse)
async def get_error_logs_api(
request: Request,
limit: int = Query(10, ge=1, le=1000),
offset: int = Query(0, ge=0),
key_search: Optional[str] = Query(
None, description="Search term for Gemini key (partial match)"
),
error_search: Optional[str] = Query(
None, description="Search term for error type or log message"
),
error_code_search: Optional[str] = Query(
None, description="Search term for error code"
),
start_date: Optional[datetime] = Query(
None, description="Start datetime for filtering"
),
end_date: Optional[datetime] = Query(
None, description="End datetime for filtering"
),
sort_by: str = Query(
"id", description="Field to sort by (e.g., 'id', 'request_time')"
),
sort_order: str = Query("desc", description="Sort order ('asc' or 'desc')"),
):
"""
获取错误日志列表 (返回错误码),支持过滤和排序
Args:
request: 请求对象
limit: 限制数量
offset: 偏移量
key_search: 密钥搜索
error_search: 错误搜索 (可能搜索类型或日志内容由DB层决定)
error_code_search: 错误码搜索
start_date: 开始日期
end_date: 结束日期
sort_by: 排序字段
sort_order: 排序顺序
Returns:
ErrorLogListResponse: An object containing the list of logs (with error_code) and the total count.
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to error logs list")
raise HTTPException(status_code=401, detail="Not authenticated")
try:
result = await error_log_service.process_get_error_logs(
limit=limit,
offset=offset,
key_search=key_search,
error_search=error_search,
error_code_search=error_code_search,
start_date=start_date,
end_date=end_date,
sort_by=sort_by,
sort_order=sort_order,
)
logs_data = result["logs"]
total_count = result["total"]
validated_logs = [ErrorLogListItem(**log) for log in logs_data]
return ErrorLogListResponse(logs=validated_logs, total=total_count)
except Exception as e:
logger.exception(f"Failed to get error logs list: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to get error logs list: {str(e)}"
)
class ErrorLogDetailResponse(BaseModel):
id: int
gemini_key: Optional[str] = None
error_type: Optional[str] = None
error_log: Optional[str] = None
request_msg: Optional[str] = None
model_name: Optional[str] = None
request_time: Optional[datetime] = None
@router.get("/errors/{log_id}/details", response_model=ErrorLogDetailResponse)
async def get_error_log_detail_api(request: Request, log_id: int = Path(..., ge=1)):
"""
根据日志 ID 获取错误日志的详细信息 (包括 error_log 和 request_msg)
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning(
f"Unauthorized access attempt to error log details for ID: {log_id}"
)
raise HTTPException(status_code=401, detail="Not authenticated")
try:
log_details = await error_log_service.process_get_error_log_details(
log_id=log_id
)
if not log_details:
raise HTTPException(status_code=404, detail="Error log not found")
return ErrorLogDetailResponse(**log_details)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Failed to get error log details: {str(e)}"
)
@router.delete("/errors", status_code=status.HTTP_204_NO_CONTENT)
async def delete_error_logs_bulk_api(
request: Request, payload: Dict[str, List[int]] = Body(...)
):
"""
批量删除错误日志 (异步)
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to bulk delete error logs")
raise HTTPException(status_code=401, detail="Not authenticated")
log_ids = payload.get("ids")
if not log_ids:
raise HTTPException(status_code=400, detail="No log IDs provided for deletion.")
try:
deleted_count = await error_log_service.process_delete_error_logs_by_ids(
log_ids
)
# 注意:异步函数返回的是尝试删除的数量,可能不是精确值
logger.info(
f"Attempted bulk deletion for {deleted_count} error logs with IDs: {log_ids}"
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
logger.exception(f"Error bulk deleting error logs with IDs {log_ids}: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal server error during bulk deletion"
)
@router.delete("/errors/{log_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_error_log_api(request: Request, log_id: int = Path(..., ge=1)):
"""
删除单个错误日志 (异步)
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning(f"Unauthorized access attempt to delete error log ID: {log_id}")
raise HTTPException(status_code=401, detail="Not authenticated")
try:
success = await error_log_service.process_delete_error_log_by_id(log_id)
if not success:
# 服务层现在在未找到时返回 False我们在这里转换为 404
raise HTTPException(
status_code=404, detail=f"Error log with ID {log_id} not found"
)
logger.info(f"Successfully deleted error log with ID: {log_id}")
return Response(status_code=status.HTTP_204_NO_CONTENT)
except HTTPException as http_exc:
raise http_exc
except Exception as e:
logger.exception(f"Error deleting error log with ID {log_id}: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal server error during deletion"
)
@router.delete("/errors/all", status_code=status.HTTP_204_NO_CONTENT)
async def delete_all_error_logs_api(request: Request):
"""
删除所有错误日志 (异步)
"""
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to delete all error logs")
raise HTTPException(status_code=401, detail="Not authenticated")
try:
deleted_count = await error_log_service.process_delete_all_error_logs()
logger.info(f"Successfully deleted all {deleted_count} error logs.")
# No body needed for 204 response
return Response(status_code=status.HTTP_204_NO_CONTENT)
except Exception as e:
logger.exception(f"Error deleting all error logs: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal server error during deletion of all logs"
)