解决cpa测试通过实际上传失败的问题

This commit is contained in:
rockxsj
2026-03-19 17:57:40 +08:00
parent 15ea00fcd8
commit bd348854b5
4 changed files with 241 additions and 35 deletions

View File

@@ -6,6 +6,7 @@ import json
import logging
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime
from urllib.parse import quote
from curl_cffi import requests as cffi_requests
from curl_cffi import CurlMime
@@ -17,6 +18,77 @@ from ...config.settings import get_settings
logger = logging.getLogger(__name__)
def _normalize_cpa_auth_files_url(api_url: str) -> str:
"""将用户填写的 CPA 地址规范化为 auth-files 接口地址。"""
normalized = (api_url or "").strip().rstrip("/")
lower_url = normalized.lower()
if not normalized:
return ""
if lower_url.endswith("/auth-files"):
return normalized
if lower_url.endswith("/v0/management") or lower_url.endswith("/management"):
return f"{normalized}/auth-files"
if lower_url.endswith("/v0"):
return f"{normalized}/management/auth-files"
return f"{normalized}/v0/management/auth-files"
def _build_cpa_headers(api_token: str, content_type: Optional[str] = None) -> dict:
headers = {
"Authorization": f"Bearer {api_token}",
}
if content_type:
headers["Content-Type"] = content_type
return headers
def _extract_cpa_error(response) -> str:
error_msg = f"上传失败: HTTP {response.status_code}"
try:
error_detail = response.json()
if isinstance(error_detail, dict):
error_msg = error_detail.get("message", error_msg)
except Exception:
error_msg = f"{error_msg} - {response.text[:200]}"
return error_msg
def _post_cpa_auth_file_multipart(upload_url: str, filename: str, file_content: bytes, api_token: str):
mime = CurlMime()
mime.addpart(
name="file",
data=file_content,
filename=filename,
content_type="application/json",
)
return cffi_requests.post(
upload_url,
multipart=mime,
headers=_build_cpa_headers(api_token),
proxies=None,
timeout=30,
impersonate="chrome110",
)
def _post_cpa_auth_file_raw_json(upload_url: str, filename: str, file_content: bytes, api_token: str):
raw_upload_url = f"{upload_url}?name={quote(filename)}"
return cffi_requests.post(
raw_upload_url,
data=file_content,
headers=_build_cpa_headers(api_token, content_type="application/json"),
proxies=None,
timeout=30,
impersonate="chrome110",
)
def generate_token_json(account: Account) -> dict:
"""
生成 CPA 格式的 Token JSON
@@ -73,45 +145,35 @@ def upload_to_cpa(
if not effective_token:
return False, "CPA API Token 未配置"
api_url = effective_url.rstrip("/")
upload_url = f"{api_url}/v0/management/auth-files"
upload_url = _normalize_cpa_auth_files_url(effective_url)
filename = f"{token_data['email']}.json"
file_content = json.dumps(token_data, ensure_ascii=False, indent=2).encode("utf-8")
headers = {
"Authorization": f"Bearer {effective_token}",
}
try:
mime = CurlMime()
mime.addpart(
name="file",
data=file_content,
filename=filename,
content_type="application/json",
)
response = cffi_requests.post(
response = _post_cpa_auth_file_multipart(
upload_url,
multipart=mime,
headers=headers,
proxies=None,
timeout=30,
impersonate="chrome110",
filename,
file_content,
effective_token,
)
if response.status_code in (200, 201):
return True, "上传成功"
error_msg = f"上传失败: HTTP {response.status_code}"
try:
error_detail = response.json()
if isinstance(error_detail, dict):
error_msg = error_detail.get("message", error_msg)
except Exception:
error_msg = f"{error_msg} - {response.text[:200]}"
return False, error_msg
if response.status_code in (404, 405, 415):
logger.warning("CPA multipart 上传失败,尝试原始 JSON 回退: %s", response.status_code)
fallback_response = _post_cpa_auth_file_raw_json(
upload_url,
filename,
file_content,
effective_token,
)
if fallback_response.status_code in (200, 201):
return True, "上传成功"
response = fallback_response
return False, _extract_cpa_error(response)
except Exception as e:
logger.error(f"CPA 上传异常: {e}")
@@ -217,12 +279,11 @@ def test_cpa_connection(api_url: str, api_token: str, proxy: str = None) -> Tupl
if not api_token:
return False, "API Token 不能为空"
api_url = api_url.rstrip("/")
test_url = f"{api_url}/v0/management/auth-files"
headers = {"Authorization": f"Bearer {api_token}"}
test_url = _normalize_cpa_auth_files_url(api_url)
headers = _build_cpa_headers(api_token)
try:
response = cffi_requests.options(
response = cffi_requests.get(
test_url,
headers=headers,
proxies=None,
@@ -230,10 +291,16 @@ def test_cpa_connection(api_url: str, api_token: str, proxy: str = None) -> Tupl
impersonate="chrome110",
)
if response.status_code in (200, 204, 401, 403, 405):
if response.status_code == 401:
return False, "连接成功,但 API Token 无效"
if response.status_code == 200:
return True, "CPA 连接测试成功"
if response.status_code == 401:
return False, "连接成功,但 API Token 无效"
if response.status_code == 403:
return False, "连接成功,但服务端未启用远程管理或当前 Token 无权限"
if response.status_code == 404:
return False, "未找到 CPA auth-files 接口,请检查 API URL 是否填写为根地址、/v0/management 或完整 auth-files 地址"
if response.status_code == 503:
return False, "连接成功,但服务端认证管理器不可用"
return False, f"服务器返回异常状态码: {response.status_code}"