Fix temp mail registration flow

This commit is contained in:
王剑锋
2026-03-18 21:36:40 +08:00
parent 23336e26b3
commit 5a935ccc59
3 changed files with 248 additions and 37 deletions

View File

@@ -214,22 +214,42 @@ class RegistrationEngine:
def _get_device_id(self) -> Optional[str]:
"""获取 Device ID"""
try:
if not self.oauth_start:
return None
response = self.session.get(
self.oauth_start.auth_url,
timeout=15
)
did = self.session.cookies.get("oai-did")
self._log(f"Device ID: {did}")
return did
except Exception as e:
self._log(f"获取 Device ID 失败: {e}", "error")
if not self.oauth_start:
return None
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
if not self.session:
self.session = self.http_client.session
response = self.session.get(
self.oauth_start.auth_url,
timeout=20
)
did = self.session.cookies.get("oai-did")
if did:
self._log(f"Device ID: {did}")
return did
self._log(
f"获取 Device ID 失败: 未返回 oai-did Cookie (HTTP {response.status_code}, 第 {attempt}/{max_attempts} 次)",
"warning" if attempt < max_attempts else "error"
)
except Exception as e:
self._log(
f"获取 Device ID 失败: {e} (第 {attempt}/{max_attempts} 次)",
"warning" if attempt < max_attempts else "error"
)
if attempt < max_attempts:
time.sleep(attempt)
self.http_client.close()
self.session = self.http_client.session
return None
def _check_sentinel(self, did: str) -> Optional[str]:
"""检查 Sentinel 拦截"""
try:
@@ -860,4 +880,4 @@ class RegistrationEngine:
except Exception as e:
self._log(f"保存到数据库失败: {e}", "error")
return False
return False

View File

