Files
gemini-balance/app/database/services.py
snaily 169488851f feat: 集成数据库配置管理并添加错误日志查看器
主要变更:

1.  **数据库集成**:
    *   引入 MySQL 数据库支持,使用 SQLAlchemy 和 `databases` 库持久化存储应用程序设置。
    *   添加了 `app/database` 目录,包含数据库连接、模型和初始化逻辑。
    *   更新 `requirements.txt` 添加数据库相关依赖 (`pymysql`, `sqlalchemy`, `aiomysql`, `databases`, `python-dotenv`)。

2.  **配置管理重构**:
    *   重构 `ConfigService` (`app/service/config/config_service.py`),使其从数据库加载和保存设置,并支持从 `.env` 文件同步初始配置到数据库。
    *   修改 `Settings` 模型 (`app/config/config.py`) 以包含数据库连接信息,并添加了从数据库加载/同步配置的逻辑。
    *   配置相关的路由 (`app/router/config_routes.py`) 更新为异步,并调用新的 `ConfigService` 方法。
    *   `KeyManager` (`app/service/key/key_manager.py`) 现在可以在配置更新后重置和重新初始化。

3.  **错误日志查看器**:
    *   新增 `/logs` 页面 (`app/templates/error_logs.html`) 用于展示应用程序错误日志。
    *   添加了相应的路由 (`app/router/log_routes.py`)、静态资源 (`app/static/css/error_logs.css`, `app/static/js/error_logs.js`) 和日志记录器 (`app/log/logger.py`)。
    *   在配置页面和密钥管理页面的导航栏中添加了指向日志页面的链接。

4.  **异步操作**:
    *   将配置服务和相关路由转换为异步 (`async def`) 以支持异步数据库操作。

5.  **其他**:
    *   更新了应用程序初始化逻辑 (`app/core/application.py`, `app/core/initialization.py`) 以包含数据库连接的建立和关闭。
2025-04-09 15:04:29 +08:00

166 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库服务模块
"""
import datetime
import json
from typing import Dict, List, Optional, Any, Union
from sqlalchemy import select, insert, update
from app.database.connection import database
from app.database.models import Settings, ErrorLog
from app.log.logger import get_database_logger
logger = get_database_logger()
async def get_all_settings() -> List[Dict[str, Any]]:
"""
获取所有设置
Returns:
List[Dict[str, Any]]: 设置列表
"""
try:
query = select(Settings)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.error(f"Failed to get all settings: {str(e)}")
raise
async def get_setting(key: str) -> Optional[Dict[str, Any]]:
"""
获取指定键的设置
Args:
key: 设置键名
Returns:
Optional[Dict[str, Any]]: 设置信息如果不存在则返回None
"""
try:
query = select(Settings).where(Settings.key == key)
result = await database.fetch_one(query)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get setting {key}: {str(e)}")
raise
async def update_setting(key: str, value: str, description: Optional[str] = None) -> bool:
"""
更新设置
Args:
key: 设置键名
value: 设置值
description: 设置描述
Returns:
bool: 是否更新成功
"""
try:
# 检查设置是否存在
setting = await get_setting(key)
if setting:
# 更新设置
query = (
update(Settings)
.where(Settings.key == key)
.values(
value=value,
description=description if description else setting["description"],
updated_at=datetime.datetime.now()
)
)
await database.execute(query)
logger.info(f"Updated setting: {key}")
return True
else:
# 插入设置
query = (
insert(Settings)
.values(
key=key,
value=value,
description=description,
created_at=datetime.datetime.now(),
updated_at=datetime.datetime.now()
)
)
await database.execute(query)
logger.info(f"Inserted setting: {key}")
return True
except Exception as e:
logger.error(f"Failed to update setting {key}: {str(e)}")
return False
async def add_error_log(
gemini_key: Optional[str] = None,
error_log: Optional[str] = None,
request_msg: Optional[Union[Dict[str, Any], str]] = None
) -> bool:
"""
添加错误日志
Args:
gemini_key: Gemini API密钥
error_log: 错误日志
request_msg: 请求消息
Returns:
bool: 是否添加成功
"""
try:
# 如果request_msg是字典则转换为JSON字符串
if isinstance(request_msg, dict):
request_msg_json = request_msg
elif isinstance(request_msg, str):
try:
request_msg_json = json.loads(request_msg)
except json.JSONDecodeError:
request_msg_json = {"message": request_msg}
else:
request_msg_json = None
# 插入错误日志
query = (
insert(ErrorLog)
.values(
gemini_key=gemini_key,
error_log=error_log,
request_msg=request_msg_json,
request_time=datetime.datetime.now()
)
)
await database.execute(query)
logger.info(f"Added error log for key: {gemini_key}")
return True
except Exception as e:
logger.error(f"Failed to add error log: {str(e)}")
return False
async def get_error_logs(limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]:
"""
获取错误日志
Args:
limit: 限制数量
offset: 偏移量
Returns:
List[Dict[str, Any]]: 错误日志列表
"""
try:
query = select(ErrorLog).order_by(ErrorLog.request_time.desc()).limit(limit).offset(offset)
result = await database.fetch_all(query)
return [dict(row) for row in result]
except Exception as e:
logger.error(f"Failed to get error logs: {str(e)}")
raise