feat(pay): 支付跳转功能

- 账号管理:补充订阅状态管理、TeamManager上传说明
 - 新增「支付升级」功能模块描述
 - 系统设置:补充 CPA配置和 TeamManager配置项
This commit is contained in:
cnlimiter
2026-03-16 17:04:54 +08:00
parent c5ab1747c6
commit 19eb172eee
18 changed files with 1174 additions and 3 deletions

172
src/core/payment.py Normal file
View File

@@ -0,0 +1,172 @@
"""
支付核心逻辑 — 生成 Plus/Team 支付链接、无痕打开浏览器、检测订阅状态
"""
import logging
import subprocess
import sys
from typing import Optional
from curl_cffi import requests as cffi_requests
from ..database.models import Account
logger = logging.getLogger(__name__)
PAYMENT_CHECKOUT_URL = "https://chatgpt.com/backend-api/payments/checkout"
TEAM_CHECKOUT_BASE_URL = "https://chatgpt.com/checkout/openai_llc/"
def _build_proxies(proxy: Optional[str]) -> Optional[dict]:
if proxy:
return {"http": proxy, "https": proxy}
return None
def generate_plus_link(account: Account, proxy: Optional[str] = None) -> str:
"""生成 Plus 支付链接"""
if not account.access_token:
raise ValueError("账号缺少 access_token")
headers = {
"Authorization": f"Bearer {account.access_token}",
"Content-Type": "application/json",
}
payload = {
"plan_type": "plus",
"checkout_ui_mode": "hosted",
"cancel_url": "https://chatgpt.com/",
"success_url": "https://chatgpt.com/",
}
resp = cffi_requests.post(
PAYMENT_CHECKOUT_URL,
headers=headers,
json=payload,
proxies=_build_proxies(proxy),
timeout=30,
impersonate="chrome110",
)
resp.raise_for_status()
data = resp.json()
if "url" in data:
return data["url"]
raise ValueError(data.get("detail", "API 未返回支付链接"))
def generate_team_link(
account: Account,
workspace_name: str = "MyTeam",
price_interval: str = "month",
seat_quantity: int = 5,
proxy: Optional[str] = None,
) -> str:
"""生成 Team 支付链接"""
if not account.access_token:
raise ValueError("账号缺少 access_token")
headers = {
"Authorization": f"Bearer {account.access_token}",
"Content-Type": "application/json",
}
payload = {
"plan_name": "chatgptteamplan",
"team_plan_data": {
"workspace_name": workspace_name,
"price_interval": price_interval,
"seat_quantity": seat_quantity,
},
"promo_campaign": {
"promo_campaign_id": "team-1-month-free",
"is_coupon_from_query_param": True,
},
"checkout_ui_mode": "custom",
}
resp = cffi_requests.post(
PAYMENT_CHECKOUT_URL,
headers=headers,
json=payload,
proxies=_build_proxies(proxy),
timeout=30,
impersonate="chrome110",
)
resp.raise_for_status()
data = resp.json()
if "checkout_session_id" in data:
return TEAM_CHECKOUT_BASE_URL + data["checkout_session_id"]
raise ValueError(data.get("detail", "API 未返回 checkout_session_id"))
def open_url_incognito(url: str) -> bool:
"""调用本机浏览器以无痕模式打开 URL"""
platform = sys.platform
try:
if platform == "win32":
# 依次尝试 Chrome、Edge
for browser, flag in [("chrome", "--incognito"), ("msedge", "--inprivate")]:
try:
subprocess.Popen(
f'start {browser} {flag} "{url}"',
shell=True,
)
return True
except Exception:
continue
elif platform == "darwin":
subprocess.Popen(
["open", "-a", "Google Chrome", "--args", "--incognito", url]
)
return True
else:
for binary in ["google-chrome", "chromium-browser", "chromium"]:
try:
subprocess.Popen([binary, "--incognito", url])
return True
except FileNotFoundError:
continue
except Exception as e:
logger.warning(f"无痕打开浏览器失败: {e}")
return False
def check_subscription_status(account: Account, proxy: Optional[str] = None) -> str:
"""
检测账号当前订阅状态。
Returns:
'free' / 'plus' / 'team'
"""
if not account.access_token:
raise ValueError("账号缺少 access_token")
headers = {
"Authorization": f"Bearer {account.access_token}",
"Content-Type": "application/json",
}
resp = cffi_requests.get(
"https://chatgpt.com/backend-api/me",
headers=headers,
proxies=_build_proxies(proxy),
timeout=20,
impersonate="chrome110",
)
resp.raise_for_status()
data = resp.json()
# 解析订阅类型
plan = data.get("plan_type") or ""
if "team" in plan.lower():
return "team"
if "plus" in plan.lower():
return "plus"
# 尝试从 orgs 或 workspace 信息判断
orgs = data.get("orgs", {}).get("data", [])
for org in orgs:
settings_ = org.get("settings", {})
if settings_.get("workspace_plan_type") in ("team", "enterprise"):
return "team"
return "free"

