refactor(core): 移除旧版CLI和主程序文件

本次提交删除了两个旧版本的核心入口文件:
1. cli.py (170行) - 旧版命令行接口,包含数据库初始化、邮箱服务创建和注册循环逻辑
2. main.py (619行) - 旧版主程序,包含完整的Tempmail.lol集成、OAuth授权流程和注册逻辑

这些文件已被重构后的模块化架构替代,新架构将功能拆分为独立的服务层、核心引擎和数据库模块,提高了代码的可维护性和可测试性。
This commit is contained in:
cnlimiter
2026-03-15 19:27:13 +08:00
parent 2f2fb51764
commit 45503102a6
2 changed files with 0 additions and 789 deletions

170
cli.py
View File

@@ -1,170 +0,0 @@
"""
命令行入口 - 保持向后兼容性
"""
import argparse
import json
import random
import time
import logging
from datetime import datetime
from typing import Optional
from src.core.utils import setup_logging, get_data_dir
from src.core.register import RegistrationEngine
from src.services import EmailServiceFactory, EmailServiceType
from src.database.init_db import initialize_database
from src.config.settings import get_settings
def setup_database():
"""初始化数据库"""
try:
initialize_database()
print("[Info] 数据库初始化完成")
return True
except Exception as e:
print(f"[Error] 数据库初始化失败: {e}")
return False
def create_tempmail_service(proxy_url: Optional[str] = None):
"""创建 Tempmail 服务"""
config = {
"base_url": "https://api.tempmail.lol/v2",
"timeout": 30,
"max_retries": 3,
"proxy_url": proxy_url,
}
try:
service = EmailServiceFactory.create(
EmailServiceType.TEMPMAIL,
config,
name="tempmail_cli"
)
print("[Info] Tempmail 服务创建成功")
return service
except Exception as e:
print(f"[Error] 创建 Tempmail 服务失败: {e}")
return None
def run_registration(proxy: Optional[str] = None) -> Optional[dict]:
"""
执行一次注册流程
Args:
proxy: 代理地址
Returns:
注册结果字典,如果失败返回 None
"""
# 创建邮箱服务
email_service = create_tempmail_service(proxy)
if not email_service:
return None
# 创建注册引擎
engine = RegistrationEngine(
email_service=email_service,
proxy_url=proxy,
callback_logger=lambda msg: print(msg)
)
# 执行注册
result = engine.run()
if result.success:
# 保存到数据库
engine.save_to_database(result)
# 保存到文件(保持向后兼容)
try:
t_data = {
"id_token": result.id_token,
"access_token": result.access_token,
"refresh_token": result.refresh_token,
"account_id": result.account_id,
"last_refresh": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"email": result.email,
"type": "codex",
"expired": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") # 简化处理
}
fname_email = result.email.replace("@", "_")
file_name = f"token_{fname_email}_{int(time.time())}.json"
with open(file_name, "w", encoding="utf-8") as f:
json.dump(t_data, f, ensure_ascii=False, separators=(",", ":"))
print(f"[*] 成功! Token 已保存至: {file_name}")
except Exception as e:
print(f"[Warning] 保存 Token 文件失败: {e}")
return result.to_dict()
else:
print(f"[-] 本次注册失败: {result.error_message}")
return None
def main() -> None:
"""主函数"""
parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (重构版本)")
parser.add_argument(
"--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890"
)
parser.add_argument("--once", action="store_true", help="只运行一次")
parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
parser.add_argument(
"--sleep-max", type=int, default=30, help="循环模式最长等待秒数"
)
parser.add_argument("--log-level", default="INFO", help="日志级别")
parser.add_argument("--log-file", help="日志文件路径")
args = parser.parse_args()
# 配置日志
setup_logging(
log_level=args.log_level,
log_file=args.log_file
)
# 初始化数据库
if not setup_database():
return
# 参数验证
sleep_min = max(1, args.sleep_min)
sleep_max = max(sleep_min, args.sleep_max)
count = 0
print("[Info] OpenAI Auto-Registrar")
while True:
count += 1
print(
f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<"
)
try:
result = run_registration(args.proxy)
if result:
print(f"[*] 注册成功! 邮箱: {result.get('email')}")
else:
print("[-] 本次注册失败。")
except Exception as e:
print(f"[Error] 发生未捕获异常: {e}")
if args.once:
break
wait_time = random.randint(sleep_min, sleep_max)
print(f"[*] 休息 {wait_time} 秒...")
time.sleep(wait_time)
if __name__ == "__main__":
main()

