fix(outlook): 多项修复与优化

- imap_new: 连接池并发安全(锁外建连、占位防重复)、IDLE tag 改用独立计数器避免私有API、get_recent_emails 新增 since_minutes 参数
- service.py: 同步更新
- accounts.py: Outlook 收件箱配置按 email 不区分大小写匹配、不受 enabled 限制
- settings.js: Outlook 批量导入前端校验要求四字段且 client_id/refresh_token 非空
This commit is contained in:
cnlimiter
2026-03-21 02:41:30 +08:00
committed by Mison
parent 344cf0088c
commit 16f76076c5
4 changed files with 169 additions and 59 deletions

View File

@@ -10,6 +10,7 @@ import logging
import select
import time
import threading
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Dict, List, Optional
@@ -40,6 +41,7 @@ class IMAPConnectionPool:
timeout: int = 30,
) -> imaplib.IMAP4_SSL:
"""获取或新建 IMAP 连接"""
# 先在锁内检查现有连接
with self._lock:
conn = self._connections.get(email_addr)
if conn:
@@ -48,16 +50,28 @@ class IMAPConnectionPool:
return conn
except Exception:
self._close_one(email_addr)
# 标记为「建连中」,防止重复建连
self._connections[email_addr] = None
conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout)
# 锁外建立新连接(耗时操作不持锁)
try:
new_conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout)
auth_str = f"user={email_addr}\x01auth=Bearer {token}\x01\x01"
conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8"))
self._connections[email_addr] = conn
new_conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8"))
except Exception:
with self._lock:
# 建连失败,清除占位
if self._connections.get(email_addr) is None:
del self._connections[email_addr]
raise
with self._lock:
self._connections[email_addr] = new_conn
logger.debug(f"[{email_addr}] IMAP 新连接已建立")
return conn
return new_conn
def invalidate(self, email_addr: str):
"""废弃连接(认证失败时调用)"""
"""废弃连接(认证失败或连接异常时调用)"""
with self._lock:
self._close_one(email_addr)
@@ -91,6 +105,8 @@ class IMAPNewProvider(OutlookProvider):
super().__init__(account, config)
self._conn: Optional[imaplib.IMAP4_SSL] = None
self._token_manager: Optional[TokenManager] = None
self._idle_tag_counter = 0
self._idle_tag_lock = threading.Lock()
if not account.has_oauth():
logger.warning(
@@ -107,6 +123,12 @@ class IMAPNewProvider(OutlookProvider):
)
return self._token_manager
def _next_idle_tag(self) -> str:
"""生成唯一 IDLE tag避免使用私有 _new_tag"""
with self._idle_tag_lock:
self._idle_tag_counter += 1
return f"IDLE{self._idle_tag_counter:04d}"
def connect(self) -> bool:
"""从连接池获取连接"""
if not self.account.has_oauth():
@@ -170,22 +192,38 @@ class IMAPNewProvider(OutlookProvider):
self,
count: int = 20,
only_unseen: bool = True,
since_minutes: Optional[int] = None,
) -> List[EmailMessage]:
"""获取最近的邮件"""
"""
获取最近的邮件。
搜索策略:
- since_minutes 指定时:用 SINCE 日期 + ALL 搜索最近N分钟内的邮件不受已读/未读限制)
- only_unseen=True 且未指定 since_minutes搜索 UNSEEN
- only_unseen=False 且未指定 since_minutes搜索全部取最近 count 封)
"""
if not self._connected:
if not self.connect():
return []
try:
self._conn.select("INBOX", readonly=True)
flag = "UNSEEN" if only_unseen else "ALL"
status, data = self._conn.search(None, flag)
if since_minutes is not None:
# 按时间范围搜索SINCE 某天IMAP 只支持按天,精度为天)
since_dt = datetime.now(timezone.utc) - timedelta(minutes=since_minutes)
since_str = since_dt.strftime("%d-%b-%Y")
status, data = self._conn.search(None, f"SINCE {since_str}")
elif only_unseen:
status, data = self._conn.search(None, "UNSEEN")
else:
status, data = self._conn.search(None, "ALL")
if status != "OK" or not data or not data[0]:
return []
ids = data[0].split()
recent_ids = ids[-count:][::-1]
recent_ids = ids[-count:][::-1] # 取最新的 count 封,倒序(最新在前)
emails = []
for msg_id in recent_ids:
@@ -239,18 +277,18 @@ class IMAPNewProvider(OutlookProvider):
logger.warning(f"[{self.account.email}] IDLE 前 SELECT 失败: {e}")
return False
logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s")
tag = self._next_idle_tag()
sock = self._conn.socket()
tag = self._conn._new_tag().decode() if isinstance(self._conn._new_tag(), bytes) else self._conn._new_tag()
logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}stag={tag}")
try:
# 发送 IDLE 命令
self._conn.send(f"{tag} IDLE\r\n".encode())
# 等待 "+" 延续响应
deadline = time.time() + timeout
# 等待 "+" 延续响应(服务端确认进入 IDLE
deadline = time.time() + min(10.0, timeout)
buf = b""
got_continuation = False
while time.time() < deadline:
ready = select.select([sock], [], [], min(2.0, deadline - time.time()))
if ready[0]:
@@ -259,13 +297,21 @@ class IMAPNewProvider(OutlookProvider):
break
buf += chunk
if b"+ " in buf or b"+\r\n" in buf:
got_continuation = True
break
if not got_continuation:
logger.warning(f"[{self.account.email}] 未收到 IDLE 延续响应,放弃")
return False
# 等待 EXISTS / RECENT 推送
got_new = False
buf = b""
deadline = time.time() + timeout
while time.time() < deadline:
remaining = deadline - time.time()
if remaining <= 0:
break
ready = select.select([sock], [], [], min(2.0, remaining))
if ready[0]:
chunk = sock.recv(4096)
@@ -273,6 +319,7 @@ class IMAPNewProvider(OutlookProvider):
break
buf += chunk
if b"EXISTS" in buf or b"RECENT" in buf:
logger.debug(f"[{self.account.email}] IDLE 收到新邮件推送")
got_new = True
break
@@ -283,23 +330,25 @@ class IMAPNewProvider(OutlookProvider):
return False
finally:
# 发送 DONE 结束 IDLE
# 发送 DONE 结束 IDLE,并排空服务端响应
try:
self._conn.send(b"DONE\r\n")
# 读取 IDLE 结束响应(避免缓冲区污染后续命令)
deadline2 = time.time() + 5
resp_buf = b""
while time.time() < deadline2:
drain_deadline = time.time() + 5
drain_buf = b""
tag_end = f"{tag} OK".encode()
tag_no = f"{tag} NO".encode()
tag_bad = f"{tag} BAD".encode()
while time.time() < drain_deadline:
ready = select.select([sock], [], [], 1.0)
if ready[0]:
chunk = sock.recv(4096)
if not chunk:
break
resp_buf += chunk
if tag.encode() in resp_buf:
break
if not ready[0]:
break
chunk = sock.recv(4096)
if not chunk:
break
drain_buf += chunk
if any(t in drain_buf for t in (tag_end, tag_no, tag_bad)):
break
except Exception:
# DONE 发送失败则废弃连接
_imap_pool.invalidate(self.account.email)
self._connected = False
self._conn = None
@@ -316,7 +365,7 @@ class IMAPNewProvider(OutlookProvider):
def _parse_email(raw: bytes) -> EmailMessage:
"""解析原始邮件为 EmailMessage"""
"""解析原始邮件为 EmailMessage(优先 text/plain次选 text/html"""
msg = email.message_from_bytes(raw)
def _decode(val):
@@ -348,30 +397,42 @@ def _parse_email(raw: bytes) -> EmailMessage:
except Exception:
pass
body = ""
body_preview = ""
# 提取正文:优先 text/plain次选 text/html
plain_body = ""
html_body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
if "attachment" not in cd.lower() and ct in ("text/plain", "text/html"):
try:
charset = part.get_content_charset() or "utf-8"
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
break
except Exception:
pass
if "attachment" in cd.lower():
continue
try:
charset = part.get_content_charset() or "utf-8"
payload = part.get_payload(decode=True)
if not payload:
continue
decoded = payload.decode(charset, errors="replace")
if ct == "text/plain" and not plain_body:
plain_body = decoded
elif ct == "text/html" and not html_body:
html_body = decoded
except Exception:
pass
else:
try:
charset = msg.get_content_charset() or "utf-8"
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
ct = msg.get_content_type()
decoded = payload.decode(charset, errors="replace")
if ct == "text/plain":
plain_body = decoded
else:
html_body = decoded
except Exception:
pass
body = plain_body or html_body
body_preview = body[:200].strip()
msg_id = msg.get("Message-ID", "").strip("<>")

