Files
gemini-balance/app/service/client/api_client.py
snaily 83ce50975a feat: 实现 OpenAI 兼容 API 端点和批量代理删除
新增与 OpenAI 规范兼容的 API 端点:
- `/openai/v1/models`
- `/openai/v1/chat/completions` (支持流式传输、重试和密钥切换)
- `/openai/v1/embeddings`
- `/openai/v1/images/generations`

包含:
- 在 `app/router/openai_compatiable_routes.py` 中新增路由。
- `OpenAICompatiableService` 用于处理请求逻辑、日志记录和错误管理。
- 更新 `OpenaiApiClient` 以支持新方法和代理使用。
- 修改 `app/domain/openai_models.py` 以实现兼容性。
- 为新 API 添加专用日志记录器 (`openai_compatible`)。
- 为新路由 (`/openai`, `/api/version/check`) 添加认证中间件豁免。

增强配置编辑器 UI:
- 在 `app/static/js/config_editor.js` 和 `app/templates/config_editor.html` 中添加批量代理删除功能。
2025-04-30 20:39:47 +08:00

170 lines
7.5 KiB
Python

# app/services/chat/api_client.py
from typing import Dict, Any, AsyncGenerator
import httpx
import random
from abc import ABC, abstractmethod
from app.config.config import settings
from app.log.logger import get_api_client_logger
from app.core.constants import DEFAULT_TIMEOUT
logger = get_api_client_logger()
class ApiClient(ABC):
"""API客户端基类"""
@abstractmethod
async def generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> Dict[str, Any]:
pass
@abstractmethod
async def stream_generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> AsyncGenerator[str, None]:
pass
class GeminiApiClient(ApiClient):
"""Gemini API客户端"""
def __init__(self, base_url: str, timeout: int = DEFAULT_TIMEOUT):
self.base_url = base_url
self.timeout = timeout
def _get_real_model(self, model: str) -> str:
if model.endswith("-search"):
model = model[:-7]
if model.endswith("-image"):
model = model[:-6]
if model.endswith("-non-thinking"):
model = model[:-13]
if "-search" in model and "-non-thinking" in model:
model = model[:-20]
return model
async def generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
model = self._get_real_model(model)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/models/{model}:generateContent?key={api_key}"
response = await client.post(url, json=payload)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def stream_generate_content(self, payload: Dict[str, Any], model: str, api_key: str) -> AsyncGenerator[str, None]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
model = self._get_real_model(model)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/models/{model}:streamGenerateContent?alt=sse&key={api_key}"
async with client.stream(method="POST", url=url, json=payload) as response:
if response.status_code != 200:
error_content = await response.aread()
error_msg = error_content.decode("utf-8")
raise Exception(f"API call failed with status code {response.status_code}, {error_msg}")
async for line in response.aiter_lines():
yield line
class OpenaiApiClient(ApiClient):
"""OpenAI API客户端"""
def __init__(self, base_url: str, timeout: int = DEFAULT_TIMEOUT):
self.base_url = base_url
self.timeout = timeout
async def get_models(self, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
async with httpx.AsyncClient(timeout=timeout) as client:
url = f"{self.base_url}/openai/models"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.get(url, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def generate_content(self, payload: Dict[str, Any], api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/chat/completions"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def stream_generate_content(self, payload: Dict[str, Any], api_key: str) -> AsyncGenerator[str, None]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/chat/completions"
headers = {"Authorization": f"Bearer {api_key}"}
async with client.stream(method="POST", url=url, json=payload, headers=headers) as response:
if response.status_code != 200:
error_content = await response.aread()
error_msg = error_content.decode("utf-8")
raise Exception(f"API call failed with status code {response.status_code}, {error_msg}")
async for line in response.aiter_lines():
yield line
async def create_embeddings(self, input: str, model: str, api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/embeddings"
headers = {"Authorization": f"Bearer {api_key}"}
payload = {
"input": input,
"model": model,
}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()
async def generate_images(self, payload: Dict[str, Any], api_key: str) -> Dict[str, Any]:
timeout = httpx.Timeout(self.timeout, read=self.timeout)
proxy_to_use = None
if settings.PROXIES:
proxy_to_use = random.choice(settings.PROXIES)
logger.info(f"Using proxy: {proxy_to_use}")
async with httpx.AsyncClient(timeout=timeout, proxy=proxy_to_use) as client:
url = f"{self.base_url}/openai/images/generations"
headers = {"Authorization": f"Bearer {api_key}"}
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
error_content = response.text
raise Exception(f"API call failed with status code {response.status_code}, {error_content}")
return response.json()