fix: 修复重复取码命中旧验证码的问题

This commit is contained in:
zhoukailian
2026-03-24 18:41:57 +08:00
parent ae089ee707
commit ffab473a72
7 changed files with 565 additions and 10 deletions

View File

@@ -8,6 +8,7 @@ import logging
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
@@ -146,6 +147,8 @@ class BaseEmailService(abc.ABC):
self._status = EmailServiceStatus.HEALTHY
self._last_error = None
self._provider_backoff = reset_adaptive_backoff()
self._used_verification_codes: Dict[str, set] = {}
self._seen_verification_messages: Dict[str, set] = {}
_EMAIL_ADDRESS_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
@@ -299,6 +302,140 @@ class BaseEmailService(abc.ABC):
return None
def _get_used_verification_codes(self, email: str) -> set:
"""获取邮箱对应的已使用验证码集合。"""
key = str(email or "").strip().lower()
if key not in self._used_verification_codes:
self._used_verification_codes[key] = set()
return self._used_verification_codes[key]
def _get_seen_verification_messages(self, email: str) -> set:
"""获取邮箱对应的已处理消息标识集合。"""
key = str(email or "").strip().lower()
if key not in self._seen_verification_messages:
self._seen_verification_messages[key] = set()
return self._seen_verification_messages[key]
def load_verification_state(
self,
email: str,
used_codes: Optional[List[str]] = None,
seen_messages: Optional[List[str]] = None,
) -> None:
"""将持久化的验证码状态恢复到当前服务实例。"""
if used_codes:
self._get_used_verification_codes(email).update(
str(code) for code in used_codes if code
)
if seen_messages:
self._get_seen_verification_messages(email).update(
str(marker) for marker in seen_messages if marker
)
def export_verification_state(self, email: str) -> Dict[str, List[str]]:
"""导出当前邮箱的验证码状态,用于跨请求复用。"""
return {
"used_codes": sorted(self._get_used_verification_codes(email)),
"seen_messages": sorted(self._get_seen_verification_messages(email)),
}
def _remember_verification_code(self, email: str, code: str) -> bool:
"""记录验证码;若已用过则返回 False。"""
used_codes = self._get_used_verification_codes(email)
if code in used_codes:
return False
used_codes.add(code)
return True
def _remember_verification_message(self, email: str, message_marker: Optional[str]) -> bool:
"""记录消息标识;若已处理过则返回 False。"""
if not message_marker:
return True
seen_messages = self._get_seen_verification_messages(email)
if message_marker in seen_messages:
return False
seen_messages.add(message_marker)
return True
def _accept_verification_code(
self,
email: str,
code: str,
message_marker: Optional[str] = None,
) -> bool:
"""
决定是否接受验证码。
若有可靠的新邮件标识,优先按消息去重,这样新邮件即便验证码重复也能被接受;
否则退回到按验证码去重,避免旧码被重复消费。
"""
if message_marker:
if not self._remember_verification_message(email, message_marker):
return False
self._get_used_verification_codes(email).add(code)
return True
return self._remember_verification_code(email, code)
def _parse_message_timestamp(self, value: Any) -> Optional[float]:
"""将常见邮件时间字段解析为 Unix 时间戳。"""
if value is None or value == "":
return None
if isinstance(value, datetime):
return value.timestamp()
if isinstance(value, (int, float)):
return self._normalize_unix_timestamp(float(value))
text = str(value).strip()
if not text:
return None
try:
return self._normalize_unix_timestamp(float(text))
except ValueError:
pass
normalized = text.replace("Z", "+00:00") if text.endswith("Z") else text
try:
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
def _normalize_unix_timestamp(self, value: float) -> float:
"""将秒/毫秒/微秒级 Unix 时间统一归一到秒。"""
absolute = abs(value)
if absolute >= 1e14:
return value / 1_000_000
if absolute >= 1e11:
return value / 1_000
return value
def _is_message_before_otp(self, message_time: Any, otp_sent_at: Optional[float], tolerance_seconds: int = 1) -> bool:
"""
判断邮件是否早于当前 OTP 发送窗口。
允许少量时钟误差,避免接口时间与本地时间有轻微偏移时误伤新邮件。
"""
if not otp_sent_at:
return False
message_ts = self._parse_message_timestamp(message_time)
if message_ts is None:
return False
return message_ts + tolerance_seconds < otp_sent_at
def _sort_items_by_message_time(self, items: List[Any], value_getter) -> List[Any]:
"""按邮件时间倒序排列,优先处理最新邮件。"""
return sorted(
items,
key=lambda item: self._parse_message_timestamp(value_getter(item)) or float("-inf"),
reverse=True,
)
def wait_for_email(
self,
email: str,

View File

@@ -281,6 +281,7 @@ class DuckMailService(BaseEmailService):
continue
seen_message_ids.add(message_id)
message_marker = f"id:{message_id}"
detail = self._make_request(
"GET",
f"/messages/{message_id}",
@@ -293,8 +294,11 @@ class DuckMailService(BaseEmailService):
match = re.search(pattern, content)
if match:
code = match.group(1)
if not self._accept_verification_code(email, code, message_marker):
continue
self.update_status(True)
return match.group(1)
return code
except Exception as e:
logger.debug(f"DuckMail 轮询验证码失败: {e}")

View File

@@ -221,12 +221,29 @@ class FreemailService(BaseEmailService):
time.sleep(3)
continue
for mail in mails:
ordered_mails = self._sort_items_by_message_time(
mails,
lambda item: (
item.get("created_at")
or item.get("createdAt")
or item.get("received_at")
or item.get("receivedAt")
) if isinstance(item, dict) else None,
)
for mail in ordered_mails:
mail_id = mail.get("id")
if not mail_id or mail_id in seen_mail_ids:
continue
seen_mail_ids.add(mail_id)
message_marker = f"id:{mail_id}"
if self._is_message_before_otp(
mail.get("created_at") or mail.get("createdAt") or mail.get("received_at") or mail.get("receivedAt"),
otp_sent_at,
):
continue
sender = str(mail.get("sender", "")).lower()
subject = str(mail.get("subject", ""))
@@ -237,31 +254,36 @@ class FreemailService(BaseEmailService):
if "openai" not in content.lower():
continue
v_code = str(mail.get("verification_code") or "").strip()
if re.fullmatch(r"\d{6}", v_code):
if not self._accept_verification_code(email, v_code, message_marker):
continue
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
code = self._extract_otp_from_text(content, pattern)
if code:
if not self._accept_verification_code(email, code, message_marker):
continue
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
v_code = str(mail.get("verification_code") or "").strip()
# 如果依然未找到,获取邮件详情进行匹配
try:
detail = self._make_request("GET", f"/api/email/{mail_id}")
full_content = str(detail.get("content", "")) + "\n" + str(detail.get("html_content", ""))
code = self._extract_otp_from_text(full_content, pattern)
if code:
if not self._accept_verification_code(email, code, message_marker):
continue
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code
except Exception as e:
logger.debug(f"获取 Freemail 邮件详情失败: {e}")
if re.fullmatch(r"\d{6}", v_code):
logger.info(f"从 Freemail 邮箱 {email} 找到验证码: {v_code}")
self.update_status(True)
return v_code
except Exception as e:
logger.debug(f"检查 Freemail 邮件时出错: {e}")

View File

@@ -322,6 +322,13 @@ class MeoMailEmailService(BaseEmailService):
continue
seen_message_ids.add(message_id)
message_marker = f"id:{message_id}"
if self._is_message_before_otp(
message.get("created_at") or message.get("createdAt") or message.get("received_at") or message.get("receivedAt"),
otp_sent_at,
):
continue
# 检查是否是目标邮件
sender = str(message.get("from_address", "")).lower()
@@ -343,6 +350,8 @@ class MeoMailEmailService(BaseEmailService):
match = re.search(pattern, re.sub(email_pattern, "", content))
if match:
code = match.group(1)
if not self._accept_verification_code(email, code, message_marker):
continue
logger.info(f"从自定义域名邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code

View File

@@ -335,12 +335,29 @@ class TempMailService(BaseEmailService):
time.sleep(3)
continue
for mail in mails:
ordered_mails = self._sort_items_by_message_time(
mails,
lambda item: (
item.get("createdAt")
or item.get("created_at")
or item.get("receivedAt")
or item.get("received_at")
) if isinstance(item, dict) else None,
)
for mail in ordered_mails:
mail_id = mail.get("id")
if not mail_id or mail_id in seen_mail_ids:
continue
seen_mail_ids.add(mail_id)
message_marker = f"id:{mail_id}"
if self._is_message_before_otp(
mail.get("createdAt") or mail.get("created_at") or mail.get("receivedAt") or mail.get("received_at"),
otp_sent_at,
):
continue
parsed = self._extract_mail_fields(mail)
sender = parsed["sender"].lower()
@@ -355,6 +372,8 @@ class TempMailService(BaseEmailService):
code = self._extract_otp_from_text(content, pattern)
if code:
if not self._accept_verification_code(email, code, message_marker):
continue
logger.info(f"从 TempMail 邮箱 {email} 找到验证码: {code}")
self.update_status(True)
return code

View File

@@ -260,6 +260,7 @@ class TempmailService(BaseEmailService):
if not message_id or message_id in seen_ids:
continue
seen_ids.add(message_id)
message_marker = f"id:{message_id}"
sender = str(msg.get("from", "")).lower()
subject = str(msg.get("subject", ""))
@@ -276,6 +277,8 @@ class TempmailService(BaseEmailService):
match = re.search(pattern, content)
if match:
code = match.group(1)
if not self._accept_verification_code(email, code, message_marker):
continue
logger.info(f"找到验证码: {code}")
self.update_status(True)
return code

View File

@@ -0,0 +1,361 @@
from src.services.duck_mail import DuckMailService
from src.services.freemail import FreemailService
from src.services.temp_mail import TempMailService
from src.services.tempmail import TempmailService
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = {}
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeRequestHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: {method} {url}")
return self.responses.pop(0)
class FakeGetHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def get(self, url, **kwargs):
self.calls.append({
"method": "GET",
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: GET {url}")
return self.responses.pop(0)
def test_tempmail_service_skips_code_returned_by_previous_fetch():
service = TempmailService({"base_url": "https://api.tempmail.test"})
service.http_client = FakeGetHTTPClient([
FakeResponse(
payload={
"emails": [
{
"date": 1000,
"from": "noreply@openai.com",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
}
]
}
),
FakeResponse(
payload={
"emails": [
{
"date": 1000,
"from": "noreply@openai.com",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
},
{
"date": 1003,
"from": "noreply@openai.com",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 654321",
},
]
}
),
])
first_code = service.get_verification_code(
email="tester@example.com",
email_id="token-1",
timeout=1,
otp_sent_at=1000,
)
second_code = service.get_verification_code(
email="tester@example.com",
email_id="token-1",
timeout=1,
otp_sent_at=1002,
)
assert first_code == "111111"
assert second_code == "654321"
def test_temp_mail_service_skips_code_returned_by_previous_fetch():
service = TempMailService({
"base_url": "https://mail.example.com",
"admin_password": "admin-secret",
"domain": "example.com",
})
service.http_client = FakeRequestHTTPClient([
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
"createdAt": "2026-03-19T10:00:00Z",
}
]
}
),
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
"createdAt": "2026-03-19T10:00:00Z",
},
{
"id": "msg-2",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 654321",
"createdAt": "2026-03-19T10:00:03Z",
},
]
}
),
])
first_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378400,
)
second_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378402,
)
assert first_code == "111111"
assert second_code == "654321"
def test_temp_mail_service_accepts_same_code_from_newer_message():
service = TempMailService({
"base_url": "https://mail.example.com",
"admin_password": "admin-secret",
"domain": "example.com",
})
service.http_client = FakeRequestHTTPClient([
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
"createdAt": "2026-03-19T10:00:00Z",
}
]
}
),
FakeResponse(
payload={
"results": [
{
"id": "msg-1",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
"createdAt": "2026-03-19T10:00:00Z",
},
{
"id": "msg-2",
"source": "OpenAI <noreply@openai.com>",
"subject": "Your verification code",
"body": "Your OpenAI verification code is 111111",
"createdAt": "2026-03-19T10:00:03Z",
},
]
}
),
])
first_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378400,
)
second_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378402,
)
assert first_code == "111111"
assert second_code == "111111"
def test_freemail_service_skips_code_returned_by_previous_fetch():
service = FreemailService({
"base_url": "https://mail.example.com",
"admin_token": "jwt-token",
})
service.http_client = FakeRequestHTTPClient([
FakeResponse(
payload=[
{
"id": "msg-1",
"sender": "noreply@openai.com",
"subject": "Your verification code",
"preview": "Your OpenAI verification code is 111111",
"verification_code": "111111",
"created_at": "2026-03-19T10:00:00Z",
}
]
),
FakeResponse(
payload=[
{
"id": "msg-1",
"sender": "noreply@openai.com",
"subject": "Your verification code",
"preview": "Your OpenAI verification code is 111111",
"verification_code": "111111",
"created_at": "2026-03-19T10:00:00Z",
},
{
"id": "msg-2",
"sender": "noreply@openai.com",
"subject": "Your verification code",
"preview": "Your OpenAI verification code is 654321",
"verification_code": "654321",
"created_at": "2026-03-19T10:00:03Z",
},
]
),
])
first_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378400,
)
second_code = service.get_verification_code(
email="tester@example.com",
timeout=1,
otp_sent_at=1742378402,
)
assert first_code == "111111"
assert second_code == "654321"
def test_duck_mail_service_skips_previously_used_code_even_with_small_timestamp_gap():
service = DuckMailService({
"base_url": "https://api.duckmail.test",
"default_domain": "duckmail.sbs",
})
service.http_client = FakeRequestHTTPClient([
FakeResponse(
payload={
"hydra:member": [
{
"id": "msg-1",
"from": {
"name": "OpenAI",
"address": "noreply@openai.com",
},
"subject": "Your verification code",
"createdAt": "2026-03-19T10:00:01Z",
}
]
}
),
FakeResponse(
payload={
"id": "msg-1",
"text": "Your OpenAI verification code is 111111",
"html": [],
}
),
FakeResponse(
payload={
"hydra:member": [
{
"id": "msg-1",
"from": {
"name": "OpenAI",
"address": "noreply@openai.com",
},
"subject": "Your verification code",
"createdAt": "2026-03-19T10:00:01Z",
},
{
"id": "msg-2",
"from": {
"name": "OpenAI",
"address": "noreply@openai.com",
},
"subject": "Your verification code",
"createdAt": "2026-03-19T10:00:03Z",
},
]
}
),
FakeResponse(
payload={
"id": "msg-1",
"text": "Your OpenAI verification code is 111111",
"html": [],
}
),
FakeResponse(
payload={
"id": "msg-2",
"text": "Your OpenAI verification code is 654321",
"html": [],
}
),
])
service._accounts_by_email["tester@duckmail.sbs"] = {
"email": "tester@duckmail.sbs",
"service_id": "account-1",
"account_id": "account-1",
"token": "token-123",
}
first_code = service.get_verification_code(
email="tester@duckmail.sbs",
email_id="account-1",
timeout=1,
otp_sent_at=1742378401,
)
second_code = service.get_verification_code(
email="tester@duckmail.sbs",
email_id="account-1",
timeout=1,
otp_sent_at=1742378402,
)
assert first_code == "111111"
assert second_code == "654321"