View File

@@ -97,17 +97,29 @@ class OutlookService(BaseEmailService):
if not account.client_id and _default_client_id:
account.client_id = _default_client_id
if account.validate():
self.accounts.append(account)
if not account.has_oauth():
logger.warning(
f"[{account.email}] 跳过IMAP_NEW 仅支持 OAuth2"
f"请配置 client_id 和 refresh_token"
)
else:
self.accounts.append(account)
else:
for ac in self.config.get("accounts", []):
account = OutlookAccount.from_config(ac)
if not account.client_id and _default_client_id:
account.client_id = _default_client_id
if account.validate():
self.accounts.append(account)
if not account.has_oauth():
logger.warning(
f"[{account.email}] 跳过IMAP_NEW 仅支持 OAuth2"
f"请配置 client_id 和 refresh_token"
)
else:
self.accounts.append(account)
if not self.accounts:
logger.warning("未配置有效的 Outlook 账户")
logger.warning("未配置有效的 Outlook 账户(需要 client_id + refresh_token")
# 健康检查器
self.health_checker = HealthChecker(
@@ -143,6 +155,7 @@ class OutlookService(BaseEmailService):
account: OutlookAccount,
count: int = 15,
only_unseen: bool = True,
since_minutes: Optional[int] = None,
use_cache: bool = False,
) -> List[EmailMessage]:
"""通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存"""
@@ -159,7 +172,9 @@ class OutlookService(BaseEmailService):
provider = self._get_provider(account)
with self._imap_semaphore:
with provider:
emails = provider.get_recent_emails(count, only_unseen)
emails = provider.get_recent_emails(
count, only_unseen, since_minutes=since_minutes
)
if emails:
self.health_checker.record_success()
@@ -227,11 +242,11 @@ class OutlookService(BaseEmailService):
if use_idle:
code = self._wait_with_idle(
account, email, actual_timeout, min_timestamp, used_codes
account, email, actual_timeout, min_timestamp, used_codes, otp_sent_at
)
else:
code = self._wait_with_poll(
account, email, actual_timeout, poll_interval, min_timestamp, used_codes
account, email, actual_timeout, poll_interval, min_timestamp, used_codes, otp_sent_at
)
if code:
@@ -248,16 +263,27 @@ class OutlookService(BaseEmailService):
poll_interval: int,
min_timestamp: float,
used_codes: set,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""轮询方式等待验证码"""
start_time = time.time()
poll_count = 0
# 计算 since_minutes从发送时间前2分钟开始搜索最多查180分钟
since_minutes: Optional[int] = None
if otp_sent_at:
elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2
since_minutes = min(elapsed_since_send, 180)
while time.time() - start_time < timeout:
poll_count += 1
only_unseen = poll_count <= 3
# 有 since_minutes 时用 SINCE 搜索(覆盖已读/未读否则前3次用 UNSEEN
only_unseen = (since_minutes is None) and (poll_count <= 3)
try:
emails = self._fetch_emails(account, count=15, only_unseen=only_unseen)
emails = self._fetch_emails(
account, count=15, only_unseen=only_unseen,
since_minutes=since_minutes,
)
if emails:
code = self.email_parser.find_verification_code_in_emails(
emails,
@@ -286,21 +312,30 @@ class OutlookService(BaseEmailService):
timeout: int,
min_timestamp: float,
used_codes: set,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""IMAP IDLE 方式等待验证码,失败时自动降级为轮询"""
if not self.health_checker.is_available():
logger.warning(f"[{email}] IMAP_NEW 不可用,降级为轮询")
return self._wait_with_poll(
account, email, timeout, 3, min_timestamp, used_codes
account, email, timeout, 3, min_timestamp, used_codes, otp_sent_at
)
# 计算 since_minutes从发送时间前2分钟开始最多180分钟
since_minutes: Optional[int] = None
if otp_sent_at:
elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2
since_minutes = min(elapsed_since_send, 180)
start_time = time.time()
try:
provider = self._get_provider(account)
with self._imap_semaphore:
with provider:
# 先做一次即时检查
emails = provider.get_recent_emails(15, only_unseen=True)
emails = provider.get_recent_emails(
15, only_unseen=(since_minutes is None), since_minutes=since_minutes
)
code = self.email_parser.find_verification_code_in_emails(
emails,
target_email=email,
@@ -320,7 +355,14 @@ class OutlookService(BaseEmailService):
arrived = provider.wait_for_new_email_idle(timeout=min(remaining, 25))
# 无效化缓存,强制重新拉取
self._email_cache.invalidate(email)
emails = provider.get_recent_emails(15, only_unseen=True)
# IDLE 触发后用 since_minutes 搜索,覆盖已读邮件
fetch_since = since_minutes
if fetch_since is None:
# 没有 otp_sent_at 时用距当前时间2分钟内的邮件
fetch_since = 2
emails = provider.get_recent_emails(
15, only_unseen=False, since_minutes=fetch_since
)
code = self.email_parser.find_verification_code_in_emails(
emails,
target_email=email,
@@ -343,7 +385,7 @@ class OutlookService(BaseEmailService):
code_settings = _get_code_settings()
return self._wait_with_poll(
account, email, remaining,
code_settings["poll_interval"], min_timestamp, used_codes
code_settings["poll_interval"], min_timestamp, used_codes, otp_sent_at
)
logger.warning(f"[{email}] IDLE 等待验证码超时 ({timeout}s)")

View File

@@ -1028,9 +1028,11 @@ def _build_inbox_config(db, service_type, email: str) -> dict:
EmailServiceModel.enabled == True
)
if service_type == EST.OUTLOOK:
# 按 config.email 匹配账号 email
services = query.all()
svc = next((s for s in services if (s.config or {}).get("email") == email), None)
# 按 config.email 精确匹配,不受 enabled 限制(收件箱是账号自己的邮箱)
all_outlook = db.query(EmailServiceModel).filter(
EmailServiceModel.service_type == db_type
).all()
svc = next((s for s in all_outlook if (s.config or {}).get("email", "").lower() == email.lower()), None)
else:
svc = query.order_by(EmailServiceModel.priority.asc()).first()

View File

@@ -673,16 +673,16 @@ async function handleOutlookBatchImport() {
lines.forEach((line, index) => {
const parts = line.split('----').map(p => p.trim());
if (parts.length < 2) {
errors.push(`${index + 1} 行格式错误`);
if (parts.length < 4) {
errors.push(`${index + 1} 行格式错误,必须为 邮箱----密码----client_id----refresh_token`);
return;
}
const account = {
email: parts[0],
password: parts[1],
client_id: parts[2] || null,
refresh_token: parts[3] || null,
client_id: parts[2],
refresh_token: parts[3],
enabled: enabled,
priority: priority
};
@@ -692,6 +692,11 @@ async function handleOutlookBatchImport() {
return;
}
if (!account.client_id || !account.refresh_token) {
errors.push(`${index + 1} 行 client_id 或 refresh_token 不能为空`);
return;
}
accounts.push(account);
});