diff --git a/src/services/outlook/providers/imap_new.py b/src/services/outlook/providers/imap_new.py index c3698f8..cce0cf1 100644 --- a/src/services/outlook/providers/imap_new.py +++ b/src/services/outlook/providers/imap_new.py @@ -10,6 +10,7 @@ import logging import select import time import threading +from datetime import datetime, timedelta, timezone from email.header import decode_header from email.utils import parsedate_to_datetime from typing import Dict, List, Optional @@ -40,6 +41,7 @@ class IMAPConnectionPool: timeout: int = 30, ) -> imaplib.IMAP4_SSL: """获取或新建 IMAP 连接""" + # 先在锁内检查现有连接 with self._lock: conn = self._connections.get(email_addr) if conn: @@ -48,16 +50,28 @@ class IMAPConnectionPool: return conn except Exception: self._close_one(email_addr) + # 标记为「建连中」,防止重复建连 + self._connections[email_addr] = None - conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout) + # 锁外建立新连接(耗时操作不持锁) + try: + new_conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout) auth_str = f"user={email_addr}\x01auth=Bearer {token}\x01\x01" - conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8")) - self._connections[email_addr] = conn + new_conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8")) + except Exception: + with self._lock: + # 建连失败,清除占位 + if self._connections.get(email_addr) is None: + del self._connections[email_addr] + raise + + with self._lock: + self._connections[email_addr] = new_conn logger.debug(f"[{email_addr}] IMAP 新连接已建立") - return conn + return new_conn def invalidate(self, email_addr: str): - """废弃连接(认证失败时调用)""" + """废弃连接(认证失败或连接异常时调用)""" with self._lock: self._close_one(email_addr) @@ -91,6 +105,8 @@ class IMAPNewProvider(OutlookProvider): super().__init__(account, config) self._conn: Optional[imaplib.IMAP4_SSL] = None self._token_manager: Optional[TokenManager] = None + self._idle_tag_counter = 0 + self._idle_tag_lock = threading.Lock() if not account.has_oauth(): logger.warning( @@ -107,6 +123,12 @@ class IMAPNewProvider(OutlookProvider): ) return self._token_manager + def _next_idle_tag(self) -> str: + """生成唯一 IDLE tag(避免使用私有 _new_tag)""" + with self._idle_tag_lock: + self._idle_tag_counter += 1 + return f"IDLE{self._idle_tag_counter:04d}" + def connect(self) -> bool: """从连接池获取连接""" if not self.account.has_oauth(): @@ -170,22 +192,38 @@ class IMAPNewProvider(OutlookProvider): self, count: int = 20, only_unseen: bool = True, + since_minutes: Optional[int] = None, ) -> List[EmailMessage]: - """获取最近的邮件""" + """ + 获取最近的邮件。 + + 搜索策略: + - since_minutes 指定时:用 SINCE 日期 + ALL 搜索最近N分钟内的邮件(不受已读/未读限制) + - only_unseen=True 且未指定 since_minutes:搜索 UNSEEN + - only_unseen=False 且未指定 since_minutes:搜索全部(取最近 count 封) + """ if not self._connected: if not self.connect(): return [] try: self._conn.select("INBOX", readonly=True) - flag = "UNSEEN" if only_unseen else "ALL" - status, data = self._conn.search(None, flag) + + if since_minutes is not None: + # 按时间范围搜索:SINCE 某天(IMAP 只支持按天,精度为天) + since_dt = datetime.now(timezone.utc) - timedelta(minutes=since_minutes) + since_str = since_dt.strftime("%d-%b-%Y") + status, data = self._conn.search(None, f"SINCE {since_str}") + elif only_unseen: + status, data = self._conn.search(None, "UNSEEN") + else: + status, data = self._conn.search(None, "ALL") if status != "OK" or not data or not data[0]: return [] ids = data[0].split() - recent_ids = ids[-count:][::-1] + recent_ids = ids[-count:][::-1] # 取最新的 count 封,倒序(最新在前) emails = [] for msg_id in recent_ids: @@ -239,18 +277,18 @@ class IMAPNewProvider(OutlookProvider): logger.warning(f"[{self.account.email}] IDLE 前 SELECT 失败: {e}") return False - logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s)") - + tag = self._next_idle_tag() sock = self._conn.socket() - tag = self._conn._new_tag().decode() if isinstance(self._conn._new_tag(), bytes) else self._conn._new_tag() + logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s,tag={tag})") try: # 发送 IDLE 命令 self._conn.send(f"{tag} IDLE\r\n".encode()) - # 等待 "+" 延续响应 - deadline = time.time() + timeout + # 等待 "+" 延续响应(服务端确认进入 IDLE) + deadline = time.time() + min(10.0, timeout) buf = b"" + got_continuation = False while time.time() < deadline: ready = select.select([sock], [], [], min(2.0, deadline - time.time())) if ready[0]: @@ -259,13 +297,21 @@ class IMAPNewProvider(OutlookProvider): break buf += chunk if b"+ " in buf or b"+\r\n" in buf: + got_continuation = True break + if not got_continuation: + logger.warning(f"[{self.account.email}] 未收到 IDLE 延续响应,放弃") + return False + # 等待 EXISTS / RECENT 推送 got_new = False buf = b"" + deadline = time.time() + timeout while time.time() < deadline: remaining = deadline - time.time() + if remaining <= 0: + break ready = select.select([sock], [], [], min(2.0, remaining)) if ready[0]: chunk = sock.recv(4096) @@ -273,6 +319,7 @@ class IMAPNewProvider(OutlookProvider): break buf += chunk if b"EXISTS" in buf or b"RECENT" in buf: + logger.debug(f"[{self.account.email}] IDLE 收到新邮件推送") got_new = True break @@ -283,23 +330,25 @@ class IMAPNewProvider(OutlookProvider): return False finally: - # 发送 DONE 结束 IDLE + # 发送 DONE 结束 IDLE,并排空服务端响应 try: self._conn.send(b"DONE\r\n") - # 读取 IDLE 结束响应(避免缓冲区污染后续命令) - deadline2 = time.time() + 5 - resp_buf = b"" - while time.time() < deadline2: + drain_deadline = time.time() + 5 + drain_buf = b"" + tag_end = f"{tag} OK".encode() + tag_no = f"{tag} NO".encode() + tag_bad = f"{tag} BAD".encode() + while time.time() < drain_deadline: ready = select.select([sock], [], [], 1.0) - if ready[0]: - chunk = sock.recv(4096) - if not chunk: - break - resp_buf += chunk - if tag.encode() in resp_buf: - break + if not ready[0]: + break + chunk = sock.recv(4096) + if not chunk: + break + drain_buf += chunk + if any(t in drain_buf for t in (tag_end, tag_no, tag_bad)): + break except Exception: - # DONE 发送失败则废弃连接 _imap_pool.invalidate(self.account.email) self._connected = False self._conn = None @@ -316,7 +365,7 @@ class IMAPNewProvider(OutlookProvider): def _parse_email(raw: bytes) -> EmailMessage: - """解析原始邮件为 EmailMessage""" + """解析原始邮件为 EmailMessage(优先 text/plain,次选 text/html)""" msg = email.message_from_bytes(raw) def _decode(val): @@ -348,30 +397,42 @@ def _parse_email(raw: bytes) -> EmailMessage: except Exception: pass - body = "" - body_preview = "" + # 提取正文:优先 text/plain,次选 text/html + plain_body = "" + html_body = "" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() cd = str(part.get("Content-Disposition", "")) - if "attachment" not in cd.lower() and ct in ("text/plain", "text/html"): - try: - charset = part.get_content_charset() or "utf-8" - payload = part.get_payload(decode=True) - if payload: - body = payload.decode(charset, errors="replace") - break - except Exception: - pass + if "attachment" in cd.lower(): + continue + try: + charset = part.get_content_charset() or "utf-8" + payload = part.get_payload(decode=True) + if not payload: + continue + decoded = payload.decode(charset, errors="replace") + if ct == "text/plain" and not plain_body: + plain_body = decoded + elif ct == "text/html" and not html_body: + html_body = decoded + except Exception: + pass else: try: charset = msg.get_content_charset() or "utf-8" payload = msg.get_payload(decode=True) if payload: - body = payload.decode(charset, errors="replace") + ct = msg.get_content_type() + decoded = payload.decode(charset, errors="replace") + if ct == "text/plain": + plain_body = decoded + else: + html_body = decoded except Exception: pass + body = plain_body or html_body body_preview = body[:200].strip() msg_id = msg.get("Message-ID", "").strip("<>") diff --git a/src/services/outlook/service.py b/src/services/outlook/service.py index 6d7d280..73f789f 100644 --- a/src/services/outlook/service.py +++ b/src/services/outlook/service.py @@ -97,17 +97,29 @@ class OutlookService(BaseEmailService): if not account.client_id and _default_client_id: account.client_id = _default_client_id if account.validate(): - self.accounts.append(account) + if not account.has_oauth(): + logger.warning( + f"[{account.email}] 跳过:IMAP_NEW 仅支持 OAuth2," + f"请配置 client_id 和 refresh_token" + ) + else: + self.accounts.append(account) else: for ac in self.config.get("accounts", []): account = OutlookAccount.from_config(ac) if not account.client_id and _default_client_id: account.client_id = _default_client_id if account.validate(): - self.accounts.append(account) + if not account.has_oauth(): + logger.warning( + f"[{account.email}] 跳过:IMAP_NEW 仅支持 OAuth2," + f"请配置 client_id 和 refresh_token" + ) + else: + self.accounts.append(account) if not self.accounts: - logger.warning("未配置有效的 Outlook 账户") + logger.warning("未配置有效的 Outlook 账户(需要 client_id + refresh_token)") # 健康检查器 self.health_checker = HealthChecker( @@ -143,6 +155,7 @@ class OutlookService(BaseEmailService): account: OutlookAccount, count: int = 15, only_unseen: bool = True, + since_minutes: Optional[int] = None, use_cache: bool = False, ) -> List[EmailMessage]: """通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存""" @@ -159,7 +172,9 @@ class OutlookService(BaseEmailService): provider = self._get_provider(account) with self._imap_semaphore: with provider: - emails = provider.get_recent_emails(count, only_unseen) + emails = provider.get_recent_emails( + count, only_unseen, since_minutes=since_minutes + ) if emails: self.health_checker.record_success() @@ -227,11 +242,11 @@ class OutlookService(BaseEmailService): if use_idle: code = self._wait_with_idle( - account, email, actual_timeout, min_timestamp, used_codes + account, email, actual_timeout, min_timestamp, used_codes, otp_sent_at ) else: code = self._wait_with_poll( - account, email, actual_timeout, poll_interval, min_timestamp, used_codes + account, email, actual_timeout, poll_interval, min_timestamp, used_codes, otp_sent_at ) if code: @@ -248,16 +263,27 @@ class OutlookService(BaseEmailService): poll_interval: int, min_timestamp: float, used_codes: set, + otp_sent_at: Optional[float] = None, ) -> Optional[str]: """轮询方式等待验证码""" start_time = time.time() poll_count = 0 + # 计算 since_minutes:从发送时间前2分钟开始搜索,最多查180分钟 + since_minutes: Optional[int] = None + if otp_sent_at: + elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2 + since_minutes = min(elapsed_since_send, 180) + while time.time() - start_time < timeout: poll_count += 1 - only_unseen = poll_count <= 3 + # 有 since_minutes 时用 SINCE 搜索(覆盖已读/未读),否则前3次用 UNSEEN + only_unseen = (since_minutes is None) and (poll_count <= 3) try: - emails = self._fetch_emails(account, count=15, only_unseen=only_unseen) + emails = self._fetch_emails( + account, count=15, only_unseen=only_unseen, + since_minutes=since_minutes, + ) if emails: code = self.email_parser.find_verification_code_in_emails( emails, @@ -286,21 +312,30 @@ class OutlookService(BaseEmailService): timeout: int, min_timestamp: float, used_codes: set, + otp_sent_at: Optional[float] = None, ) -> Optional[str]: """IMAP IDLE 方式等待验证码,失败时自动降级为轮询""" if not self.health_checker.is_available(): logger.warning(f"[{email}] IMAP_NEW 不可用,降级为轮询") return self._wait_with_poll( - account, email, timeout, 3, min_timestamp, used_codes + account, email, timeout, 3, min_timestamp, used_codes, otp_sent_at ) + # 计算 since_minutes:从发送时间前2分钟开始,最多180分钟 + since_minutes: Optional[int] = None + if otp_sent_at: + elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2 + since_minutes = min(elapsed_since_send, 180) + start_time = time.time() try: provider = self._get_provider(account) with self._imap_semaphore: with provider: # 先做一次即时检查 - emails = provider.get_recent_emails(15, only_unseen=True) + emails = provider.get_recent_emails( + 15, only_unseen=(since_minutes is None), since_minutes=since_minutes + ) code = self.email_parser.find_verification_code_in_emails( emails, target_email=email, @@ -320,7 +355,14 @@ class OutlookService(BaseEmailService): arrived = provider.wait_for_new_email_idle(timeout=min(remaining, 25)) # 无效化缓存,强制重新拉取 self._email_cache.invalidate(email) - emails = provider.get_recent_emails(15, only_unseen=True) + # IDLE 触发后用 since_minutes 搜索,覆盖已读邮件 + fetch_since = since_minutes + if fetch_since is None: + # 没有 otp_sent_at 时,用距当前时间2分钟内的邮件 + fetch_since = 2 + emails = provider.get_recent_emails( + 15, only_unseen=False, since_minutes=fetch_since + ) code = self.email_parser.find_verification_code_in_emails( emails, target_email=email, @@ -343,7 +385,7 @@ class OutlookService(BaseEmailService): code_settings = _get_code_settings() return self._wait_with_poll( account, email, remaining, - code_settings["poll_interval"], min_timestamp, used_codes + code_settings["poll_interval"], min_timestamp, used_codes, otp_sent_at ) logger.warning(f"[{email}] IDLE 等待验证码超时 ({timeout}s)") diff --git a/src/web/routes/accounts.py b/src/web/routes/accounts.py index 7b4aa6f..c559012 100644 --- a/src/web/routes/accounts.py +++ b/src/web/routes/accounts.py @@ -1028,9 +1028,11 @@ def _build_inbox_config(db, service_type, email: str) -> dict: EmailServiceModel.enabled == True ) if service_type == EST.OUTLOOK: - # 按 config.email 匹配账号 email - services = query.all() - svc = next((s for s in services if (s.config or {}).get("email") == email), None) + # 按 config.email 精确匹配,不受 enabled 限制(收件箱是账号自己的邮箱) + all_outlook = db.query(EmailServiceModel).filter( + EmailServiceModel.service_type == db_type + ).all() + svc = next((s for s in all_outlook if (s.config or {}).get("email", "").lower() == email.lower()), None) else: svc = query.order_by(EmailServiceModel.priority.asc()).first() diff --git a/static/js/settings.js b/static/js/settings.js index 7835098..39c8735 100644 --- a/static/js/settings.js +++ b/static/js/settings.js @@ -673,16 +673,16 @@ async function handleOutlookBatchImport() { lines.forEach((line, index) => { const parts = line.split('----').map(p => p.trim()); - if (parts.length < 2) { - errors.push(`第 ${index + 1} 行格式错误`); + if (parts.length < 4) { + errors.push(`第 ${index + 1} 行格式错误,必须为 邮箱----密码----client_id----refresh_token`); return; } const account = { email: parts[0], password: parts[1], - client_id: parts[2] || null, - refresh_token: parts[3] || null, + client_id: parts[2], + refresh_token: parts[3], enabled: enabled, priority: priority }; @@ -692,6 +692,11 @@ async function handleOutlookBatchImport() { return; } + if (!account.client_id || !account.refresh_token) { + errors.push(`第 ${index + 1} 行 client_id 或 refresh_token 不能为空`); + return; + } + accounts.push(account); });