feat(core): 实现了 Outlook 邮箱验证码获取改进方案

1. 增强邮件识别逻辑 (_is_openai_verification_mail)
    - 严格验证发件人必须是 OpenAI
    - 验证主题/正文包含验证关键词
    - 验证收件人匹配目标邮箱
  2. 邮件时间戳过滤
    - 基于 otp_sent_at 过滤发送前的旧邮件
    - 预留 60 秒时钟偏差容忍
  3. 验证码提取优化 (_extract_code_from_mail)
    - 优先从主题提取 6 位数字
    - 语义正则匹配("code is", "验证码")
    - 兜底任意 6 位数字
  4. 验证码去重机制
    - 新增 _used_codes 实例变量
    - 避免重复使用同一验证码
  5. 渐进式邮件检查
    - 前 3 次轮询只检查未读邮件
    - 之后检查所有邮件(避免已读邮件被忽略)
  6. 可配置超时时间
    - 新增配置项 email_code_timeout(默认 120 秒)
    - 新增配置项 email_code_poll_interval(默认 3 秒)
  7. 详细时间戳日志
    - 记录 IMAP 连接耗时
    - 记录邮件搜索耗时
    - 记录总耗时和轮询次数
This commit is contained in:
cnlimiter
2026-03-15 02:43:00 +08:00
parent bf13756e6d
commit e70c99f205
7 changed files with 202 additions and 38 deletions

View File

@@ -118,6 +118,29 @@ OTP_WAIT_TIMEOUT = 120 # 秒
OTP_POLL_INTERVAL = 3 # 秒
OTP_MAX_ATTEMPTS = 40 # 最大轮询次数
# 验证码提取正则(增强版)
# 简单匹配:任意 6 位数字
OTP_CODE_SIMPLE_PATTERN = r"(?<!\d)(\d{6})(?!\d)"
# 语义匹配:带上下文的验证码(如 "code is 123456", "验证码 123456"
OTP_CODE_SEMANTIC_PATTERN = r'(?:code\s+is|验证码[是为]?\s*[:]?\s*)(\d{6})'
# OpenAI 验证邮件发件人
OPENAI_EMAIL_SENDERS = [
"noreply@openai.com",
"no-reply@openai.com",
"@openai.com", # 通配符匹配
]
# OpenAI 验证邮件关键词
OPENAI_VERIFICATION_KEYWORDS = [
"verify your email",
"verification code",
"验证码",
"your openai code",
"code is",
"one-time code",
]
# 密码生成
PASSWORD_CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
DEFAULT_PASSWORD_LENGTH = 12

View File

@@ -110,6 +110,10 @@ class Settings(BaseSettings):
tempmail_timeout: int = Field(default=30)
tempmail_max_retries: int = Field(default=3)
# 验证码等待配置
email_code_timeout: int = Field(default=120) # 验证码等待超时(秒)
email_code_poll_interval: int = Field(default=3) # 验证码轮询间隔(秒)
# 自定义域名邮箱配置
custom_domain_base_url: str = Field(default="")
custom_domain_api_key: Optional[SecretStr] = Field(default=None)

View File

