mirror of
https://github.com/cnlimiter/codex-register.git
synced 2026-05-07 06:22:56 +08:00
620 lines
20 KiB
Python
620 lines
20 KiB
Python
import argparse
|
||
import base64
|
||
import hashlib
|
||
import json
|
||
import random
|
||
import re
|
||
import secrets
|
||
import string
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from typing import Any, Dict, Optional
|
||
|
||
from curl_cffi import requests
|
||
|
||
# ==========================================
|
||
# Tempmail.lol API (v2)
|
||
# ==========================================
|
||
|
||
TEMPMAIL_BASE = "https://api.tempmail.lol/v2"
|
||
|
||
|
||
def get_email_and_token(proxies: Any = None) -> tuple[str, str]:
|
||
"""创建 Tempmail.lol 邮箱并获取 token"""
|
||
try:
|
||
# 创建新的 inbox
|
||
resp = requests.post(
|
||
f"{TEMPMAIL_BASE}/inbox/create",
|
||
headers={
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json",
|
||
},
|
||
json={},
|
||
proxies=proxies,
|
||
impersonate="chrome",
|
||
timeout=15,
|
||
)
|
||
|
||
if resp.status_code not in (200, 201):
|
||
print(f"[Error] Tempmail.lol 请求失败,状态码: {resp.status_code}")
|
||
return "", ""
|
||
|
||
data = resp.json()
|
||
email = str(data.get("address", "")).strip()
|
||
token = str(data.get("token", "")).strip()
|
||
|
||
if not email or not token:
|
||
print("[Error] Tempmail.lol 返回数据不完整")
|
||
return "", ""
|
||
|
||
return email, token
|
||
|
||
except Exception as e:
|
||
print(f"[Error] 创建 Tempmail.lol 邮箱出错: {e}")
|
||
return "", ""
|
||
|
||
|
||
def get_oai_code(token: str, email: str, proxies: Any = None) -> str:
|
||
"""使用 Tempmail.lol token 轮询获取验证码"""
|
||
regex = r"(?<!\d)(\d{6})(?!\d)"
|
||
seen_ids: set[int] = set()
|
||
|
||
print(f"[*] 正在等待邮箱 {email} 的验证码...", end="", flush=True)
|
||
|
||
for _ in range(40):
|
||
print(".", end="", flush=True)
|
||
try:
|
||
# 获取邮件列表
|
||
resp = requests.get(
|
||
f"{TEMPMAIL_BASE}/inbox",
|
||
params={"token": token},
|
||
headers={"Accept": "application/json"},
|
||
proxies=proxies,
|
||
impersonate="chrome",
|
||
timeout=15,
|
||
)
|
||
|
||
if resp.status_code != 200:
|
||
time.sleep(3)
|
||
continue
|
||
|
||
data = resp.json()
|
||
|
||
# 检查 inbox 是否过期
|
||
if data is None or (isinstance(data, dict) and not data):
|
||
print(" 邮箱已过期")
|
||
return ""
|
||
|
||
email_list = data.get("emails", []) if isinstance(data, dict) else []
|
||
|
||
if not isinstance(email_list, list):
|
||
time.sleep(3)
|
||
continue
|
||
|
||
for msg in email_list:
|
||
if not isinstance(msg, dict):
|
||
continue
|
||
|
||
# 使用 date 作为唯一标识(因为可能没有 id)
|
||
msg_date = msg.get("date", 0)
|
||
if not msg_date or msg_date in seen_ids:
|
||
continue
|
||
seen_ids.add(msg_date)
|
||
|
||
sender = str(msg.get("from", "")).lower()
|
||
subject = str(msg.get("subject", ""))
|
||
body = str(msg.get("body", ""))
|
||
html = str(msg.get("html") or "")
|
||
|
||
content = "\n".join([sender, subject, body, html])
|
||
|
||
# 检查是否是目标邮件
|
||
if "openai" not in sender and "openai" not in content.lower():
|
||
continue
|
||
|
||
# 提取验证码
|
||
m = re.search(regex, content)
|
||
if m:
|
||
print(" 抓到啦! 验证码:", m.group(1))
|
||
return m.group(1)
|
||
|
||
except Exception as e:
|
||
pass
|
||
|
||
time.sleep(3)
|
||
|
||
print(" 超时,未收到验证码")
|
||
return ""
|
||
|
||
|
||
# ==========================================
|
||
# OAuth 授权与辅助函数
|
||
# ==========================================
|
||
|
||
AUTH_URL = "https://auth.openai.com/oauth/authorize"
|
||
TOKEN_URL = "https://auth.openai.com/oauth/token"
|
||
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
|
||
|
||
DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
|
||
DEFAULT_SCOPE = "openid email profile offline_access"
|
||
|
||
|
||
def _b64url_no_pad(raw: bytes) -> str:
|
||
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
|
||
|
||
|
||
def _sha256_b64url_no_pad(s: str) -> str:
|
||
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
|
||
|
||
|
||
def _random_state(nbytes: int = 16) -> str:
|
||
return secrets.token_urlsafe(nbytes)
|
||
|
||
|
||
def _pkce_verifier() -> str:
|
||
return secrets.token_urlsafe(64)
|
||
|
||
|
||
def _parse_callback_url(callback_url: str) -> Dict[str, str]:
|
||
candidate = callback_url.strip()
|
||
if not candidate:
|
||
return {"code": "", "state": "", "error": "", "error_description": ""}
|
||
|
||
if "://" not in candidate:
|
||
if candidate.startswith("?"):
|
||
candidate = f"http://localhost{candidate}"
|
||
elif any(ch in candidate for ch in "/?#") or ":" in candidate:
|
||
candidate = f"http://{candidate}"
|
||
elif "=" in candidate:
|
||
candidate = f"http://localhost/?{candidate}"
|
||
|
||
parsed = urllib.parse.urlparse(candidate)
|
||
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
|
||
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
|
||
|
||
for key, values in fragment.items():
|
||
if key not in query or not query[key] or not (query[key][0] or "").strip():
|
||
query[key] = values
|
||
|
||
def get1(k: str) -> str:
|
||
v = query.get(k, [""])
|
||
return (v[0] or "").strip()
|
||
|
||
code = get1("code")
|
||
state = get1("state")
|
||
error = get1("error")
|
||
error_description = get1("error_description")
|
||
|
||
if code and not state and "#" in code:
|
||
code, state = code.split("#", 1)
|
||
|
||
if not error and error_description:
|
||
error, error_description = error_description, ""
|
||
|
||
return {
|
||
"code": code,
|
||
"state": state,
|
||
"error": error,
|
||
"error_description": error_description,
|
||
}
|
||
|
||
|
||
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
|
||
if not id_token or id_token.count(".") < 2:
|
||
return {}
|
||
payload_b64 = id_token.split(".")[1]
|
||
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
|
||
try:
|
||
payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
|
||
return json.loads(payload.decode("utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
|
||
raw = (seg or "").strip()
|
||
if not raw:
|
||
return {}
|
||
pad = "=" * ((4 - (len(raw) % 4)) % 4)
|
||
try:
|
||
decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii"))
|
||
return json.loads(decoded.decode("utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _to_int(v: Any) -> int:
|
||
try:
|
||
return int(v)
|
||
except (TypeError, ValueError):
|
||
return 0
|
||
|
||
|
||
def _generate_password(length: int = 12) -> str:
|
||
"""生成指定长度的随机密码(包含大小写字母和数字)"""
|
||
chars = string.ascii_letters + string.digits
|
||
return ''.join(secrets.choice(chars) for _ in range(length))
|
||
|
||
|
||
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
|
||
body = urllib.parse.urlencode(data).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
url,
|
||
data=body,
|
||
method="POST",
|
||
headers={
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Accept": "application/json",
|
||
},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read()
|
||
if resp.status != 200:
|
||
raise RuntimeError(
|
||
f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
|
||
)
|
||
return json.loads(raw.decode("utf-8"))
|
||
except urllib.error.HTTPError as exc:
|
||
raw = exc.read()
|
||
raise RuntimeError(
|
||
f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
|
||
) from exc
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class OAuthStart:
|
||
auth_url: str
|
||
state: str
|
||
code_verifier: str
|
||
redirect_uri: str
|
||
|
||
|
||
def generate_oauth_url(
|
||
*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE
|
||
) -> OAuthStart:
|
||
state = _random_state()
|
||
code_verifier = _pkce_verifier()
|
||
code_challenge = _sha256_b64url_no_pad(code_verifier)
|
||
|
||
params = {
|
||
"client_id": CLIENT_ID,
|
||
"response_type": "code",
|
||
"redirect_uri": redirect_uri,
|
||
"scope": scope,
|
||
"state": state,
|
||
"code_challenge": code_challenge,
|
||
"code_challenge_method": "S256",
|
||
"prompt": "login",
|
||
"id_token_add_organizations": "true",
|
||
"codex_cli_simplified_flow": "true",
|
||
}
|
||
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
|
||
return OAuthStart(
|
||
auth_url=auth_url,
|
||
state=state,
|
||
code_verifier=code_verifier,
|
||
redirect_uri=redirect_uri,
|
||
)
|
||
|
||
|
||
def submit_callback_url(
|
||
*,
|
||
callback_url: str,
|
||
expected_state: str,
|
||
code_verifier: str,
|
||
redirect_uri: str = DEFAULT_REDIRECT_URI,
|
||
) -> str:
|
||
cb = _parse_callback_url(callback_url)
|
||
if cb["error"]:
|
||
desc = cb["error_description"]
|
||
raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
|
||
|
||
if not cb["code"]:
|
||
raise ValueError("callback url missing ?code=")
|
||
if not cb["state"]:
|
||
raise ValueError("callback url missing ?state=")
|
||
if cb["state"] != expected_state:
|
||
raise ValueError("state mismatch")
|
||
|
||
token_resp = _post_form(
|
||
TOKEN_URL,
|
||
{
|
||
"grant_type": "authorization_code",
|
||
"client_id": CLIENT_ID,
|
||
"code": cb["code"],
|
||
"redirect_uri": redirect_uri,
|
||
"code_verifier": code_verifier,
|
||
},
|
||
)
|
||
|
||
access_token = (token_resp.get("access_token") or "").strip()
|
||
refresh_token = (token_resp.get("refresh_token") or "").strip()
|
||
id_token = (token_resp.get("id_token") or "").strip()
|
||
expires_in = _to_int(token_resp.get("expires_in"))
|
||
|
||
claims = _jwt_claims_no_verify(id_token)
|
||
email = str(claims.get("email") or "").strip()
|
||
auth_claims = claims.get("https://api.openai.com/auth") or {}
|
||
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
|
||
|
||
now = int(time.time())
|
||
expired_rfc3339 = time.strftime(
|
||
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
|
||
)
|
||
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
|
||
|
||
config = {
|
||
"id_token": id_token,
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"account_id": account_id,
|
||
"last_refresh": now_rfc3339,
|
||
"email": email,
|
||
"type": "codex",
|
||
"expired": expired_rfc3339,
|
||
}
|
||
|
||
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
|
||
# ==========================================
|
||
# 核心注册逻辑
|
||
# ==========================================
|
||
|
||
|
||
def run(proxy: Optional[str]) -> Optional[str]:
|
||
proxies: Any = None
|
||
if proxy:
|
||
proxies = {"http": proxy, "https": proxy}
|
||
|
||
s = requests.Session(proxies=proxies, impersonate="chrome")
|
||
|
||
try:
|
||
trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
|
||
trace = trace.text
|
||
loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
|
||
loc = loc_re.group(1) if loc_re else None
|
||
print(f"[*] 当前 IP 所在地: {loc}")
|
||
if loc == "CN" or loc == "HK":
|
||
raise RuntimeError("检查代理哦w - 所在地不支持")
|
||
except Exception as e:
|
||
print(f"[Error] 网络连接检查失败: {e}")
|
||
return None
|
||
|
||
email, dev_token = get_email_and_token(proxies)
|
||
if not email or not dev_token:
|
||
return None
|
||
print(f"[*] 成功获取 Tempmail.lol 邮箱与授权: {email}")
|
||
|
||
oauth = generate_oauth_url()
|
||
url = oauth.auth_url
|
||
|
||
try:
|
||
resp = s.get(url, timeout=15)
|
||
did = s.cookies.get("oai-did")
|
||
print(f"[*] Device ID: {did}")
|
||
|
||
signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
|
||
sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
|
||
|
||
sen_resp = requests.post(
|
||
"https://sentinel.openai.com/backend-api/sentinel/req",
|
||
headers={
|
||
"origin": "https://sentinel.openai.com",
|
||
"referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
|
||
"content-type": "text/plain;charset=UTF-8",
|
||
},
|
||
data=sen_req_body,
|
||
proxies=proxies,
|
||
impersonate="chrome",
|
||
timeout=15,
|
||
)
|
||
|
||
if sen_resp.status_code != 200:
|
||
print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}")
|
||
return None
|
||
|
||
sen_token = sen_resp.json()["token"]
|
||
sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}'
|
||
|
||
signup_resp = s.post(
|
||
"https://auth.openai.com/api/accounts/authorize/continue",
|
||
headers={
|
||
"referer": "https://auth.openai.com/create-account",
|
||
"accept": "application/json",
|
||
"content-type": "application/json",
|
||
"openai-sentinel-token": sentinel,
|
||
},
|
||
data=signup_body,
|
||
)
|
||
print(f"[*] 提交注册表单状态: {signup_resp.status_code}")
|
||
|
||
# 生成密码
|
||
password = _generate_password()
|
||
print(f"[*] 生成密码: {password}")
|
||
|
||
# 提交密码和邮箱
|
||
register_body = json.dumps({
|
||
"password": password,
|
||
"username": email
|
||
})
|
||
|
||
pwd_resp = s.post(
|
||
"https://auth.openai.com/api/accounts/user/register",
|
||
headers={
|
||
"referer": "https://auth.openai.com/create-account/password",
|
||
"accept": "application/json",
|
||
"content-type": "application/json",
|
||
},
|
||
data=register_body,
|
||
)
|
||
print(f"[*] 提交密码状态: {pwd_resp.status_code}")
|
||
|
||
# 发送邮箱验证码
|
||
otp_resp = s.get(
|
||
"https://auth.openai.com/api/accounts/email-otp/send",
|
||
headers={
|
||
"referer": "https://auth.openai.com/create-account/password",
|
||
"accept": "application/json",
|
||
},
|
||
)
|
||
print(f"[*] 验证码发送状态: {otp_resp.status_code}")
|
||
|
||
code = get_oai_code(dev_token, email, proxies)
|
||
if not code:
|
||
return None
|
||
|
||
code_body = f'{{"code":"{code}"}}'
|
||
code_resp = s.post(
|
||
"https://auth.openai.com/api/accounts/email-otp/validate",
|
||
headers={
|
||
"referer": "https://auth.openai.com/email-verification",
|
||
"accept": "application/json",
|
||
"content-type": "application/json",
|
||
},
|
||
data=code_body,
|
||
)
|
||
print(f"[*] 验证码校验状态: {code_resp.status_code}")
|
||
|
||
create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
|
||
create_account_resp = s.post(
|
||
"https://auth.openai.com/api/accounts/create_account",
|
||
headers={
|
||
"referer": "https://auth.openai.com/about-you",
|
||
"accept": "application/json",
|
||
"content-type": "application/json",
|
||
},
|
||
data=create_account_body,
|
||
)
|
||
create_account_status = create_account_resp.status_code
|
||
print(f"[*] 账户创建状态: {create_account_status}")
|
||
|
||
if create_account_status != 200:
|
||
print(create_account_resp.text)
|
||
return None
|
||
|
||
auth_cookie = s.cookies.get("oai-client-auth-session")
|
||
if not auth_cookie:
|
||
print("[Error] 未能获取到授权 Cookie")
|
||
return None
|
||
|
||
auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
|
||
workspaces = auth_json.get("workspaces") or []
|
||
if not workspaces:
|
||
print("[Error] 授权 Cookie 里没有 workspace 信息")
|
||
return None
|
||
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
|
||
if not workspace_id:
|
||
print("[Error] 无法解析 workspace_id")
|
||
return None
|
||
|
||
select_body = f'{{"workspace_id":"{workspace_id}"}}'
|
||
select_resp = s.post(
|
||
"https://auth.openai.com/api/accounts/workspace/select",
|
||
headers={
|
||
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
|
||
"content-type": "application/json",
|
||
},
|
||
data=select_body,
|
||
)
|
||
|
||
if select_resp.status_code != 200:
|
||
print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}")
|
||
print(select_resp.text)
|
||
return None
|
||
|
||
continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
|
||
if not continue_url:
|
||
print("[Error] workspace/select 响应里缺少 continue_url")
|
||
return None
|
||
|
||
current_url = continue_url
|
||
for _ in range(6):
|
||
final_resp = s.get(current_url, allow_redirects=False, timeout=15)
|
||
location = final_resp.headers.get("Location") or ""
|
||
|
||
if final_resp.status_code not in [301, 302, 303, 307, 308]:
|
||
break
|
||
if not location:
|
||
break
|
||
|
||
next_url = urllib.parse.urljoin(current_url, location)
|
||
if "code=" in next_url and "state=" in next_url:
|
||
return submit_callback_url(
|
||
callback_url=next_url,
|
||
code_verifier=oauth.code_verifier,
|
||
redirect_uri=oauth.redirect_uri,
|
||
expected_state=oauth.state,
|
||
)
|
||
current_url = next_url
|
||
|
||
print("[Error] 未能在重定向链中捕获到最终 Callback URL")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"[Error] 运行时发生错误: {e}")
|
||
return None
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (Tempmail.lol 版本)")
|
||
parser.add_argument(
|
||
"--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890"
|
||
)
|
||
parser.add_argument("--once", action="store_true", help="只运行一次")
|
||
parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
|
||
parser.add_argument(
|
||
"--sleep-max", type=int, default=30, help="循环模式最长等待秒数"
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
sleep_min = max(1, args.sleep_min)
|
||
sleep_max = max(sleep_min, args.sleep_max)
|
||
|
||
count = 0
|
||
print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH (Tempmail.lol Edition)")
|
||
|
||
while True:
|
||
count += 1
|
||
print(
|
||
f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<"
|
||
)
|
||
|
||
try:
|
||
token_json = run(args.proxy)
|
||
|
||
if token_json:
|
||
try:
|
||
t_data = json.loads(token_json)
|
||
fname_email = t_data.get("email", "unknown").replace("@", "_")
|
||
except Exception:
|
||
fname_email = "unknown"
|
||
|
||
file_name = f"token_{fname_email}_{int(time.time())}.json"
|
||
|
||
with open(file_name, "w", encoding="utf-8") as f:
|
||
f.write(token_json)
|
||
|
||
print(f"[*] 成功! Token 已保存至: {file_name}")
|
||
else:
|
||
print("[-] 本次注册失败。")
|
||
|
||
except Exception as e:
|
||
print(f"[Error] 发生未捕获异常: {e}")
|
||
|
||
if args.once:
|
||
break
|
||
|
||
wait_time = random.randint(sleep_min, sleep_max)
|
||
print(f"[*] 休息 {wait_time} 秒...")
|
||
time.sleep(wait_time)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|