Files
gemini-balance/app/service/stats/stats_service.py
snaily 1aa3d267bb feat(api,ui): 新增24h错误码最高Key统计与面板
- 新增 GET /api/stats/attention-keys 接口,统计最近24小时指定
  状态码(默认429)错误次数最多的 Key,仅统计内存中的 Key,
  支持 limit 与 status_code 参数
- StatsService 新增 get_attention_keys_last_24h,按 api_key 分组计数并
  降序返回
- UI 新增“值得注意的Key”卡片:支持 429/403/400 快捷切换、自定义状态码
  与数量限制,默认展示 429 前 10
- 列表项支持验证、查看 24h 详情、复制、删除等快捷操作
- 将 Chart.js 与页面脚本改为 defer,保证 DOM 就绪与执行顺序
- 修复:补充获取数量输入框引用,避免初始化未声明变量报错
- 其他:微调日志输出格式
2025-08-18 06:28:48 +08:00

401 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/service/stats_service.py
import datetime
from typing import Union
from sqlalchemy import and_, case, func, or_, select
from app.database.connection import database
from app.database.models import RequestLog
from app.log.logger import get_stats_logger
logger = get_stats_logger()
class StatsService:
"""Service class for handling statistics related operations."""
async def get_calls_in_last_seconds(self, seconds: int) -> dict[str, int]:
"""获取过去 N 秒内的调用次数 (总数、成功、失败)"""
try:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=seconds)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= cutoff_time)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in last {seconds} seconds: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_calls_in_last_minutes(self, minutes: int) -> dict[str, int]:
"""获取过去 N 分钟内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(minutes * 60)
async def get_calls_in_last_hours(self, hours: int) -> dict[str, int]:
"""获取过去 N 小时内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(hours * 3600)
async def get_calls_in_current_month(self) -> dict[str, int]:
"""获取当前自然月内的调用次数 (总数、成功、失败)"""
try:
now = datetime.datetime.now()
start_of_month = now.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= start_of_month)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in current month: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_api_usage_stats(self) -> dict:
"""获取所有需要的 API 使用统计数据 (总数、成功、失败)"""
try:
stats_1m = await self.get_calls_in_last_minutes(1)
stats_1h = await self.get_calls_in_last_hours(1)
stats_24h = await self.get_calls_in_last_hours(24)
stats_month = await self.get_calls_in_current_month()
return {
"calls_1m": stats_1m,
"calls_1h": stats_1h,
"calls_24h": stats_24h,
"calls_month": stats_month,
}
except Exception as e:
logger.error(f"Failed to get API usage stats: {e}")
default_stat = {"total": 0, "success": 0, "failure": 0}
return {
"calls_1m": default_stat.copy(),
"calls_1h": default_stat.copy(),
"calls_24h": default_stat.copy(),
"calls_month": default_stat.copy(),
}
async def get_api_call_details(self, period: str) -> list[dict]:
"""
获取指定时间段内的 API 调用详情
Args:
period: 时间段标识 ('1m', '1h', '24h')
Returns:
包含调用详情的字典列表,每个字典包含 timestamp, key, model, status, status_code, latency_ms, error_log_id(可选)
Raises:
ValueError: 如果 period 无效
"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "8h":
start_time = now - datetime.timedelta(hours=8)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code.label("status_code"),
RequestLog.latency_ms.label("latency_ms"),
)
.where(RequestLog.request_time >= start_time)
.order_by(RequestLog.request_time.desc())
)
results = await database.fetch_all(query)
# 为失败调用尝试查找匹配的错误日志ID时间窗口 +/- 5 分钟)
from app.database.models import ErrorLog # 延迟导入避免循环依赖
details: list[dict] = []
for row in results:
status = "failure"
if row["status_code"] is not None:
status = "success" if 200 <= row["status_code"] < 300 else "failure"
record = {
"timestamp": row["timestamp"].isoformat(),
"key": row["key"],
"model": row["model"],
"status": status,
"status_code": row["status_code"],
"latency_ms": row["latency_ms"],
}
# 如果失败尝试附带一个相关的错误日志ID便于前端拉取详情
if status == "failure" and row["key"]:
try:
ts = row["timestamp"]
start_win = ts - datetime.timedelta(minutes=5)
end_win = ts + datetime.timedelta(minutes=5)
err_query = (
select(ErrorLog.id)
.where(
ErrorLog.gemini_key == row["key"],
ErrorLog.request_time >= start_win,
ErrorLog.request_time <= end_win,
)
.order_by(ErrorLog.request_time.desc())
.limit(1)
)
err = await database.fetch_one(err_query)
if err:
record["error_log_id"] = err["id"]
except Exception as _e:
logger.debug(
f"No matching error log found for key ending ...{row['key'][-4:] if row['key'] else ''}: {_e}"
)
details.append(record)
logger.info(
f"Retrieved {len(details)} API call details for period '{period}'"
)
return details
except Exception as e:
logger.error(f"Failed to get API call details for period '{period}': {e}")
raise
async def get_key_call_details(self, key: str, period: str) -> list[dict]:
"""获取指定密钥在指定时间段内的调用详情 (与 get_api_call_details 结构一致)"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "8h":
start_time = now - datetime.timedelta(hours=8)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code.label("status_code"),
RequestLog.latency_ms.label("latency_ms"),
)
.where(RequestLog.request_time >= start_time, RequestLog.api_key == key)
.order_by(RequestLog.request_time.desc())
)
results = await database.fetch_all(query)
from app.database.models import ErrorLog
details: list[dict] = []
for row in results:
status = "failure"
if row["status_code"] is not None:
status = "success" if 200 <= row["status_code"] < 300 else "failure"
record = {
"timestamp": row["timestamp"].isoformat(),
"key": row["key"],
"model": row["model"],
"status": status,
"status_code": row["status_code"],
"latency_ms": row["latency_ms"],
}
if status == "failure" and row["key"]:
try:
ts = row["timestamp"]
start_win = ts - datetime.timedelta(minutes=5)
end_win = ts + datetime.timedelta(minutes=5)
err_query = (
select(ErrorLog.id)
.where(
ErrorLog.gemini_key == row["key"],
ErrorLog.request_time >= start_win,
ErrorLog.request_time < end_win,
)
.order_by(ErrorLog.request_time.desc())
.limit(1)
)
err = await database.fetch_one(err_query)
if err:
record["error_log_id"] = err["id"]
except Exception as _e:
logger.debug(
f"No matching error log found for key ending ...{row['key'][-4:] if row['key'] else ''}: {_e}"
)
details.append(record)
logger.info(
f"Retrieved {len(details)} key call details for key=...{key[-4:] if key else ''} period '{period}'"
)
return details
except Exception as e:
logger.error(
f"Failed to get key call details for key=...{key[-4:] if key else ''} period '{period}': {e}"
)
raise
async def get_attention_keys_last_24h(self, include_keys: set[str], limit: int = 20, status_code: int = 429) -> list[dict]:
"""返回最近24小时内指定状态码(默认429)最多的Key列表仅包含include_keys中的Key。
Returns: [{"key": str, "count": int, "status_code": int}, ...] 按次数降序
"""
try:
now = datetime.datetime.now()
start_time = now - datetime.timedelta(hours=24)
if not include_keys:
return []
query = (
select(
RequestLog.api_key.label("key"),
func.count(RequestLog.id).label("count"),
)
.where(
RequestLog.request_time >= start_time,
RequestLog.status_code == status_code,
RequestLog.api_key.isnot(None),
RequestLog.api_key.in_(list(include_keys)),
)
.group_by(RequestLog.api_key)
.order_by(func.count(RequestLog.id).desc())
.limit(limit)
)
rows = await database.fetch_all(query)
return [
{"key": row["key"], "count": row["count"], "status_code": status_code}
for row in rows
if row["key"]
]
except Exception as e:
logger.error(f"Failed to get attention keys ({status_code}) in last 24h: {e}")
return []
async def get_key_usage_details_last_24h(self, key: str) -> Union[dict, None]:
"""
获取指定 API 密钥在过去 24 小时内按模型统计的调用次数。
Args:
key: 要查询的 API 密钥。
Returns:
一个字典,其中键是模型名称,值是调用次数。
如果查询出错或没有找到记录,可能返回 None 或空字典。
Example: {"gemini-pro": 10, "gemini-1.5-pro-latest": 5}
"""
logger.info(
f"Fetching usage details for key ending in ...{key[-4:]} for the last 24h."
)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=24)
try:
query = (
select(
RequestLog.model_name, func.count(RequestLog.id).label("call_count")
)
.where(
RequestLog.api_key == key,
RequestLog.request_time >= cutoff_time,
RequestLog.model_name.isnot(None),
)
.group_by(RequestLog.model_name)
.order_by(func.count(RequestLog.id).desc())
)
results = await database.fetch_all(query)
if not results:
logger.info(
f"No usage details found for key ending in ...{key[-4:]} in the last 24h."
)
return {}
usage_details = {row["model_name"]: row["call_count"] for row in results}
logger.info(
f"Successfully fetched usage details for key ending in ...{key[-4:]}: {usage_details}"
)
return usage_details
except Exception as e:
logger.error(
f"Failed to get key usage details for key ending in ...{key[-4:]}: {e}",
exc_info=True,
)
raise