@@ -118,6 +118,7 @@ class RegistrationEngine:
self.session: Optional[cffi_requests.Session] = None
self.session_token: Optional[str] = None # 会话令牌
self.logs: list = []
self._otp_sent_at: Optional[float] = None # OTP 发送时间戳
def _log(self, message: str, level: str = "info"):
"""记录日志"""
@@ -346,6 +347,9 @@ class RegistrationEngine:
def _send_verification_code(self) -> bool:
"""发送验证码"""
try:
# 记录发送时间戳
self._otp_sent_at = time.time()
response = self.session.get(
OPENAI_API_ENDPOINTS["send_otp"],
headers={
@@ -371,7 +375,8 @@ class RegistrationEngine:
email=self.email,
email_id=email_id,
timeout=120,
pattern=OTP_CODE_PATTERN
pattern=OTP_CODE_PATTERN,
otp_sent_at=self._otp_sent_at,
)
if code:

View File

@@ -81,7 +81,8 @@ class BaseEmailService(abc.ABC):
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = r"(?<!\d)(\d{6})(?!\d)"
pattern: str = r"(?<!\d)(\d{6})(?!\d)",
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
获取验证码
@@ -91,6 +92,7 @@ class BaseEmailService(abc.ABC):
email_id: 邮箱服务中的 ID如果需要
timeout: 超时时间(秒)
pattern: 验证码正则表达式
otp_sent_at: OTP 发送时间戳,用于过滤旧邮件
Returns:
验证码字符串,如果超时或未找到返回 None

View File

@@ -235,7 +235,8 @@ class CustomDomainEmailService(BaseEmailService):
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从自定义域名邮箱获取验证码
@@ -245,6 +246,7 @@ class CustomDomainEmailService(BaseEmailService):
email_id: 邮箱 ID如果不提供从缓存中查找
timeout: 超时时间(秒)
pattern: 验证码正则表达式
otp_sent_at: OTP 发送时间戳(自定义域名服务暂不使用此参数)
Returns:
验证码字符串,如果超时或未找到返回 None

View File

@@ -21,7 +21,14 @@ from email.utils import parsedate_to_datetime
from urllib.error import HTTPError
from .base import BaseEmailService, EmailServiceError, EmailServiceType
from ..config.constants import OTP_CODE_PATTERN
from ..config.constants import (
OTP_CODE_PATTERN,
OTP_CODE_SIMPLE_PATTERN,
OTP_CODE_SEMANTIC_PATTERN,
OPENAI_EMAIL_SENDERS,
OPENAI_VERIFICATION_KEYWORDS,
)
from ..config import get_settings
logger = logging.getLogger(__name__)
@@ -397,6 +404,9 @@ class OutlookService(BaseEmailService):
# IMAP 连接限制(防止限流)
self._imap_semaphore = threading.Semaphore(5)
# 验证码去重机制email -> set of used codes
self._used_codes: Dict[str, set] = {}
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
选择可用的 Outlook 账户
@@ -436,8 +446,9 @@ class OutlookService(BaseEmailService):
self,
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN
timeout: int = None,
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从 Outlook 邮箱获取验证码
@@ -445,8 +456,9 @@ class OutlookService(BaseEmailService):
Args:
email: 邮箱地址
email_id: 未使用(对于 Outlookemail 就是标识)
timeout: 超时时间(秒)
timeout: 超时时间(秒),默认使用配置值
pattern: 验证码正则表达式
otp_sent_at: OTP 发送时间戳,用于过滤旧邮件
Returns:
验证码字符串,如果超时或未找到返回 None
@@ -462,21 +474,33 @@ class OutlookService(BaseEmailService):
self.update_status(False, EmailServiceError(f"未找到邮箱对应的账户: {email}"))
return None
logger.info(f"正在从 Outlook 邮箱 {email} 获取验证码...")
# 使用配置的超时时间
settings = get_settings()
actual_timeout = timeout or settings.email_code_timeout
poll_interval = settings.email_code_poll_interval
logger.info(f"[{email}] 开始获取验证码,超时 {actual_timeout}sOTP发送时间: {otp_sent_at}")
# 初始化验证码去重集合
if email not in self._used_codes:
self._used_codes[email] = set()
used_codes = self._used_codes[email]
# 计算最小时间戳(留出 60 秒时钟偏差)
min_timestamp = (otp_sent_at - 60) if otp_sent_at else 0
start_time = time.time()
last_check_time = 0
check_count = 0
poll_count = 0
while time.time() - start_time < timeout:
check_count += 1
while time.time() - start_time < actual_timeout:
poll_count += 1
loop_start = time.time()
# 控制检查频率
if time.time() - last_check_time < 3:
time.sleep(1)
continue
# 渐进式邮件检查:前 3 次只检查未读,之后检查全部
only_unseen = poll_count <= 3
try:
connect_start = time.time()
with self._imap_semaphore:
with OutlookIMAPClient(
account,
@@ -484,38 +508,49 @@ class OutlookService(BaseEmailService):
port=self.config["imap_port"],
timeout=10
) as client:
emails = client.get_recent_emails(count=10, only_unseen=True)
connect_elapsed = time.time() - connect_start
logger.debug(f"[{email}] IMAP 连接耗时 {connect_elapsed:.2f}s")
# 搜索邮件
search_start = time.time()
emails = client.get_recent_emails(count=15, only_unseen=only_unseen)
search_elapsed = time.time() - search_start
logger.debug(f"[{email}] 搜索到 {len(emails)} 封邮件(未读={only_unseen}),耗时 {search_elapsed:.2f}s")
for mail in emails:
# 检查是否是 OpenAI 相关邮件
if not self._is_oai_mail(mail):
# 时间戳过滤
mail_ts = mail.get("date_timestamp", 0)
if min_timestamp > 0 and mail_ts > 0 and mail_ts < min_timestamp:
logger.debug(f"[{email}] 跳过旧邮件: {mail.get('subject', '')[:50]}")
continue
# 检查是否是 OpenAI 验证邮件
if not self._is_openai_verification_mail(mail, email):
continue
# 提取验证码
content = f"{mail.get('from', '')} {mail.get('subject', '')} {mail.get('body', '')}"
match = re.search(pattern, content)
if match:
code = match.group(1)
logger.info(f"从 Outlook 邮箱 {email} 找到验证码: {code}")
# 可选:标记邮件为已读(避免重复获取)
# 注意:这需要修改 IMAP 客户端的实现
code = self._extract_code_from_mail(mail, pattern)
if code:
# 去重检查
if code in used_codes:
logger.debug(f"[{email}] 跳过已使用的验证码: {code}")
continue
used_codes.add(code)
elapsed = int(time.time() - start_time)
logger.info(f"[{email}] 找到验证码: {code},总耗时 {elapsed}s轮询 {poll_count}")
self.update_status(True)
return code
last_check_time = time.time()
if check_count % 5 == 0:
logger.debug(f"检查 {email} 的验证码,已检查 {check_count}")
except Exception as e:
logger.warning(f"检查 Outlook 邮箱 {email} 时出错: {e}")
last_check_time = time.time()
loop_elapsed = time.time() - loop_start
logger.warning(f"[{email}] 检查出错: {e},循环耗时 {loop_elapsed:.2f}s")
time.sleep(3)
# 等待下次轮询
time.sleep(poll_interval)
logger.warning(f"等待验证码超时: {email}")
elapsed = int(time.time() - start_time)
logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count}")
return None
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
@@ -574,11 +609,102 @@ class OutlookService(BaseEmailService):
return False
def _is_oai_mail(self, mail: Dict[str, Any]) -> bool:
"""判断是否为 OpenAI 相关邮件"""
"""判断是否为 OpenAI 相关邮件(旧方法,保留兼容)"""
combined = f"{mail.get('from', '')} {mail.get('subject', '')} {mail.get('body', '')}".lower()
keywords = ["openai", "chatgpt", "verification", "验证码", "code"]
return any(keyword in combined for keyword in keywords)
def _is_openai_verification_mail(
self,
mail: Dict[str, Any],
target_email: str = None
) -> bool:
"""
严格判断是否为 OpenAI 验证邮件
Args:
mail: 邮件信息字典
target_email: 目标邮箱地址(用于验证收件人)
Returns:
是否为 OpenAI 验证邮件
"""
sender = mail.get("from", "").lower()
# 1. 发件人必须是 OpenAI
valid_senders = OPENAI_EMAIL_SENDERS
if not any(s in sender for s in valid_senders):
logger.debug(f"邮件发件人非 OpenAI: {sender}")
return False
# 2. 主题或正文包含验证关键词
subject = mail.get("subject", "").lower()
body = mail.get("body", "").lower()
verification_keywords = OPENAI_VERIFICATION_KEYWORDS
combined = f"{subject} {body}"
if not any(kw in combined for kw in verification_keywords):
logger.debug(f"邮件未包含验证关键词: {subject[:50]}")
return False
# 3. 验证收件人(可选)
if target_email:
recipients = f"{mail.get('to', '')} {mail.get('delivered_to', '')} {mail.get('x_original_to', '')}".lower()
if target_email.lower() not in recipients:
logger.debug(f"邮件收件人不匹配: {recipients[:50]}")
return False
logger.debug(f"识别为 OpenAI 验证邮件: {subject[:50]}")
return True
def _extract_code_from_mail(
self,
mail: Dict[str, Any],
fallback_pattern: str = OTP_CODE_PATTERN
) -> Optional[str]:
"""
从邮件中提取验证码
优先级:
1. 从主题提取6位数字
2. 从正文用语义正则提取(如 "code is 123456"
3. 兜底:任意 6 位数字
Args:
mail: 邮件信息字典
fallback_pattern: 兜底正则表达式
Returns:
验证码字符串,如果未找到返回 None
"""
# 编译正则
re_simple = re.compile(OTP_CODE_SIMPLE_PATTERN)
re_semantic = re.compile(OTP_CODE_SEMANTIC_PATTERN, re.IGNORECASE)
# 1. 主题优先
subject = mail.get("subject", "")
match = re_simple.search(subject)
if match:
code = match.group(1)
logger.debug(f"从主题提取验证码: {code}")
return code
# 2. 正文语义匹配
body = mail.get("body", "")
match = re_semantic.search(body)
if match:
code = match.group(1)
logger.debug(f"从正文语义提取验证码: {code}")
return code
# 3. 兜底:任意 6 位数字
match = re_simple.search(body)
if match:
code = match.group(1)
logger.debug(f"从正文兜底提取验证码: {code}")
return code
return None
def get_account_stats(self) -> Dict[str, Any]:
"""获取账户统计信息"""
total = len(self.accounts)

View File

@@ -123,7 +123,8 @@ class TempmailService(BaseEmailService):
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从 Tempmail.lol 获取验证码
@@ -133,6 +134,7 @@ class TempmailService(BaseEmailService):
email_id: 邮箱 token如果不提供从缓存中查找
timeout: 超时时间(秒)
pattern: 验证码正则表达式
otp_sent_at: OTP 发送时间戳Tempmail 服务暂不使用此参数)
Returns:
验证码字符串,如果超时或未找到返回 None