Files
gemini-balance/app/database/services.py
snaily 7b4652c802 feat(monitoring): 添加 API 请求统计和监控面板
本次提交引入了 API 请求统计功能,并将原“密钥状态”页面重构为功能更全面的“监控面板”。

主要变更包括:

- **数据库与服务层:**
    - 新增 `RequestLog` 数据模型 (`app/database/models.py`),用于存储 API 请求的详细信息(时间、模型、密钥、成功状态、状态码、耗时)。
    - 在 `app/database/services.py` 中添加 `add_request_log` 和 `get_request_stats` 函数,分别用于记录单次请求和获取时间窗口内的统计数据。
    - 新增 `app/service/stats_service.py`,封装了获取 API 调用统计逻辑。

- **API 请求日志记录:**
    - 在 Gemini (`gemini_chat_service.py`) 和 OpenAI (`openai_chat_service.py`) 聊天服务中,于 API 调用前后添加了 `add_request_log` 调用,以记录请求的成功与否及耗时。

- **前端监控面板:**
    - 将 `/keys` 路由对应的页面 (`keys_status.html`) 从“密钥状态”重构为“监控面板”。
    - 页面顶部新增统计卡片区域,展示:
        - 密钥统计:总数、有效数、无效数。
        - API 调用统计:1分钟内、1小时内、24小时内、本月调用次数。
    - 密钥列表(有效/无效)采用响应式网格布局 (`grid`),并增加了悬停动效和边框高亮。
    - 优化了有效密钥列表的筛选逻辑,在无匹配项时显示提示信息。
    - 为新的统计卡片和列表项添加了相应的 CSS 样式。
    - 更新了 `keys_status.js` 以支持筛选无结果时的提示。

- **路由与导航:**
    - 在 `app/router/routes.py` 中添加了 `/stats` 端点,用于获取 API 统计数据。
    - 更新了 `config_editor.html` 和 `error_logs.html` 中的导航链接,使其指向新的“监控面板”。

- **日志配置:**
    - 在 `app/log/logger.py` 中,为 `sqlalchemy.exc` 设置了 WARNING 日志级别。

这些更改旨在提供更好的系统可观测性,方便用户监控 API 密钥状态和请求频率。
2025-04-11 14:45:03 +08:00

