Files
gemini-balance/app/service/client/api_client.py
snaily 2225a40bbe feat: 增加 Gemini 安全设置支持
- 新增 `SAFETY_SETTINGS` 配置项,允许用户通过环境变量或数据库配置 Gemini 模型的安全过滤级别。
- 更新后端服务 (`config.py`, `constants.py`, `gemini_routes.py`, `openai_routes.py`, `openai_chat_service.py`, `api_client.py`, `model_service.py`) 以支持和传递 `safety_settings` 参数。
- 在配置编辑器前端 (`config_editor.js`, `config_editor.html`) 添加了用于管理安全设置的用户界面。
- 将模型获取逻辑 (`model_service.py`, `api_client.py`) 改为异步。
- 优化 Service Worker (`service-worker.js`) 的缓存策略为 "cache then network"。

Bump version to 2.1.2
2025-05-02 22:49:36 +08:00

195 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app/services/chat/api_client.py
from typing import Dict, Any, AsyncGenerator, Optional
import httpx
import random
from abc import ABC, abstractmethod
from app.config.config import settings
from app.log.logger import get_api_client_logger
from app.core.constants import DEFAULT_TIMEOUT
logger = get_api_client_logger()
class ApiClient(ABC):
"""API客户端基类"""
@abstractmethod
async def generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> Dict[str, Any]:
pass
@abstractmethod
async def stream_generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> AsyncGenerator[str, None]:
pass
class GeminiApiClient(ApiClient):
"""Gemini API客户端"""
def __init__(self, base_url: str, timeout: int = DEFAULT_TIMEOUT):
self.base_url = base_url
self.timeout = timeout
def _get_real_model(self, model: str) -> str:
if model.endswith("-search"):
model = model[:-7]
if model.endswith("-image"):
model = model[:-6]
if model.endswith("-non-thinking"):
model = model[:-13]
if "-search" in model and "-non-thinking" in model:
model = model[:-20]
return model
async def get_models(self, api_key: str) -> Optional[Dict[str, Any]]:
"""获取可用的 Gemini 模型列表"""
timeout = httpx.Timeout(timeout=5)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy for getting models: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/models?key={api_key}"
try:
response = await client.get(url)
response.raise_for_status() # 如果状态码不是 2xx则引发 HTTPStatusError
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"获取模型列表失败: {e.response.status_code}")
logger.error(e.response.text)
# 返回 None 而不是抛出异常,以便上层处理
return None
except httpx.RequestError as e:
logger.error(f"请求模型列表失败: {e}")
# 返回 None 而不是抛出异常
return None
async def generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
model = self._get_real_model(model)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/models/{model}:generateContent?key={api_key}"
response = await client.post(url, json=payload)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def stream_generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> AsyncGenerator[str, None]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
model = self._get_real_model(model)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/models/{model}:streamGenerateContent?alt=sse&key={api_key}"
async with client.stream(method="POST", url=url, json=payload) as response:
if response.status_code != 200:
error_content = await response.aread()
error_msg = error_content.decode("utf-8")
raise Exception(f"API call failed with status code {response.status_code}, {error_msg}")
async for line in response.aiter_lines():
yield line
class OpenaiApiClient(ApiClient):
"""OpenAI API客户端"""
def __init__(self, base_url: str, timeout: int = DEFAULT_TIMEOUT):
self.base_url = base_url
self.timeout = timeout
async def get_models(self, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
async with httpx.AsyncClient(timeout=timeout) as client:
url = f"{self.base_url}/openai/models"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.get(url, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def generate_content(self, payload: Dict[str, Any], api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/chat/completions"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def stream_generate_content(self, payload: Dict[str, Any], api_key: str) -> AsyncGenerator[str, None]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/chat/completions"
headers = {"Authorization": f"Bearer {api_key}"}
async with client.stream(method="POST", url=url, json=payload, headers=headers) as response:
if response.status_code != 200:
error_content = await response.aread()
error_msg = error_content.decode("utf-8")
raise Exception(f"API call failed with status code {response.status_code}, {error_msg}")
async for line in response.aiter_lines():
yield line
async def create_embeddings(self, input: str, model: str, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/embeddings"
headers = {"Authorization": f"Bearer {api_key}"}
payload = {
"input": input,
"model": model,
}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def generate_images(self, payload: Dict[str, Any], api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/images/generations"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()