Files
codex-register/src/services/imap_mail.py
cnlimiter 0059cf97bd feat(services): 新增标准 IMAP 邮箱服务支持(Gmail/QQ/163等)
- 新增 EmailServiceType.IMAP_MAIL 枚举值和默认配置
- 新建 ImapMailService(imaplib 标准库,强制直连)
- 注册路由新增 imap_mail 分支和 available-services 键
- 邮箱服务路由新增 imap_mail stats 统计和类型描述
- accounts 路由 _build_inbox_config 支持 imap_mail
- 前端表单/列表/编辑完整支持 IMAP 子类型
- 无新增依赖
2026-03-20 15:46:29 +08:00

218 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
IMAP 邮箱服务
支持 Gmail / QQ / 163 / Yahoo / Outlook 等标准 IMAP 协议邮件服务商。
仅用于接收验证码强制直连imaplib 不支持代理)。
"""
import imaplib
import email
import re
import time
import logging
from email.header import decode_header
from typing import Any, Dict, Optional
from .base import BaseEmailService, EmailServiceError
from ..config.constants import (
EmailServiceType,
OPENAI_EMAIL_SENDERS,
OTP_CODE_SEMANTIC_PATTERN,
OTP_CODE_PATTERN,
)
logger = logging.getLogger(__name__)
class ImapMailService(BaseEmailService):
"""标准 IMAP 邮箱服务(仅接收验证码,强制直连)"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
super().__init__(EmailServiceType.IMAP_MAIL, name)
cfg = config or {}
required_keys = ["host", "email", "password"]
missing_keys = [k for k in required_keys if not cfg.get(k)]
if missing_keys:
raise ValueError(f"缺少必需配置: {missing_keys}")
self.host: str = str(cfg["host"]).strip()
self.port: int = int(cfg.get("port", 993))
self.use_ssl: bool = bool(cfg.get("use_ssl", True))
self.email_addr: str = str(cfg["email"]).strip()
self.password: str = str(cfg["password"])
self.timeout: int = int(cfg.get("timeout", 30))
self.max_retries: int = int(cfg.get("max_retries", 3))
def _connect(self) -> imaplib.IMAP4:
"""建立 IMAP 连接并登录,返回 mail 对象"""
if self.use_ssl:
mail = imaplib.IMAP4_SSL(self.host, self.port)
else:
mail = imaplib.IMAP4(self.host, self.port)
mail.starttls()
mail.login(self.email_addr, self.password)
return mail
def _decode_str(self, value) -> str:
"""解码邮件头部字段"""
if value is None:
return ""
parts = decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(str(part))
return " ".join(decoded)
def _get_text_body(self, msg) -> str:
"""提取邮件纯文本内容"""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
charset = part.get_content_charset() or "utf-8"
payload = part.get_payload(decode=True)
if payload:
body += payload.decode(charset, errors="replace")
else:
charset = msg.get_content_charset() or "utf-8"
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
return body
def _is_openai_sender(self, from_addr: str) -> bool:
"""判断发件人是否为 OpenAI"""
from_lower = from_addr.lower()
for sender in OPENAI_EMAIL_SENDERS:
if sender.startswith("@") or sender.startswith("."):
if sender in from_lower:
return True
else:
if sender in from_lower:
return True
return False
def _extract_otp(self, text: str) -> Optional[str]:
"""从文本中提取 6 位验证码,优先语义匹配,回退简单匹配"""
match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE)
if match:
return match.group(1)
match = re.search(OTP_CODE_PATTERN, text)
if match:
return match.group(1)
return None
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""IMAP 模式不创建新邮箱,直接返回配置中的固定地址"""
self.update_status(True)
return {
"email": self.email_addr,
"service_id": self.email_addr,
"id": self.email_addr,
}
def get_verification_code(
self,
email: str,
email_id: str = None,
timeout: int = 60,
pattern: str = None,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""轮询 IMAP 收件箱,获取 OpenAI 验证码"""
start_time = time.time()
seen_ids: set = set()
mail = None
try:
mail = self._connect()
mail.select("INBOX")
while time.time() - start_time < timeout:
try:
# 搜索所有未读邮件
status, data = mail.search(None, "UNSEEN")
if status != "OK" or not data or not data[0]:
time.sleep(3)
continue
msg_ids = data[0].split()
for msg_id in reversed(msg_ids): # 最新的优先
id_str = msg_id.decode()
if id_str in seen_ids:
continue
seen_ids.add(id_str)
# 获取邮件
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
# 检查发件人
from_addr = self._decode_str(msg.get("From", ""))
if not self._is_openai_sender(from_addr):
continue
# 提取验证码
body = self._get_text_body(msg)
code = self._extract_otp(body)
if code:
# 标记已读
mail.store(msg_id, "+FLAGS", "\\Seen")
self.update_status(True)
logger.info(f"IMAP 获取验证码成功: {code}")
return code
except imaplib.IMAP4.error as e:
logger.debug(f"IMAP 搜索邮件失败: {e}")
# 尝试重新连接
try:
mail.select("INBOX")
except Exception:
pass
time.sleep(3)
except Exception as e:
logger.warning(f"IMAP 连接/轮询失败: {e}")
self.update_status(False, str(e))
finally:
if mail:
try:
mail.logout()
except Exception:
pass
return None
def check_health(self) -> bool:
"""尝试 IMAP 登录并选择收件箱"""
mail = None
try:
mail = self._connect()
status, _ = mail.select("INBOX")
return status == "OK"
except Exception as e:
logger.warning(f"IMAP 健康检查失败: {e}")
return False
finally:
if mail:
try:
mail.logout()
except Exception:
pass
def list_emails(self, **kwargs) -> list:
"""IMAP 单账号模式,返回固定地址"""
return [{"email": self.email_addr, "id": self.email_addr}]
def delete_email(self, email_id: str) -> bool:
"""IMAP 模式无需删除逻辑"""
return True