@@ -8,7 +8,12 @@ import re
import time
import json
import logging
from typing import Optional, Dict, Any
from email import message_from_string
from email.header import decode_header, make_header
from email.message import Message
from email.policy import default as email_policy
from html import unescape
from typing import Optional, Dict, Any, List
from .base import BaseEmailService, EmailServiceError, EmailServiceType
from ..core.http_client import HTTPClient, RequestConfig
@@ -63,6 +68,97 @@ class TempMailService(BaseEmailService):
# 邮箱缓存email -> {jwt, address}
self._email_cache: Dict[str, Dict[str, Any]] = {}
def _decode_mime_header(self, value: str) -> str:
"""解码 MIME 头,兼容 RFC 2047 编码主题。"""
if not value:
return ""
try:
return str(make_header(decode_header(value)))
except Exception:
return value
def _extract_body_from_message(self, message: Message) -> str:
"""从 MIME 邮件对象中提取可读正文。"""
parts: List[str] = []
if message.is_multipart():
for part in message.walk():
if part.get_content_maintype() == "multipart":
continue
content_type = (part.get_content_type() or "").lower()
if content_type not in ("text/plain", "text/html"):
continue
try:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
text = payload.decode(charset, errors="replace") if payload else ""
except Exception:
try:
text = part.get_content()
except Exception:
text = ""
if content_type == "text/html":
text = re.sub(r"<[^>]+>", " ", text)
parts.append(text)
else:
try:
payload = message.get_payload(decode=True)
charset = message.get_content_charset() or "utf-8"
body = payload.decode(charset, errors="replace") if payload else ""
except Exception:
try:
body = message.get_content()
except Exception:
body = str(message.get_payload() or "")
if "html" in (message.get_content_type() or "").lower():
body = re.sub(r"<[^>]+>", " ", body)
parts.append(body)
return unescape("\n".join(part for part in parts if part).strip())
def _extract_mail_fields(self, mail: Dict[str, Any]) -> Dict[str, str]:
"""统一提取邮件字段,兼容 raw MIME 和不同 Worker 返回格式。"""
sender = str(
mail.get("source")
or mail.get("from")
or mail.get("from_address")
or mail.get("fromAddress")
or ""
).strip()
subject = str(mail.get("subject") or mail.get("title") or "").strip()
body_text = str(
mail.get("text")
or mail.get("body")
or mail.get("content")
or mail.get("html")
or ""
).strip()
raw = str(mail.get("raw") or "").strip()
if raw:
try:
message = message_from_string(raw, policy=email_policy)
sender = sender or self._decode_mime_header(message.get("From", ""))
subject = subject or self._decode_mime_header(message.get("Subject", ""))
parsed_body = self._extract_body_from_message(message)
if parsed_body:
body_text = f"{body_text}\n{parsed_body}".strip() if body_text else parsed_body
except Exception as e:
logger.debug(f"解析 TempMail raw 邮件失败: {e}")
body_text = f"{body_text}\n{raw}".strip() if body_text else raw
body_text = unescape(re.sub(r"<[^>]+>", " ", body_text))
return {
"sender": sender,
"subject": subject,
"body": body_text,
"raw": raw,
}
def _admin_headers(self) -> Dict[str, str]:
"""构造 admin 请求头"""
return {
@@ -224,14 +320,12 @@ class TempMailService(BaseEmailService):
seen_mail_ids.add(mail_id)
sender = str(mail.get("source", "")).lower()
subject = str(mail.get("subject", ""))
body_text = str(mail.get("text", "") or mail.get("html", "") or "")
# 去除简单 HTML 标签
body_clean = re.sub(r"<[^>]+>", " ", body_text)
content = f"{sender} {subject} {body_clean}"
parsed = self._extract_mail_fields(mail)
sender = parsed["sender"].lower()
subject = parsed["subject"]
body_text = parsed["body"]
raw_text = parsed["raw"]
content = f"{sender}\n{subject}\n{body_text}\n{raw_text}".strip()
# 只处理 OpenAI 邮件
if "openai" not in sender and "openai" not in content.lower():
@@ -252,6 +346,88 @@ class TempMailService(BaseEmailService):
logger.warning(f"等待 TempMail 验证码超时: {email}")
return None
def list_emails(self, limit: int = 100, offset: int = 0, **kwargs) -> List[Dict[str, Any]]:
"""
列出邮箱
Args:
limit: 返回数量上限
offset: 分页偏移
**kwargs: 额外查询参数,透传给 admin API
Returns:
邮箱列表
"""
params = {
"limit": limit,
"offset": offset,
}
params.update({k: v for k, v in kwargs.items() if v is not None})
try:
response = self._make_request("GET", "/admin/mails", params=params)
mails = response.get("results", [])
if not isinstance(mails, list):
raise EmailServiceError(f"API 返回数据格式错误: {response}")
emails: List[Dict[str, Any]] = []
for mail in mails:
address = (mail.get("address") or "").strip()
mail_id = mail.get("id") or address
email_info = {
"id": mail_id,
"service_id": mail_id,
"email": address,
"subject": mail.get("subject"),
"from": mail.get("source"),
"created_at": mail.get("createdAt") or mail.get("created_at"),
"raw_data": mail,
}
emails.append(email_info)
if address:
cached = self._email_cache.get(address, {})
self._email_cache[address] = {**cached, **email_info}
self.update_status(True)
return emails
except Exception as e:
logger.warning(f"列出 TempMail 邮箱失败: {e}")
self.update_status(False, e)
return list(self._email_cache.values())
def delete_email(self, email_id: str) -> bool:
"""
删除邮箱
Note:
当前 TempMail admin API 文档未见删除地址接口,这里先从本地缓存移除,
以满足统一接口并避免服务实例化失败。
"""
removed = False
emails_to_delete = []
for address, info in self._email_cache.items():
candidate_ids = {
address,
info.get("id"),
info.get("service_id"),
}
if email_id in candidate_ids:
emails_to_delete.append(address)
for address in emails_to_delete:
self._email_cache.pop(address, None)
removed = True
if removed:
logger.info(f"已从 TempMail 缓存移除邮箱: {email_id}")
self.update_status(True)
else:
logger.info(f"TempMail 缓存中未找到邮箱: {email_id}")
return removed
def check_health(self) -> bool:
"""检查服务健康状态"""
try:

View File

@@ -182,6 +182,30 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse:
)
def _normalize_email_service_config(
service_type: EmailServiceType,
config: Optional[dict],
proxy_url: Optional[str] = None
) -> dict:
"""按服务类型兼容旧字段名,避免不同服务的配置键互相污染。"""
normalized = config.copy() if config else {}
if 'api_url' in normalized and 'base_url' not in normalized:
normalized['base_url'] = normalized.pop('api_url')
if service_type == EmailServiceType.CUSTOM_DOMAIN:
if 'domain' in normalized and 'default_domain' not in normalized:
normalized['default_domain'] = normalized.pop('domain')
elif service_type == EmailServiceType.TEMP_MAIL:
if 'default_domain' in normalized and 'domain' not in normalized:
normalized['domain'] = normalized.pop('default_domain')
if proxy_url and 'proxy_url' not in normalized:
normalized['proxy_url'] = proxy_url
return normalized
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False, cpa_service_id: Optional[int] = None):
"""
在线程池中执行的同步注册任务
@@ -236,15 +260,11 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).first()
if db_service:
config = db_service.config.copy() if db_service.config else {}
# 兼容旧版字段名 api_url -> base_url
if 'api_url' in config and 'base_url' not in config:
config['base_url'] = config.pop('api_url')
if 'domain' in config and 'default_domain' not in config:
config['default_domain'] = config.pop('domain')
service_type = EmailServiceType(db_service.service_type)
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
# 更新任务关联的邮箱服务
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库邮箱服务: {db_service.name} (ID: {db_service.id})")
logger.info(f"使用数据库邮箱服务: {db_service.name} (ID: {db_service.id}, 类型: {service_type.value})")
else:
raise ValueError(f"邮箱服务不存在或已禁用: {email_service_id}")
else:
@@ -265,12 +285,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).order_by(EmailServiceModel.priority.asc()).first()
if db_service and db_service.config:
config = db_service.config.copy()
# 兼容旧版字段名 api_url -> base_url
if 'api_url' in config and 'base_url' not in config:
config['base_url'] = config.pop('api_url')
if 'domain' in config and 'default_domain' not in config:
config['default_domain'] = config.pop('domain')
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库自定义域名服务: {db_service.name}")
elif settings.custom_domain_base_url and settings.custom_domain_api_key: