Files
gemini-balance/app/service/stats/stats_service.py
snaily c65d5244d6 fix(stats): 修复状态码检查条件的比较方式
本次提交主要更改了 `StatsService` 中对 `RequestLog.status_code` 的比较方式,将 `== None` 修改为 `is None`,以符合 Python 的最佳实践。这一修复旨在提高代码的可读性和准确性。
2025-05-08 19:08:03 +08:00

256 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/service/stats_service.py
import datetime
from sqlalchemy import and_, case, func, or_, select
from app.database.connection import database
from app.database.models import RequestLog
from app.log.logger import get_stats_logger
logger = get_stats_logger()
class StatsService:
"""Service class for handling statistics related operations."""
async def get_calls_in_last_seconds(self, seconds: int) -> dict[str, int]:
"""获取过去 N 秒内的调用次数 (总数、成功、失败)"""
try:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=seconds)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1), # type: ignore
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= cutoff_time)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in last {seconds} seconds: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_calls_in_last_minutes(self, minutes: int) -> dict[str, int]:
"""获取过去 N 分钟内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(minutes * 60)
async def get_calls_in_last_hours(self, hours: int) -> dict[str, int]:
"""获取过去 N 小时内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(hours * 3600)
async def get_calls_in_current_month(self) -> dict[str, int]:
"""获取当前自然月内的调用次数 (总数、成功、失败)"""
try:
now = datetime.datetime.now()
start_of_month = now.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1), # type: ignore
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= start_of_month)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in current month: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_api_usage_stats(self) -> dict:
"""获取所有需要的 API 使用统计数据 (总数、成功、失败)"""
try:
stats_1m = await self.get_calls_in_last_minutes(1)
stats_1h = await self.get_calls_in_last_hours(1)
stats_24h = await self.get_calls_in_last_hours(24)
stats_month = await self.get_calls_in_current_month()
return {
"calls_1m": stats_1m,
"calls_1h": stats_1h,
"calls_24h": stats_24h,
"calls_month": stats_month,
}
except Exception as e:
logger.error(f"Failed to get API usage stats: {e}")
default_stat = {"total": 0, "success": 0, "failure": 0}
return {
"calls_1m": default_stat.copy(),
"calls_1h": default_stat.copy(),
"calls_24h": default_stat.copy(),
"calls_month": default_stat.copy(),
}
async def get_api_call_details(self, period: str) -> list[dict]:
"""
获取指定时间段内的 API 调用详情
Args:
period: 时间段标识 ('1m', '1h', '24h')
Returns:
包含调用详情的字典列表,每个字典包含 timestamp, key, model, status
Raises:
ValueError: 如果 period 无效
"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code, # We might need to map this to 'success'/'failure' later
)
.where(RequestLog.request_time >= start_time)
.order_by(RequestLog.request_time.desc())
) # Order by most recent first
results = await database.fetch_all(query)
# Convert results to list of dicts and map status_code
details = []
for row in results:
status = "failure" # 默认状态为 failure如果 status_code 有效且在 200-299 范围内则更新为 success
if row["status_code"] is not None: # 检查 status_code 是否为空
status = "success" if 200 <= row["status_code"] < 300 else "failure"
details.append(
{
"timestamp": row[
"timestamp"
].isoformat(), # Use ISO format for JS compatibility
"key": row["key"],
"model": row["model"],
"status": status,
}
)
logger.info(
f"Retrieved {len(details)} API call details for period '{period}'"
)
return details
except Exception as e:
logger.error(f"Failed to get API call details for period '{period}': {e}")
# Re-raise the exception to be handled by the route
raise
async def get_key_usage_details_last_24h(self, key: str) -> dict | None:
"""
获取指定 API 密钥在过去 24 小时内按模型统计的调用次数。
Args:
key: 要查询的 API 密钥。
Returns:
一个字典,其中键是模型名称,值是调用次数。
如果查询出错或没有找到记录,可能返回 None 或空字典。
Example: {"gemini-pro": 10, "gemini-1.5-pro-latest": 5}
"""
logger.info(
f"Fetching usage details for key ending in ...{key[-4:]} for the last 24h."
)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=24)
try:
query = (
select(
RequestLog.model_name, func.count(RequestLog.id).label("call_count")
)
.where(
RequestLog.api_key == key,
RequestLog.request_time >= cutoff_time,
RequestLog.model_name.isnot(None), # Ensure model_name is not null
)
.group_by(RequestLog.model_name)
.order_by(func.count(RequestLog.id).desc()) # Order by count descending
)
results = await database.fetch_all(query)
if not results:
logger.info(
f"No usage details found for key ending in ...{key[-4:]} in the last 24h."
)
return {} # Return empty dict if no records found
usage_details = {row["model_name"]: row["call_count"] for row in results}
logger.info(
f"Successfully fetched usage details for key ending in ...{key[-4:]}: {usage_details}"
)
return usage_details
except Exception as e:
logger.error(
f"Failed to get key usage details for key ending in ...{key[-4:]}: {e}",
exc_info=True,
)
# Depending on requirements, you might return None or raise the exception
# Raising allows the route handler to return a 500 error.
raise # Re-raise the exception