Files
codex-register/src/services/freemail.py

333 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Freemail 邮箱服务实现
基于自部署 Cloudflare Worker 临时邮箱服务 (https://github.com/idinging/freemail)
"""
import re
import time
import logging
import random
import string
from typing import Optional, Dict, Any, List
from .base import BaseEmailService, EmailServiceError, EmailServiceType, RateLimitedEmailServiceError
from ..core.http_client import HTTPClient, RequestConfig
from ..config.constants import OTP_CODE_PATTERN
logger = logging.getLogger(__name__)
class FreemailService(BaseEmailService):
"""
Freemail 邮箱服务
基于自部署 Cloudflare Worker 的临时邮箱
"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
"""
初始化 Freemail 服务
Args:
config: 配置字典,支持以下键:
- base_url: Worker 域名地址 (必需)
- admin_token: Admin Token对应 JWT_TOKEN (必需)
- domain: 邮箱域名,如 example.com
- timeout: 请求超时时间,默认 30
- max_retries: 最大重试次数,默认 3
name: 服务名称
"""
super().__init__(EmailServiceType.FREEMAIL, name)
required_keys = ["base_url", "admin_token"]
missing_keys = [key for key in required_keys if not (config or {}).get(key)]
if missing_keys:
raise ValueError(f"缺少必需配置: {missing_keys}")
default_config = {
"timeout": 30,
"max_retries": 3,
}
self.config = {**default_config, **(config or {})}
self.config["base_url"] = self.config["base_url"].rstrip("/")
http_config = RequestConfig(
timeout=self.config["timeout"],
max_retries=self.config["max_retries"],
)
self.http_client = HTTPClient(proxy_url=None, config=http_config)
# 缓存 domain 列表
self._domains = []
def _get_headers(self) -> Dict[str, str]:
"""构造 admin 请求头"""
return {
"Authorization": f"Bearer {self.config['admin_token']}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _make_request(self, method: str, path: str, **kwargs) -> Any:
"""
发送请求并返回 JSON 数据
Args:
method: HTTP 方法
path: 请求路径(以 / 开头)
**kwargs: 传递给 http_client.request 的额外参数
Returns:
响应 JSON 数据
Raises:
EmailServiceError: 请求失败
"""
url = f"{self.config['base_url']}{path}"
kwargs.setdefault("headers", {})
kwargs["headers"].update(self._get_headers())
try:
response = self.http_client.request(method, url, **kwargs)
if response.status_code >= 400:
error_msg = f"请求失败: {response.status_code}"
try:
error_data = response.json()
error_msg = f"{error_msg} - {error_data}"
except Exception:
error_msg = f"{error_msg} - {response.text[:200]}"
retry_after = None
if response.status_code == 429:
retry_after_header = response.headers.get("Retry-After")
if retry_after_header:
try:
retry_after = max(1, int(retry_after_header))
except ValueError:
retry_after = None
error = RateLimitedEmailServiceError(error_msg, retry_after=retry_after)
else:
error = EmailServiceError(error_msg)
self.update_status(False, error)
raise error
try:
return response.json()
except Exception:
return {"raw_response": response.text}
except Exception as e:
self.update_status(False, e)
if isinstance(e, EmailServiceError):
raise
raise EmailServiceError(f"请求失败: {method} {path} - {e}")
def _ensure_domains(self):
"""获取并缓存可用域名列表"""
if not self._domains:
try:
domains = self._make_request("GET", "/api/domains")
if isinstance(domains, list):
self._domains = domains
except Exception as e:
logger.warning(f"获取 Freemail 域名列表失败: {e}")
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
通过 API 创建临时邮箱
Returns:
包含邮箱信息的字典:
- email: 邮箱地址
- service_id: 同 email用作标识
"""
self._ensure_domains()
req_config = config or {}
domain_index = 0
target_domain = req_config.get("domain") or self.config.get("domain")
if target_domain and self._domains:
for i, d in enumerate(self._domains):
if d == target_domain:
domain_index = i
break
prefix = req_config.get("name")
try:
if prefix:
body = {
"local": prefix,
"domainIndex": domain_index
}
resp = self._make_request("POST", "/api/create", json=body)
else:
params = {"domainIndex": domain_index}
length = req_config.get("length")
if length:
params["length"] = length
resp = self._make_request("GET", "/api/generate", params=params)
email = resp.get("email")
if not email:
raise EmailServiceError(f"创建邮箱失败,未返回邮箱地址: {resp}")
email_info = {
"email": email,
"service_id": email,
"id": email,
"created_at": time.time(),
}
logger.info(f"成功创建 Freemail 邮箱: {email}")
self.update_status(True)
return email_info
except Exception as e:
self.update_status(False, e)
if isinstance(e, EmailServiceError):
raise
raise EmailServiceError(f"创建邮箱失败: {e}")
def get_verification_code(
self,
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从 Freemail 邮箱获取验证码
Args:
email: 邮箱地址
email_id: 未使用,保留接口兼容
timeout: 超时时间(秒)
pattern: 验证码正则
otp_sent_at: OTP 发送时间戳(暂未使用)
Returns:
验证码字符串,超时返回 None
"""
logger.info(f"正在从 Freemail 邮箱 {email} 获取验证码...")
start_time = time.time()
seen_mail_ids: set = set()
while time.time() - start_time < timeout:
try:
mails = self._make_request("GET", "/api/emails", params={"mailbox": email, "limit": 20})
if not isinstance(mails, list):
time.sleep(3)
continue
for mail in mails:
mail_id = mail.get("id")
if not mail_id or mail_id in seen_mail_ids:
continue
seen_mail_ids.add(mail_id)
sender = str(mail.get("sender", "")).lower()
subject = str(mail.get("subject", ""))
preview = str(mail.get("preview", ""))
content = f"{sender}\n{subject}\n{preview}"
if "openai" not in content.lower():
continue
code = self._extract_otp_from_text(content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
v_code = str(mail.get("verification_code") or "").strip()
# 如果依然未找到,获取邮件详情进行匹配
try:
detail = self._make_request("GET", f"/api/email/{mail_id}")
full_content = str(detail.get("content", "")) + "\n" + str(detail.get("html_content", ""))
code = self._extract_otp_from_text(full_content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
except Exception as e:
logger.debug(f"获取 Freemail 邮件详情失败: {e}")
if re.fullmatch(r"\d{6}", v_code):
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
except Exception as e:
logger.debug(f"检查 Freemail 邮件时出错: {e}")
time.sleep(3)
logger.warning(f"等待 Freemail 验证码超时: {email}")
return None
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
"""
列出邮箱
Args:
**kwargs: 额外查询参数
Returns:
邮箱列表
"""
try:
params = {
"limit": kwargs.get("limit", 100),
"offset": kwargs.get("offset", 0)
}
resp = self._make_request("GET", "/api/mailboxes", params=params)
emails = []
if isinstance(resp, list):
for mail in resp:
address = mail.get("address")
if address:
emails.append({
"id": address,
"service_id": address,
"email": address,
"created_at": mail.get("created_at"),
"raw_data": mail
})
self.update_status(True)
return emails
except Exception as e:
logger.warning(f"列出 Freemail 邮箱失败: {e}")
self.update_status(False, e)
return []
def delete_email(self, email_id: str) -> bool:
"""
删除邮箱
"""
try:
self._make_request("DELETE", "/api/mailboxes", params={"address": email_id})
logger.info(f"已删除 Freemail 邮箱: {email_id}")
self.update_status(True)
return True
except Exception as e:
logger.warning(f"删除 Freemail 邮箱失败: {e}")
self.update_status(False, e)
return False
def check_health(self) -> bool:
"""检查服务健康状态"""
try:
self._make_request("GET", "/api/domains")
self.update_status(True)
return True
except Exception as e:
logger.warning(f"Freemail 健康检查失败: {e}")
self.update_status(False, e)
return False