fix: avoid reading six-digit email domains as OTPs

This commit is contained in:
zhoukailian
2026-03-24 17:43:34 +08:00
parent e794371bd9
commit 107be96166
4 changed files with 141 additions and 19 deletions

View File

@@ -5,10 +5,11 @@
import abc
import logging
import re
from typing import Optional, Dict, Any, List
from enum import Enum
from ..config.constants import EmailServiceType
from ..config.constants import EmailServiceType, OTP_CODE_PATTERN, OTP_CODE_SEMANTIC_PATTERN
logger = logging.getLogger(__name__)
@@ -46,6 +47,8 @@ class BaseEmailService(abc.ABC):
self._status = EmailServiceStatus.HEALTHY
self._last_error = None
_EMAIL_ADDRESS_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
@property
def status(self) -> EmailServiceStatus:
"""获取服务状态"""
@@ -163,6 +166,30 @@ class BaseEmailService(abc.ABC):
return email_info
return None
def _strip_email_addresses(self, text: str) -> str:
"""移除文本中的邮箱地址,避免域名数字被误识别为验证码。"""
return self._EMAIL_ADDRESS_PATTERN.sub(" ", text or "")
def _extract_otp_from_text(self, text: str, pattern: Optional[str] = None) -> Optional[str]:
"""
从文本中提取验证码。
优先语义匹配,再在移除邮箱地址后的文本上做 6 位数字兜底。
"""
if not text:
return None
semantic_match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE)
if semantic_match:
return semantic_match.group(1)
fallback_pattern = pattern or OTP_CODE_PATTERN
simple_match = re.search(fallback_pattern, self._strip_email_addresses(text))
if simple_match:
return simple_match.group(1)
return None
def wait_for_email(
self,
email: str,
@@ -383,4 +410,4 @@ def create_email_service(
Returns:
邮箱服务实例
"""
return EmailServiceFactory.create(service_type, config, name)
return EmailServiceFactory.create(service_type, config, name)

View File

@@ -226,34 +226,31 @@ class FreemailService(BaseEmailService):
if "openai" not in content.lower():
continue
# 尝试直接使用 Freemail 提取的验证码
v_code = mail.get("verification_code")
if v_code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
# 如果没有直接提供,通过正则匹配 preview
match = re.search(pattern, content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
v_code = str(mail.get("verification_code") or "").strip()
# 如果依然未找到,获取邮件详情进行匹配
try:
detail = self._make_request("GET", f"/api/email/{mail_id}")
full_content = str(detail.get("content", "")) + "\n" + str(detail.get("html_content", ""))
match = re.search(pattern, full_content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(full_content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
except Exception as e:
logger.debug(f"获取 Freemail 邮件详情失败: {e}")
if re.fullmatch(r"\d{6}", v_code):
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
except Exception as e:
logger.debug(f"检查 Freemail 邮件时出错: {e}")

View File

@@ -342,9 +342,8 @@ class TempMailService(BaseEmailService):
if "openai" not in sender and "openai" not in content.lower():
continue
match = re.search(pattern, content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(content, pattern)
if code:
logger.info(f"从 TempMail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code

View File

@@ -0,0 +1,99 @@
from src.services.freemail import FreemailService
from src.services.temp_mail import TempMailService
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = {}
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: {method} {url}")
return self.responses.pop(0)
def test_temp_mail_ignores_six_digit_domain_when_extracting_code():
service = TempMailService({
"base_url": "https://mail.example.com",
"admin_password": "admin-secret",
"domain": "123456.com",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your OpenAI verification code",
"body": (
"Email sent to tester@123456.com.\n"
"Your OpenAI verification code is 654321"
),
}
]
}
)
])
code = service.get_verification_code(
email="tester@123456.com",
timeout=1,
)
assert code == "654321"
def test_freemail_prefers_real_code_over_worker_extracted_domain_digits():
service = FreemailService({
"base_url": "https://mail.example.com",
"admin_token": "jwt-token",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload=[
{
"id": "msg-1",
"sender": "noreply@openai.com",
"subject": "Your OpenAI verification code",
"preview": "Verification email sent to tester@123456.com",
"verification_code": "123456",
}
]
),
FakeResponse(
payload={
"content": (
"To: tester@123456.com\n"
"Your OpenAI verification code is 654321"
),
"html_content": "",
}
),
])
code = service.get_verification_code(
email="tester@123456.com",
timeout=1,
)
assert code == "654321"