mirror of
https://github.com/snailyp/gemini-balance.git
synced 2026-05-11 18:09:55 +08:00
将 `delete_all_error_logs` 函数的实现从一次性删除所有记录改为分批删除。这可以防止在处理大量日志时因数据库事务过长而导致的超时或性能问题。 - 每次从数据库中获取一批日志ID,然后根据ID进行删除。 - 在每个批次处理后,使用 `asyncio.sleep(0)` 将控制权交还给事件循环,避免长时间阻塞。 - 批次大小设置为500,以兼容不同数据库(如SQLite)对SQL参数数量的限制。 - 函数现在返回实际删除的日志总数,而不是一个固定的成功指示符。
803 lines
26 KiB
Python
803 lines
26 KiB
Python
"""
|
||
数据库服务模块
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any, Dict, List, Optional, Union
|
||
|
||
from sqlalchemy import asc, delete, desc, func, insert, select, update
|
||
|
||
from app.database.connection import database
|
||
from app.database.models import ErrorLog, FileRecord, FileState, RequestLog, Settings
|
||
from app.log.logger import get_database_logger
|
||
from app.utils.helpers import redact_key_for_logging
|
||
|
||
logger = get_database_logger()
|
||
|
||
|
||
async def get_all_settings() -> List[Dict[str, Any]]:
|
||
"""
|
||
获取所有设置
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 设置列表
|
||
"""
|
||
try:
|
||
query = select(Settings)
|
||
result = await database.fetch_all(query)
|
||
return [dict(row) for row in result]
|
||
except Exception as e:
|
||
logger.error(f"Failed to get all settings: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def get_setting(key: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取指定键的设置
|
||
|
||
Args:
|
||
key: 设置键名
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 设置信息,如果不存在则返回None
|
||
"""
|
||
try:
|
||
query = select(Settings).where(Settings.key == key)
|
||
result = await database.fetch_one(query)
|
||
return dict(result) if result else None
|
||
except Exception as e:
|
||
logger.error(f"Failed to get setting {key}: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def update_setting(
|
||
key: str, value: str, description: Optional[str] = None
|
||
) -> bool:
|
||
"""
|
||
更新设置
|
||
|
||
Args:
|
||
key: 设置键名
|
||
value: 设置值
|
||
description: 设置描述
|
||
|
||
Returns:
|
||
bool: 是否更新成功
|
||
"""
|
||
try:
|
||
# 检查设置是否存在
|
||
setting = await get_setting(key)
|
||
|
||
if setting:
|
||
# 更新设置
|
||
query = (
|
||
update(Settings)
|
||
.where(Settings.key == key)
|
||
.values(
|
||
value=value,
|
||
description=description if description else setting["description"],
|
||
updated_at=datetime.now(),
|
||
)
|
||
)
|
||
await database.execute(query)
|
||
logger.info(f"Updated setting: {key}")
|
||
return True
|
||
else:
|
||
# 插入设置
|
||
query = insert(Settings).values(
|
||
key=key,
|
||
value=value,
|
||
description=description,
|
||
created_at=datetime.now(),
|
||
updated_at=datetime.now(),
|
||
)
|
||
await database.execute(query)
|
||
logger.info(f"Inserted setting: {key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to update setting {key}: {str(e)}")
|
||
return False
|
||
|
||
|
||
async def add_error_log(
|
||
gemini_key: Optional[str] = None,
|
||
model_name: Optional[str] = None,
|
||
error_type: Optional[str] = None,
|
||
error_log: Optional[str] = None,
|
||
error_code: Optional[int] = None,
|
||
request_msg: Optional[Union[Dict[str, Any], str]] = None,
|
||
request_datetime: Optional[datetime] = None,
|
||
) -> bool:
|
||
"""
|
||
添加错误日志
|
||
|
||
Args:
|
||
gemini_key: Gemini API密钥
|
||
error_log: 错误日志
|
||
error_code: 错误代码 (例如 HTTP 状态码)
|
||
request_msg: 请求消息
|
||
|
||
Returns:
|
||
bool: 是否添加成功
|
||
"""
|
||
try:
|
||
# 如果request_msg是字典,则转换为JSON字符串
|
||
if isinstance(request_msg, dict):
|
||
request_msg_json = request_msg
|
||
elif isinstance(request_msg, str):
|
||
try:
|
||
request_msg_json = json.loads(request_msg)
|
||
except json.JSONDecodeError:
|
||
request_msg_json = {"message": request_msg}
|
||
else:
|
||
request_msg_json = None
|
||
|
||
# 插入错误日志
|
||
query = insert(ErrorLog).values(
|
||
gemini_key=gemini_key,
|
||
error_type=error_type,
|
||
error_log=error_log,
|
||
model_name=model_name,
|
||
error_code=error_code,
|
||
request_msg=request_msg_json,
|
||
request_time=(request_datetime if request_datetime else datetime.now()),
|
||
)
|
||
await database.execute(query)
|
||
logger.info(f"Added error log for key: {redact_key_for_logging(gemini_key)}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to add error log: {str(e)}")
|
||
return False
|
||
|
||
|
||
async def get_error_logs(
|
||
limit: int = 20,
|
||
offset: int = 0,
|
||
key_search: Optional[str] = None,
|
||
error_search: Optional[str] = None,
|
||
error_code_search: Optional[str] = None,
|
||
start_date: Optional[datetime] = None,
|
||
end_date: Optional[datetime] = None,
|
||
sort_by: str = "id",
|
||
sort_order: str = "desc",
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取错误日志,支持搜索、日期过滤和排序
|
||
|
||
Args:
|
||
limit (int): 限制数量
|
||
offset (int): 偏移量
|
||
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
|
||
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
|
||
error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
|
||
start_date (Optional[datetime]): 开始日期时间
|
||
end_date (Optional[datetime]): 结束日期时间
|
||
sort_by (str): 排序字段 (例如 'id', 'request_time')
|
||
sort_order (str): 排序顺序 ('asc' or 'desc')
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 错误日志列表
|
||
"""
|
||
try:
|
||
query = select(
|
||
ErrorLog.id,
|
||
ErrorLog.gemini_key,
|
||
ErrorLog.model_name,
|
||
ErrorLog.error_type,
|
||
ErrorLog.error_log,
|
||
ErrorLog.error_code,
|
||
ErrorLog.request_time,
|
||
)
|
||
|
||
if key_search:
|
||
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
|
||
if error_search:
|
||
query = query.where(
|
||
(ErrorLog.error_type.ilike(f"%{error_search}%"))
|
||
| (ErrorLog.error_log.ilike(f"%{error_search}%"))
|
||
)
|
||
if start_date:
|
||
query = query.where(ErrorLog.request_time >= start_date)
|
||
if end_date:
|
||
query = query.where(ErrorLog.request_time < end_date)
|
||
if error_code_search:
|
||
try:
|
||
error_code_int = int(error_code_search)
|
||
query = query.where(ErrorLog.error_code == error_code_int)
|
||
except ValueError:
|
||
logger.warning(
|
||
f"Invalid format for error_code_search: '{error_code_search}'. Expected an integer. Skipping error code filter."
|
||
)
|
||
|
||
sort_column = getattr(ErrorLog, sort_by, ErrorLog.id)
|
||
if sort_order.lower() == "asc":
|
||
query = query.order_by(asc(sort_column))
|
||
else:
|
||
query = query.order_by(desc(sort_column))
|
||
|
||
query = query.limit(limit).offset(offset)
|
||
|
||
result = await database.fetch_all(query)
|
||
return [dict(row) for row in result]
|
||
except Exception as e:
|
||
logger.exception(f"Failed to get error logs with filters: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def get_error_logs_count(
|
||
key_search: Optional[str] = None,
|
||
error_search: Optional[str] = None,
|
||
error_code_search: Optional[str] = None,
|
||
start_date: Optional[datetime] = None,
|
||
end_date: Optional[datetime] = None,
|
||
) -> int:
|
||
"""
|
||
获取符合条件的错误日志总数
|
||
|
||
Args:
|
||
key_search (Optional[str]): Gemini密钥搜索词 (模糊匹配)
|
||
error_search (Optional[str]): 错误类型或日志内容搜索词 (模糊匹配)
|
||
error_code_search (Optional[str]): 错误码搜索词 (精确匹配)
|
||
start_date (Optional[datetime]): 开始日期时间
|
||
end_date (Optional[datetime]): 结束日期时间
|
||
|
||
Returns:
|
||
int: 日志总数
|
||
"""
|
||
try:
|
||
query = select(func.count()).select_from(ErrorLog)
|
||
|
||
if key_search:
|
||
query = query.where(ErrorLog.gemini_key.ilike(f"%{key_search}%"))
|
||
if error_search:
|
||
query = query.where(
|
||
(ErrorLog.error_type.ilike(f"%{error_search}%"))
|
||
| (ErrorLog.error_log.ilike(f"%{error_search}%"))
|
||
)
|
||
if start_date:
|
||
query = query.where(ErrorLog.request_time >= start_date)
|
||
if end_date:
|
||
query = query.where(ErrorLog.request_time < end_date)
|
||
if error_code_search:
|
||
try:
|
||
error_code_int = int(error_code_search)
|
||
query = query.where(ErrorLog.error_code == error_code_int)
|
||
except ValueError:
|
||
logger.warning(
|
||
f"Invalid format for error_code_search in count: '{error_code_search}'. Expected an integer. Skipping error code filter."
|
||
)
|
||
|
||
count_result = await database.fetch_one(query)
|
||
return count_result[0] if count_result else 0
|
||
except Exception as e:
|
||
logger.exception(f"Failed to count error logs with filters: {str(e)}")
|
||
raise
|
||
|
||
|
||
# 新增函数:获取单条错误日志详情
|
||
async def get_error_log_details(log_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据 ID 获取单个错误日志的详细信息
|
||
|
||
Args:
|
||
log_id (int): 错误日志的 ID
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 包含日志详细信息的字典,如果未找到则返回 None
|
||
"""
|
||
try:
|
||
query = select(ErrorLog).where(ErrorLog.id == log_id)
|
||
result = await database.fetch_one(query)
|
||
if result:
|
||
# 将 request_msg (JSONB) 转换为字符串以便在 API 中返回
|
||
log_dict = dict(result)
|
||
if "request_msg" in log_dict and log_dict["request_msg"] is not None:
|
||
# 确保即使是 None 或非 JSON 数据也能处理
|
||
try:
|
||
log_dict["request_msg"] = json.dumps(
|
||
log_dict["request_msg"], ensure_ascii=False, indent=2
|
||
)
|
||
except TypeError:
|
||
log_dict["request_msg"] = str(log_dict["request_msg"])
|
||
return log_dict
|
||
else:
|
||
return None
|
||
except Exception as e:
|
||
logger.exception(f"Failed to get error log details for ID {log_id}: {str(e)}")
|
||
raise
|
||
|
||
|
||
# 新增函数:通过 gemini_key / error_code / 时间窗口 查找最接近的错误日志
|
||
async def find_error_log_by_info(
|
||
gemini_key: str,
|
||
timestamp: datetime,
|
||
status_code: Optional[int] = None,
|
||
window_seconds: int = 1,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
在给定时间窗口内,根据 gemini_key(精确匹配)及可选的 status_code 查找最接近 timestamp 的错误日志。
|
||
|
||
假设错误日志的 error_code 存储的是 HTTP 状态码或等价错误码。
|
||
|
||
Args:
|
||
gemini_key: 完整的 Gemini key 字符串。
|
||
timestamp: 目标时间(UTC 或本地,与存储一致)。
|
||
status_code: 可选的错误码,若提供则优先匹配该错误码。
|
||
window_seconds: 允许的时间偏差窗口,单位秒,默认为 1 秒。
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 最匹配的一条错误日志的完整详情(字段与 get_error_log_details 一致),若未找到则返回 None。
|
||
"""
|
||
try:
|
||
start_time = timestamp - timedelta(seconds=window_seconds)
|
||
end_time = timestamp + timedelta(seconds=window_seconds)
|
||
|
||
base_query = select(ErrorLog).where(
|
||
ErrorLog.gemini_key == gemini_key,
|
||
ErrorLog.request_time >= start_time,
|
||
ErrorLog.request_time <= end_time,
|
||
)
|
||
|
||
# 若提供了状态码,先尝试按状态码过滤
|
||
if status_code is not None:
|
||
query = base_query.where(ErrorLog.error_code == status_code).order_by(
|
||
ErrorLog.request_time.desc()
|
||
)
|
||
candidates = await database.fetch_all(query)
|
||
if not candidates:
|
||
# 回退:不按状态码,仅按时间窗口
|
||
query2 = base_query.order_by(ErrorLog.request_time.desc())
|
||
candidates = await database.fetch_all(query2)
|
||
else:
|
||
query = base_query.order_by(ErrorLog.request_time.desc())
|
||
candidates = await database.fetch_all(query)
|
||
|
||
if not candidates:
|
||
return None
|
||
|
||
# 在 Python 中选择与 timestamp 最接近的一条
|
||
def _to_dict(row: Any) -> Dict[str, Any]:
|
||
d = dict(row)
|
||
if "request_msg" in d and d["request_msg"] is not None:
|
||
try:
|
||
d["request_msg"] = json.dumps(
|
||
d["request_msg"], ensure_ascii=False, indent=2
|
||
)
|
||
except TypeError:
|
||
d["request_msg"] = str(d["request_msg"])
|
||
return d
|
||
|
||
best = min(
|
||
candidates,
|
||
key=lambda r: abs((r["request_time"] - timestamp).total_seconds()),
|
||
)
|
||
return _to_dict(best)
|
||
except Exception as e:
|
||
logger.exception(
|
||
f"Failed to find error log by info (key=***{gemini_key[-4:] if gemini_key else ''}, code={status_code}, ts={timestamp}, window={window_seconds}s): {str(e)}"
|
||
)
|
||
raise
|
||
|
||
|
||
async def delete_error_logs_by_ids(log_ids: List[int]) -> int:
|
||
"""
|
||
根据提供的 ID 列表批量删除错误日志 (异步)。
|
||
|
||
Args:
|
||
log_ids: 要删除的错误日志 ID 列表。
|
||
|
||
Returns:
|
||
int: 实际删除的日志数量。
|
||
"""
|
||
if not log_ids:
|
||
return 0
|
||
try:
|
||
# 使用 databases 执行删除
|
||
query = delete(ErrorLog).where(ErrorLog.id.in_(log_ids))
|
||
# execute 返回受影响的行数,但 databases 库的 execute 不直接返回 rowcount
|
||
# 我们需要先查询是否存在,或者依赖数据库约束/触发器(如果适用)
|
||
# 或者,我们可以执行删除并假设成功,除非抛出异常
|
||
# 为了简单起见,我们执行删除并记录日志,不精确返回删除数量
|
||
# 如果需要精确数量,需要先执行 SELECT COUNT(*)
|
||
await database.execute(query)
|
||
# 注意:databases 的 execute 不返回 rowcount,所以我们不能直接返回删除的数量
|
||
# 返回 log_ids 的长度作为尝试删除的数量,或者返回 0/1 表示操作尝试
|
||
logger.info(f"Attempted bulk deletion for error logs with IDs: {log_ids}")
|
||
return len(log_ids) # 返回尝试删除的数量
|
||
except Exception as e:
|
||
# 数据库连接或执行错误
|
||
logger.error(
|
||
f"Error during bulk deletion of error logs {log_ids}: {e}", exc_info=True
|
||
)
|
||
raise
|
||
|
||
|
||
async def delete_error_log_by_id(log_id: int) -> bool:
|
||
"""
|
||
根据 ID 删除单个错误日志 (异步)。
|
||
|
||
Args:
|
||
log_id: 要删除的错误日志 ID。
|
||
|
||
Returns:
|
||
bool: 如果成功删除返回 True,否则返回 False。
|
||
"""
|
||
try:
|
||
# 先检查是否存在 (可选,但更明确)
|
||
check_query = select(ErrorLog.id).where(ErrorLog.id == log_id)
|
||
exists = await database.fetch_one(check_query)
|
||
|
||
if not exists:
|
||
logger.warning(
|
||
f"Attempted to delete non-existent error log with ID: {log_id}"
|
||
)
|
||
return False
|
||
|
||
# 执行删除
|
||
delete_query = delete(ErrorLog).where(ErrorLog.id == log_id)
|
||
await database.execute(delete_query)
|
||
logger.info(f"Successfully deleted error log with ID: {log_id}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Error deleting error log with ID {log_id}: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
async def delete_all_error_logs() -> int:
|
||
"""
|
||
分批删除所有错误日志,以避免大数据量下的超时和性能问题。
|
||
|
||
Returns:
|
||
int: 被删除的错误日志总数。
|
||
"""
|
||
total_deleted_count = 0
|
||
# SQLite 对 SQL 参数数量有上限(常见为 999),IN 子句中过多参数会报错
|
||
# 统一使用 500,兼容 SQLite/MySQL,必要时可在配置中暴露该值
|
||
batch_size = 500
|
||
|
||
try:
|
||
while True:
|
||
# 1) 读取一批待删除的ID,仅选择ID列以提升效率
|
||
id_query = select(ErrorLog.id).order_by(ErrorLog.id).limit(batch_size)
|
||
rows = await database.fetch_all(id_query)
|
||
if not rows:
|
||
break
|
||
|
||
ids = [row["id"] for row in rows]
|
||
|
||
# 2) 按ID批量删除
|
||
delete_query = delete(ErrorLog).where(ErrorLog.id.in_(ids))
|
||
await database.execute(delete_query)
|
||
|
||
deleted_in_batch = len(ids)
|
||
total_deleted_count += deleted_in_batch
|
||
|
||
logger.debug(f"Deleted a batch of {deleted_in_batch} error logs.")
|
||
|
||
# 若不足一个批次,说明已删除完成
|
||
if deleted_in_batch < batch_size:
|
||
break
|
||
|
||
# 3) 将控制权交还事件循环,缓解长时间占用
|
||
await asyncio.sleep(0)
|
||
|
||
logger.info(
|
||
f"Successfully deleted all error logs in batches. Total deleted: {total_deleted_count}"
|
||
)
|
||
return total_deleted_count
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Failed to delete all error logs in batches: {str(e)}", exc_info=True
|
||
)
|
||
raise
|
||
|
||
|
||
# 新增函数:添加请求日志
|
||
async def add_request_log(
|
||
model_name: Optional[str],
|
||
api_key: Optional[str],
|
||
is_success: bool,
|
||
status_code: Optional[int] = None,
|
||
latency_ms: Optional[int] = None,
|
||
request_time: Optional[datetime] = None,
|
||
) -> bool:
|
||
"""
|
||
添加 API 请求日志
|
||
|
||
Args:
|
||
model_name: 模型名称
|
||
api_key: 使用的 API 密钥
|
||
is_success: 请求是否成功
|
||
status_code: API 响应状态码
|
||
latency_ms: 请求耗时(毫秒)
|
||
request_time: 请求发生时间 (如果为 None, 则使用当前时间)
|
||
|
||
Returns:
|
||
bool: 是否添加成功
|
||
"""
|
||
try:
|
||
log_time = request_time if request_time else datetime.now()
|
||
|
||
query = insert(RequestLog).values(
|
||
request_time=log_time,
|
||
model_name=model_name,
|
||
api_key=api_key,
|
||
is_success=is_success,
|
||
status_code=status_code,
|
||
latency_ms=latency_ms,
|
||
)
|
||
await database.execute(query)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to add request log: {str(e)}")
|
||
return False
|
||
|
||
|
||
# ==================== 文件记录相关函数 ====================
|
||
|
||
|
||
async def create_file_record(
|
||
name: str,
|
||
mime_type: str,
|
||
size_bytes: int,
|
||
api_key: str,
|
||
uri: str,
|
||
create_time: datetime,
|
||
update_time: datetime,
|
||
expiration_time: datetime,
|
||
state: FileState = FileState.PROCESSING,
|
||
display_name: Optional[str] = None,
|
||
sha256_hash: Optional[str] = None,
|
||
upload_url: Optional[str] = None,
|
||
user_token: Optional[str] = None,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
创建文件记录
|
||
|
||
Args:
|
||
name: 文件名称(格式: files/{file_id})
|
||
mime_type: MIME 类型
|
||
size_bytes: 文件大小(字节)
|
||
api_key: 上传时使用的 API Key
|
||
uri: 文件访问 URI
|
||
create_time: 创建时间
|
||
update_time: 更新时间
|
||
expiration_time: 过期时间
|
||
display_name: 显示名称
|
||
sha256_hash: SHA256 哈希值
|
||
upload_url: 临时上传 URL
|
||
user_token: 上传用户的 token
|
||
|
||
Returns:
|
||
Dict[str, Any]: 创建的文件记录
|
||
"""
|
||
try:
|
||
query = insert(FileRecord).values(
|
||
name=name,
|
||
display_name=display_name,
|
||
mime_type=mime_type,
|
||
size_bytes=size_bytes,
|
||
sha256_hash=sha256_hash,
|
||
state=state,
|
||
create_time=create_time,
|
||
update_time=update_time,
|
||
expiration_time=expiration_time,
|
||
uri=uri,
|
||
api_key=api_key,
|
||
upload_url=upload_url,
|
||
user_token=user_token,
|
||
)
|
||
await database.execute(query)
|
||
|
||
# 返回创建的记录
|
||
return await get_file_record_by_name(name)
|
||
except Exception as e:
|
||
logger.error(f"Failed to create file record: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def get_file_record_by_name(name: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
根据文件名获取文件记录
|
||
|
||
Args:
|
||
name: 文件名称(格式: files/{file_id})
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 文件记录,如果不存在则返回 None
|
||
"""
|
||
try:
|
||
query = select(FileRecord).where(FileRecord.name == name)
|
||
result = await database.fetch_one(query)
|
||
return dict(result) if result else None
|
||
except Exception as e:
|
||
logger.error(f"Failed to get file record by name {name}: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def update_file_record_state(
|
||
file_name: str,
|
||
state: FileState,
|
||
update_time: Optional[datetime] = None,
|
||
upload_completed: Optional[datetime] = None,
|
||
sha256_hash: Optional[str] = None,
|
||
) -> bool:
|
||
"""
|
||
更新文件记录状态
|
||
|
||
Args:
|
||
file_name: 文件名
|
||
state: 新状态
|
||
update_time: 更新时间
|
||
upload_completed: 上传完成时间
|
||
sha256_hash: SHA256 哈希值
|
||
|
||
Returns:
|
||
bool: 是否更新成功
|
||
"""
|
||
try:
|
||
values = {"state": state}
|
||
if update_time:
|
||
values["update_time"] = update_time
|
||
if upload_completed:
|
||
values["upload_completed"] = upload_completed
|
||
if sha256_hash:
|
||
values["sha256_hash"] = sha256_hash
|
||
|
||
query = update(FileRecord).where(FileRecord.name == file_name).values(**values)
|
||
result = await database.execute(query)
|
||
|
||
if result:
|
||
logger.info(f"Updated file record state for {file_name} to {state}")
|
||
return True
|
||
|
||
logger.warning(f"File record not found for update: {file_name}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"Failed to update file record state: {str(e)}")
|
||
return False
|
||
|
||
|
||
async def list_file_records(
|
||
user_token: Optional[str] = None,
|
||
api_key: Optional[str] = None,
|
||
page_size: int = 10,
|
||
page_token: Optional[str] = None,
|
||
) -> tuple[List[Dict[str, Any]], Optional[str]]:
|
||
"""
|
||
列出文件记录
|
||
|
||
Args:
|
||
user_token: 用户 token(如果提供,只返回该用户的文件)
|
||
api_key: API Key(如果提供,只返回使用该 key 的文件)
|
||
page_size: 每页大小
|
||
page_token: 分页标记(偏移量)
|
||
|
||
Returns:
|
||
tuple[List[Dict[str, Any]], Optional[str]]: (文件列表, 下一页标记)
|
||
"""
|
||
try:
|
||
logger.debug(
|
||
f"list_file_records called with page_size={page_size}, page_token={page_token}"
|
||
)
|
||
query = select(FileRecord).where(
|
||
FileRecord.expiration_time > datetime.now(timezone.utc)
|
||
)
|
||
|
||
if user_token:
|
||
query = query.where(FileRecord.user_token == user_token)
|
||
if api_key:
|
||
query = query.where(FileRecord.api_key == api_key)
|
||
|
||
# 使用偏移量进行分页
|
||
offset = 0
|
||
if page_token:
|
||
try:
|
||
offset = int(page_token)
|
||
except ValueError:
|
||
logger.warning(f"Invalid page token: {page_token}")
|
||
offset = 0
|
||
|
||
# 按ID升序排列,使用 OFFSET 和 LIMIT
|
||
query = query.order_by(FileRecord.id).offset(offset).limit(page_size + 1)
|
||
|
||
results = await database.fetch_all(query)
|
||
|
||
logger.debug(f"Query returned {len(results)} records")
|
||
if results:
|
||
logger.debug(
|
||
f"First record ID: {results[0]['id']}, Last record ID: {results[-1]['id']}"
|
||
)
|
||
|
||
# 处理分页
|
||
has_next = len(results) > page_size
|
||
if has_next:
|
||
results = results[:page_size]
|
||
# 下一页的偏移量是当前偏移量加上本页返回的记录数
|
||
next_offset = offset + page_size
|
||
next_page_token = str(next_offset)
|
||
logger.debug(
|
||
f"Has next page, offset={offset}, page_size={page_size}, next_page_token={next_page_token}"
|
||
)
|
||
else:
|
||
next_page_token = None
|
||
logger.debug(f"No next page, returning {len(results)} results")
|
||
|
||
return [dict(row) for row in results], next_page_token
|
||
except Exception as e:
|
||
logger.error(f"Failed to list file records: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def delete_file_record(name: str) -> bool:
|
||
"""
|
||
删除文件记录
|
||
|
||
Args:
|
||
name: 文件名称
|
||
|
||
Returns:
|
||
bool: 是否删除成功
|
||
"""
|
||
try:
|
||
query = delete(FileRecord).where(FileRecord.name == name)
|
||
await database.execute(query)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete file record: {str(e)}")
|
||
return False
|
||
|
||
|
||
async def delete_expired_file_records() -> List[Dict[str, Any]]:
|
||
"""
|
||
删除已过期的文件记录
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 删除的记录列表
|
||
"""
|
||
try:
|
||
# 先获取要删除的记录
|
||
query = select(FileRecord).where(
|
||
FileRecord.expiration_time <= datetime.now(timezone.utc)
|
||
)
|
||
expired_records = await database.fetch_all(query)
|
||
|
||
if not expired_records:
|
||
return []
|
||
|
||
# 执行删除
|
||
delete_query = delete(FileRecord).where(
|
||
FileRecord.expiration_time <= datetime.now(timezone.utc)
|
||
)
|
||
await database.execute(delete_query)
|
||
|
||
logger.info(f"Deleted {len(expired_records)} expired file records")
|
||
return [dict(record) for record in expired_records]
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete expired file records: {str(e)}")
|
||
raise
|
||
|
||
|
||
async def get_file_api_key(name: str) -> Optional[str]:
|
||
"""
|
||
获取文件对应的 API Key
|
||
|
||
Args:
|
||
name: 文件名称
|
||
|
||
Returns:
|
||
Optional[str]: API Key,如果文件不存在或已过期则返回 None
|
||
"""
|
||
try:
|
||
query = select(FileRecord.api_key).where(
|
||
(FileRecord.name == name)
|
||
& (FileRecord.expiration_time > datetime.now(timezone.utc))
|
||
)
|
||
result = await database.fetch_one(query)
|
||
return result["api_key"] if result else None
|
||
except Exception as e:
|
||
logger.error(f"Failed to get file API key: {str(e)}")
|
||
raise
|