mirror of
https://github.com/cnlimiter/codex-register.git
synced 2026-05-07 04:12:43 +08:00
104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Tempmail.lol API 探针。
|
||
|
||
用途:
|
||
1. 创建测试收件箱或复用现有 token。
|
||
2. 拉取 /inbox 原始 JSON 并原样打印。
|
||
3. 检查邮件对象里是否存在 received_at/date 等时间字段。
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
import time
|
||
from typing import Any, Dict, Iterable
|
||
|
||
import httpx
|
||
|
||
|
||
DEFAULT_BASE_URL = "https://api.tempmail.lol/v2"
|
||
TIME_FIELDS = ("received_at", "date", "created_at", "createdAt", "timestamp")
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="抓取 Tempmail.lol 收件箱原始 JSON")
|
||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Tempmail API 基础地址")
|
||
parser.add_argument("--token", help="已有 inbox token;未提供时自动创建新邮箱")
|
||
parser.add_argument("--poll-count", type=int, default=1, help="轮询次数")
|
||
parser.add_argument("--poll-interval", type=float, default=3.0, help="轮询间隔秒数")
|
||
parser.add_argument("--timeout", type=float, default=20.0, help="HTTP 超时时间")
|
||
return parser.parse_args()
|
||
|
||
|
||
def dump_json(title: str, payload: Dict[str, Any]) -> None:
|
||
print(f"\n===== {title} =====")
|
||
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
|
||
|
||
|
||
def summarize_time_fields(emails: Iterable[Dict[str, Any]]) -> None:
|
||
for index, message in enumerate(emails, start=1):
|
||
present_fields = {name: message.get(name) for name in TIME_FIELDS if name in message}
|
||
print(f"email[{index}] 时间字段: {json.dumps(present_fields, ensure_ascii=False, default=str)}")
|
||
|
||
|
||
def create_inbox(client: httpx.Client, base_url: str) -> Dict[str, Any]:
|
||
response = client.post(
|
||
f"{base_url}/inbox/create",
|
||
headers={
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json",
|
||
},
|
||
json={},
|
||
)
|
||
print(f"CREATE_STATUS {response.status_code}")
|
||
response.raise_for_status()
|
||
payload = response.json()
|
||
dump_json("CREATE_RESPONSE", payload)
|
||
return payload
|
||
|
||
|
||
def fetch_inbox(client: httpx.Client, base_url: str, token: str) -> Dict[str, Any]:
|
||
response = client.get(
|
||
f"{base_url}/inbox",
|
||
params={"token": token},
|
||
headers={"Accept": "application/json"},
|
||
)
|
||
print(f"INBOX_STATUS {response.status_code}")
|
||
response.raise_for_status()
|
||
payload = response.json()
|
||
dump_json("INBOX_RESPONSE", payload)
|
||
emails = payload.get("emails", []) if isinstance(payload, dict) else []
|
||
if isinstance(emails, list):
|
||
summarize_time_fields([mail for mail in emails if isinstance(mail, dict)])
|
||
else:
|
||
print(f"emails 字段不是列表: {type(emails).__name__}")
|
||
return payload
|
||
|
||
|
||
def main() -> int:
|
||
args = parse_args()
|
||
with httpx.Client(timeout=args.timeout) as client:
|
||
token = args.token
|
||
if not token:
|
||
inbox = create_inbox(client, args.base_url)
|
||
token = str(inbox.get("token", "")).strip()
|
||
address = str(inbox.get("address", "")).strip()
|
||
print(f"ADDRESS {address}")
|
||
print(f"TOKEN {token}")
|
||
|
||
if not token:
|
||
print("未拿到 token,无法继续拉取 inbox", file=sys.stderr)
|
||
return 1
|
||
|
||
for attempt in range(1, args.poll_count + 1):
|
||
print(f"\n----- poll {attempt}/{args.poll_count} -----")
|
||
fetch_inbox(client, args.base_url, token)
|
||
if attempt < args.poll_count:
|
||
time.sleep(args.poll_interval)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|