285 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库服务模块
"""
import json
from typing import Dict, List, Optional, Any, Union
from datetime import datetime # Keep this import
from sqlalchemy import select, insert, update, func
from app.database.connection import database
from app.database.models import Settings, ErrorLog, RequestLog # Import RequestLog
from app.log.logger import get_database_logger
logger = get_database_logger()
async def get_all_settings() -> List[Dict[str, Any]]:
"""
获取所有设置
Returns:
List[Dict[str, Any]]: 设置列表
"""
try:
query = select(Settings)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.error(f"Failed to get all settings: {str(e)}")
raise
async def get_setting(key: str) -> Optional[Dict[str, Any]]:
"""
获取指定键的设置
Args:
key: 设置键名
Returns:
Optional[Dict[str, Any]]: 设置信息如果不存在则返回None
"""
try:
query = select(Settings).where(Settings.key == key)
result = await database.fetch_one(query)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get setting {key}: {str(e)}")
raise
async def update_setting(key: str, value: str, description: Optional[str] = None) -> bool:
"""
更新设置
Args:
key: 设置键名
value: 设置值
description: 设置描述
Returns:
bool: 是否更新成功
"""
try:
# 检查设置是否存在
setting = await get_setting(key)
if setting:
# 更新设置
query = (
update(Settings)
.where(Settings.key == key)
.values(
value=value,
description=description if description else setting["description"],
updated_at=datetime.now() # Use datetime.now()
)
)
await database.execute(query)
logger.info(f"Updated setting: {key}")
return True
else:
# 插入设置
query = (
insert(Settings)
.values(
key=key,
value=value,
description=description,
created_at=datetime.now(), # Use datetime.now()
updated_at=datetime.now() # Use datetime.now()
)
)
await database.execute(query)
logger.info(f"Inserted setting: {key}")
return True
except Exception as e:
logger.error(f"Failed to update setting {key}: {str(e)}")
return False
async def add_error_log(
gemini_key: Optional[str] = None,
model_name: Optional[str] = None,
error_type: Optional[str] = None,
error_log: Optional[str] = None,
error_code: Optional[int] = None,
request_msg: Optional[Union[Dict[str, Any], str]] = None
) -> bool:
"""
添加错误日志
Args:
gemini_key: Gemini API密钥
error_log: 错误日志
error_code: 错误代码 (例如 HTTP 状态码)
request_msg: 请求消息
Returns:
bool: 是否添加成功
"""
try:
# 如果request_msg是字典则转换为JSON字符串
if isinstance(request_msg, dict):
request_msg_json = request_msg
elif isinstance(request_msg, str):
try:
request_msg_json = json.loads(request_msg)
except json.JSONDecodeError:
request_msg_json = {"message": request_msg}
else:
request_msg_json = None
# 插入错误日志
query = (
insert(ErrorLog)
.values(
gemini_key=gemini_key,
error_type=error_type,
error_log=error_log,
model_name=model_name,
error_code=error_code,
request_msg=request_msg_json,
request_time=datetime.now()
)
)
await database.execute(query)
logger.info(f"Added error log for key: {gemini_key}")
return True
except Exception as e:
logger.error(f"Failed to add error log: {str(e)}")
return False
async def get_error_logs(
limit: int = 20,
offset: int = 0,
key_search: Optional[str] = None,
error_search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
获取错误日志,支持搜索和日期过滤
Args:
limit (int): 限制数量
offset (int): 偏移量
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
start_date (Optional[datetime]): 开始日期时间
end_date (Optional[datetime]): 结束日期时间
Returns:
List[Dict[str, Any]]: 错误日志列表
"""
try:
query = select(ErrorLog)
# Apply filters
if key_search:
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
if error_search:
query = query.where(
(ErrorLog.error_type.ilike(f"%{error_search}%")) |
(ErrorLog.error_log.ilike(f"%{error_search}%"))
)
if start_date:
query = query.where(ErrorLog.request_time >= start_date)
if end_date:
# Use the datetime object directly for comparison
query = query.where(ErrorLog.request_time < end_date)
# Apply ordering, limit, and offset
query = query.order_by(ErrorLog.request_time.desc()).limit(limit).offset(offset)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.exception(f"Failed to get error logs with filters: {str(e)}") # Use exception for stack trace
raise
async def get_error_logs_count(
key_search: Optional[str] = None,
error_search: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> int:
"""
获取符合条件的错误日志总数
Args:
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
start_date (Optional[datetime]): 开始日期时间
end_date (Optional[datetime]): 结束日期时间
Returns:
int: 日志总数
"""
try:
query = select(func.count()).select_from(ErrorLog)
# Apply the same filters as get_error_logs
if key_search:
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
if error_search:
query = query.where(
(ErrorLog.error_type.ilike(f"%{error_search}%")) |
(ErrorLog.error_log.ilike(f"%{error_search}%"))
)
if start_date:
query = query.where(ErrorLog.request_time >= start_date)
if end_date:
# Use the datetime object directly for comparison
query = query.where(ErrorLog.request_time < end_date)
count_result = await database.fetch_one(query)
return count_result[0] if count_result else 0
except Exception as e:
logger.exception(f"Failed to count error logs with filters: {str(e)}") # Use exception for stack trace
raise
# 新增函数:添加请求日志
async def add_request_log(
model_name: Optional[str],
api_key: Optional[str],
is_success: bool,
status_code: Optional[int] = None,
latency_ms: Optional[int] = None,
request_time: Optional[datetime] = None
) -> bool:
"""
添加 API 请求日志
Args:
model_name: 模型名称
api_key: 使用的 API 密钥
is_success: 请求是否成功
status_code: API 响应状态码
latency_ms: 请求耗时(毫秒)
request_time: 请求发生时间 (如果为 None, 则使用当前时间)
Returns:
bool: 是否添加成功
"""
try:
log_time = request_time if request_time else datetime.now()
query = insert(RequestLog).values(
request_time=log_time,
model_name=model_name,
api_key=api_key,
is_success=is_success,
status_code=status_code,
latency_ms=latency_ms
)
await database.execute(query)
# logger.debug(f"Added request log: key={api_key[:4]}..., success={is_success}, model={model_name}") # Use debug level
return True
except Exception as e:
logger.error(f"Failed to add request log: {str(e)}")
return False