From 107be96166c976f1d4b397f9e478f4524d43ef13 Mon Sep 17 00:00:00 2001 From: zhoukailian Date: Tue, 24 Mar 2026 17:43:34 +0800 Subject: [PATCH] fix: avoid reading six-digit email domains as OTPs --- src/services/base.py | 31 +++++- src/services/freemail.py | 25 +++-- src/services/temp_mail.py | 5 +- tests/test_cloudflare_worker_mail_services.py | 99 +++++++++++++++++++ 4 files changed, 141 insertions(+), 19 deletions(-) create mode 100644 tests/test_cloudflare_worker_mail_services.py diff --git a/src/services/base.py b/src/services/base.py index aba923f..2c2293c 100644 --- a/src/services/base.py +++ b/src/services/base.py @@ -5,10 +5,11 @@ import abc import logging +import re from typing import Optional, Dict, Any, List from enum import Enum -from ..config.constants import EmailServiceType +from ..config.constants import EmailServiceType, OTP_CODE_PATTERN, OTP_CODE_SEMANTIC_PATTERN logger = logging.getLogger(__name__) @@ -46,6 +47,8 @@ class BaseEmailService(abc.ABC): self._status = EmailServiceStatus.HEALTHY self._last_error = None + _EMAIL_ADDRESS_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") + @property def status(self) -> EmailServiceStatus: """获取服务状态""" @@ -163,6 +166,30 @@ class BaseEmailService(abc.ABC): return email_info return None + def _strip_email_addresses(self, text: str) -> str: + """移除文本中的邮箱地址,避免域名数字被误识别为验证码。""" + return self._EMAIL_ADDRESS_PATTERN.sub(" ", text or "") + + def _extract_otp_from_text(self, text: str, pattern: Optional[str] = None) -> Optional[str]: + """ + 从文本中提取验证码。 + + 优先语义匹配,再在移除邮箱地址后的文本上做 6 位数字兜底。 + """ + if not text: + return None + + semantic_match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE) + if semantic_match: + return semantic_match.group(1) + + fallback_pattern = pattern or OTP_CODE_PATTERN + simple_match = re.search(fallback_pattern, self._strip_email_addresses(text)) + if simple_match: + return simple_match.group(1) + + return None + def wait_for_email( self, email: str, @@ -383,4 +410,4 @@ def create_email_service( Returns: 邮箱服务实例 """ - return EmailServiceFactory.create(service_type, config, name) \ No newline at end of file + return EmailServiceFactory.create(service_type, config, name) diff --git a/src/services/freemail.py b/src/services/freemail.py index e1e0587..bb092a2 100644 --- a/src/services/freemail.py +++ b/src/services/freemail.py @@ -226,34 +226,31 @@ class FreemailService(BaseEmailService): if "openai" not in content.lower(): continue - # 尝试直接使用 Freemail 提取的验证码 - v_code = mail.get("verification_code") - if v_code: - logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}") - self.update_status(True) - return v_code - - # 如果没有直接提供,通过正则匹配 preview - match = re.search(pattern, content) - if match: - code = match.group(1) + code = self._extract_otp_from_text(content, pattern) + if code: logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}") self.update_status(True) return code + v_code = str(mail.get("verification_code") or "").strip() + # 如果依然未找到,获取邮件详情进行匹配 try: detail = self._make_request("GET", f"/api/email/{mail_id}") full_content = str(detail.get("content", "")) + "\n" + str(detail.get("html_content", "")) - match = re.search(pattern, full_content) - if match: - code = match.group(1) + code = self._extract_otp_from_text(full_content, pattern) + if code: logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}") self.update_status(True) return code except Exception as e: logger.debug(f"获取 Freemail 邮件详情失败: {e}") + if re.fullmatch(r"\d{6}", v_code): + logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}") + self.update_status(True) + return v_code + except Exception as e: logger.debug(f"检查 Freemail 邮件时出错: {e}") diff --git a/src/services/temp_mail.py b/src/services/temp_mail.py index 6804409..d518273 100644 --- a/src/services/temp_mail.py +++ b/src/services/temp_mail.py @@ -342,9 +342,8 @@ class TempMailService(BaseEmailService): if "openai" not in sender and "openai" not in content.lower(): continue - match = re.search(pattern, content) - if match: - code = match.group(1) + code = self._extract_otp_from_text(content, pattern) + if code: logger.info(f"从 TempMail 邮箱 {email} 找到验证码: {code}") self.update_status(True) return code diff --git a/tests/test_cloudflare_worker_mail_services.py b/tests/test_cloudflare_worker_mail_services.py new file mode 100644 index 0000000..cefe574 --- /dev/null +++ b/tests/test_cloudflare_worker_mail_services.py @@ -0,0 +1,99 @@ +from src.services.freemail import FreemailService +from src.services.temp_mail import TempMailService + + +class FakeResponse: + def __init__(self, status_code=200, payload=None, text=""): + self.status_code = status_code + self._payload = payload + self.text = text + self.headers = {} + + def json(self): + if self._payload is None: + raise ValueError("no json payload") + return self._payload + + +class FakeHTTPClient: + def __init__(self, responses): + self.responses = list(responses) + self.calls = [] + + def request(self, method, url, **kwargs): + self.calls.append({ + "method": method, + "url": url, + "kwargs": kwargs, + }) + if not self.responses: + raise AssertionError(f"未准备响应: {method} {url}") + return self.responses.pop(0) + + +def test_temp_mail_ignores_six_digit_domain_when_extracting_code(): + service = TempMailService({ + "base_url": "https://mail.example.com", + "admin_password": "admin-secret", + "domain": "123456.com", + }) + service.http_client = FakeHTTPClient([ + FakeResponse( + payload={ + "results": [ + { + "id": "msg-1", + "source": "OpenAI ", + "subject": "Your OpenAI verification code", + "body": ( + "Email sent to tester@123456.com.\n" + "Your OpenAI verification code is 654321" + ), + } + ] + } + ) + ]) + + code = service.get_verification_code( + email="tester@123456.com", + timeout=1, + ) + + assert code == "654321" + + +def test_freemail_prefers_real_code_over_worker_extracted_domain_digits(): + service = FreemailService({ + "base_url": "https://mail.example.com", + "admin_token": "jwt-token", + }) + service.http_client = FakeHTTPClient([ + FakeResponse( + payload=[ + { + "id": "msg-1", + "sender": "noreply@openai.com", + "subject": "Your OpenAI verification code", + "preview": "Verification email sent to tester@123456.com", + "verification_code": "123456", + } + ] + ), + FakeResponse( + payload={ + "content": ( + "To: tester@123456.com\n" + "Your OpenAI verification code is 654321" + ), + "html_content": "", + } + ), + ]) + + code = service.get_verification_code( + email="tester@123456.com", + timeout=1, + ) + + assert code == "654321"