Merge pull request #83 from ZHOUKAILIAN/fix/worker-mail-otp-extraction

fix: avoid reading six-digit email domains as OTPs
This commit is contained in:
演变
2026-03-25 00:17:34 +08:00
committed by GitHub
4 changed files with 140 additions and 18 deletions

View File

@@ -5,12 +5,13 @@
import abc
import logging
import re
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from enum import Enum
from ..config.constants import EmailServiceType
from ..config.constants import EmailServiceType, OTP_CODE_PATTERN, OTP_CODE_SEMANTIC_PATTERN
logger = logging.getLogger(__name__)
@@ -146,6 +147,8 @@ class BaseEmailService(abc.ABC):
self._last_error = None
self._provider_backoff = reset_adaptive_backoff()
_EMAIL_ADDRESS_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
@property
def status(self) -> EmailServiceStatus:
"""获取服务状态"""
@@ -272,6 +275,30 @@ class BaseEmailService(abc.ABC):
return email_info
return None
def _strip_email_addresses(self, text: str) -> str:
"""移除文本中的邮箱地址,避免域名数字被误识别为验证码。"""
return self._EMAIL_ADDRESS_PATTERN.sub(" ", text or "")
def _extract_otp_from_text(self, text: str, pattern: Optional[str] = None) -> Optional[str]:
"""
从文本中提取验证码。
优先语义匹配,再在移除邮箱地址后的文本上做 6 位数字兜底。
"""
if not text:
return None
semantic_match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE)
if semantic_match:
return semantic_match.group(1)
fallback_pattern = pattern or OTP_CODE_PATTERN
simple_match = re.search(fallback_pattern, self._strip_email_addresses(text))
if simple_match:
return simple_match.group(1)
return None
def wait_for_email(
self,
email: str,

View File

@@ -237,34 +237,31 @@ class FreemailService(BaseEmailService):
if "openai" not in content.lower():
continue
# 尝试直接使用 Freemail 提取的验证码
v_code = mail.get("verification_code")
if v_code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
# 如果没有直接提供,通过正则匹配 preview
match = re.search(pattern, content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
v_code = str(mail.get("verification_code") or "").strip()
# 如果依然未找到,获取邮件详情进行匹配
try:
detail = self._make_request("GET", f"/api/email/{mail_id}")
full_content = str(detail.get("content", "")) + "\n" + str(detail.get("html_content", ""))
match = re.search(pattern, full_content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(full_content, pattern)
if code:
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
except Exception as e:
logger.debug(f"获取 Freemail 邮件详情失败: {e}")
if re.fullmatch(r"\d{6}", v_code):
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
except Exception as e:
logger.debug(f"检查 Freemail 邮件时出错: {e}")

View File

@@ -353,9 +353,8 @@ class TempMailService(BaseEmailService):
if "openai" not in sender and "openai" not in content.lower():
continue
match = re.search(pattern, content)
if match:
code = match.group(1)
code = self._extract_otp_from_text(content, pattern)
if code:
logger.info(f"从 TempMail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code

View File

@@ -0,0 +1,99 @@
from src.services.freemail import FreemailService
from src.services.temp_mail import TempMailService
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = {}
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: {method} {url}")
return self.responses.pop(0)
def test_temp_mail_ignores_six_digit_domain_when_extracting_code():
service = TempMailService({
"base_url": "https://mail.example.com",
"admin_password": "admin-secret",
"domain": "123456.com",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your OpenAI verification code",
"body": (
"Email sent to tester@123456.com.\n"
"Your OpenAI verification code is 654321"
),
}
]
}
)
])
code = service.get_verification_code(
email="tester@123456.com",
timeout=1,
)
assert code == "654321"
def test_freemail_prefers_real_code_over_worker_extracted_domain_digits():
service = FreemailService({
"base_url": "https://mail.example.com",
"admin_token": "jwt-token",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload=[
{
"id": "msg-1",
"sender": "noreply@openai.com",
"subject": "Your OpenAI verification code",
"preview": "Verification email sent to tester@123456.com",
"verification_code": "123456",
}
]
),
FakeResponse(
payload={
"content": (
"To: tester@123456.com\n"
"Your OpenAI verification code is 654321"
),
"html_content": "",
}
),
])
code = service.get_verification_code(
email="tester@123456.com",
timeout=1,
)
assert code == "654321"