diff --git a/cli.py b/cli.py deleted file mode 100644 index c58bfa9..0000000 --- a/cli.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -命令行入口 - 保持向后兼容性 -""" - -import argparse -import json -import random -import time -import logging -from datetime import datetime -from typing import Optional - -from src.core.utils import setup_logging, get_data_dir -from src.core.register import RegistrationEngine -from src.services import EmailServiceFactory, EmailServiceType -from src.database.init_db import initialize_database -from src.config.settings import get_settings - - -def setup_database(): - """初始化数据库""" - try: - initialize_database() - print("[Info] 数据库初始化完成") - return True - except Exception as e: - print(f"[Error] 数据库初始化失败: {e}") - return False - - -def create_tempmail_service(proxy_url: Optional[str] = None): - """创建 Tempmail 服务""" - config = { - "base_url": "https://api.tempmail.lol/v2", - "timeout": 30, - "max_retries": 3, - "proxy_url": proxy_url, - } - - try: - service = EmailServiceFactory.create( - EmailServiceType.TEMPMAIL, - config, - name="tempmail_cli" - ) - print("[Info] Tempmail 服务创建成功") - return service - except Exception as e: - print(f"[Error] 创建 Tempmail 服务失败: {e}") - return None - - -def run_registration(proxy: Optional[str] = None) -> Optional[dict]: - """ - 执行一次注册流程 - - Args: - proxy: 代理地址 - - Returns: - 注册结果字典,如果失败返回 None - """ - # 创建邮箱服务 - email_service = create_tempmail_service(proxy) - if not email_service: - return None - - # 创建注册引擎 - engine = RegistrationEngine( - email_service=email_service, - proxy_url=proxy, - callback_logger=lambda msg: print(msg) - ) - - # 执行注册 - result = engine.run() - - if result.success: - # 保存到数据库 - engine.save_to_database(result) - - # 保存到文件(保持向后兼容) - try: - t_data = { - "id_token": result.id_token, - "access_token": result.access_token, - "refresh_token": result.refresh_token, - "account_id": result.account_id, - "last_refresh": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), - "email": result.email, - "type": "codex", - "expired": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") # 简化处理 - } - - fname_email = result.email.replace("@", "_") - file_name = f"token_{fname_email}_{int(time.time())}.json" - - with open(file_name, "w", encoding="utf-8") as f: - json.dump(t_data, f, ensure_ascii=False, separators=(",", ":")) - - print(f"[*] 成功! Token 已保存至: {file_name}") - - except Exception as e: - print(f"[Warning] 保存 Token 文件失败: {e}") - - return result.to_dict() - else: - print(f"[-] 本次注册失败: {result.error_message}") - return None - - -def main() -> None: - """主函数""" - parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (重构版本)") - parser.add_argument( - "--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890" - ) - parser.add_argument("--once", action="store_true", help="只运行一次") - parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数") - parser.add_argument( - "--sleep-max", type=int, default=30, help="循环模式最长等待秒数" - ) - parser.add_argument("--log-level", default="INFO", help="日志级别") - parser.add_argument("--log-file", help="日志文件路径") - args = parser.parse_args() - - # 配置日志 - setup_logging( - log_level=args.log_level, - log_file=args.log_file - ) - - # 初始化数据库 - if not setup_database(): - return - - # 参数验证 - sleep_min = max(1, args.sleep_min) - sleep_max = max(sleep_min, args.sleep_max) - - count = 0 - print("[Info] OpenAI Auto-Registrar") - - while True: - count += 1 - print( - f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<" - ) - - try: - result = run_registration(args.proxy) - - if result: - print(f"[*] 注册成功! 邮箱: {result.get('email')}") - else: - print("[-] 本次注册失败。") - - except Exception as e: - print(f"[Error] 发生未捕获异常: {e}") - - if args.once: - break - - wait_time = random.randint(sleep_min, sleep_max) - print(f"[*] 休息 {wait_time} 秒...") - time.sleep(wait_time) - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 0ff6f70..0000000 --- a/main.py +++ /dev/null @@ -1,619 +0,0 @@ -import argparse -import base64 -import hashlib -import json -import random -import re -import secrets -import string -import time -import urllib.error -import urllib.parse -import urllib.request -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Dict, Optional - -from curl_cffi import requests - -# ========================================== -# Tempmail.lol API (v2) -# ========================================== - -TEMPMAIL_BASE = "https://api.tempmail.lol/v2" - - -def get_email_and_token(proxies: Any = None) -> tuple[str, str]: - """创建 Tempmail.lol 邮箱并获取 token""" - try: - # 创建新的 inbox - resp = requests.post( - f"{TEMPMAIL_BASE}/inbox/create", - headers={ - "Accept": "application/json", - "Content-Type": "application/json", - }, - json={}, - proxies=proxies, - impersonate="chrome", - timeout=15, - ) - - if resp.status_code not in (200, 201): - print(f"[Error] Tempmail.lol 请求失败,状态码: {resp.status_code}") - return "", "" - - data = resp.json() - email = str(data.get("address", "")).strip() - token = str(data.get("token", "")).strip() - - if not email or not token: - print("[Error] Tempmail.lol 返回数据不完整") - return "", "" - - return email, token - - except Exception as e: - print(f"[Error] 创建 Tempmail.lol 邮箱出错: {e}") - return "", "" - - -def get_oai_code(token: str, email: str, proxies: Any = None) -> str: - """使用 Tempmail.lol token 轮询获取验证码""" - regex = r"(? str: - return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") - - -def _sha256_b64url_no_pad(s: str) -> str: - return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) - - -def _random_state(nbytes: int = 16) -> str: - return secrets.token_urlsafe(nbytes) - - -def _pkce_verifier() -> str: - return secrets.token_urlsafe(64) - - -def _parse_callback_url(callback_url: str) -> Dict[str, str]: - candidate = callback_url.strip() - if not candidate: - return {"code": "", "state": "", "error": "", "error_description": ""} - - if "://" not in candidate: - if candidate.startswith("?"): - candidate = f"http://localhost{candidate}" - elif any(ch in candidate for ch in "/?#") or ":" in candidate: - candidate = f"http://{candidate}" - elif "=" in candidate: - candidate = f"http://localhost/?{candidate}" - - parsed = urllib.parse.urlparse(candidate) - query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) - fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) - - for key, values in fragment.items(): - if key not in query or not query[key] or not (query[key][0] or "").strip(): - query[key] = values - - def get1(k: str) -> str: - v = query.get(k, [""]) - return (v[0] or "").strip() - - code = get1("code") - state = get1("state") - error = get1("error") - error_description = get1("error_description") - - if code and not state and "#" in code: - code, state = code.split("#", 1) - - if not error and error_description: - error, error_description = error_description, "" - - return { - "code": code, - "state": state, - "error": error, - "error_description": error_description, - } - - -def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]: - if not id_token or id_token.count(".") < 2: - return {} - payload_b64 = id_token.split(".")[1] - pad = "=" * ((4 - (len(payload_b64) % 4)) % 4) - try: - payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii")) - return json.loads(payload.decode("utf-8")) - except Exception: - return {} - - -def _decode_jwt_segment(seg: str) -> Dict[str, Any]: - raw = (seg or "").strip() - if not raw: - return {} - pad = "=" * ((4 - (len(raw) % 4)) % 4) - try: - decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii")) - return json.loads(decoded.decode("utf-8")) - except Exception: - return {} - - -def _to_int(v: Any) -> int: - try: - return int(v) - except (TypeError, ValueError): - return 0 - - -def _generate_password(length: int = 12) -> str: - """生成指定长度的随机密码(包含大小写字母和数字)""" - chars = string.ascii_letters + string.digits - return ''.join(secrets.choice(chars) for _ in range(length)) - - -def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: - body = urllib.parse.urlencode(data).encode("utf-8") - req = urllib.request.Request( - url, - data=body, - method="POST", - headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Accept": "application/json", - }, - ) - try: - with urllib.request.urlopen(req, timeout=timeout) as resp: - raw = resp.read() - if resp.status != 200: - raise RuntimeError( - f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}" - ) - return json.loads(raw.decode("utf-8")) - except urllib.error.HTTPError as exc: - raw = exc.read() - raise RuntimeError( - f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}" - ) from exc - - -@dataclass(frozen=True) -class OAuthStart: - auth_url: str - state: str - code_verifier: str - redirect_uri: str - - -def generate_oauth_url( - *, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE -) -> OAuthStart: - state = _random_state() - code_verifier = _pkce_verifier() - code_challenge = _sha256_b64url_no_pad(code_verifier) - - params = { - "client_id": CLIENT_ID, - "response_type": "code", - "redirect_uri": redirect_uri, - "scope": scope, - "state": state, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - "prompt": "login", - "id_token_add_organizations": "true", - "codex_cli_simplified_flow": "true", - } - auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}" - return OAuthStart( - auth_url=auth_url, - state=state, - code_verifier=code_verifier, - redirect_uri=redirect_uri, - ) - - -def submit_callback_url( - *, - callback_url: str, - expected_state: str, - code_verifier: str, - redirect_uri: str = DEFAULT_REDIRECT_URI, -) -> str: - cb = _parse_callback_url(callback_url) - if cb["error"]: - desc = cb["error_description"] - raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip()) - - if not cb["code"]: - raise ValueError("callback url missing ?code=") - if not cb["state"]: - raise ValueError("callback url missing ?state=") - if cb["state"] != expected_state: - raise ValueError("state mismatch") - - token_resp = _post_form( - TOKEN_URL, - { - "grant_type": "authorization_code", - "client_id": CLIENT_ID, - "code": cb["code"], - "redirect_uri": redirect_uri, - "code_verifier": code_verifier, - }, - ) - - access_token = (token_resp.get("access_token") or "").strip() - refresh_token = (token_resp.get("refresh_token") or "").strip() - id_token = (token_resp.get("id_token") or "").strip() - expires_in = _to_int(token_resp.get("expires_in")) - - claims = _jwt_claims_no_verify(id_token) - email = str(claims.get("email") or "").strip() - auth_claims = claims.get("https://api.openai.com/auth") or {} - account_id = str(auth_claims.get("chatgpt_account_id") or "").strip() - - now = int(time.time()) - expired_rfc3339 = time.strftime( - "%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0)) - ) - now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)) - - config = { - "id_token": id_token, - "access_token": access_token, - "refresh_token": refresh_token, - "account_id": account_id, - "last_refresh": now_rfc3339, - "email": email, - "type": "codex", - "expired": expired_rfc3339, - } - - return json.dumps(config, ensure_ascii=False, separators=(",", ":")) - - -# ========================================== -# 核心注册逻辑 -# ========================================== - - -def run(proxy: Optional[str]) -> Optional[str]: - proxies: Any = None - if proxy: - proxies = {"http": proxy, "https": proxy} - - s = requests.Session(proxies=proxies, impersonate="chrome") - - try: - trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10) - trace = trace.text - loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE) - loc = loc_re.group(1) if loc_re else None - print(f"[*] 当前 IP 所在地: {loc}") - if loc == "CN" or loc == "HK": - raise RuntimeError("检查代理哦w - 所在地不支持") - except Exception as e: - print(f"[Error] 网络连接检查失败: {e}") - return None - - email, dev_token = get_email_and_token(proxies) - if not email or not dev_token: - return None - print(f"[*] 成功获取 Tempmail.lol 邮箱与授权: {email}") - - oauth = generate_oauth_url() - url = oauth.auth_url - - try: - resp = s.get(url, timeout=15) - did = s.cookies.get("oai-did") - print(f"[*] Device ID: {did}") - - signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}' - sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}' - - sen_resp = requests.post( - "https://sentinel.openai.com/backend-api/sentinel/req", - headers={ - "origin": "https://sentinel.openai.com", - "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", - "content-type": "text/plain;charset=UTF-8", - }, - data=sen_req_body, - proxies=proxies, - impersonate="chrome", - timeout=15, - ) - - if sen_resp.status_code != 200: - print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}") - return None - - sen_token = sen_resp.json()["token"] - sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}' - - signup_resp = s.post( - "https://auth.openai.com/api/accounts/authorize/continue", - headers={ - "referer": "https://auth.openai.com/create-account", - "accept": "application/json", - "content-type": "application/json", - "openai-sentinel-token": sentinel, - }, - data=signup_body, - ) - print(f"[*] 提交注册表单状态: {signup_resp.status_code}") - - # 生成密码 - password = _generate_password() - print(f"[*] 生成密码: {password}") - - # 提交密码和邮箱 - register_body = json.dumps({ - "password": password, - "username": email - }) - - pwd_resp = s.post( - "https://auth.openai.com/api/accounts/user/register", - headers={ - "referer": "https://auth.openai.com/create-account/password", - "accept": "application/json", - "content-type": "application/json", - }, - data=register_body, - ) - print(f"[*] 提交密码状态: {pwd_resp.status_code}") - - # 发送邮箱验证码 - otp_resp = s.get( - "https://auth.openai.com/api/accounts/email-otp/send", - headers={ - "referer": "https://auth.openai.com/create-account/password", - "accept": "application/json", - }, - ) - print(f"[*] 验证码发送状态: {otp_resp.status_code}") - - code = get_oai_code(dev_token, email, proxies) - if not code: - return None - - code_body = f'{{"code":"{code}"}}' - code_resp = s.post( - "https://auth.openai.com/api/accounts/email-otp/validate", - headers={ - "referer": "https://auth.openai.com/email-verification", - "accept": "application/json", - "content-type": "application/json", - }, - data=code_body, - ) - print(f"[*] 验证码校验状态: {code_resp.status_code}") - - create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}' - create_account_resp = s.post( - "https://auth.openai.com/api/accounts/create_account", - headers={ - "referer": "https://auth.openai.com/about-you", - "accept": "application/json", - "content-type": "application/json", - }, - data=create_account_body, - ) - create_account_status = create_account_resp.status_code - print(f"[*] 账户创建状态: {create_account_status}") - - if create_account_status != 200: - print(create_account_resp.text) - return None - - auth_cookie = s.cookies.get("oai-client-auth-session") - if not auth_cookie: - print("[Error] 未能获取到授权 Cookie") - return None - - auth_json = _decode_jwt_segment(auth_cookie.split(".")[0]) - workspaces = auth_json.get("workspaces") or [] - if not workspaces: - print("[Error] 授权 Cookie 里没有 workspace 信息") - return None - workspace_id = str((workspaces[0] or {}).get("id") or "").strip() - if not workspace_id: - print("[Error] 无法解析 workspace_id") - return None - - select_body = f'{{"workspace_id":"{workspace_id}"}}' - select_resp = s.post( - "https://auth.openai.com/api/accounts/workspace/select", - headers={ - "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", - "content-type": "application/json", - }, - data=select_body, - ) - - if select_resp.status_code != 200: - print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}") - print(select_resp.text) - return None - - continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip() - if not continue_url: - print("[Error] workspace/select 响应里缺少 continue_url") - return None - - current_url = continue_url - for _ in range(6): - final_resp = s.get(current_url, allow_redirects=False, timeout=15) - location = final_resp.headers.get("Location") or "" - - if final_resp.status_code not in [301, 302, 303, 307, 308]: - break - if not location: - break - - next_url = urllib.parse.urljoin(current_url, location) - if "code=" in next_url and "state=" in next_url: - return submit_callback_url( - callback_url=next_url, - code_verifier=oauth.code_verifier, - redirect_uri=oauth.redirect_uri, - expected_state=oauth.state, - ) - current_url = next_url - - print("[Error] 未能在重定向链中捕获到最终 Callback URL") - return None - - except Exception as e: - print(f"[Error] 运行时发生错误: {e}") - return None - - -def main() -> None: - parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (Tempmail.lol 版本)") - parser.add_argument( - "--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890" - ) - parser.add_argument("--once", action="store_true", help="只运行一次") - parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数") - parser.add_argument( - "--sleep-max", type=int, default=30, help="循环模式最长等待秒数" - ) - args = parser.parse_args() - - sleep_min = max(1, args.sleep_min) - sleep_max = max(sleep_min, args.sleep_max) - - count = 0 - print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH (Tempmail.lol Edition)") - - while True: - count += 1 - print( - f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<" - ) - - try: - token_json = run(args.proxy) - - if token_json: - try: - t_data = json.loads(token_json) - fname_email = t_data.get("email", "unknown").replace("@", "_") - except Exception: - fname_email = "unknown" - - file_name = f"token_{fname_email}_{int(time.time())}.json" - - with open(file_name, "w", encoding="utf-8") as f: - f.write(token_json) - - print(f"[*] 成功! Token 已保存至: {file_name}") - else: - print("[-] 本次注册失败。") - - except Exception as e: - print(f"[Error] 发生未捕获异常: {e}") - - if args.once: - break - - wait_time = random.randint(sleep_min, sleep_max) - print(f"[*] 休息 {wait_time} 秒...") - time.sleep(wait_time) - - -if __name__ == "__main__": - main()