619
main.py
View File

@@ -1,619 +0,0 @@
import argparse
import base64
import hashlib
import json
import random
import re
import secrets
import string
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from curl_cffi import requests
# ==========================================
# Tempmail.lol API (v2)
# ==========================================
TEMPMAIL_BASE = "https://api.tempmail.lol/v2"
def get_email_and_token(proxies: Any = None) -> tuple[str, str]:
"""创建 Tempmail.lol 邮箱并获取 token"""
try:
# 创建新的 inbox
resp = requests.post(
f"{TEMPMAIL_BASE}/inbox/create",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json={},
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if resp.status_code not in (200, 201):
print(f"[Error] Tempmail.lol 请求失败,状态码: {resp.status_code}")
return "", ""
data = resp.json()
email = str(data.get("address", "")).strip()
token = str(data.get("token", "")).strip()
if not email or not token:
print("[Error] Tempmail.lol 返回数据不完整")
return "", ""
return email, token
except Exception as e:
print(f"[Error] 创建 Tempmail.lol 邮箱出错: {e}")
return "", ""
def get_oai_code(token: str, email: str, proxies: Any = None) -> str:
"""使用 Tempmail.lol token 轮询获取验证码"""
regex = r"(?<!\d)(\d{6})(?!\d)"
seen_ids: set[int] = set()
print(f"[*] 正在等待邮箱 {email} 的验证码...", end="", flush=True)
for _ in range(40):
print(".", end="", flush=True)
try:
# 获取邮件列表
resp = requests.get(
f"{TEMPMAIL_BASE}/inbox",
params={"token": token},
headers={"Accept": "application/json"},
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if resp.status_code != 200:
time.sleep(3)
continue
data = resp.json()
# 检查 inbox 是否过期
if data is None or (isinstance(data, dict) and not data):
print(" 邮箱已过期")
return ""
email_list = data.get("emails", []) if isinstance(data, dict) else []
if not isinstance(email_list, list):
time.sleep(3)
continue
for msg in email_list:
if not isinstance(msg, dict):
continue
# 使用 date 作为唯一标识(因为可能没有 id
msg_date = msg.get("date", 0)
if not msg_date or msg_date in seen_ids:
continue
seen_ids.add(msg_date)
sender = str(msg.get("from", "")).lower()
subject = str(msg.get("subject", ""))
body = str(msg.get("body", ""))
html = str(msg.get("html") or "")
content = "\n".join([sender, subject, body, html])
# 检查是否是目标邮件
if "openai" not in sender and "openai" not in content.lower():
continue
# 提取验证码
m = re.search(regex, content)
if m:
print(" 抓到啦! 验证码:", m.group(1))
return m.group(1)
except Exception as e:
pass
time.sleep(3)
print(" 超时,未收到验证码")
return ""
# ==========================================
# OAuth 授权与辅助函数
# ==========================================
AUTH_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
DEFAULT_REDIRECT_URI = f"http://localhost:1455/auth/callback"
DEFAULT_SCOPE = "openid email profile offline_access"
def _b64url_no_pad(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _sha256_b64url_no_pad(s: str) -> str:
return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
def _random_state(nbytes: int = 16) -> str:
return secrets.token_urlsafe(nbytes)
def _pkce_verifier() -> str:
return secrets.token_urlsafe(64)
def _parse_callback_url(callback_url: str) -> Dict[str, str]:
candidate = callback_url.strip()
if not candidate:
return {"code": "", "state": "", "error": "", "error_description": ""}
if "://" not in candidate:
if candidate.startswith("?"):
candidate = f"http://localhost{candidate}"
elif any(ch in candidate for ch in "/?#") or ":" in candidate:
candidate = f"http://{candidate}"
elif "=" in candidate:
candidate = f"http://localhost/?{candidate}"
parsed = urllib.parse.urlparse(candidate)
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
for key, values in fragment.items():
if key not in query or not query[key] or not (query[key][0] or "").strip():
query[key] = values
def get1(k: str) -> str:
v = query.get(k, [""])
return (v[0] or "").strip()
code = get1("code")
state = get1("state")
error = get1("error")
error_description = get1("error_description")
if code and not state and "#" in code:
code, state = code.split("#", 1)
if not error and error_description:
error, error_description = error_description, ""
return {
"code": code,
"state": state,
"error": error,
"error_description": error_description,
}
def _jwt_claims_no_verify(id_token: str) -> Dict[str, Any]:
if not id_token or id_token.count(".") < 2:
return {}
payload_b64 = id_token.split(".")[1]
pad = "=" * ((4 - (len(payload_b64) % 4)) % 4)
try:
payload = base64.urlsafe_b64decode((payload_b64 + pad).encode("ascii"))
return json.loads(payload.decode("utf-8"))
except Exception:
return {}
def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
raw = (seg or "").strip()
if not raw:
return {}
pad = "=" * ((4 - (len(raw) % 4)) % 4)
try:
decoded = base64.urlsafe_b64decode((raw + pad).encode("ascii"))
return json.loads(decoded.decode("utf-8"))
except Exception:
return {}
def _to_int(v: Any) -> int:
try:
return int(v)
except (TypeError, ValueError):
return 0
def _generate_password(length: int = 12) -> str:
"""生成指定长度的随机密码(包含大小写字母和数字)"""
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
body = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(
url,
data=body,
method="POST",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if resp.status != 200:
raise RuntimeError(
f"token exchange failed: {resp.status}: {raw.decode('utf-8', 'replace')}"
)
return json.loads(raw.decode("utf-8"))
except urllib.error.HTTPError as exc:
raw = exc.read()
raise RuntimeError(
f"token exchange failed: {exc.code}: {raw.decode('utf-8', 'replace')}"
) from exc
@dataclass(frozen=True)
class OAuthStart:
auth_url: str
state: str
code_verifier: str
redirect_uri: str
def generate_oauth_url(
*, redirect_uri: str = DEFAULT_REDIRECT_URI, scope: str = DEFAULT_SCOPE
) -> OAuthStart:
state = _random_state()
code_verifier = _pkce_verifier()
code_challenge = _sha256_b64url_no_pad(code_verifier)
params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": redirect_uri,
"scope": scope,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "login",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
}
auth_url = f"{AUTH_URL}?{urllib.parse.urlencode(params)}"
return OAuthStart(
auth_url=auth_url,
state=state,
code_verifier=code_verifier,
redirect_uri=redirect_uri,
)
def submit_callback_url(
*,
callback_url: str,
expected_state: str,
code_verifier: str,
redirect_uri: str = DEFAULT_REDIRECT_URI,
) -> str:
cb = _parse_callback_url(callback_url)
if cb["error"]:
desc = cb["error_description"]
raise RuntimeError(f"oauth error: {cb['error']}: {desc}".strip())
if not cb["code"]:
raise ValueError("callback url missing ?code=")
if not cb["state"]:
raise ValueError("callback url missing ?state=")
if cb["state"] != expected_state:
raise ValueError("state mismatch")
token_resp = _post_form(
TOKEN_URL,
{
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": cb["code"],
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
},
)
access_token = (token_resp.get("access_token") or "").strip()
refresh_token = (token_resp.get("refresh_token") or "").strip()
id_token = (token_resp.get("id_token") or "").strip()
expires_in = _to_int(token_resp.get("expires_in"))
claims = _jwt_claims_no_verify(id_token)
email = str(claims.get("email") or "").strip()
auth_claims = claims.get("https://api.openai.com/auth") or {}
account_id = str(auth_claims.get("chatgpt_account_id") or "").strip()
now = int(time.time())
expired_rfc3339 = time.strftime(
"%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))
)
now_rfc3339 = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now))
config = {
"id_token": id_token,
"access_token": access_token,
"refresh_token": refresh_token,
"account_id": account_id,
"last_refresh": now_rfc3339,
"email": email,
"type": "codex",
"expired": expired_rfc3339,
}
return json.dumps(config, ensure_ascii=False, separators=(",", ":"))
# ==========================================
# 核心注册逻辑
# ==========================================
def run(proxy: Optional[str]) -> Optional[str]:
proxies: Any = None
if proxy:
proxies = {"http": proxy, "https": proxy}
s = requests.Session(proxies=proxies, impersonate="chrome")
try:
trace = s.get("https://cloudflare.com/cdn-cgi/trace", timeout=10)
trace = trace.text
loc_re = re.search(r"^loc=(.+)$", trace, re.MULTILINE)
loc = loc_re.group(1) if loc_re else None
print(f"[*] 当前 IP 所在地: {loc}")
if loc == "CN" or loc == "HK":
raise RuntimeError("检查代理哦w - 所在地不支持")
except Exception as e:
print(f"[Error] 网络连接检查失败: {e}")
return None
email, dev_token = get_email_and_token(proxies)
if not email or not dev_token:
return None
print(f"[*] 成功获取 Tempmail.lol 邮箱与授权: {email}")
oauth = generate_oauth_url()
url = oauth.auth_url
try:
resp = s.get(url, timeout=15)
did = s.cookies.get("oai-did")
print(f"[*] Device ID: {did}")
signup_body = f'{{"username":{{"value":"{email}","kind":"email"}},"screen_hint":"signup"}}'
sen_req_body = f'{{"p":"","id":"{did}","flow":"authorize_continue"}}'
sen_resp = requests.post(
"https://sentinel.openai.com/backend-api/sentinel/req",
headers={
"origin": "https://sentinel.openai.com",
"referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
"content-type": "text/plain;charset=UTF-8",
},
data=sen_req_body,
proxies=proxies,
impersonate="chrome",
timeout=15,
)
if sen_resp.status_code != 200:
print(f"[Error] Sentinel 异常拦截,状态码: {sen_resp.status_code}")
return None
sen_token = sen_resp.json()["token"]
sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}'
signup_resp = s.post(
"https://auth.openai.com/api/accounts/authorize/continue",
headers={
"referer": "https://auth.openai.com/create-account",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": sentinel,
},
data=signup_body,
)
print(f"[*] 提交注册表单状态: {signup_resp.status_code}")
# 生成密码
password = _generate_password()
print(f"[*] 生成密码: {password}")
# 提交密码和邮箱
register_body = json.dumps({
"password": password,
"username": email
})
pwd_resp = s.post(
"https://auth.openai.com/api/accounts/user/register",
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
"content-type": "application/json",
},
data=register_body,
)
print(f"[*] 提交密码状态: {pwd_resp.status_code}")
# 发送邮箱验证码
otp_resp = s.get(
"https://auth.openai.com/api/accounts/email-otp/send",
headers={
"referer": "https://auth.openai.com/create-account/password",
"accept": "application/json",
},
)
print(f"[*] 验证码发送状态: {otp_resp.status_code}")
code = get_oai_code(dev_token, email, proxies)
if not code:
return None
code_body = f'{{"code":"{code}"}}'
code_resp = s.post(
"https://auth.openai.com/api/accounts/email-otp/validate",
headers={
"referer": "https://auth.openai.com/email-verification",
"accept": "application/json",
"content-type": "application/json",
},
data=code_body,
)
print(f"[*] 验证码校验状态: {code_resp.status_code}")
create_account_body = '{"name":"Neo","birthdate":"2000-02-20"}'
create_account_resp = s.post(
"https://auth.openai.com/api/accounts/create_account",
headers={
"referer": "https://auth.openai.com/about-you",
"accept": "application/json",
"content-type": "application/json",
},
data=create_account_body,
)
create_account_status = create_account_resp.status_code
print(f"[*] 账户创建状态: {create_account_status}")
if create_account_status != 200:
print(create_account_resp.text)
return None
auth_cookie = s.cookies.get("oai-client-auth-session")
if not auth_cookie:
print("[Error] 未能获取到授权 Cookie")
return None
auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
workspaces = auth_json.get("workspaces") or []
if not workspaces:
print("[Error] 授权 Cookie 里没有 workspace 信息")
return None
workspace_id = str((workspaces[0] or {}).get("id") or "").strip()
if not workspace_id:
print("[Error] 无法解析 workspace_id")
return None
select_body = f'{{"workspace_id":"{workspace_id}"}}'
select_resp = s.post(
"https://auth.openai.com/api/accounts/workspace/select",
headers={
"referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent",
"content-type": "application/json",
},
data=select_body,
)
if select_resp.status_code != 200:
print(f"[Error] 选择 workspace 失败,状态码: {select_resp.status_code}")
print(select_resp.text)
return None
continue_url = str((select_resp.json() or {}).get("continue_url") or "").strip()
if not continue_url:
print("[Error] workspace/select 响应里缺少 continue_url")
return None
current_url = continue_url
for _ in range(6):
final_resp = s.get(current_url, allow_redirects=False, timeout=15)
location = final_resp.headers.get("Location") or ""
if final_resp.status_code not in [301, 302, 303, 307, 308]:
break
if not location:
break
next_url = urllib.parse.urljoin(current_url, location)
if "code=" in next_url and "state=" in next_url:
return submit_callback_url(
callback_url=next_url,
code_verifier=oauth.code_verifier,
redirect_uri=oauth.redirect_uri,
expected_state=oauth.state,
)
current_url = next_url
print("[Error] 未能在重定向链中捕获到最终 Callback URL")
return None
except Exception as e:
print(f"[Error] 运行时发生错误: {e}")
return None
def main() -> None:
parser = argparse.ArgumentParser(description="OpenAI 自动注册脚本 (Tempmail.lol 版本)")
parser.add_argument(
"--proxy", default=None, help="代理地址,如 http://127.0.0.1:7890"
)
parser.add_argument("--once", action="store_true", help="只运行一次")
parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
parser.add_argument(
"--sleep-max", type=int, default=30, help="循环模式最长等待秒数"
)
args = parser.parse_args()
sleep_min = max(1, args.sleep_min)
sleep_max = max(sleep_min, args.sleep_max)
count = 0
print("[Info] Yasal's Seamless OpenAI Auto-Registrar Started for ZJH (Tempmail.lol Edition)")
while True:
count += 1
print(
f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<"
)
try:
token_json = run(args.proxy)
if token_json:
try:
t_data = json.loads(token_json)
fname_email = t_data.get("email", "unknown").replace("@", "_")
except Exception:
fname_email = "unknown"
file_name = f"token_{fname_email}_{int(time.time())}.json"
with open(file_name, "w", encoding="utf-8") as f:
f.write(token_json)
print(f"[*] 成功! Token 已保存至: {file_name}")
else:
print("[-] 本次注册失败。")
except Exception as e:
print(f"[Error] 发生未捕获异常: {e}")
if args.once:
break
wait_time = random.randint(sleep_min, sleep_max)
print(f"[*] 休息 {wait_time} 秒...")
time.sleep(wait_time)
if __name__ == "__main__":
main()