fix(register): 修复新版本注册流程

This commit is contained in:
cnlimiter
2026-03-21 20:01:44 +08:00
committed by Mison
parent e116011e67
commit a53fe50a0c
8 changed files with 143 additions and 410 deletions

View File

@@ -65,8 +65,10 @@ OPENAI_API_ENDPOINTS = {
"signup": "https://auth.openai.com/api/accounts/authorize/continue",
"register": "https://auth.openai.com/api/accounts/user/register",
"send_otp": "https://auth.openai.com/api/accounts/email-otp/send",
"passwordless_send_otp": "https://auth.openai.com/api/accounts/passwordless/send-otp",
"validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate",
"create_account": "https://auth.openai.com/api/accounts/create_account",
"add_phone" : "https://auth.openai.com/add-phone",
"select_workspace": "https://auth.openai.com/api/accounts/workspace/select",
"password_verify" : "https://auth.openai.com/api/accounts/password/verify"
}

View File

@@ -12,6 +12,7 @@ from .http_client import (
create_openai_client,
)
from .register import RegistrationEngine, RegistrationResult
from .login import LoginEngine
from .utils import setup_logging, get_data_dir
__all__ = [
@@ -27,6 +28,7 @@ __all__ = [
'create_openai_client',
'RegistrationEngine',
'RegistrationResult',
'LoginEngine',
'setup_logging',
'get_data_dir',
]

View File

@@ -12,7 +12,6 @@ import string
from typing import Optional, Dict, Any, Tuple, Callable
from dataclasses import dataclass
from datetime import datetime
from curl_cffi import requests as cffi_requests
from .openai.oauth import OAuthManager, OAuthStart
@@ -392,46 +391,7 @@ class RegistrationEngine:
self._log(f"密码注册失败: {e}", "error")
return False, None
def _login_password(self, did: str, sen_token: Optional[str]) -> bool:
"""注册密码"""
try:
password = ""
with get_db() as db:
# 检查是否已存在该邮箱的记录
existing = crud.get_account_by_email(db, self.email)
# 生成密码
password = existing.password
# 提交密码登录
register_body = json.dumps({
"password": password
})
if sen_token:
sentinel = f'{{"p": "", "id": "{did}", "flow": "password_verify"}}'
response = self.session.post(
OPENAI_API_ENDPOINTS["password_verify"],
headers={
"referer": "https://auth.openai.com/log-in/password",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": sentinel
},
data=register_body,
)
self._log(f"提交密码状态: {response.status_code}")
if response.status_code != 200:
error_text = response.text[:500]
self._log(f"密码登录失败: {error_text}", "warning")
return False
return True
except Exception as e:
self._log(f"密码登录失败: {e}", "error")
return False, None
def _mark_email_as_registered(self):
"""标记邮箱为已注册状态OpenAI 侧已存在该账号)"""
@@ -560,331 +520,18 @@ class RegistrationEngine:
self._log(f"创建账户失败: {e}", "error")
return False
def _get_workspace_id(self) -> Optional[str]:
"""获取 Workspace ID"""
try:
auth_cookie = self.session.cookies.get("oai-client-auth-session")
if not auth_cookie:
self._log("未能获取到授权 Cookie", "error")
return None
# 解码 JWT
import base64
import json as json_module
try:
segments = auth_cookie.split(".")
if len(segments) < 1:
self._log("授权 Cookie 格式错误", "error")
return None
# 解码第一个 segment
payload = segments[0]
pad = "=" * ((4 - (len(payload) % 4)) % 4)
decoded = base64.urlsafe_b64decode((payload + pad).encode("ascii"))
auth_json = json_module.loads(decoded.decode("utf-8"))
workspaces = auth_json.get("workspaces") or []
if not workspaces:
self._log("授权 Cookie 里没有 workspace 信息", "error")
return None
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
if not workspace_id:
self._log("无法解析 workspace_id", "error")
return None
self._log(f"Workspace ID: {workspace_id}")
return workspace_id
except Exception as e:
self._log(f"解析授权 Cookie 失败: {e}", "error")
return None
except Exception as e:
self._log(f"获取 Workspace ID 失败: {e}", "error")
return None
def _select_workspace(self, workspace_id: str) -> Optional[str]:
"""选择 Workspace"""
try:
select_body = f'{{"workspace_id":"{workspace_id}"}}'
response = self.session.post(
OPENAI_API_ENDPOINTS["select_workspace"],
headers={
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
"content-type": "application/json",
},
data=select_body,
)
if response.status_code != 200:
self._log(f"选择 workspace 失败: {response.status_code}", "error")
self._log(f"响应: {response.text[:200]}", "warning")
return None
continue_url = str((response.json() or {}).get("continue_url") or "").strip()
if not continue_url:
self._log("workspace/select 响应里缺少 continue_url", "error")
return None
self._log(f"Continue URL: {continue_url[:100]}...")
return continue_url
except Exception as e:
self._log(f"选择 Workspace 失败: {e}", "error")
return None
def _follow_redirects(self, start_url: str) -> Optional[str]:
"""跟随重定向链,寻找回调 URL"""
try:
current_url = start_url
max_redirects = 6
for i in range(max_redirects):
self._log(f"重定向 {i+1}/{max_redirects}: {current_url[:100]}...")
response = self.session.get(
current_url,
allow_redirects=False,
timeout=15
)
location = response.headers.get("Location") or ""
# 如果不是重定向状态码,停止
if response.status_code not in [301, 302, 303, 307, 308]:
self._log(f"非重定向状态码: {response.status_code}")
break
if not location:
self._log("重定向响应缺少 Location 头")
break
# 构建下一个 URL
import urllib.parse
next_url = urllib.parse.urljoin(current_url, location)
# 检查是否包含回调参数
if "code=" in next_url and "state=" in next_url:
self._log(f"找到回调 URL: {next_url[:100]}...")
return next_url
current_url = next_url
self._log("未能在重定向链中找到回调 URL", "error")
return None
except Exception as e:
self._log(f"跟随重定向失败: {e}", "error")
return None
def _handle_oauth_callback(self, callback_url: str) -> Optional[Dict[str, Any]]:
"""处理 OAuth 回调"""
try:
if not self.oauth_start:
self._log("OAuth 流程未初始化", "error")
return None
self._log("处理 OAuth 回调...")
token_info = self.oauth_manager.handle_callback(
callback_url=callback_url,
expected_state=self.oauth_start.state,
code_verifier=self.oauth_start.code_verifier
)
self._log("OAuth 授权成功")
return token_info
except Exception as e:
self._log(f"处理 OAuth 回调失败: {e}", "error")
return None
def run(self) -> RegistrationResult:
"""
执行完整的注册流程
支持已注册账号自动登录:
- 如果检测到邮箱已注册,自动切换到登录流程
- 已注册账号跳过:设置密码、发送验证码、创建用户账户
- 共用步骤获取验证码、验证验证码、Workspace 和 OAuth 回调
Returns:
RegistrationResult: 注册结果
"""
result = RegistrationResult(success=False, logs=self.logs)
try:
self._log("=" * 60)
self._log("开始注册流程")
self._log("=" * 60)
# 1. 检查 IP 地理位置
self._log("1. 检查 IP 地理位置...")
ip_ok, location = self._check_ip_location()
if not ip_ok:
result.error_message = f"IP 地理位置不支持: {location}"
self._log(f"IP 检查失败: {location}", "error")
return result
self._log(f"IP 位置: {location}")
# 2. 创建邮箱
self._log("2. 创建邮箱...")
if not self._create_email():
result.error_message = "创建邮箱失败"
return result
result.email = self.email
# 3. 初始化会话
self._log("3. 初始化会话...")
if not self._init_session():
result.error_message = "初始化会话失败"
return result
# 4. 开始 OAuth 流程
self._log("4. 开始 OAuth 授权流程...")
if not self._start_oauth():
result.error_message = "开始 OAuth 流程失败"
return result
# 5. 获取 Device ID
self._log("5. 获取 Device ID...")
did = self._get_device_id()
if not did:
result.error_message = "获取 Device ID 失败"
return result
# 6. 检查 Sentinel 拦截
self._log("6. 检查 Sentinel 拦截...")
sen_token = self._check_sentinel(did)
if sen_token:
self._log("Sentinel 检查通过")
else:
self._log("Sentinel 检查失败或未启用", "warning")
# 7. 提交注册表单 + 解析响应判断账号状态
self._log("7. 提交注册表单...")
signup_result = self._submit_signup_form(did, sen_token)
if not signup_result.success:
result.error_message = f"提交注册表单失败: {signup_result.error_message}"
return result
# 8. 检测到已注册账号 → 直接终止任务
if self._is_existing_account:
self._log(f"8. 邮箱 {self.email} 在 OpenAI 已注册,跳过注册流程", "warning")
result.error_message = f"邮箱 {self.email} 已在 OpenAI 注册"
return result
elif self.email_service.service_type.value == "outlook":
# Outlook 邮箱注册跳过密码提交环节,直接生成密码备用
self._log("8. Outlook 邮箱,跳过密码注册环节")
self.password = self._generate_password()
else:
self._log("8. 注册密码...")
password_ok, password = self._register_password()
if not password_ok:
result.error_message = "注册密码失败"
return result
# 9. 发送验证码Outlook 邮箱跳过,由邮箱本身接收)
if self.email_service.service_type.value != "outlook":
self._log("9. 发送验证码...")
if not self._send_verification_code():
result.error_message = "发送验证码失败"
return result
else:
self._log("9. Outlook 邮箱,跳过发送验证码环节")
self._otp_sent_at = time.time() # 记录时间戳,供后续按时间范围搜索邮件
# 10. 获取验证码
self._log("10. 等待验证码...")
code = self._get_verification_code()
if not code:
result.error_message = "获取验证码失败"
return result
# 11. 验证验证码
self._log("11. 验证验证码...")
if not self._validate_verification_code(code):
result.error_message = "验证验证码失败"
return result
# 12. 创建用户账户
self._log("12. 创建用户账户...")
if not self._create_user_account():
result.error_message = "创建用户账户失败"
return result
# 13. 获取 Workspace ID
self._log("13. 获取 Workspace ID...")
workspace_id = self._get_workspace_id()
if not workspace_id:
result.error_message = "获取 Workspace ID 失败"
return result
result.workspace_id = workspace_id
# 14. 选择 Workspace
self._log("14. 选择 Workspace...")
continue_url = self._select_workspace(workspace_id)
if not continue_url:
result.error_message = "选择 Workspace 失败"
return result
# 15. 跟随重定向链
self._log("15. 跟随重定向链...")
callback_url = self._follow_redirects(continue_url)
if not callback_url:
result.error_message = "跟随重定向链失败"
return result
# 16. 处理 OAuth 回调
self._log("16. 处理 OAuth 回调...")
token_info = self._handle_oauth_callback(callback_url)
if not token_info:
result.error_message = "处理 OAuth 回调失败"
return result
# 提取账户信息
result.account_id = token_info.get("account_id", "")
result.access_token = token_info.get("access_token", "")
result.refresh_token = token_info.get("refresh_token", "")
result.id_token = token_info.get("id_token", "")
result.password = self.password or "" # 保存密码(已注册账号为空)
# 设置来源标记
result.source = "register"
# 尝试获取 session_token 从 cookie
session_cookie = self.session.cookies.get("__Secure-next-auth.session-token")
if session_cookie:
self.session_token = session_cookie
result.session_token = session_cookie
self._log(f"获取到 Session Token")
# 17. 完成
self._log("=" * 60)
self._log("注册成功!")
self._log(f"邮箱: {result.email}")
self._log(f"Account ID: {result.account_id}")
self._log(f"Workspace ID: {result.workspace_id}")
self._log("=" * 60)
result.success = True
result.metadata = {
"email_service": self.email_service.service_type.value,
"proxy_used": self.proxy_url,
"registered_at": datetime.now().isoformat(),
}
return result
except Exception as e:
self._log(f"注册过程中发生未预期错误: {e}", "error")
result.error_message = str(e)
return result
def _add_phone(self) -> bool:
"""获取 手机验证码"""
phone_body = f'{{"code":"{code}"}}'
response = self.session.post(
OPENAI_API_ENDPOINTS["add_phone"],
headers={
"referer": "https://auth.openai.com/api/accounts/create_account",
"accept": "application/json",
"content-type": "application/json",
},
data=create_account_body,
)
def save_to_database(self, result: RegistrationResult) -> bool:
"""

View File

@@ -15,6 +15,7 @@ import base64
import re
import uuid
from datetime import datetime, timedelta
from html.parser import HTMLParser
from typing import Any, Dict, List, Optional, Union, Callable
from pathlib import Path
@@ -568,3 +569,49 @@ class Timer:
if self.start_time is not None:
return time.time() - self.start_time
return 0.0
class BootstrapExtractor(HTMLParser):
"""内部解析器,专门提取 id="client-bootstrap" 的 script 内容"""
def __init__(self):
super().__init__()
self._in_target = False
self.json_text = None
def handle_starttag(self, tag, attrs):
if tag == 'script':
attrs_dict = dict(attrs)
if attrs_dict.get('id') == 'client-bootstrap':
self._in_target = True
def handle_endtag(self, tag):
if tag == 'script' and self._in_target:
self._in_target = False
def handle_data(self, data):
if self._in_target and self.json_text is None:
self.json_text = data.strip()
def extract_client_bootstrap_json(html: str):
"""
从 HTML 字符串中提取 id="client-bootstrap" 的 script 标签内容并解析为 JSON。
返回 dict 或 None未找到或解析失败
"""
parser = BootstrapExtractor()
parser.feed(html)
if parser.json_text:
try:
return json.loads(parser.json_text)
except json.JSONDecodeError:
return None
return None
def base64_payload_decode(payload_b64):
import base64
import json as json_module
padding = 4 - (len(payload_b64) % 4)
if padding != 4:
payload_b64 += '=' * padding
# 解码Base64URL 使用 - 和 _ 替代 + 和 /
payload_bytes = base64.urlsafe_b64decode(payload_b64)
return json_module.loads(payload_bytes)

View File

@@ -193,56 +193,69 @@ class IMAPNewProvider(OutlookProvider):
count: int = 20,
only_unseen: bool = True,
since_minutes: Optional[int] = None,
folders: Optional[List[str]] = None,
) -> List[EmailMessage]:
"""
获取最近的邮件。
获取最近的邮件,支持多文件夹搜索(合并去重)
搜索策略:
- since_minutes 指定时:用 SINCE 日期 + ALL 搜索最近N分钟内的邮件不受已读/未读限制)
- only_unseen=True 且未指定 since_minutes搜索 UNSEEN
- only_unseen=False 且未指定 since_minutes搜索全部取最近 count 封)
- folders 默认为 ["INBOX"],可传入多个文件夹(如 ["INBOX", "Junk Email"]
"""
if not self._connected:
if not self.connect():
return []
try:
self._conn.select("INBOX", readonly=True)
if folders is None:
folders = ["INBOX"]
if since_minutes is not None:
# 按时间范围搜索SINCE 某天IMAP 只支持按天,精度为天)
since_dt = datetime.now(timezone.utc) - timedelta(minutes=since_minutes)
since_str = since_dt.strftime("%d-%b-%Y")
status, data = self._conn.search(None, f"SINCE {since_str}")
elif only_unseen:
status, data = self._conn.search(None, "UNSEEN")
else:
status, data = self._conn.search(None, "ALL")
all_emails: List[EmailMessage] = []
seen_ids: set = set()
if status != "OK" or not data or not data[0]:
return []
for folder in folders:
try:
status, _ = self._conn.select(folder, readonly=True)
if status != "OK":
logger.debug(f"[{self.account.email}] 文件夹 {folder} 不存在或无法访问,跳过")
continue
ids = data[0].split()
recent_ids = ids[-count:][::-1] # 取最新的 count 封,倒序(最新在前)
if since_minutes is not None:
since_dt = datetime.now(timezone.utc) - timedelta(minutes=since_minutes)
since_str = since_dt.strftime("%d-%b-%Y")
status, data = self._conn.search(None, f"SINCE {since_str}")
elif only_unseen:
status, data = self._conn.search(None, "UNSEEN")
else:
status, data = self._conn.search(None, "ALL")
emails = []
for msg_id in recent_ids:
try:
msg = self._fetch_email(msg_id)
if msg:
emails.append(msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}")
if status != "OK" or not data or not data[0]:
continue
return emails
ids = data[0].split()
recent_ids = ids[-count:][::-1] # 取最新的 count 封,倒序(最新在前)
except Exception as e:
self.record_failure(str(e))
logger.error(f"[{self.account.email}] 获取邮件失败: {e}")
_imap_pool.invalidate(self.account.email)
self._connected = False
self._conn = None
return []
for msg_id in recent_ids:
try:
msg = self._fetch_email(msg_id)
if msg and msg.id not in seen_ids:
seen_ids.add(msg.id)
all_emails.append(msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}, folder: {folder}): {e}")
except Exception as e:
self.record_failure(str(e))
logger.warning(f"[{self.account.email}] 搜索文件夹 {folder} 失败: {e}")
_imap_pool.invalidate(self.account.email)
self._connected = False
self._conn = None
break
# 按收信时间降序排列,截取 count 封
all_emails.sort(key=lambda m: m.received_timestamp, reverse=True)
return all_emails[:count]
def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]:
"""获取并解析单封邮件"""

View File

@@ -21,6 +21,9 @@ from .providers.imap_new import IMAPNewProvider
logger = logging.getLogger(__name__)
# 验证码搜索的文件夹列表(同时搜索收件箱和垃圾箱)
_OUTLOOK_SEARCH_FOLDERS = ["INBOX", "Junk Email"]
def _get_code_settings() -> dict:
settings = get_settings()
@@ -157,6 +160,7 @@ class OutlookService(BaseEmailService):
only_unseen: bool = True,
since_minutes: Optional[int] = None,
use_cache: bool = False,
folders: Optional[List[str]] = None,
) -> List[EmailMessage]:
"""通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存"""
if use_cache:
@@ -173,7 +177,7 @@ class OutlookService(BaseEmailService):
with self._imap_semaphore:
with provider:
emails = provider.get_recent_emails(
count, only_unseen, since_minutes=since_minutes
count, only_unseen, since_minutes=since_minutes, folders=folders
)
if emails:
@@ -269,20 +273,21 @@ class OutlookService(BaseEmailService):
start_time = time.time()
poll_count = 0
# 计算 since_minutes从发送时间前2分钟开始搜索最多查180分钟
since_minutes: Optional[int] = None
if otp_sent_at:
elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2
since_minutes = min(elapsed_since_send, 180)
while time.time() - start_time < timeout:
poll_count += 1
# since_minutes 时用 SINCE 搜索(覆盖已读/未读否则前3次用 UNSEEN
only_unseen = (since_minutes is None) and (poll_count <= 3)
# 每次动态计算 since_minutes,确保时间窗口随轮询推进而更新
if otp_sent_at:
elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2
since_minutes: Optional[int] = min(elapsed_since_send, 180)
only_unseen = False
else:
since_minutes = None
only_unseen = poll_count <= 3
try:
emails = self._fetch_emails(
account, count=15, only_unseen=only_unseen,
since_minutes=since_minutes,
folders=_OUTLOOK_SEARCH_FOLDERS,
)
if emails:
code = self.email_parser.find_verification_code_in_emails(
@@ -334,7 +339,8 @@ class OutlookService(BaseEmailService):
with provider:
# 先做一次即时检查
emails = provider.get_recent_emails(
15, only_unseen=(since_minutes is None), since_minutes=since_minutes
15, only_unseen=(since_minutes is None), since_minutes=since_minutes,
folders=_OUTLOOK_SEARCH_FOLDERS,
)
code = self.email_parser.find_verification_code_in_emails(
emails,
@@ -361,7 +367,8 @@ class OutlookService(BaseEmailService):
# 没有 otp_sent_at 时用距当前时间2分钟内的邮件
fetch_since = 2
emails = provider.get_recent_emails(
15, only_unseen=False, since_minutes=fetch_since
15, only_unseen=False, since_minutes=fetch_since,
folders=_OUTLOOK_SEARCH_FOLDERS,
)
code = self.email_parser.find_verification_code_in_emails(
emails,

View File

@@ -15,7 +15,8 @@ from pydantic import BaseModel, Field
from ...database import crud
from ...database.session import get_db
from ...database.models import RegistrationTask, Proxy
from ...core.register import RegistrationEngine, RegistrationResult
from ...core.login import LoginEngine
from ...core.register import RegistrationResult
from ...services import EmailServiceFactory, EmailServiceType
from ...config.settings import get_settings
from ..task_manager import task_manager
@@ -395,7 +396,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
# 创建注册引擎 - 使用 TaskManager 的日志回调
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id)
engine = RegistrationEngine(
engine = LoginEngine(
email_service=email_service,
proxy_url=actual_proxy_url,
callback_logger=log_callback,

14
tests/test_decode.py Normal file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @Time : 2026/3/21 14:48
from src.core.utils import base64_payload_decode, base64_decode
if __name__ == '__main__':
print(base64_payload_decode("eyJzZXNzaW9uX2lkIjoiYXV0aHNlc3NfcUE5eFByY3RaZmtHWXJnSlJGdUpxRXBPIiwiY291bnRyeV9jb2RlX2hpbnQiOiJVUyIsImF1dGhfc2Vzc2lvbl9sb2dnaW5nX2lkIjoiMTk0ZDg5OGQtM2Q0ZC00MzU5LWI1NTQtYmJjMjc1YTJlYjU1IiwicHJvbW8iOiIiLCJzaWdudXBfc291cmNlIjoiIiwib3BlbmFpX2NsaWVudF9pZCI6ImFwcF9FTW9hbUVFWjczZjBDa1hhWHA3aHJhbm4iLCJhcHBfbmFtZV9lbnVtIjoib2FpY2xpIiwiYWFzX2VuYWJsZWQiOmZhbHNlLCJvcmlnaW5hbF9zY3JlZW5faGludCI6ImxvZ2luIiwicGFzc3dvcmRsZXNzX2Rpc2FibGVkIjpmYWxzZSwicGFzc3dvcmRsZXNzX290cF9mcm9tX3Bhc3N3b3JkX3JlZGlyZWN0IjpmYWxzZSwiZW1haWxfdmVyaWZpY2F0aW9uX21vZGUiOiJwYXNzd29yZGxlc3NfbG9naW4iLCJlbWFpbCI6Imxob2xsYW5kNTcwQGdzb2xleWZveWxlLm9yZy51ayIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJuYW1lIjoiZXJyIiwid29ya3NwYWNlcyI6W3siaWQiOiI4NjhmZGNmYi1kNjI3LTRhZTItYTQ4Mi1jMTQxMjA1MGZhYTYiLCJuYW1lIjpudWxsLCJraW5kIjoicGVyc29uYWwiLCJwcm9maWxlX3BpY3R1cmVfYWx0X3RleHQiOiJlcnIifV19"))