Files
codex-register/main.py
cnlimiter dc1334fbab 1
2026-03-14 01:27:40 +08:00

620 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import base64
import hashlib
import json
import random
import re
import secrets
import string
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from curl_cffi import requests
# ==========================================
# Tempmail.lol API (v2)
# ==========================================
TEMPMAIL_BASE = "https://api.tempmail.lol/v2"
def get_email_and_token(proxies: Any = None) -> tuple[str, str]:
"""创建 Tempmail.lol 邮箱并获取 token"""
try:
# 创建新的 inbox
resp = requests.post(
f"{TEMPMAIL_BASE}/inbox/create",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json={},
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if resp.status_code not in (200, 201):
print(f"[Error] Tempmail.lol 请求失败,状态码: {resp.status_code}")
return "", ""
data = resp.json()
email = str(data.get("address", "")).strip()
token = str(data.get("token", "")).strip()
if not email or not token:
print("[Error] Tempmail.lol 返回数据不完整")
return "", ""
return email, token
except Exception as e:
print(f"[Error] 创建 Tempmail.lol 邮箱出错: {e}")
return "", ""
def get_oai_code(token: str, email: str, proxies: Any = None) -> str:
"""使用 Tempmail.lol token 轮询获取验证码"""
regex = r"(?<!\d)(\d{6})(?!\d)"
seen_ids: set[int] = set()
print(f"[*] 正在等待邮箱 {email} 的验证码...", end="", flush=True)
for _ in range(40):
print(".", end="", flush=True)
try:
# 获取邮件列表
resp = requests.get(
f"{TEMPMAIL_BASE}/inbox",
params={"token": token},
headers={"Accept": "application/json"},
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if resp.status_code != 200:
time.sleep(3)
continue
data = resp.json()
# 检查 inbox 是否过期
if data is None or (isinstance(data, dict) and not data):
print(" 邮箱已过期")
return ""
email_list = data.get("emails", []) if isinstance(data, dict) else []
if not isinstance(email_list, list):
time.sleep(3)
continue
for msg in email_list:
if not isinstance(msg, dict):
continue
# 使用 date 作为唯一标识(因为可能没有 id
msg_date = msg.get("date", 0)
if not msg_date or msg_date in seen_ids:
continue
seen_ids.add(msg_date)
sender = str(msg.get("from", "")).lower()
subject = str(msg.get("subject", ""))
body = str(msg.get("body", ""))
html = str(msg.get("html") or "")
content = "\n".join([sender, subject, body, html])
# 检查是否是目标邮件
if "openai" not in sender and "openai" not in content.lower():
continue
# 提取验证码
m = re.search(regex, content)
if m:
print(" 抓到啦! 验证码:", m.group(1))
return m.group(1)
except Exception as e:
pass
time.sleep(3)
print(" 超时,未收到验证码")
return ""
# ==========================================
# OAuth 授权与辅助函数
# ==========================================
AUTH_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
DEFAULT_SCOPE = "openid email profile offline_access"
def _b64url_no_pad(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _sha256_b64url_no_pad(s: str) -> str:
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
def _random_state(nbytes: int = 16) -> str:
return secrets.token_urlsafe(nbytes)
def _pkce_verifier() -> str:
return secrets.token_urlsafe(64)
def _parse_callback_url(callback_url: str) -> Dict[str, str]:
candidate = callback_url.strip()
if not candidate:
return {"code": "", "state": "", "error": "", "error_description": ""}
if "://" not in candidate:
if candidate.startswith("?"):
candidate = f"http://localhost{candidate}"
elif any(ch in candidate for ch in "/?#") or ":" in candidate:
candidate = f"http://{candidate}"
elif "=" in candidate:
candidate = f"http://localhost/?{candidate}"
parsed = urllib.parse.urlparse(candidate)
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
for key, values in fragment.items():
if key not in query or not query[key] or not (query[key][0] or "").strip():
query[key] = values
def get1(k: str) -> str:
v = query.get(k, [""])
return (v[0] or "").strip()
code = get1("code")
state = get1("state")
error = get1("error")
error_description = get1("error_description")
if code and not state and "#" in code:
code, state = code.split("#", 1)
if not error and error_description:
error, error_description = error_description, ""
return {
"code": code,
"state": state,
"error": error,
"error_description": error_description,
}
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
if not id_token or id_token.count(".") < 2:
return {}
payload_b64 = id_token.split(".")[1]
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
try:
payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
return json.loads(payload.decode("utf-8"))
except Exception:
return {}
def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
raw = (seg or "").strip()
if not raw:
return {}
pad = "=" * ((4 - (len(raw) % 4)) % 4)
try:
decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii"))
return json.loads(decoded.decode("utf-8"))
except Exception:
return {}
def _to_int(v: Any) -> int:
try:
return int(v)
except (TypeError, ValueError):
return 0
def _generate_password(length: int = 12) -> str:
"""生成指定长度的随机密码(包含大小写字母和数字)"""
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(
url,
data=body,
method="POST",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if resp.status != 200:
raise RuntimeError(
f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
)
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
raw = exc.read()
raise RuntimeError(
f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
) from exc
@dataclass(frozen=True)
class OAuthStart:
auth_url: str
state: str
code_verifier: str
redirect_uri: str
def generate_oauth_url(
*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE
) -> OAuthStart:
state = _random_state()
code_verifier = _pkce_verifier()
code_challenge = _sha256_b64url_no_pad(code_verifier)
params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": redirect_uri,
"scope": scope,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "login",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
}
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
return OAuthStart(
auth_url=auth_url,
state=state,
code_verifier=code_verifier,
redirect_uri=redirect_uri,
)
def submit_callback_url(
*,
callback_url: str,
expected_state: str,
code_verifier: str,
redirect_uri: str = DEFAULT_REDIRECT_URI,
) -> str:
cb = _parse_callback_url(callback_url)
if cb["error"]:
desc = cb["error_description"]
raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
if not cb["code"]:
raise ValueError("callback url missing ?code=")
if not cb["state"]:
raise ValueError("callback url missing ?state=")
if cb["state"] != expected_state:
raise ValueError("state mismatch")
token_resp = _post_form(
TOKEN_URL,
{
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": cb["code"],
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
},
)
access_token = (token_resp.get("access_token") or "").strip()
refresh_token = (token_resp.get("refresh_token") or "").strip()
id_token = (token_resp.get("id_token") or "").strip()
expires_in = _to_int(token_resp.get("expires_in"))
claims = _jwt_claims_no_verify(id_token)
email = str(claims.get("email") or "").strip()
auth_claims = claims.get("https://api.openai.com/auth") or {}
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
now = int(time.time())
expired_rfc3339 = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
)
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
config = {
"id_token": id_token,
"access_token": access_token,
"refresh_token": refresh_token,
"account_id": account_id,
"last_refresh": now_rfc3339,
"email": email,
"type": "codex",
"expired": expired_rfc3339,
}
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
# ==========================================
# 核心注册逻辑
# ==========================================
def run(proxy: Optional[str]) -> Optional[str]:
proxies: Any = None
if proxy:
proxies = {"http": proxy, "https": proxy}
s = requests.Session(proxies=proxies, impersonate="chrome")
try:
trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
trace = trace.text
loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
loc = loc_re.group(1) if loc_re else None
print(f"[*] 当前 IP 所在地: {loc}")
if loc == "CN" or loc == "HK":
raise RuntimeError("检查代理哦w - 所在地不支持")
except Exception as e:
print(f"[Error] 网络连接检查失败: {e}")
return None
email, dev_token = get_email_and_token(proxies)
if not email or not dev_token:
return None
print(f"[*] 成功获取 Tempmail.lol 邮箱与授权: {email}")
oauth = generate_oauth_url()
url = oauth.auth_url
try:
resp = s.get(url, timeout=15)
did = s.cookies.get("oai-did")
print(f"[*] Device ID: {did}")
signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
sen_resp = requests.post(
"https://sentinel.openai.com/backend-api/sentinel/req",
headers={
"origin": "https://sentinel.openai.com",
"referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
"content-type": "text/plain;charset=UTF-8",
},
data=sen_req_body,
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if sen_resp.status_code != 200:
print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}")
return None
sen_token = sen_resp.json()["token"]
sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}'
signup_resp = s.post(
"https://auth.openai.com/api/accounts/authorize/continue",
headers={
"referer": "https://auth.openai.com/create-account",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": sentinel,
},
data=signup_body,
)
print(f"[*] 提交注册表单状态: {signup_resp.status_code}")
# 生成密码
password = _generate_password()
print(f"[*] 生成密码: {password}")
# 提交密码和邮箱
register_body = json.dumps({
"password": password,
"username": email
})
pwd_resp = s.post(
"https://auth.openai.com/api/accounts/user/register",
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
"content-type": "application/json",
},
data=register_body,
)
print(f"[*] 提交密码状态: {pwd_resp.status_code}")
# 发送邮箱验证码
otp_resp = s.get(
"https://auth.openai.com/api/accounts/email-otp/send",
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
},
)
print(f"[*] 验证码发送状态: {otp_resp.status_code}")
code = get_oai_code(dev_token, email, proxies)
if not code:
return None
code_body = f'{{"code":"{code}"}}'
code_resp = s.post(
"https://auth.openai.com/api/accounts/email-otp/validate",
headers={
"referer": "https://auth.openai.com/email-verification",
"accept": "application/json",
"content-type": "application/json",
},
data=code_body,
)
print(f"[*] 验证码校验状态: {code_resp.status_code}")
create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
create_account_resp = s.post(
"https://auth.openai.com/api/accounts/create_account",
headers={
"referer": "https://auth.openai.com/about-you",
"accept": "application/json",
"content-type": "application/json",
},
data=create_account_body,
)
create_account_status = create_account_resp.status_code
print(f"[*] 账户创建状态: {create_account_status}")
if create_account_status != 200:
print(create_account_resp.text)
return None
auth_cookie = s.cookies.get("oai-client-auth-session")
if not auth_cookie:
print("[Error] 未能获取到授权 Cookie")
return None
auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
workspaces = auth_json.get("workspaces") or []
if not workspaces:
print("[Error] 授权 Cookie 里没有 workspace 信息")
return None
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
if not workspace_id:
print("[Error] 无法解析 workspace_id")
return None
select_body = f'{{"workspace_id":"{workspace_id}"}}'
select_resp = s.post(
"https://auth.openai.com/api/accounts/workspace/select",
headers={
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
"content-type": "application/json",
},
data=select_body,
)
if select_resp.status_code != 200:
print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}")
print(select_resp.text)
return None
continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
if not continue_url:
print("[Error] workspace/select 响应里缺少 continue_url")
return None
current_url = continue_url
for _ in range(6):
final_resp = s.get(current_url, allow_redirects=False, timeout=15)
location = final_resp.headers.get("Location") or ""
if final_resp.status_code not in [301, 302, 303, 307, 308]:
break
if not location:
break
next_url = urllib.parse.urljoin(current_url, location)
if "code=" in next_url and "state=" in next_url:
return submit_callback_url(
callback_url=next_url,
code_verifier=oauth.code_verifier,
redirect_uri=oauth.redirect_uri,
expected_state=oauth.state,
)
current_url = next_url
print("[Error] 未能在重定向链中捕获到最终 Callback URL")
return None
except Exception as e:
print(f"[Error] 运行时发生错误: {e}")
return None
def main() -> None:
parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (Tempmail.lol 版本)")
parser.add_argument(
"--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890"
)
parser.add_argument("--once", action="store_true", help="只运行一次")
parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
parser.add_argument(
"--sleep-max", type=int, default=30, help="循环模式最长等待秒数"
)
args = parser.parse_args()
sleep_min = max(1, args.sleep_min)
sleep_max = max(sleep_min, args.sleep_max)
count = 0
print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH (Tempmail.lol Edition)")
while True:
count += 1
print(
f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<"
)
try:
token_json = run(args.proxy)
if token_json:
try:
t_data = json.loads(token_json)
fname_email = t_data.get("email", "unknown").replace("@", "_")
except Exception:
fname_email = "unknown"
file_name = f"token_{fname_email}_{int(time.time())}.json"
with open(file_name, "w", encoding="utf-8") as f:
f.write(token_json)
print(f"[*] 成功! Token 已保存至: {file_name}")
else:
print("[-] 本次注册失败。")
except Exception as e:
print(f"[Error] 发生未捕获异常: {e}")
if args.once:
break
wait_time = random.randint(sleep_min, sleep_max)
print(f"[*] 休息 {wait_time} 秒...")
time.sleep(wait_time)
if __name__ == "__main__":
main()