Files
codex-register/src/core/upload/newapi_upload.py
Jay Hsueh 05e480a756 feat(newapi): 添加 NEWAPI 上传功能及服务管理接口
- 新增 `newapi_upload.py` 文件,包含上传到 NEWAPI 的功能。
- 在数据库模型中添加 `NewapiService` 表及相关字段。
- 更新 CRUD 操作以支持 NEWAPI 服务的创建、更新、查询和删除。
- 添加新的 API 路由以管理 NEWAPI 服务。
- 前端实现批量上传和单个账号上传到 NEWAPI 的功能。
- 更新相关页面以支持 NEWAPI 服务的选择和管理。
2026-03-24 17:46:33 +08:00

127 lines
4.2 KiB
Python

"""
NEWAPI 上传功能 — 通过 PUT /api/channel/ 添加渠道
"""
import json
import logging
from datetime import datetime
from typing import List, Tuple
from curl_cffi import requests as cffi_requests
from ...database.models import Account
from ...database.session import get_db
logger = logging.getLogger(__name__)
NEWAPI_TYPE_OPENAI = 57
DEFAULT_BASE_URL = ""
DEFAULT_MODELS = "gpt-5.4,gpt-5,gpt-5-codex,gpt-5-codex-mini,gpt-5.1,gpt-5.1-codex,gpt-5.1-codex-max,gpt-5.1-codex-mini,gpt-5.2,gpt-5.2-codex,gpt-5.3-codex,gpt-5-openai-compact,gpt-5-codex-openai-compact,gpt-5-codex-mini-openai-compact,gpt-5.1-openai-compact,gpt-5.1-codex-openai-compact,gpt-5.1-codex-max-openai-compact,gpt-5.1-codex-mini-openai-compact,gpt-5.2-openai-compact,gpt-5.2-codex-openai-compact,gpt-5.3-codex-openai-compact"
def _normalize_base(api_url: str) -> str:
return (api_url or "").strip().rstrip("/")
def _build_headers(api_key: str) -> dict:
return {
"Authorization": f"Bearer {api_key}",
"New-Api-User": "1",
"Content-Type": "application/json",
}
def _extract_error(resp) -> str:
error_msg = f"上传失败: HTTP {resp.status_code}"
try:
detail = resp.json()
if isinstance(detail, dict):
error_msg = detail.get("message", error_msg)
except Exception:
error_msg = f"{error_msg} - {resp.text[:200]}"
return error_msg
def upload_to_newapi(
account: Account,
api_url: str,
api_key: str,
) -> Tuple[bool, str]:
base = _normalize_base(api_url)
if not base:
return False, "NEWAPI API URL 未配置"
if not api_key:
return False, "NEWAPI API Key 未配置"
if not account.access_token:
return False, "账号缺少 access_token"
url = f"{base}/api/channel/"
account_name = account.email or ""
channel = {
"auto_ban": 1,
"name": account.email or "",
"type": NEWAPI_TYPE_OPENAI,
"key": json.dumps({"access_token": account.access_token or "", "account_id": account_name}, ensure_ascii=False),
"base_url": DEFAULT_BASE_URL,
"models": DEFAULT_MODELS,
"multi_key_mode": "random",
"group": "default",
"groups": ["default"],
"priority": 0,
"weight": 0,
}
try:
resp = cffi_requests.post(
url,
headers=_build_headers(api_key),
json={"mode": "single", "channel": channel},
proxies=None,
timeout=30,
impersonate="chrome110",
)
if resp.status_code in (200, 201):
return True, "上传成功"
return False, _extract_error(resp)
except Exception as e:
logger.error("NEWAPI 上传异常: %s", e)
return False, f"上传异常: {str(e)}"
def batch_upload_to_newapi(
account_ids: List[int],
api_url: str,
api_key: str,
) -> dict:
results = {
"success_count": 0,
"failed_count": 0,
"skipped_count": 0,
"details": [],
}
with get_db() as db:
for account_id in account_ids:
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
results["failed_count"] += 1
results["details"].append({"id": account_id, "email": None, "success": False, "error": "账号不存在"})
continue
if not account.access_token:
results["skipped_count"] += 1
results["details"].append({"id": account_id, "email": account.email, "success": False, "error": "缺少 Token"})
continue
success, message = upload_to_newapi(account, api_url, api_key)
if success:
account.newapi_uploaded = True
account.newapi_uploaded_at = datetime.utcnow()
db.commit()
results["success_count"] += 1
results["details"].append({"id": account_id, "email": account.email, "success": True, "message": message})
else:
results["failed_count"] += 1
results["details"].append({"id": account_id, "email": account.email, "success": False, "error": message})
return results