refactor(core): 优化CPA上传使用curl_cffi的multipart方式

This commit is contained in:
cnlimiter
2026-03-16 11:56:05 +08:00
parent 07f0a2cca0
commit 8b27e7bb28

View File

@@ -8,6 +8,7 @@ from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime
from curl_cffi import requests as cffi_requests
from curl_cffi import CurlMime
from ..database.session import get_db
from ..database.models import Account
@@ -40,11 +41,11 @@ def generate_token_json(account: Account) -> dict:
def upload_to_cpa(token_data: dict, proxy: str = None) -> Tuple[bool, str]:
"""
上传单个账号到 CPA 管理平台
上传单个账号到 CPA 管理平台(不走代理)
Args:
token_data: Token JSON 数据
proxy: 可选的代理 URL
proxy: 保留参数不使用CPA 上传始终直连)
Returns:
(成功标志, 消息或错误信息)
@@ -57,45 +58,45 @@ def upload_to_cpa(token_data: dict, proxy: str = None) -> Tuple[bool, str]:
if not settings.cpa_api_url:
return False, "CPA API URL 未配置"
api_url = settings.cpa_api_url.rstrip('/')
api_url = settings.cpa_api_url.rstrip("/")
upload_url = f"{api_url}/v0/management/auth-files"
filename = f"{token_data['email']}.json"
file_content = json.dumps(token_data, ensure_ascii=False, indent=2).encode('utf-8')
file_content = json.dumps(token_data, ensure_ascii=False, indent=2).encode("utf-8")
files = {"file": (filename, file_content, "application/json")}
headers = {
"Authorization": f"Bearer {settings.cpa_api_token.get_secret_value()}"
"Authorization": f"Bearer {settings.cpa_api_token.get_secret_value()}",
}
try:
proxies = None
if proxy:
proxies = {
"http": proxy,
"https": proxy
}
mime = CurlMime()
mime.addpart(
name="file",
data=file_content,
filename=filename,
content_type="application/json",
)
response = cffi_requests.post(
upload_url,
files=files,
multipart=mime,
headers=headers,
proxies=proxies,
proxies=None,
timeout=30,
impersonate="chrome110"
impersonate="chrome110",
)
if response.status_code in (200, 201):
return True, "上传成功"
else:
error_msg = f"上传失败: HTTP {response.status_code}"
try:
error_detail = response.json()
if isinstance(error_detail, dict):
error_msg = error_detail.get("message", error_msg)
except:
error_msg = f"{error_msg} - {response.text[:200]}"
return False, error_msg
error_msg = f"上传失败: HTTP {response.status_code}"
try:
error_detail = response.json()
if isinstance(error_detail, dict):
error_msg = error_detail.get("message", error_msg)
except Exception:
error_msg = f"{error_msg} - {response.text[:200]}"
return False, error_msg
except Exception as e:
logger.error(f"CPA 上传异常: {e}")
@@ -178,12 +179,12 @@ def batch_upload_to_cpa(account_ids: List[int], proxy: str = None) -> dict:
def test_cpa_connection(api_url: str, api_token: str, proxy: str = None) -> Tuple[bool, str]:
"""
测试 CPA 连接
测试 CPA 连接(不走代理)
Args:
api_url: CPA API URL
api_token: CPA API Token
proxy: 可选的代理 URL
proxy: 保留参数不使用CPA 始终直连)
Returns:
(成功标志, 消息)
@@ -194,45 +195,29 @@ def test_cpa_connection(api_url: str, api_token: str, proxy: str = None) -> Tupl
if not api_token:
return False, "API Token 不能为空"
# 清理 URL
api_url = api_url.rstrip('/')
# 尝试访问健康检查或 API 信息端点
api_url = api_url.rstrip("/")
test_url = f"{api_url}/v0/management/auth-files"
headers = {
"Authorization": f"Bearer {api_token}"
}
headers = {"Authorization": f"Bearer {api_token}"}
try:
proxies = None
if proxy:
proxies = {
"http": proxy,
"https": proxy
}
# 发送一个简单的请求测试连接GET 列表或 OPTIONS
response = cffi_requests.options(
test_url,
headers=headers,
proxies=proxies,
proxies=None,
timeout=10,
impersonate="chrome110"
impersonate="chrome110",
)
if response.status_code in (200, 204, 401, 403, 405):
# 401/403 表示服务器可达但认证问题
# 405 表示 OPTIONS 方法不被允许,但服务器可达
if response.status_code == 401:
return False, "连接成功,但 API Token 无效"
return True, "CPA 连接测试成功"
else:
return False, f"服务器返回异常状态码: {response.status_code}"
return False, f"服务器返回异常状态码: {response.status_code}"
except cffi_requests.exceptions.ConnectionError as e:
return False, f"无法连接到服务器: {str(e)}"
except cffi_requests.exceptions.Timeout:
return False, "连接超时,请检查网络或代理配置"
return False, "连接超时,请检查网络配置"
except Exception as e:
return False, f"连接测试失败: {str(e)}"