From dc1334fbab509f309112e6aaafd32ee88d0d9300 Mon Sep 17 00:00:00 2001 From: cnlimiter Date: Sat, 14 Mar 2026 01:27:40 +0800 Subject: [PATCH] 1 --- main.py | 619 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | Bin 0 -> 138 bytes 2 files changed, 619 insertions(+) create mode 100644 main.py create mode 100644 requirements.txt diff --git a/main.py b/main.py new file mode 100644 index 0000000..0ff6f70 --- /dev/null +++ b/main.py @@ -0,0 +1,619 @@ +import argparse +import base64 +import hashlib +import json +import random +import re +import secrets +import string +import time +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, Optional + +from curl_cffi import requests + +# ========================================== +# Tempmail.lol API (v2) +# ========================================== + +TEMPMAIL_BASE = "https://api.tempmail.lol/v2" + + +def get_email_and_token(proxies: Any = None) -> tuple[str, str]: + """创建 Tempmail.lol 邮箱并获取 token""" + try: + # 创建新的 inbox + resp = requests.post( + f"{TEMPMAIL_BASE}/inbox/create", + headers={ + "Accept": "application/json", + "Content-Type": "application/json", + }, + json={}, + proxies=proxies, + impersonate="chrome", + timeout=15, + ) + + if resp.status_code not in (200, 201): + print(f"[Error] Tempmail.lol 请求失败,状态码: {resp.status_code}") + return "", "" + + data = resp.json() + email = str(data.get("address", "")).strip() + token = str(data.get("token", "")).strip() + + if not email or not token: + print("[Error] Tempmail.lol 返回数据不完整") + return "", "" + + return email, token + + except Exception as e: + print(f"[Error] 创建 Tempmail.lol 邮箱出错: {e}") + return "", "" + + +def get_oai_code(token: str, email: str, proxies: Any = None) -> str: + """使用 Tempmail.lol token 轮询获取验证码""" + regex = r"(? str: + return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") + + +def _sha256_b64url_no_pad(s: str) -> str: + return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) + + +def _random_state(nbytes: int = 16) -> str: + return secrets.token_urlsafe(nbytes) + + +def _pkce_verifier() -> str: + return secrets.token_urlsafe(64) + + +def _parse_callback_url(callback_url: str) -> Dict[str, str]: + candidate = callback_url.strip() + if not candidate: + return {"code": "", "state": "", "error": "", "error_description": ""} + + if "://" not in candidate: + if candidate.startswith("?"): + candidate = f"http://localhost{candidate}" + elif any(ch in candidate for ch in "/?#") or ":" in candidate: + candidate = f"http://{candidate}" + elif "=" in candidate: + candidate = f"http://localhost/?{candidate}" + + parsed = urllib.parse.urlparse(candidate) + query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) + fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) + + for key, values in fragment.items(): + if key not in query or not query[key] or not (query[key][0] or "").strip(): + query[key] = values + + def get1(k: str) -> str: + v = query.get(k, [""]) + return (v[0] or "").strip() + + code = get1("code") + state = get1("state") + error = get1("error") + error_description = get1("error_description") + + if code and not state and "#" in code: + code, state = code.split("#", 1) + + if not error and error_description: + error, error_description = error_description, "" + + return { + "code": code, + "state": state, + "error": error, + "error_description": error_description, + } + + +def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]: + if not id_token or id_token.count(".") < 2: + return {} + payload_b64 = id_token.split(".")[1] + pad = "=" * ((4 - (len(payload_b64) % 4)) % 4) + try: + payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")) + return json.loads(payload.decode("utf-8")) + except Exception: + return {} + + +def _decode_jwt_segment(seg: str) -> Dict[str, Any]: + raw = (seg or "").strip() + if not raw: + return {} + pad = "=" * ((4 - (len(raw) % 4)) % 4) + try: + decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii")) + return json.loads(decoded.decode("utf-8")) + except Exception: + return {} + + +def _to_int(v: Any) -> int: + try: + return int(v) + except (TypeError, ValueError): + return 0 + + +def _generate_password(length: int = 12) -> str: + """生成指定长度的随机密码(包含大小写字母和数字)""" + chars = string.ascii_letters + string.digits + return ''.join(secrets.choice(chars) for _ in range(length)) + + +def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: + body = urllib.parse.urlencode(data).encode("utf-8") + req = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read() + if resp.status != 200: + raise RuntimeError( + f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}" + ) + return json.loads(raw.decode("utf-8")) + except urllib.error.HTTPError as exc: + raw = exc.read() + raise RuntimeError( + f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}" + ) from exc + + +@dataclass(frozen=True) +class OAuthStart: + auth_url: str + state: str + code_verifier: str + redirect_uri: str + + +def generate_oauth_url( + *, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE +) -> OAuthStart: + state = _random_state() + code_verifier = _pkce_verifier() + code_challenge = _sha256_b64url_no_pad(code_verifier) + + params = { + "client_id": CLIENT_ID, + "response_type": "code", + "redirect_uri": redirect_uri, + "scope": scope, + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "prompt": "login", + "id_token_add_organizations": "true", + "codex_cli_simplified_flow": "true", + } + auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}" + return OAuthStart( + auth_url=auth_url, + state=state, + code_verifier=code_verifier, + redirect_uri=redirect_uri, + ) + + +def submit_callback_url( + *, + callback_url: str, + expected_state: str, + code_verifier: str, + redirect_uri: str = DEFAULT_REDIRECT_URI, +) -> str: + cb = _parse_callback_url(callback_url) + if cb["error"]: + desc = cb["error_description"] + raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip()) + + if not cb["code"]: + raise ValueError("callback url missing ?code=") + if not cb["state"]: + raise ValueError("callback url missing ?state=") + if cb["state"] != expected_state: + raise ValueError("state mismatch") + + token_resp = _post_form( + TOKEN_URL, + { + "grant_type": "authorization_code", + "client_id": CLIENT_ID, + "code": cb["code"], + "redirect_uri": redirect_uri, + "code_verifier": code_verifier, + }, + ) + + access_token = (token_resp.get("access_token") or "").strip() + refresh_token = (token_resp.get("refresh_token") or "").strip() + id_token = (token_resp.get("id_token") or "").strip() + expires_in = _to_int(token_resp.get("expires_in")) + + claims = _jwt_claims_no_verify(id_token) + email = str(claims.get("email") or "").strip() + auth_claims = claims.get("https://api.openai.com/auth") or {} + account_id = str(auth_claims.get("chatgpt_account_id") or "").strip() + + now = int(time.time()) + expired_rfc3339 = time.strftime( + "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)) + ) + now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)) + + config = { + "id_token": id_token, + "access_token": access_token, + "refresh_token": refresh_token, + "account_id": account_id, + "last_refresh": now_rfc3339, + "email": email, + "type": "codex", + "expired": expired_rfc3339, + } + + return json.dumps(config, ensure_ascii=False, separators=(",", ":")) + + +# ========================================== +# 核心注册逻辑 +# ========================================== + + +def run(proxy: Optional[str]) -> Optional[str]: + proxies: Any = None + if proxy: + proxies = {"http": proxy, "https": proxy} + + s = requests.Session(proxies=proxies, impersonate="chrome") + + try: + trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10) + trace = trace.text + loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE) + loc = loc_re.group(1) if loc_re else None + print(f"[*] 当前 IP 所在地: {loc}") + if loc == "CN" or loc == "HK": + raise RuntimeError("检查代理哦w - 所在地不支持") + except Exception as e: + print(f"[Error] 网络连接检查失败: {e}") + return None + + email, dev_token = get_email_and_token(proxies) + if not email or not dev_token: + return None + print(f"[*] 成功获取 Tempmail.lol 邮箱与授权: {email}") + + oauth = generate_oauth_url() + url = oauth.auth_url + + try: + resp = s.get(url, timeout=15) + did = s.cookies.get("oai-did") + print(f"[*] Device ID: {did}") + + signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}' + sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}' + + sen_resp = requests.post( + "https://sentinel.openai.com/backend-api/sentinel/req", + headers={ + "origin": "https://sentinel.openai.com", + "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", + "content-type": "text/plain;charset=UTF-8", + }, + data=sen_req_body, + proxies=proxies, + impersonate="chrome", + timeout=15, + ) + + if sen_resp.status_code != 200: + print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}") + return None + + sen_token = sen_resp.json()["token"] + sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}' + + signup_resp = s.post( + "https://auth.openai.com/api/accounts/authorize/continue", + headers={ + "referer": "https://auth.openai.com/create-account", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": sentinel, + }, + data=signup_body, + ) + print(f"[*] 提交注册表单状态: {signup_resp.status_code}") + + # 生成密码 + password = _generate_password() + print(f"[*] 生成密码: {password}") + + # 提交密码和邮箱 + register_body = json.dumps({ + "password": password, + "username": email + }) + + pwd_resp = s.post( + "https://auth.openai.com/api/accounts/user/register", + headers={ + "referer": "https://auth.openai.com/create-account/password", + "accept": "application/json", + "content-type": "application/json", + }, + data=register_body, + ) + print(f"[*] 提交密码状态: {pwd_resp.status_code}") + + # 发送邮箱验证码 + otp_resp = s.get( + "https://auth.openai.com/api/accounts/email-otp/send", + headers={ + "referer": "https://auth.openai.com/create-account/password", + "accept": "application/json", + }, + ) + print(f"[*] 验证码发送状态: {otp_resp.status_code}") + + code = get_oai_code(dev_token, email, proxies) + if not code: + return None + + code_body = f'{{"code":"{code}"}}' + code_resp = s.post( + "https://auth.openai.com/api/accounts/email-otp/validate", + headers={ + "referer": "https://auth.openai.com/email-verification", + "accept": "application/json", + "content-type": "application/json", + }, + data=code_body, + ) + print(f"[*] 验证码校验状态: {code_resp.status_code}") + + create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}' + create_account_resp = s.post( + "https://auth.openai.com/api/accounts/create_account", + headers={ + "referer": "https://auth.openai.com/about-you", + "accept": "application/json", + "content-type": "application/json", + }, + data=create_account_body, + ) + create_account_status = create_account_resp.status_code + print(f"[*] 账户创建状态: {create_account_status}") + + if create_account_status != 200: + print(create_account_resp.text) + return None + + auth_cookie = s.cookies.get("oai-client-auth-session") + if not auth_cookie: + print("[Error] 未能获取到授权 Cookie") + return None + + auth_json = _decode_jwt_segment(auth_cookie.split(".")[0]) + workspaces = auth_json.get("workspaces") or [] + if not workspaces: + print("[Error] 授权 Cookie 里没有 workspace 信息") + return None + workspace_id = str((workspaces[0] or {}).get("id") or "").strip() + if not workspace_id: + print("[Error] 无法解析 workspace_id") + return None + + select_body = f'{{"workspace_id":"{workspace_id}"}}' + select_resp = s.post( + "https://auth.openai.com/api/accounts/workspace/select", + headers={ + "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", + "content-type": "application/json", + }, + data=select_body, + ) + + if select_resp.status_code != 200: + print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}") + print(select_resp.text) + return None + + continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip() + if not continue_url: + print("[Error] workspace/select 响应里缺少 continue_url") + return None + + current_url = continue_url + for _ in range(6): + final_resp = s.get(current_url, allow_redirects=False, timeout=15) + location = final_resp.headers.get("Location") or "" + + if final_resp.status_code not in [301, 302, 303, 307, 308]: + break + if not location: + break + + next_url = urllib.parse.urljoin(current_url, location) + if "code=" in next_url and "state=" in next_url: + return submit_callback_url( + callback_url=next_url, + code_verifier=oauth.code_verifier, + redirect_uri=oauth.redirect_uri, + expected_state=oauth.state, + ) + current_url = next_url + + print("[Error] 未能在重定向链中捕获到最终 Callback URL") + return None + + except Exception as e: + print(f"[Error] 运行时发生错误: {e}") + return None + + +def main() -> None: + parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (Tempmail.lol 版本)") + parser.add_argument( + "--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890" + ) + parser.add_argument("--once", action="store_true", help="只运行一次") + parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数") + parser.add_argument( + "--sleep-max", type=int, default=30, help="循环模式最长等待秒数" + ) + args = parser.parse_args() + + sleep_min = max(1, args.sleep_min) + sleep_max = max(sleep_min, args.sleep_max) + + count = 0 + print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH (Tempmail.lol Edition)") + + while True: + count += 1 + print( + f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<" + ) + + try: + token_json = run(args.proxy) + + if token_json: + try: + t_data = json.loads(token_json) + fname_email = t_data.get("email", "unknown").replace("@", "_") + except Exception: + fname_email = "unknown" + + file_name = f"token_{fname_email}_{int(time.time())}.json" + + with open(file_name, "w", encoding="utf-8") as f: + f.write(token_json) + + print(f"[*] 成功! Token 已保存至: {file_name}") + else: + print("[-] 本次注册失败。") + + except Exception as e: + print(f"[Error] 发生未捕获异常: {e}") + + if args.once: + break + + wait_time = random.randint(sleep_min, sleep_max) + print(f"[*] 休息 {wait_time} 秒...") + time.sleep(wait_time) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..5d1a46cb1d3ec1cc3dcc6d8d3173a09ff7a45bb7 GIT binary patch literal 138 zcmezWFPR~gp@^Y`A(J5uh;4z;h`|7e%^386BnX=_@G@{QBm?CUD)fMAKo}xZ3N#^y hAs$5yNZyda1g@%pp%N%xz>o-3TMV`nWS22q9so1|6r}(F literal 0 HcmV?d00001