Compare commits

..

10 Commits

13 changed files with 719 additions and 86 deletions

204
README.md
View File

@@ -4,11 +4,11 @@
## 📝 项目简介
本项目是一个基于 FastAPI 框架开发的高性能、易于部署的 OpenAI 和 Gemini API 代理服务。它不仅兼容 OpenAI 的 API 接口,还支持 Google 的 Gemini 模型,为用户提供灵活的模型选择。该代理服务内置了多 API Key 轮询、负载均衡、自动重试、访问控制Bearer Token 认证)、流式响应等功能,旨在简化 AI 应用的开发和部署流程。
本项目是一个基于 FastAPI 框架开发的高性能、易于部署的Gemini OpenAI兼容 和 Gemini API 代理服务。它不仅兼容 OpenAI 的 API 接口,还支持 Google 的 Gemini 原生接口。该代理服务内置了多 API Key 轮询、负载均衡、自动重试、访问控制Bearer Token 认证)、流式响应等功能,旨在简化 AI 应用的开发和部署流程。
**核心功能与优势:**
- **多模型支持**: 无缝切换 OpenAI 和 Gemini 模型
- **多协议支持**: 无缝切换 OpenAI兼容 和 Gemini 协议
- **智能 API Key 管理**: 自动轮询多个 API Key实现负载均衡和故障转移。
- **安全访问控制**: 使用 Bearer Token 进行身份验证,保护 API 访问。
- **流式响应支持**: 提供实时的流式数据传输,提升用户体验。
@@ -51,29 +51,90 @@
3. **配置**:
创建 `.env` 文件,并配置以下环境变量:
创建 `.env` 文件,并按以下分类配置环境变量:
```env
API_KEYS=["your-gemini-api-key-1", "your-gemini-api-key-2"] # 你的 Gemini API 密钥列表
ALLOWED_TOKENS=["your-access-token-1", "your-access-token-2"] # 允许访问的 Token 列表
BASE_URL="https://generativelanguage.googleapis.com/v1beta" # Gemini API 基础 URL, 保持默认即可
MODEL_SEARCH=["gemini-2.0-flash-exp"] # 启用搜索功能的模型列表
TOOLS_CODE_EXECUTION_ENABLED=false # 是否启用代码执行工具, 默认为 false
SHOW_SEARCH_LINK=true # 是否显示搜索链接
SHOW_THINKING_PROCESS=true # 是否显示思考过程
AUTH_TOKEN="" # 备用token, 如果不设置, 默认为 ALLOWED_TOKENS 的第一个
MAX_FAILURES=3 # 允许单个key失败的次数
# 基础配置
BASE_URL="https://generativelanguage.googleapis.com/v1beta" # Gemini API 基础 URL默认无需修改
MAX_FAILURES=3 # 允许单个key失败的次数默认3次
# 认证与安全配置
API_KEYS=["your-gemini-api-key-1", "your-gemini-api-key-2"] # Gemini API 密钥列表,用于负载均衡
ALLOWED_TOKENS=["your-access-token-1", "your-access-token-2"] # 允许访问的 Token 列表
AUTH_TOKEN="" # 超级管理员token具有所有权限默认使用 ALLOWED_TOKENS 的第一个
# 模型功能配置
MODEL_SEARCH=["gemini-2.0-flash-exp"] # 支持搜索功能的模型列表
TOOLS_CODE_EXECUTION_ENABLED=false # 是否启用代码执行工具默认false
SHOW_SEARCH_LINK=true # 是否在响应中显示搜索结果链接默认true
SHOW_THINKING_PROCESS=true # 是否显示模型思考过程默认true
# 图片生成配置
PAID_KEY="your-paid-api-key" # 付费版API Key用于图片生成等高级功能
CREATE_IMAGE_MODEL="imagen-3.0-generate-002" # 图片生成模型默认使用imagen-3.0
# 图片上传配置
UPLOAD_PROVIDER="smms" # 图片上传提供商目前支持smms
SMMS_SECRET_TOKEN="your-smms-token" # SM.MS图床的API Token
```
- `API_KEYS`: 你的 Gemini API 密钥列表,支持多个 Key 轮询。
- `ALLOWED_TOKENS`: 允许访问的 Token 列表,用于 API 认证。
- `BASE_URL`: Gemini API 的基础 URL通常不需要修改。
- `MODEL_SEARCH`: 启用搜索功能的模型列表。
- `TOOLS_CODE_EXECUTION_ENABLED`: 是否启用代码执行工具, 默认为 `false`。
- `SHOW_SEARCH_LINK`: 是否显示搜索结果链接(当使用搜索模型时)。
- `SHOW_THINKING_PROCESS`: 是否显示模型的"思考"过程(对于某些模型)。
- `AUTH_TOKEN`: 主鉴权token(权限较大,注意保管), 如果不设置, 默认为 `ALLOWED_TOKENS` 的第一个。
- `MAX_FAILURES`: 允许单个 API Key 失败的次数,超过此次数后该 Key 将被标记为无效。
### 配置说明
#### 基础配置
- `BASE_URL`: Gemini API 的基础 URL
- 默认值: `https://generativelanguage.googleapis.com/v1beta`
- 说明: 通常无需修改,除非 API 地址发生变化
- `MAX_FAILURES`: API Key 允许的最大失败次数
- 默认值: `3`
- 说明: 超过此次数后Key 将被暂时标记为无效
#### 认证与安全配置
- `API_KEYS`: Gemini API 密钥列表
- 格式: JSON 数组字符串
- 用途: 支持多个 Key 轮询,实现负载均衡
- 建议: 至少配置 2 个 Key 以保证服务可用性
- `ALLOWED_TOKENS`: 访问令牌列表
- 格式: JSON 数组字符串
- 用途: 用于客户端认证
- 安全提示: 请使用足够复杂的令牌
- `AUTH_TOKEN`: 超级管理员令牌
- 可选配置,留空则使用 ALLOWED_TOKENS 的第一个
- 具有查看 API Key 状态等特权操作权限
#### 模型功能配置
- `MODEL_SEARCH`: 搜索功能支持的模型
- 默认值: `["gemini-2.0-flash-exp"]`
- 说明: 仅列表中的模型可使用搜索功能
- `TOOLS_CODE_EXECUTION_ENABLED`: 代码执行功能
- 默认值: `false`
- 安全提示: 生产环境建议禁用
- `SHOW_SEARCH_LINK`: 搜索结果链接显示
- 默认值: `true`
- 用途: 控制搜索结果中是否包含原始链接
- `SHOW_THINKING_PROCESS`: 思考过程显示
- 默认值: `true`
- 用途: 显示模型的推理过程,便于调试
#### 图片生成配置
- `PAID_KEY`: 付费版 API Key
- 用途: 用于图片生成等高级功能
- 说明: 需要单独申请的付费版 Key
- `CREATE_IMAGE_MODEL`: 图片生成模型
- 默认值: `imagen-3.0-generate-002`
- 说明: 当前支持的最新图片生成模型
#### 图片上传配置
- `UPLOAD_PROVIDER`: 图片上传服务提供商
- 默认值: `smms`
- 说明: 目前支持 SM.MS 图床
- `SMMS_SECRET_TOKEN`: SM.MS API Token
- 用途: 用于图片上传到 SM.MS 图床
- 获取方式: 需要在 SM.MS 官网注册并获取
### ▶️ 运行
@@ -109,13 +170,30 @@ uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
所有 API 请求都需要在 Header 中添加 `Authorization` 字段,值为 `Bearer <your-token>`,其中 `<your-token>` 需要替换为你在 `.env` 文件中配置的 `ALLOWED_TOKENS` 中的一个或者 `AUTH_TOKEN`。
### 获取模型列表
### API 路由
本服务提供两种API路由
1. **OpenAI 兼容路由** (推荐)
- 基础路径: `/v1`
- 完全兼容OpenAI API格式
- 支持所有Gemini模型
2. **Gemini 原生路由**
- 基础路径: `/gemini/v1beta` 或 `/v1beta`
- 遵循Google原生API格式
- 适用于需要直接使用Gemini API的场景
### OpenAI兼容路由
#### 获取模型列表
- **URL**: `/v1/models`
- **Method**: `GET`
- **Header**: `Authorization: Bearer <your-token>`
- **Response**: 返回支持的所有模型列表,包括最新的`gemini-2.0-flash-exp-search`等模型
### 聊天补全 (Chat Completions)
#### 聊天补全 (Chat Completions)
- **URL**: `/v1/chat/completions`
- **Method**: `POST`
@@ -141,11 +219,34 @@ uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
}
```
- `messages`: 消息列表,格式与 OpenAI API 相同
- `model`: 模型名称,例如 `gemini-1.5-flash-002`。
- `stream`: 是否开启流式响应,`true` 或 `false`。
- `tools`: 使用的工具列表。
- 其他参数:与 OpenAI API 兼容的参数,如 `temperature`, `max_tokens` 等。
- `messages`: 消息列表,格式与 OpenAI API 相同
- `model`: 模型名称,支持所有Gemini模型包括:
- `gemini-1.5-flash-002`: 快速响应模型
- `gemini-2.0-flash-exp`: 实验性快速响应模型
- `gemini-2.0-flash-exp-search`: 支持搜索功能的实验性模型
- `stream`: 是否开启流式响应,`true` 或 `false`
- `tools`: 使用的工具列表
- 其他参数:与 OpenAI API 兼容的参数,如 `temperature`, `max_tokens` 等
### Gemini原生路由
#### 获取模型列表
- **URL**: `/gemini/v1beta/models` 或 `/v1beta/models`
- **Method**: `GET`
- **Header**: `Authorization: Bearer <your-token>`
#### 生成内容
- **URL**: `/gemini/v1beta/models/{model_name}:generateContent`
- **Method**: `POST`
- **Header**: `Authorization: Bearer <your-token>`
#### 流式生成内容
- **URL**: `/gemini/v1beta/models/{model_name}:streamGenerateContent`
- **Method**: `POST`
- **Header**: `Authorization: Bearer <your-token>`
### 获取词向量 (Embeddings)
@@ -169,12 +270,30 @@ uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
- **URL**: `/health`
- **Method**: `GET`
### 获取 API Key 列表
### Web界面功能
#### 验证页面
- **URL**: `/auth`
- **说明**: 提供了一个简洁的Web界面用于验证访问令牌
- **功能**:
- 美观的用户界面,支持响应式设计
- 安全的令牌验证机制
- 错误提示功能
- 支持移动端访问
#### API密钥状态管理
- **URL**: `/v1/keys/list`
- **Method**: `GET`
- **Header**: `Authorization: Bearer <your-auth-token>`
- **说明**: 只有使用 `AUTH_TOKEN` 才能访问此接口, 用于获取有效和无效的 API Key 列表。
- **说明**:
- 只有使用 `AUTH_TOKEN` 才能访问此接口
- 提供了可视化的Web界面展示API密钥状态
- 支持查看有效和无效的API密钥列表
- 显示每个密钥的失败次数统计
- 提供一键复制功能(支持复制单个密钥或批量复制)
- 实时显示密钥总数统计
### 图片生成 (Image Generation)
@@ -186,12 +305,34 @@ uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
```json
{
"model": "dall-e-3",
"prompt": "汉服美女",
"prompt": "{n:2} {ratio:16:9} 汉服美女",
"n": 1,
"size": "1024x1024"
}
```
**Prompt参数说明:**
prompt支持通过特殊标记来控制生成参数
1. 图片数量控制:
- 格式: `{n:数量}`
- 示例: `{n:2} 一只可爱的猫` - 生成2张图片
- 取值范围: 1-4
- 说明: 如果在prompt中指定了n将覆盖请求body中的n参数
2. 图片比例控制:
- 格式: `{ratio:宽:高}`
- 示例: `{ratio:16:9} 一片森林` - 生成16:9比例的图片
- 支持的比例: "1:1"、"3:4"、"4:3"、"9:16"、"16:9"
- 说明: 如果指定了size参数将优先使用size对应的比例
3. 参数组合:
- 示例: `{n:2} {ratio:16:9} 一片美丽的森林` - 生成2张16:9比例的图片
- 说明: 这些参数标记会自动从prompt中移除不会影响实际的图片生成提示词
> 注意n的取值范围[1,4], ratio取值范围"1:1"、"3:4"、"4:3"、"9:16" 和 "16:9"
## 📚 代码结构
```plaintext
@@ -267,6 +408,7 @@ A: 请检查以下几点:
A: 在请求的 Body 中,将 `stream` 参数设置为 `true` 即可。
**Q: 如何启用代码执行工具?**
A: 在 `.env` 文件的 `TOOLS_CODE_EXECUTION_ENABLED` 变量中, 设置为 `true` 即可。
## 📄 许可证

