Files
gemini-balance/app/service/stats/stats_service.py
snaily e9601ca76c feat(api,ui): 新增按Key调用详情与错误日志查找并联动前端
引入按密钥维度的请求详情及错误日志关联,新增错误日志精确
查找接口,并扩展统计时间维度,提升故障定位与可观测性。

- 新增 /api/logs/errors/lookup 接口:支持按 gemini_key / timestamp /
  status_code 与时间窗口查找最接近的错误日志;ErrorLogDetailResponse
  增加 error_code 字段
- Stats 接口增强:get_api_call_details 返回 status_code、latency_ms,
  并在失败时尝试匹配 error_log_id;新增 /api/stats/key-details 获取指
  定密钥调用详情;新增 8h 时间段
- DB 层:add_error_log 支持传入 request_datetime(默认使用 UTC);新增
  find_error_log_by_info 封装按 key/时间窗口/状态码的查询
- 前端 keys_status:趋势图支持 8 小时区间;调用详情表新增状态码/耗时与
  失败详情按钮;可按 key 查看期内调用详情并查看匹配错误日志;优化统计
  摘要展示与模态层级(z-index)
- OpenAIChatService:错误记录携带请求时间;改进日志与健壮性处理
2025-08-18 05:19:29 +08:00

366 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/service/stats_service.py
import datetime
from typing import Union
from sqlalchemy import and_, case, func, or_, select
from app.database.connection import database
from app.database.models import RequestLog
from app.log.logger import get_stats_logger
logger = get_stats_logger()
class StatsService:
"""Service class for handling statistics related operations."""
async def get_calls_in_last_seconds(self, seconds: int) -> dict[str, int]:
"""获取过去 N 秒内的调用次数 (总数、成功、失败)"""
try:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=seconds)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= cutoff_time)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in last {seconds} seconds: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_calls_in_last_minutes(self, minutes: int) -> dict[str, int]:
"""获取过去 N 分钟内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(minutes * 60)
async def get_calls_in_last_hours(self, hours: int) -> dict[str, int]:
"""获取过去 N 小时内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(hours * 3600)
async def get_calls_in_current_month(self) -> dict[str, int]:
"""获取当前自然月内的调用次数 (总数、成功、失败)"""
try:
now = datetime.datetime.now()
start_of_month = now.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= start_of_month)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in current month: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_api_usage_stats(self) -> dict:
"""获取所有需要的 API 使用统计数据 (总数、成功、失败)"""
try:
stats_1m = await self.get_calls_in_last_minutes(1)
stats_1h = await self.get_calls_in_last_hours(1)
stats_24h = await self.get_calls_in_last_hours(24)
stats_month = await self.get_calls_in_current_month()
return {
"calls_1m": stats_1m,
"calls_1h": stats_1h,
"calls_24h": stats_24h,
"calls_month": stats_month,
}
except Exception as e:
logger.error(f"Failed to get API usage stats: {e}")
default_stat = {"total": 0, "success": 0, "failure": 0}
return {
"calls_1m": default_stat.copy(),
"calls_1h": default_stat.copy(),
"calls_24h": default_stat.copy(),
"calls_month": default_stat.copy(),
}
async def get_api_call_details(self, period: str) -> list[dict]:
"""
获取指定时间段内的 API 调用详情
Args:
period: 时间段标识 ('1m', '1h', '24h')
Returns:
包含调用详情的字典列表,每个字典包含 timestamp, key, model, status, status_code, latency_ms, error_log_id(可选)
Raises:
ValueError: 如果 period 无效
"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "8h":
start_time = now - datetime.timedelta(hours=8)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code.label("status_code"),
RequestLog.latency_ms.label("latency_ms"),
)
.where(RequestLog.request_time >= start_time)
.order_by(RequestLog.request_time.desc())
)
results = await database.fetch_all(query)
# 为失败调用尝试查找匹配的错误日志ID时间窗口 +/- 5 分钟)
from app.database.models import ErrorLog # 延迟导入避免循环依赖
details: list[dict] = []
for row in results:
status = "failure"
if row["status_code"] is not None:
status = "success" if 200 <= row["status_code"] < 300 else "failure"
record = {
"timestamp": row["timestamp"].isoformat(),
"key": row["key"],
"model": row["model"],
"status": status,
"status_code": row["status_code"],
"latency_ms": row["latency_ms"],
}
# 如果失败尝试附带一个相关的错误日志ID便于前端拉取详情
if status == "failure" and row["key"]:
try:
ts = row["timestamp"]
start_win = ts - datetime.timedelta(minutes=5)
end_win = ts + datetime.timedelta(minutes=5)
err_query = (
select(ErrorLog.id)
.where(
ErrorLog.gemini_key == row["key"],
ErrorLog.request_time >= start_win,
ErrorLog.request_time <= end_win,
)
.order_by(ErrorLog.request_time.desc())
.limit(1)
)
err = await database.fetch_one(err_query)
if err:
record["error_log_id"] = err["id"]
except Exception as _e:
logger.debug(
f"No matching error log found for key ending ...{row['key'][-4:] if row['key'] else ''}: {_e}"
)
details.append(record)
logger.info(
f"Retrieved {len(details)} API call details for period '{period}'"
)
return details
except Exception as e:
logger.error(f"Failed to get API call details for period '{period}': {e}")
raise
async def get_key_call_details(self, key: str, period: str) -> list[dict]:
"""获取指定密钥在指定时间段内的调用详情 (与 get_api_call_details 结构一致)"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "8h":
start_time = now - datetime.timedelta(hours=8)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code.label("status_code"),
RequestLog.latency_ms.label("latency_ms"),
)
.where(RequestLog.request_time >= start_time, RequestLog.api_key == key)
.order_by(RequestLog.request_time.desc())
)
results = await database.fetch_all(query)
from app.database.models import ErrorLog
details: list[dict] = []
for row in results:
status = "failure"
if row["status_code"] is not None:
status = "success" if 200 <= row["status_code"] < 300 else "failure"
record = {
"timestamp": row["timestamp"].isoformat(),
"key": row["key"],
"model": row["model"],
"status": status,
"status_code": row["status_code"],
"latency_ms": row["latency_ms"],
}
if status == "failure" and row["key"]:
try:
ts = row["timestamp"]
start_win = ts - datetime.timedelta(minutes=5)
end_win = ts + datetime.timedelta(minutes=5)
err_query = (
select(ErrorLog.id)
.where(
ErrorLog.gemini_key == row["key"],
ErrorLog.request_time >= start_win,
ErrorLog.request_time < end_win,
)
.order_by(ErrorLog.request_time.desc())
.limit(1)
)
err = await database.fetch_one(err_query)
if err:
record["error_log_id"] = err["id"]
except Exception as _e:
logger.debug(
f"No matching error log found for key ending ...{row['key'][-4:] if row['key'] else ''}: {_e}"
)
details.append(record)
logger.info(
f"Retrieved {len(details)} key call details for key=...{key[-4:] if key else ''} period '{period}'"
)
return details
except Exception as e:
logger.error(
f"Failed to get key call details for key=...{key[-4:] if key else ''} period '{period}': {e}"
)
raise
async def get_key_usage_details_last_24h(self, key: str) -> Union[dict, None]:
"""
获取指定 API 密钥在过去 24 小时内按模型统计的调用次数。
Args:
key: 要查询的 API 密钥。
Returns:
一个字典,其中键是模型名称,值是调用次数。
如果查询出错或没有找到记录,可能返回 None 或空字典。
Example: {"gemini-pro": 10, "gemini-1.5-pro-latest": 5}
"""
logger.info(
f"Fetching usage details for key ending in ...{key[-4:]} for the last 24h."
)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=24)
try:
query = (
select(
RequestLog.model_name, func.count(RequestLog.id).label("call_count")
)
.where(
RequestLog.api_key == key,
RequestLog.request_time >= cutoff_time,
RequestLog.model_name.isnot(None),
)
.group_by(RequestLog.model_name)
.order_by(func.count(RequestLog.id).desc())
)
results = await database.fetch_all(query)
if not results:
logger.info(
f"No usage details found for key ending in ...{key[-4:]} in the last 24h."
)
return {}
usage_details = {row["model_name"]: row["call_count"] for row in results}
logger.info(
f"Successfully fetched usage details for key ending in ...{key[-4:]}: {usage_details}"
)
return usage_details
except Exception as e:
logger.error(
f"Failed to get key usage details for key ending in ...{key[-4:]}: {e}",
exc_info=True,
)
raise