158
src/core/team_manager.py Normal file
View File

@@ -0,0 +1,158 @@
"""
Team Manager 上传功能
参照 CPA 上传模式,直连不走代理
"""
import logging
from typing import List, Tuple
from datetime import datetime
from curl_cffi import requests as cffi_requests
from ..database.session import get_db
from ..database.models import Account
from ..config.settings import get_settings
logger = logging.getLogger(__name__)
def upload_to_team_manager(
account: Account,
api_url: str,
api_key: str,
) -> Tuple[bool, str]:
"""
上传单账号到 Team Manager直连不走代理
Returns:
(成功标志, 消息)
"""
if not api_url:
return False, "Team Manager API URL 未配置"
if not api_key:
return False, "Team Manager API Key 未配置"
if not account.access_token:
return False, "账号缺少 access_token"
url = api_url.rstrip("/") + "/api/accounts/import"
headers = {
"X-API-Key": api_key,
"Content-Type": "application/json",
}
payload = {
"import_type": "single",
"email": account.email,
"access_token": account.access_token or "",
"session_token": account.session_token or "",
"refresh_token": account.refresh_token or "",
"client_id": account.client_id or "",
}
try:
resp = cffi_requests.post(
url,
headers=headers,
json=payload,
proxies=None,
timeout=30,
impersonate="chrome110",
)
if resp.status_code in (200, 201):
return True, "上传成功"
error_msg = f"上传失败: HTTP {resp.status_code}"
try:
detail = resp.json()
if isinstance(detail, dict):
error_msg = detail.get("message", error_msg)
except Exception:
error_msg = f"{error_msg} - {resp.text[:200]}"
return False, error_msg
except Exception as e:
logger.error(f"Team Manager 上传异常: {e}")
return False, f"上传异常: {str(e)}"
def batch_upload_to_team_manager(
account_ids: List[int],
api_url: str,
api_key: str,
) -> dict:
"""
批量上传账号到 Team Manager
Returns:
包含成功/失败统计和详情的字典
"""
results = {
"success_count": 0,
"failed_count": 0,
"skipped_count": 0,
"details": [],
}
with get_db() as db:
for account_id in account_ids:
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
results["failed_count"] += 1
results["details"].append(
{"id": account_id, "email": None, "success": False, "error": "账号不存在"}
)
continue
if not account.access_token:
results["skipped_count"] += 1
results["details"].append(
{"id": account_id, "email": account.email, "success": False, "error": "缺少 Token"}
)
continue
success, message = upload_to_team_manager(account, api_url, api_key)
if success:
results["success_count"] += 1
results["details"].append(
{"id": account_id, "email": account.email, "success": True, "message": message}
)
else:
results["failed_count"] += 1
results["details"].append(
{"id": account_id, "email": account.email, "success": False, "error": message}
)
return results
def test_team_manager_connection(api_url: str, api_key: str) -> Tuple[bool, str]:
"""
测试 Team Manager 连接(直连)
Returns:
(成功标志, 消息)
"""
if not api_url:
return False, "API URL 不能为空"
if not api_key:
return False, "API Key 不能为空"
url = api_url.rstrip("/") + "/api/accounts/import"
headers = {"X-API-Key": api_key}
try:
resp = cffi_requests.options(
url,
headers=headers,
proxies=None,
timeout=10,
impersonate="chrome110",
)
if resp.status_code in (200, 204, 401, 403, 405):
if resp.status_code == 401:
return False, "连接成功,但 API Key 无效"
return True, "Team Manager 连接测试成功"
return False, f"服务器返回异常状态码: {resp.status_code}"
except cffi_requests.exceptions.ConnectionError as e:
return False, f"无法连接到服务器: {str(e)}"
except cffi_requests.exceptions.Timeout:
return False, "连接超时,请检查网络配置"
except Exception as e:
return False, f"连接测试失败: {str(e)}"