View File

@@ -6,7 +6,7 @@ from app.core.logger import get_gemini_logger
from app.core.security import SecurityService
from app.schemas.gemini_models import GeminiRequest
from app.services.gemini_chat_service import GeminiChatService
from app.services.key_manager import KeyManager
from app.services.key_manager import KeyManager, get_key_manager_instance
from app.services.model_service import ModelService
from app.services.chat.retry_handler import RetryHandler
@@ -16,13 +16,20 @@ logger = get_gemini_logger()
# 初始化服务
security_service = SecurityService(settings.ALLOWED_TOKENS, settings.AUTH_TOKEN)
key_manager = KeyManager(settings.API_KEYS)
async def get_key_manager():
return await get_key_manager_instance()
async def get_next_working_key_wrapper(key_manager: KeyManager = Depends(get_key_manager)):
return await key_manager.get_next_working_key()
model_service = ModelService(settings.MODEL_SEARCH)
@router.get("/models")
@router_v1beta.get("/models")
async def list_models(_=Depends(security_service.verify_key)):
async def list_models(_=Depends(security_service.verify_key),
key_manager: KeyManager = Depends(get_key_manager)):
"""获取可用的Gemini模型列表"""
logger.info("-" * 50 + "list_gemini_models" + "-" * 50)
logger.info("Handling Gemini models list request")
@@ -40,12 +47,13 @@ async def list_models(_=Depends(security_service.verify_key)):
@router.post("/models/{model_name}:generateContent")
@router_v1beta.post("/models/{model_name}:generateContent")
@RetryHandler(max_retries=3, key_manager=key_manager, key_arg="api_key")
@RetryHandler(max_retries=3, key_manager=Depends(get_key_manager), key_arg="api_key")
async def generate_content(
model_name: str,
request: GeminiRequest,
_=Depends(security_service.verify_goog_api_key),
api_key: str = Depends(key_manager.get_next_working_key),
api_key: str = Depends(get_next_working_key_wrapper),
key_manager: KeyManager = Depends(get_key_manager)
):
chat_service = GeminiChatService(settings.BASE_URL, key_manager)
"""非流式生成内容"""
@@ -69,12 +77,13 @@ async def generate_content(
@router.post("/models/{model_name}:streamGenerateContent")
@router_v1beta.post("/models/{model_name}:streamGenerateContent")
@RetryHandler(max_retries=3, key_manager=key_manager, key_arg="api_key")
@RetryHandler(max_retries=3, key_manager=Depends(get_key_manager), key_arg="api_key")
async def stream_generate_content(
model_name: str,
request: GeminiRequest,
_=Depends(security_service.verify_goog_api_key),
api_key: str = Depends(key_manager.get_next_working_key),
api_key: str = Depends(get_next_working_key_wrapper),
key_manager: KeyManager = Depends(get_key_manager)
):
chat_service = GeminiChatService(settings.BASE_URL, key_manager)
"""流式生成内容"""

View File

@@ -8,7 +8,7 @@ from app.schemas.openai_models import ChatRequest, EmbeddingRequest, ImageGenera
from app.services.chat.retry_handler import RetryHandler
from app.services.embedding_service import EmbeddingService
from app.services.image_create_service import ImageCreateService
from app.services.key_manager import KeyManager
from app.services.key_manager import KeyManager, get_key_manager_instance
from app.services.model_service import ModelService
from app.services.openai_chat_service import OpenAIChatService
@@ -17,15 +17,22 @@ logger = get_openai_logger()
# 初始化服务
security_service = SecurityService(settings.ALLOWED_TOKENS, settings.AUTH_TOKEN)
key_manager = KeyManager(settings.API_KEYS)
model_service = ModelService(settings.MODEL_SEARCH)
embedding_service = EmbeddingService(settings.BASE_URL)
image_create_service = ImageCreateService()
async def get_key_manager():
return await get_key_manager_instance()
async def get_next_working_key_wrapper(key_manager: KeyManager = Depends(get_key_manager)):
return await key_manager.get_next_working_key()
@router.get("/v1/models")
@router.get("/hf/v1/models")
async def list_models(_=Depends(security_service.verify_authorization)):
async def list_models(
_=Depends(security_service.verify_authorization),
key_manager: KeyManager = Depends(get_key_manager)
):
logger.info("-" * 50 + "list_models" + "-" * 50)
logger.info("Handling models list request")
api_key = await key_manager.get_next_working_key()
@@ -39,11 +46,12 @@ async def list_models(_=Depends(security_service.verify_authorization)):
@router.post("/v1/chat/completions")
@router.post("/hf/v1/chat/completions")
@RetryHandler(max_retries=3, key_manager=key_manager, key_arg="api_key")
@RetryHandler(max_retries=3, key_manager=Depends(get_key_manager), key_arg="api_key")
async def chat_completion(
request: ChatRequest,
_=Depends(security_service.verify_authorization),
api_key: str = Depends(key_manager.get_next_working_key),
request: ChatRequest,
_=Depends(security_service.verify_authorization),
api_key: str = Depends(get_next_working_key_wrapper),
key_manager: KeyManager = Depends(get_key_manager)
):
# 如果model是imagen3,使用paid_key
if request.model == f"{settings.CREATE_IMAGE_MODEL}-chat":
@@ -54,23 +62,25 @@ async def chat_completion(
logger.info(f"Request: \n{request.model_dump_json(indent=2)}")
logger.info(f"Using API key: {api_key}")
try:
response = await chat_service.create_image_chat_completion(request=request)
# 如果model是imagen3,使用paid_key
if request.model == f"{settings.CREATE_IMAGE_MODEL}-chat":
response = await chat_service.create_image_chat_completion(request=request)
else:
response = await chat_service.create_chat_completion(request, api_key)
# 处理流式响应
if request.stream:
return StreamingResponse(response, media_type="text/event-stream")
logger.info("Chat completion request successful")
return response
except Exception as e:
logger.error(f"Chat completion failed after retries: {str(e)}")
raise HTTPException(status_code=500, detail="Chat completion failed") from e
@router.post("/v1/images/generations")
@router.post("/hf/v1/images/generations")
async def generate_image(
request: ImageGenerationRequest,
_=Depends(security_service.verify_authorization),
request: ImageGenerationRequest,
_=Depends(security_service.verify_authorization),
):
logger.info("-" * 50 + "generate_image" + "-" * 50)
logger.info(f"Handling image generation request for prompt: {request.prompt}")
@@ -79,17 +89,16 @@ async def generate_image(
response = image_create_service.generate_images(request)
logger.info("Image generation request successful")
return response
except Exception as e:
logger.error(f"Image generation request failed: {str(e)}")
raise HTTPException(status_code=500, detail="Image generation request failed") from e
@router.post("/v1/embeddings")
@router.post("/hf/v1/embeddings")
async def embedding(
request: EmbeddingRequest,
_=Depends(security_service.verify_authorization),
request: EmbeddingRequest,
_=Depends(security_service.verify_authorization),
key_manager: KeyManager = Depends(get_key_manager)
):
logger.info("-" * 50 + "embedding" + "-" * 50)
logger.info(f"Handling embedding request for model: {request.model}")
@@ -105,11 +114,11 @@ async def embedding(
logger.error(f"Embedding request failed: {str(e)}")
raise HTTPException(status_code=500, detail="Embedding request failed") from e
@router.get("/v1/keys/list")
@router.get("/hf/v1/keys/list")
async def get_keys_list(
_=Depends(security_service.verify_auth_token),
_=Depends(security_service.verify_auth_token),
key_manager: KeyManager = Depends(get_key_manager)
):
"""获取有效和无效的API key列表"""
logger.info("-" * 50 + "get_keys_list" + "-" * 50)

View File

@@ -1,9 +1,12 @@
from fastapi import HTTPException, Header
from typing import Optional
from app.core.logger import get_security_logger
from app.core.config import settings
logger = get_security_logger()
def verify_auth_token(token: str) -> bool:
return token == settings.AUTH_TOKEN
class SecurityService:
def __init__(self, allowed_tokens: list, auth_token: str):

View File

@@ -1,17 +1,57 @@
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from app.core.logger import get_main_logger
from app.core.security import verify_auth_token
from app.services.key_manager import get_key_manager_instance
from app.core.config import settings
from app.api import gemini_routes, openai_routes
import uvicorn
from app.middleware.request_logging_middleware import RequestLoggingMiddleware
# 配置日志
logger = get_main_logger()
app = FastAPI()
# 配置Jinja2模板
templates = Jinja2Templates(directory="app/templates")
# 创建 KeyManager 实例
key_manager = None
@app.on_event("startup")
async def startup_event():
global key_manager
logger.info("Application starting up...")
try:
key_manager = await get_key_manager_instance(settings.API_KEYS)
logger.info("KeyManager initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize KeyManager: {str(e)}")
raise
# 添加中间件来处理未经身份验证的请求
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
# 允许 gemini_routes 和 openai_routes 中的端点绕过身份验证
if (request.url.path not in ["/", "/auth"] and
not request.url.path.startswith("/static") and
not request.url.path.startswith("/gemini") and
not request.url.path.startswith("/v1") and
not request.url.path.startswith("/v1beta") and
not request.url.path.startswith("/health") and
not request.url.path.startswith("/hf")):
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning(f"Unauthorized access attempt to {request.url.path}")
return RedirectResponse(url="/")
logger.debug("Request authenticated successfully")
response = await call_next(request)
return response
# 添加请求日志中间件
# app.add_middleware(RequestLoggingMiddleware)
@@ -32,13 +72,59 @@ app.include_router(gemini_routes.router)
app.include_router(gemini_routes.router_v1beta)
@app.get("/", response_class=HTMLResponse)
async def auth_page(request: Request):
return templates.TemplateResponse("auth.html", {"request": request})
@app.post("/auth")
async def authenticate(request: Request):
try:
form = await request.form()
auth_token = form.get("auth_token")
if not auth_token:
logger.warning("Authentication attempt with empty token")
return RedirectResponse(url="/", status_code=302)
if verify_auth_token(auth_token):
logger.info("Successful authentication")
response = RedirectResponse(url="/keys", status_code=302)
response.set_cookie(key="auth_token", value=auth_token, httponly=True, max_age=3600)
return response
logger.warning("Failed authentication attempt with invalid token")
return RedirectResponse(url="/", status_code=302)
except Exception as e:
logger.error(f"Authentication error: {str(e)}")
return RedirectResponse(url="/", status_code=302)
@app.get("/keys", response_class=HTMLResponse)
async def keys_page(request: Request):
try:
auth_token = request.cookies.get("auth_token")
if not auth_token or not verify_auth_token(auth_token):
logger.warning("Unauthorized access attempt to keys page")
return RedirectResponse(url="/", status_code=302)
keys_status = await key_manager.get_keys_by_status()
total = len(keys_status["valid_keys"]) + len(keys_status["invalid_keys"])
logger.info(f"Keys status retrieved successfully. Total keys: {total}")
return templates.TemplateResponse("keys_status.html", {
"request": request,
"valid_keys": keys_status["valid_keys"],
"invalid_keys": keys_status["invalid_keys"],
"total": total
})
except Exception as e:
logger.error(f"Error retrieving keys status: {str(e)}")
raise
@app.get("/health")
@app.get("/")
async def health_check():
async def health_check(request: Request):
logger.info("Health check endpoint called")
return {"status": "healthy"}
if __name__ == "__main__":
logger.info("Starting application server...")
uvicorn.run(app, host="0.0.0.0", port=8001)

View File

@@ -27,4 +27,4 @@ class ImageGenerationRequest(BaseModel):
size: Optional[str] = "1024x1024"
quality: Optional[str] = ""
style: Optional[str] = ""
response_format: Optional[str] = "b64_json"
response_format: Optional[str] = "url"

View File

@@ -19,8 +19,42 @@ class ImageCreateService:
self.paid_key = settings.PAID_KEY
self.aspect_ratio = aspect_ratio
def parse_prompt_parameters(self, prompt: str) -> tuple:
"""从prompt中解析参数
支持的格式:
- {n:数量} 例如: {n:2} 生成2张图片
- {ratio:比例} 例如: {ratio:16:9} 使用16:9比例
"""
import re
# 默认值
n = 1
aspect_ratio = self.aspect_ratio
# 解析n参数
n_match = re.search(r'{n:(\d+)}', prompt)
if n_match:
n = int(n_match.group(1))
if n < 1 or n > 4:
raise ValueError(f"Invalid n value: {n}. Must be between 1 and 4.")
prompt = prompt.replace(n_match.group(0), '').strip()
# 解析ratio参数
ratio_match = re.search(r'{ratio:(\d+:\d+)}', prompt)
if ratio_match:
aspect_ratio = ratio_match.group(1)
valid_ratios = ["1:1", "3:4", "4:3", "9:16", "16:9"]
if aspect_ratio not in valid_ratios:
raise ValueError(
f"Invalid ratio: {aspect_ratio}. Must be one of: {', '.join(valid_ratios)}"
)
prompt = prompt.replace(ratio_match.group(0), '').strip()
return prompt, n, aspect_ratio
def generate_images(self, request: ImageGenerationRequest):
client = genai.Client(api_key=self.paid_key)
if request.size == "1024x1024":
self.aspect_ratio = "1:1"
elif request.size == "1792x1024":
@@ -32,6 +66,18 @@ class ImageCreateService:
f"Invalid size: {request.size}. Supported sizes are 1024x1024, 1792x1024, and 1024x1792."
)
# 解析prompt中的参数
cleaned_prompt, prompt_n, prompt_ratio = self.parse_prompt_parameters(request.prompt)
request.prompt = cleaned_prompt
# 如果prompt中指定了n则覆盖请求中的n
if prompt_n > 1:
request.n = prompt_n
# 如果prompt中指定了ratio则覆盖默认的aspect_ratio
if prompt_ratio != self.aspect_ratio:
self.aspect_ratio = prompt_ratio
response = client.models.generate_images(
model=self.image_model,
prompt=request.prompt,
@@ -56,11 +102,17 @@ class ImageCreateService:
filename = f"{current_date}/{uuid.uuid4().hex[:8]}.png"
upload_response = image_uploader.upload(image_data,filename)
# base64_image = base64.b64encode(image_data).decode('utf-8')
images_data.append({
"url": f"{upload_response.data.url}",
"revised_prompt": request.prompt
})
if request.response_format == "b64_json":
base64_image = base64.b64encode(image_data).decode('utf-8')
images_data.append({
"b64_json": base64_image,
"revised_prompt": request.prompt
})
else:
images_data.append({
"url": f"{upload_response.data.url}",
"revised_prompt": request.prompt
})
response_data = {
"created": int(time.time()), # Current timestamp
@@ -76,6 +128,9 @@ class ImageCreateService:
if image_datas:
markdown_images = []
for index, image_data in enumerate(image_datas):
markdown_images.append(f"![Generated Image {index+1}]({image_data['url']})")
if 'url' in image_data:
markdown_images.append(f"![Generated Image {index+1}]({image_data['url']})")
else:
# 如果是base64格式创建data URL
markdown_images.append(f"![Generated Image {index+1}](data:image/png;base64,{image_data['b64_json']})")
return "\n".join(markdown_images)

View File

@@ -4,6 +4,7 @@ from typing import Dict
from app.core.logger import get_key_manager_logger
from app.core.config import settings
logger = get_key_manager_logger()
@@ -61,20 +62,44 @@ class KeyManager:
return await self.get_next_working_key()
def get_fail_count(self, key: str) -> int:
"""获取指定密钥的失败次数"""
return self.key_failure_counts.get(key, 0)
async def get_keys_by_status(self) -> dict:
"""获取分类后的API key列表"""
valid_keys = []
invalid_keys = []
"""获取分类后的API key列表,包括失败次数"""
valid_keys = {}
invalid_keys = {}
async with self.failure_count_lock:
for key in self.api_keys:
masked_key = f"{key}"
if self.key_failure_counts[key] < self.MAX_FAILURES:
valid_keys.append(masked_key)
fail_count = self.key_failure_counts[key]
if fail_count < self.MAX_FAILURES:
valid_keys[key] = fail_count
else:
invalid_keys.append(masked_key)
invalid_keys[key] = fail_count
return {
"valid_keys": valid_keys,
"invalid_keys": invalid_keys
}
_singleton_instance = None
_singleton_lock = asyncio.Lock()
async def get_key_manager_instance(api_keys: list = None) -> KeyManager:
"""
获取 KeyManager 单例实例。
如果尚未创建实例,将使用提供的 api_keys 初始化 KeyManager。
如果已创建实例,则忽略 api_keys 参数,返回现有单例。
"""
global _singleton_instance
async with _singleton_lock:
if _singleton_instance is None:
if api_keys is None:
raise ValueError("API keys are required to initialize the KeyManager")
_singleton_instance = KeyManager(api_keys)
return _singleton_instance

View File

@@ -52,15 +52,14 @@ class ModelService:
"parent": None,
}
openai_format["data"].append(openai_model)
if settings.CREATE_IMAGE_MODEL:
image_model = openai_model.copy()
image_model["id"] = f"{settings.CREATE_IMAGE_MODEL}-chat"
openai_format["data"].append(image_model)
if model_id in self.model_search:
search_model = openai_model.copy()
search_model["id"] = f"{model_id}-search"
openai_format["data"].append(search_model)
if settings.CREATE_IMAGE_MODEL:
image_model = openai_model.copy()
image_model["id"] = f"{settings.CREATE_IMAGE_MODEL}-chat"
openai_format["data"].append(image_model)
return openai_format

View File

@@ -3,6 +3,7 @@
import json
from typing import Dict, Any, AsyncGenerator, List, Union
from app.core.logger import get_openai_logger
from app.services.chat.message_converter import OpenAIMessageConverter
from app.services.chat.response_handler import OpenAIResponseHandler
from app.services.chat.api_client import GeminiApiClient
from app.schemas.openai_models import ChatRequest, ImageGenerationRequest
@@ -85,9 +86,8 @@ def _build_payload(
class OpenAIChatService:
"""聊天服务"""
def __init__(self, base_url: str, key_manager: KeyManager = None):
self.message_converter = OpenAIMessageConverter()
self.response_handler = OpenAIResponseHandler(config=None)
self.api_client = GeminiApiClient(base_url)
self.key_manager = key_manager

89
app/templates/auth.html Normal file
View File

@@ -0,0 +1,89 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>验证页面</title>
<link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;700&display=swap" rel="stylesheet">
<style>
body {
font-family: 'Roboto', sans-serif;
line-height: 1.6;
margin: 0;
padding: 0;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
}
.container {
max-width: 400px;
width: 90%;
background: white;
padding: 40px;
border-radius: 15px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
transition: all 0.3s ease;
}
.container:hover {
transform: translateY(-5px);
box-shadow: 0 15px 35px rgba(0,0,0,0.15);
}
h2 {
color: #2c3e50;
text-align: center;
margin-bottom: 30px;
font-weight: 700;
}
form {
display: flex;
flex-direction: column;
}
input {
margin: 10px 0;
padding: 12px;
border: 1px solid #e0e0e0;
border-radius: 5px;
font-size: 16px;
transition: all 0.3s ease;
}
input:focus {
border-color: #3498db;
box-shadow: 0 0 5px rgba(52, 152, 219, 0.5);
}
button {
background-color: #3498db;
color: white;
border: none;
padding: 12px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
font-weight: bold;
transition: background-color 0.3s ease;
}
button:hover {
background-color: #2980b9;
}
.error-message {
color: #e74c3c;
margin-top: 15px;
text-align: center;
font-weight: bold;
}
</style>
</head>
<body>
<div class="container">
<h2>请输入验证令牌</h2>
<form id="auth-form" action="/auth" method="post">
<input type="password" id="auth-token" name="auth_token" required placeholder="输入验证令牌">
<button type="submit">提交</button>
</form>
{% if error %}
<p class="error-message">{{ error }}</p>
{% endif %}
</div>
</body>
</html>

View File

@@ -0,0 +1,214 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>API密钥状态</title>
<link href="https://fonts.googleapis.com/css2?family=Roboto:wght@300;400;700&display=swap" rel="stylesheet">
<style>
body {
font-family: 'Roboto', sans-serif;
line-height: 1.6;
margin: 0;
padding: 0;
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
}
.container {
max-width: 900px;
width: 90%;
background: white;
padding: 40px;
border-radius: 15px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
transition: all 0.3s ease;
}
.container:hover {
transform: translateY(-5px);
box-shadow: 0 15px 35px rgba(0,0,0,0.15);
}
h1 {
color: #2c3e50;
text-align: center;
margin-bottom: 30px;
font-weight: 700;
}
.key-list {
margin-bottom: 30px;
background: #f8f9fa;
padding: 20px;
border-radius: 10px;
transition: all 0.3s ease;
}
.key-list:hover {
background: #e9ecef;
}
.key-list h2 {
color: #2c3e50;
margin-bottom: 15px;
display: flex;
justify-content: space-between;
align-items: center;
font-size: 1.5em;
}
ul {
list-style-type: none;
padding: 0;
}
li {
background: white;
border: 1px solid #e0e0e0;
margin-bottom: 10px;
padding: 12px;
border-radius: 5px;
transition: all 0.2s ease;
display: flex;
justify-content: space-between;
align-items: center;
}
li:hover {
transform: translateX(5px);
box-shadow: 2px 2px 5px rgba(0,0,0,0.1);
}
.total {
font-weight: bold;
margin-top: 20px;
text-align: center;
font-size: 1.2em;
color: #2c3e50;
}
.copy-btn {
background-color: #3498db;
color: white;
border: none;
padding: 8px 15px;
border-radius: 5px;
cursor: pointer;
font-size: 14px;
font-weight: bold;
transition: background-color 0.3s ease;
}
.copy-btn:hover {
background-color: #2980b9;
}
#copyStatus {
text-align: center;
margin-top: 20px;
color: #27ae60;
font-weight: bold;
opacity: 0;
transition: opacity 0.3s ease;
}
.fail-count {
color: #e74c3c;
font-size: 0.9em;
margin-left: 10px;
}
</style>
</head>
<body>
<div class="container">
<h1>API密钥状态</h1>
<div class="key-list">
<h2>
有效密钥
<button class="copy-btn" onclick="copyKeys('valid')">一键复制</button>
</h2>
<ul id="validKeys">
{% for key, fail_count in valid_keys.items() %}
<li>
<span>{{ key }}</span>
<span class="fail-count">(失败次数: {{ fail_count }})</span>
<button class="copy-btn" onclick="copyKey('{{ key }}')">复制</button>
</li>
{% endfor %}
</ul>
</div>
<div class="key-list">
<h2>
无效密钥
<button class="copy-btn" onclick="copyKeys('invalid')">一键复制</button>
</h2>
<ul id="invalidKeys">
{% for key, fail_count in invalid_keys.items() %}
<li>
<span>{{ key }}</span>
<span class="fail-count">(失败次数: {{ fail_count }})</span>
<button class="copy-btn" onclick="copyKey('{{ key }}')">复制</button>
</li>
{% endfor %}
</ul>
</div>
<div class="total">
总密钥数:{{ total }}
</div>
<div id="copyStatus"></div>
</div>
<script>
function copyToClipboard(text) {
if (navigator.clipboard && navigator.clipboard.writeText) {
return navigator.clipboard.writeText(text);
} else {
return new Promise((resolve, reject) => {
const textArea = document.createElement("textarea");
textArea.value = text;
textArea.style.position = "fixed";
document.body.appendChild(textArea);
textArea.focus();
textArea.select();
try {
const successful = document.execCommand('copy');
document.body.removeChild(textArea);
if (successful) {
resolve();
} else {
reject(new Error('复制失败'));
}
} catch (err) {
document.body.removeChild(textArea);
reject(err);
}
});
}
}
function copyKeys(type) {
const keys = Array.from(document.querySelectorAll(`#${type}Keys li span:first-child`)).map(span => span.textContent.trim());
const jsonKeys = JSON.stringify(keys);
copyToClipboard(jsonKeys)
.then(() => {
showCopyStatus(`已成功复制 ${type === 'valid' ? '有效' : '无效'} 密钥到剪贴板`);
})
.catch((err) => {
console.error('无法复制文本: ', err);
showCopyStatus('复制失败,请重试');
});
}
function copyKey(key) {
copyToClipboard(key)
.then(() => {
showCopyStatus(`已成功复制密钥到剪贴板`);
})
.catch((err) => {
console.error('无法复制文本: ', err);
showCopyStatus('复制失败,请重试');
});
}
function showCopyStatus(message) {
const statusElement = document.getElementById('copyStatus');
statusElement.textContent = message;
statusElement.style.opacity = 1;
setTimeout(() => {
statusElement.style.opacity = 0;
}, 2000);
}
</script>
</body>
</html>

View File

@@ -6,4 +6,6 @@ pydantic_settings
requests
starlette
uvicorn
google-genai
google-genai
jinja2
python-multipart