refactor(outlook): 简化为单 IMAP_NEW Provider

This commit is contained in:
cnlimiter
2026-03-21 01:34:49 +08:00
committed by Mison
parent 188636356a
commit 668500028a
18 changed files with 732 additions and 1607 deletions

View File

@@ -68,12 +68,14 @@ OPENAI_API_ENDPOINTS = {
"validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate",
"create_account": "https://auth.openai.com/api/accounts/create_account",
"select_workspace": "https://auth.openai.com/api/accounts/workspace/select",
"password_verify" : "https://auth.openai.com/api/accounts/password/verify"
}
# OpenAI 页面类型(用于判断账号状态)
OPENAI_PAGE_TYPES = {
"LOGIN_PASSWORD": "login_password",
"EMAIL_OTP_VERIFICATION": "email_otp_verification", # 已注册账号,需要 OTP 验证
"PASSWORD_REGISTRATION": "password", # 新账号,需要设置密码
"PASSWORD_REGISTRATION": "create_account_password", # 新账号,需要设置密码
}
# ============================================================================
@@ -378,20 +380,8 @@ MICROSOFT_TOKEN_ENDPOINTS = {
}
# IMAP 服务器配置
OUTLOOK_IMAP_SERVERS = {
"OLD": "outlook.office365.com", # 旧版 IMAP
"NEW": "outlook.live.com", # 新版 IMAP
}
OUTLOOK_IMAP_SERVER = "outlook.live.com"
OUTLOOK_IMAP_PORT = 993
# Microsoft OAuth2 Scopes
MICROSOFT_SCOPES = {
# 旧版 IMAP 不需要特定 scope
"IMAP_OLD": "",
# 新版 IMAP 需要的 scope
"IMAP_NEW": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access",
# Graph API 需要的 scope
"GRAPH_API": "https://graph.microsoft.com/.default",
}
# Outlook 提供者默认优先级
OUTLOOK_PROVIDER_PRIORITY = ["imap_new", "imap_old", "graph_api"]
# Microsoft OAuth2 ScopeIMAP_NEW
OUTLOOK_IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access"

View File

@@ -358,12 +358,6 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = {
),
# Outlook 配置
"outlook_provider_priority": SettingDefinition(
db_key="outlook.provider_priority",
default_value=["imap_old", "imap_new", "graph_api"],
category=SettingCategory.EMAIL,
description="Outlook 提供者优先级"
),
"outlook_health_failure_threshold": SettingDefinition(
db_key="outlook.health_failure_threshold",
default_value=5,
@@ -382,6 +376,12 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = {
category=SettingCategory.EMAIL,
description="Outlook OAuth 默认 Client ID"
),
"outlook_use_idle": SettingDefinition(
db_key="outlook.use_idle",
default_value=True,
category=SettingCategory.EMAIL,
description="使用 IMAP IDLE 替代轮询获取验证码(降低延迟,默认开启)"
),
}
# 属性名到数据库键名的映射(用于向后兼容)
@@ -407,9 +407,9 @@ SETTING_TYPES: Dict[str, Type] = {
"cpa_enabled": bool,
"email_code_timeout": int,
"email_code_poll_interval": int,
"outlook_provider_priority": list,
"outlook_health_failure_threshold": int,
"outlook_health_disable_duration": int,
"outlook_use_idle": bool,
}
# 需要作为 SecretStr 处理的字段
@@ -694,10 +694,10 @@ class Settings(BaseModel):
email_code_poll_interval: int = 3
# Outlook 配置
outlook_provider_priority: List[str] = ["imap_old", "imap_new", "graph_api"]
outlook_health_failure_threshold: int = 5
outlook_health_disable_duration: int = 60
outlook_default_client_id: str = "24d9a0ed-8787-4584-883c-2fd79308940a"
outlook_use_idle: bool = True
# 全局配置实例

View File

@@ -286,6 +286,7 @@ class RegistrationEngine:
"""
try:
signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"signup"}}'
#signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"login_or_signup"}}'
headers = {
"referer": "https://auth.openai.com/create-account",
@@ -318,7 +319,7 @@ class RegistrationEngine:
self._log(f"响应页面类型: {page_type}")
# 判断是否为已注册账号
is_existing = page_type == OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"]
is_existing = page_type != OPENAI_PAGE_TYPES["PASSWORD_REGISTRATION"]
if is_existing:
self._log(f"检测到已注册账号,将自动切换到登录流程")
@@ -367,8 +368,7 @@ class RegistrationEngine:
self._log(f"提交密码状态: {response.status_code}")
if response.status_code != 200:
error_text = response.text[:500]
self._log(f"密码注册失败: {error_text}", "warning")
self._log(f"密码注册失败: {response.json().get("error", {}).get("message", "")}", "warning")
# 解析错误信息,判断是否是邮箱已注册
try:
@@ -377,7 +377,7 @@ class RegistrationEngine:
error_code = error_json.get("error", {}).get("code", "")
# 检测邮箱已注册的情况
if "already" in error_msg.lower() or "exists" in error_msg.lower() or error_code == "user_exists":
if "already" in error_msg.lower() or "exists" in error_msg.lower() or "again" in error_msg.lower() or error_code == "user_exists" or error_code == "invalid_request":
self._log(f"邮箱 {self.email} 可能已在 OpenAI 注册过", "error")
# 标记此邮箱为已注册状态
self._mark_email_as_registered()
@@ -392,24 +392,74 @@ class RegistrationEngine:
self._log(f"密码注册失败: {e}", "error")
return False, None
def _mark_email_as_registered(self):
"""标记邮箱为已注册状态(用于防止重复尝试)"""
def _login_password(self, did: str, sen_token: Optional[str]) -> bool:
"""注册密码"""
try:
password = ""
with get_db() as db:
# 检查是否已存在该邮箱的记录
existing = crud.get_account_by_email(db, self.email)
if not existing:
# 创建一个失败记录,标记该邮箱已注册过
# 生成密码
password = existing.password
# 提交密码登录
register_body = json.dumps({
"password": password
})
if sen_token:
sentinel = f'{{"p": "", "id": "{did}", "flow": "password_verify"}}'
response = self.session.post(
OPENAI_API_ENDPOINTS["password_verify"],
headers={
"referer": "https://auth.openai.com/log-in/password",
"accept": "application/json",
"content-type": "application/json",
"openai-sentinel-token": sentinel
},
data=register_body,
)
self._log(f"提交密码状态: {response.status_code}")
if response.status_code != 200:
error_text = response.text[:500]
self._log(f"密码登录失败: {error_text}", "warning")
return False
return True
except Exception as e:
self._log(f"密码登录失败: {e}", "error")
return False, None
def _mark_email_as_registered(self):
"""标记邮箱为已注册状态OpenAI 侧已存在该账号)"""
try:
with get_db() as db:
existing = crud.get_account_by_email(db, self.email)
if existing:
# 数据库中已有该账号,更新 extra_data 标记
extra = existing.extra_data or {}
extra["openai_already_registered"] = True
crud.update_account(
db,
existing.id,
extra_data=extra
)
self._log(f"已更新数据库中 {self.email} 的已注册标记")
else:
# 数据库中不存在,创建失败记录并标记
crud.create_account(
db,
email=self.email,
password="", # 空密码表示未成功注册
password="",
email_service=self.email_service.service_type.value,
email_service_id=self.email_info.get("service_id") if self.email_info else None,
status="failed",
extra_data={"register_failed_reason": "email_already_registered_on_openai"}
extra_data={"openai_already_registered": True}
)
self._log(f"已在数据库中标记邮箱 {self.email} 为已注册状态")
self._log(f"已在数据库中标记 {self.email} 为已注册状态")
except Exception as e:
logger.warning(f"标记邮箱状态失败: {e}")
@@ -722,9 +772,11 @@ class RegistrationEngine:
result.error_message = f"提交注册表单失败: {signup_result.error_message}"
return result
# 8. [已注册账号跳过] 注册密码
# 8. 检测到已注册账号 → 直接终止任务
if self._is_existing_account:
self._log("8. [已注册账号] 跳过密码设置OTP 已自动发送")
self._log(f"8. 邮箱 {self.email} 在 OpenAI 已注册,跳过注册流程", "warning")
result.error_message = f"邮箱 {self.email} 已在 OpenAI 注册"
return result
else:
self._log("8. 注册密码...")
password_ok, password = self._register_password()
@@ -732,16 +784,11 @@ class RegistrationEngine:
result.error_message = "注册密码失败"
return result
# 9. [已注册账号跳过] 发送验证码
if self._is_existing_account:
self._log("9. [已注册账号] 跳过发送验证码,使用自动发送的 OTP")
# 已注册账号的 OTP 在提交表单时已自动发送,记录时间戳
self._otp_sent_at = time.time()
else:
self._log("9. 发送验证码...")
if not self._send_verification_code():
result.error_message = "发送验证码失败"
return result
# 9. 发送验证码
self._log("9. 发送验证码...")
if not self._send_verification_code():
result.error_message = "发送验证码失败"
return result
# 10. 获取验证码
self._log("10. 等待验证码...")
@@ -756,14 +803,11 @@ class RegistrationEngine:
result.error_message = "验证验证码失败"
return result
# 12. [已注册账号跳过] 创建用户账户
if self._is_existing_account:
self._log("12. [已注册账号] 跳过创建用户账户")
else:
self._log("12. 创建用户账户...")
if not self._create_user_account():
result.error_message = "创建用户账户失败"
return result
# 12. 创建用户账户
self._log("12. 创建用户账户...")
if not self._create_user_account():
result.error_message = "创建用户账户失败"
return result
# 13. 获取 Workspace ID
self._log("13. 获取 Workspace ID...")
@@ -803,7 +847,7 @@ class RegistrationEngine:
result.password = self.password or "" # 保存密码(已注册账号为空)
# 设置来源标记
result.source = "login" if self._is_existing_account else "register"
result.source = "register"
# 尝试获取 session_token 从 cookie
session_cookie = self.session.cookies.get("__Secure-next-auth.session-token")
@@ -814,10 +858,7 @@ class RegistrationEngine:
# 17. 完成
self._log("=" * 60)
if self._is_existing_account:
self._log("登录成功! (已注册账号)")
else:
self._log("注册成功!")
self._log("注册成功!")
self._log(f"邮箱: {result.email}")
self._log(f"Account ID: {result.account_id}")
self._log(f"Workspace ID: {result.workspace_id}")
@@ -828,7 +869,6 @@ class RegistrationEngine:
"email_service": self.email_service.service_type.value,
"proxy_used": self.proxy_url,
"registered_at": datetime.now().isoformat(),
"is_existing_account": self._is_existing_account,
}
return result

View File

@@ -713,4 +713,21 @@ def delete_tm_service(db: Session, service_id: int) -> bool:
return False
db.delete(svc)
db.commit()
return True
return True
def update_outlook_refresh_token(db: Session, service_id: int, email: str, new_refresh_token: str):
"""更新 EmailService.config 中指定邮箱的 refresh_token"""
service = db.query(EmailService).filter(EmailService.id == service_id).first()
if not service or not service.config:
return
config = dict(service.config)
# 单账户格式
if config.get("email", "").lower() == email.lower():
config["refresh_token"] = new_refresh_token
# 多账户列表格式
for acc in config.get("accounts", []):
if acc.get("email", "").lower() == email.lower():
acc["refresh_token"] = new_refresh_token
service.config = config
db.commit()

View File

@@ -38,9 +38,7 @@ from .outlook.base import (
from .outlook.account import OutlookAccount
from .outlook.providers import (
OutlookProvider,
IMAPOldProvider,
IMAPNewProvider,
GraphAPIProvider,
)
__all__ = [
@@ -67,7 +65,5 @@ __all__ = [
'ProviderStatus',
'OutlookAccount',
'OutlookProvider',
'IMAPOldProvider',
'IMAPNewProvider',
'GraphAPIProvider',
]

View File

@@ -3,7 +3,7 @@ Outlook 账户数据类
"""
from dataclasses import dataclass
from typing import Dict, Any, Optional
from typing import Dict, Any
@dataclass
@@ -16,7 +16,6 @@ class OutlookAccount:
@classmethod
def from_config(cls, config: Dict[str, Any]) -> "OutlookAccount":
"""从配置创建账户"""
return cls(
email=config.get("email", ""),
password=config.get("password", ""),
@@ -25,15 +24,12 @@ class OutlookAccount:
)
def has_oauth(self) -> bool:
"""是否支持 OAuth2"""
return bool(self.client_id and self.refresh_token)
def validate(self) -> bool:
"""验证账户信息是否有效"""
return bool(self.email and self.password) or self.has_oauth()
def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]:
"""转换为字典"""
result = {
"email": self.email,
"has_oauth": self.has_oauth(),
@@ -47,5 +43,4 @@ class OutlookAccount:
return result
def __str__(self) -> str:
"""字符串表示"""
return f"OutlookAccount({self.email})"

View File

@@ -1,6 +1,5 @@
"""
Outlook 服务基础定义
包含枚举类型和数据类
Outlook 邮箱服务基础定义
"""
from dataclasses import dataclass, field
@@ -10,49 +9,38 @@ from typing import Optional, Dict, Any, List
class ProviderType(str, Enum):
"""Outlook 提供者类型"""
IMAP_OLD = "imap_old" # 旧版 IMAP (outlook.office365.com)
IMAP_NEW = "imap_new" # 新版 IMAP (outlook.live.com)
GRAPH_API = "graph_api" # Microsoft Graph API
"""Outlook 提供者类型(仅 IMAP_NEW"""
IMAP_NEW = "imap_new"
class TokenEndpoint(str, Enum):
"""Token 端点"""
LIVE = "https://login.live.com/oauth20_token.srf"
CONSUMERS = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
COMMON = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
class IMAPServer(str, Enum):
"""IMAP 服务器"""
OLD = "outlook.office365.com"
NEW = "outlook.live.com"
class ProviderStatus(str, Enum):
"""提供者状态"""
HEALTHY = "healthy" # 健康
DEGRADED = "degraded" # 降级
DISABLED = "disabled" # 禁用
HEALTHY = "healthy"
DEGRADED = "degraded"
DISABLED = "disabled"
@dataclass
class EmailMessage:
"""邮件消息数据类"""
id: str # 消息 ID
subject: str # 主题
sender: str # 发件人
recipients: List[str] = field(default_factory=list) # 收件人列表
body: str = "" # 正文内容
body_preview: str = "" # 正文预览
received_at: Optional[datetime] = None # 接收时间
received_timestamp: int = 0 # 接收时间戳
is_read: bool = False # 是否已读
has_attachments: bool = False # 是否有附件
raw_data: Optional[bytes] = None # 原始数据(用于调试)
id: str
subject: str
sender: str
recipients: List[str] = field(default_factory=list)
body: str = ""
body_preview: str = ""
received_at: Optional[datetime] = None
received_timestamp: int = 0
is_read: bool = False
has_attachments: bool = False
raw_data: Optional[bytes] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"subject": self.subject,
@@ -71,19 +59,17 @@ class EmailMessage:
class TokenInfo:
"""Token 信息数据类"""
access_token: str
expires_at: float # 过期时间戳
expires_at: float
token_type: str = "Bearer"
scope: str = ""
refresh_token: Optional[str] = None
def is_expired(self, buffer_seconds: int = 120) -> bool:
"""检查 Token 是否已过期"""
import time
return time.time() >= (self.expires_at - buffer_seconds)
@classmethod
def from_response(cls, data: Dict[str, Any], scope: str = "") -> "TokenInfo":
"""从 API 响应创建"""
import time
return cls(
access_token=data.get("access_token", ""),
@@ -99,49 +85,42 @@ class ProviderHealth:
"""提供者健康状态"""
provider_type: ProviderType
status: ProviderStatus = ProviderStatus.HEALTHY
failure_count: int = 0 # 连续失败次数
last_success: Optional[datetime] = None # 最后成功时间
last_failure: Optional[datetime] = None # 最后失败时间
last_error: str = "" # 最后错误信息
disabled_until: Optional[datetime] = None # 禁用截止时间
failure_count: int = 0
last_success: Optional[datetime] = None
last_failure: Optional[datetime] = None
last_error: str = ""
disabled_until: Optional[datetime] = None
def record_success(self):
"""记录成功"""
self.status = ProviderStatus.HEALTHY
self.failure_count = 0
self.last_success = datetime.now()
self.disabled_until = None
def record_failure(self, error: str):
"""记录失败"""
self.failure_count += 1
self.last_failure = datetime.now()
self.last_error = error
def should_disable(self, threshold: int = 3) -> bool:
"""判断是否应该禁用"""
return self.failure_count >= threshold
def is_disabled(self) -> bool:
"""检查是否被禁用"""
if self.disabled_until and datetime.now() < self.disabled_until:
return True
return False
def disable(self, duration_seconds: int = 300):
"""禁用提供者"""
from datetime import timedelta
self.status = ProviderStatus.DISABLED
self.disabled_until = datetime.now() + timedelta(seconds=duration_seconds)
def enable(self):
"""启用提供者"""
self.status = ProviderStatus.HEALTHY
self.disabled_until = None
self.failure_count = 0
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"provider_type": self.provider_type.value,
"status": self.status.value,

View File

@@ -1,15 +1,13 @@
"""
健康检查和故障切换管理
健康检查管理(简化版,单 Provider
"""
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from typing import Any, Dict, Optional
from .base import ProviderType, ProviderHealth, ProviderStatus
from .providers.base import OutlookProvider
from .base import ProviderHealth, ProviderStatus, ProviderType
logger = logging.getLogger(__name__)
@@ -17,296 +15,49 @@ logger = logging.getLogger(__name__)
class HealthChecker:
"""
健康检查管理
跟踪各提供者的健康状态,管理故障切换
单 Provider 健康检查器
跟踪 IMAP_NEW 的健康状态
"""
def __init__(
self,
failure_threshold: int = 3,
disable_duration: int = 300,
recovery_check_interval: int = 60,
):
"""
初始化健康检查器
Args:
failure_threshold: 连续失败次数阈值,超过后禁用
disable_duration: 禁用时长(秒)
recovery_check_interval: 恢复检查间隔(秒)
"""
self.failure_threshold = failure_threshold
self.disable_duration = disable_duration
self.recovery_check_interval = recovery_check_interval
# 提供者健康状态: ProviderType -> ProviderHealth
self._health_status: Dict[ProviderType, ProviderHealth] = {}
self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW)
self._lock = threading.Lock()
# 初始化所有提供者的健康状态
for provider_type in ProviderType:
self._health_status[provider_type] = ProviderHealth(
provider_type=provider_type
)
def get_health(self, provider_type: ProviderType) -> ProviderHealth:
"""获取提供者的健康状态"""
def record_success(self):
with self._lock:
return self._health_status.get(provider_type, ProviderHealth(provider_type=provider_type))
self._health.record_success()
def record_success(self, provider_type: ProviderType):
"""记录成功操作"""
def record_failure(self, error: str):
with self._lock:
health = self._health_status.get(provider_type)
if health:
health.record_success()
logger.debug(f"{provider_type.value} 记录成功")
def record_failure(self, provider_type: ProviderType, error: str):
"""记录失败操作"""
with self._lock:
health = self._health_status.get(provider_type)
if health:
health.record_failure(error)
# 检查是否需要禁用
if health.should_disable(self.failure_threshold):
health.disable(self.disable_duration)
logger.warning(
f"{provider_type.value} 已禁用 {self.disable_duration} 秒,"
f"原因: {error}"
)
def is_available(self, provider_type: ProviderType) -> bool:
"""
检查提供者是否可用
Args:
provider_type: 提供者类型
Returns:
是否可用
"""
health = self.get_health(provider_type)
# 检查是否被禁用
if health.is_disabled():
remaining = (health.disabled_until - datetime.now()).total_seconds()
logger.debug(
f"{provider_type.value} 已被禁用,剩余 {int(remaining)}"
)
return False
return health.status != ProviderStatus.DISABLED
def get_available_providers(
self,
priority_order: Optional[List[ProviderType]] = None,
) -> List[ProviderType]:
"""
获取可用的提供者列表
Args:
priority_order: 优先级顺序,默认为 [IMAP_NEW, IMAP_OLD, GRAPH_API]
Returns:
可用的提供者列表
"""
if priority_order is None:
priority_order = [
ProviderType.IMAP_NEW,
ProviderType.IMAP_OLD,
ProviderType.GRAPH_API,
]
available = []
for provider_type in priority_order:
if self.is_available(provider_type):
available.append(provider_type)
return available
def get_next_available_provider(
self,
priority_order: Optional[List[ProviderType]] = None,
) -> Optional[ProviderType]:
"""
获取下一个可用的提供者
Args:
priority_order: 优先级顺序
Returns:
可用的提供者类型,如果没有返回 None
"""
available = self.get_available_providers(priority_order)
return available[0] if available else None
def force_disable(self, provider_type: ProviderType, duration: Optional[int] = None):
"""
强制禁用提供者
Args:
provider_type: 提供者类型
duration: 禁用时长(秒),默认使用配置值
"""
with self._lock:
health = self._health_status.get(provider_type)
if health:
health.disable(duration or self.disable_duration)
logger.warning(f"{provider_type.value} 已强制禁用")
def force_enable(self, provider_type: ProviderType):
"""
强制启用提供者
Args:
provider_type: 提供者类型
"""
with self._lock:
health = self._health_status.get(provider_type)
if health:
health.enable()
logger.info(f"{provider_type.value} 已启用")
def get_all_health_status(self) -> Dict[str, Any]:
"""
获取所有提供者的健康状态
Returns:
健康状态字典
"""
with self._lock:
return {
provider_type.value: health.to_dict()
for provider_type, health in self._health_status.items()
}
def check_and_recover(self):
"""
检查并恢复被禁用的提供者
如果禁用时间已过,自动恢复提供者
"""
with self._lock:
for provider_type, health in self._health_status.items():
if health.is_disabled():
# 检查是否可以恢复
if health.disabled_until and datetime.now() >= health.disabled_until:
health.enable()
logger.info(f"{provider_type.value} 已自动恢复")
def reset_all(self):
"""重置所有提供者的健康状态"""
with self._lock:
for provider_type in ProviderType:
self._health_status[provider_type] = ProviderHealth(
provider_type=provider_type
self._health.record_failure(error)
if self._health.should_disable(self.failure_threshold):
self._health.disable(self.disable_duration)
logger.warning(
f"IMAP_NEW 已禁用 {self.disable_duration}s原因: {error}"
)
logger.info("已重置所有提供者的健康状态")
class FailoverManager:
"""
故障切换管理器
管理提供者之间的自动切换
"""
def __init__(
self,
health_checker: HealthChecker,
priority_order: Optional[List[ProviderType]] = None,
):
"""
初始化故障切换管理器
Args:
health_checker: 健康检查器
priority_order: 提供者优先级顺序
"""
self.health_checker = health_checker
self.priority_order = priority_order or [
ProviderType.IMAP_NEW,
ProviderType.IMAP_OLD,
ProviderType.GRAPH_API,
]
# 当前使用的提供者索引
self._current_index = 0
self._lock = threading.Lock()
def get_current_provider(self) -> Optional[ProviderType]:
"""
获取当前提供者
Returns:
当前提供者类型,如果没有可用的返回 None
"""
available = self.health_checker.get_available_providers(self.priority_order)
if not available:
return None
def is_available(self) -> bool:
with self._lock:
# 尝试使用当前索引
if self._current_index < len(available):
return available[self._current_index]
return available[0]
def switch_to_next(self) -> Optional[ProviderType]:
"""
切换到下一个提供者
Returns:
下一个提供者类型,如果没有可用的返回 None
"""
available = self.health_checker.get_available_providers(self.priority_order)
if not available:
return None
if self._health.is_disabled():
remaining = (
(self._health.disabled_until - datetime.now()).total_seconds()
if self._health.disabled_until
else 0
)
logger.debug(f"IMAP_NEW 已被禁用,剩余 {int(remaining)}s")
return False
return self._health.status != ProviderStatus.DISABLED
def reset(self):
with self._lock:
self._current_index = (self._current_index + 1) % len(available)
next_provider = available[self._current_index]
logger.info(f"切换到提供者: {next_provider.value}")
return next_provider
def on_provider_success(self, provider_type: ProviderType):
"""
提供者成功时调用
Args:
provider_type: 提供者类型
"""
self.health_checker.record_success(provider_type)
# 重置索引到成功的提供者
with self._lock:
available = self.health_checker.get_available_providers(self.priority_order)
if provider_type in available:
self._current_index = available.index(provider_type)
def on_provider_failure(self, provider_type: ProviderType, error: str):
"""
提供者失败时调用
Args:
provider_type: 提供者类型
error: 错误信息
"""
self.health_checker.record_failure(provider_type, error)
self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW)
def get_status(self) -> Dict[str, Any]:
"""
获取故障切换状态
Returns:
状态字典
"""
current = self.get_current_provider()
return {
"current_provider": current.value if current else None,
"priority_order": [p.value for p in self.priority_order],
"available_providers": [
p.value for p in self.health_checker.get_available_providers(self.priority_order)
],
"health_status": self.health_checker.get_all_health_status(),
}
with self._lock:
return self._health.to_dict()

View File

@@ -3,27 +3,18 @@ Outlook 提供者模块
"""
from .base import OutlookProvider, ProviderConfig
from .imap_old import IMAPOldProvider
from .imap_new import IMAPNewProvider
from .graph_api import GraphAPIProvider
__all__ = [
'OutlookProvider',
'ProviderConfig',
'IMAPOldProvider',
'IMAPNewProvider',
'GraphAPIProvider',
]
# 提供者注册表
PROVIDER_REGISTRY = {
'imap_old': IMAPOldProvider,
'imap_new': IMAPNewProvider,
'graph_api': GraphAPIProvider,
}
def get_provider_class(provider_type: str):
"""获取提供者类"""
return PROVIDER_REGISTRY.get(provider_type)

View File

@@ -5,7 +5,7 @@ Outlook 提供者抽象基类
import abc
import logging
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from typing import List, Optional
from ..base import ProviderType, EmailMessage, ProviderHealth, ProviderStatus
from ..account import OutlookAccount
@@ -18,56 +18,36 @@ logger = logging.getLogger(__name__)
class ProviderConfig:
"""提供者配置"""
timeout: int = 30
max_retries: int = 3
proxy_url: Optional[str] = None
# 健康检查配置
service_id: Optional[int] = None
health_failure_threshold: int = 3
health_disable_duration: int = 300 # 秒
health_disable_duration: int = 300
class OutlookProvider(abc.ABC):
"""
Outlook 提供者抽象基类
定义所有提供者必须实现的接口
"""
"""Outlook 提供者抽象基类"""
def __init__(
self,
account: OutlookAccount,
config: Optional[ProviderConfig] = None,
):
"""
初始化提供者
Args:
account: Outlook 账户
config: 提供者配置
"""
self.account = account
self.config = config or ProviderConfig()
# 健康状态
self._health = ProviderHealth(provider_type=self.provider_type)
# 连接状态
self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW)
self._connected = False
self._last_error: Optional[str] = None
@property
@abc.abstractmethod
def provider_type(self) -> ProviderType:
"""获取提供者类型"""
pass
return ProviderType.IMAP_NEW
@property
def health(self) -> ProviderHealth:
"""获取健康状态"""
return self._health
@property
def is_healthy(self) -> bool:
"""检查是否健康"""
return (
self._health.status == ProviderStatus.HEALTHY
and not self._health.is_disabled()
@@ -75,22 +55,14 @@ class OutlookProvider(abc.ABC):
@property
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected
@abc.abstractmethod
def connect(self) -> bool:
"""
连接到服务
Returns:
是否连接成功
"""
pass
@abc.abstractmethod
def disconnect(self):
"""断开连接"""
pass
@abc.abstractmethod
@@ -99,81 +71,44 @@ class OutlookProvider(abc.ABC):
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
pass
@abc.abstractmethod
def test_connection(self) -> bool:
"""
测试连接是否正常
Returns:
连接是否正常
"""
pass
def wait_for_new_email_idle(self, timeout: int = 25) -> bool:
"""IMAP IDLE默认不支持子类可覆盖"""
return False
def record_success(self):
"""记录成功操作"""
self._health.record_success()
self._last_error = None
logger.debug(f"[{self.account.email}] {self.provider_type.value} 操作成功")
def record_failure(self, error: str):
"""记录失败操作"""
self._health.record_failure(error)
self._last_error = error
# 检查是否需要禁用
if self._health.should_disable(self.config.health_failure_threshold):
self._health.disable(self.config.health_disable_duration)
logger.warning(
f"[{self.account.email}] {self.provider_type.value} 已禁用 "
f"{self.config.health_disable_duration},原因: {error}"
)
else:
logger.warning(
f"[{self.account.email}] {self.provider_type.value} 操作失败 "
f"({self._health.failure_count}/{self.config.health_failure_threshold}): {error}"
f"[{self.account.email}] IMAP_NEW 已禁用 "
f"{self.config.health_disable_duration}s,原因: {error}"
)
def check_health(self) -> bool:
"""
检查健康状态
Returns:
是否健康可用
"""
# 检查是否被禁用
if self._health.is_disabled():
logger.debug(
f"[{self.account.email}] {self.provider_type.value} 已被禁用,"
f"将在 {self._health.disabled_until} 后恢复"
)
return False
return self._health.status in (ProviderStatus.HEALTHY, ProviderStatus.DEGRADED)
def __enter__(self):
"""上下文管理器入口"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.disconnect()
return False
def __str__(self) -> str:
"""字符串表示"""
return f"{self.__class__.__name__}({self.account.email})"
def __repr__(self) -> str:

View File

@@ -1,250 +0,0 @@
"""
Graph API 提供者
使用 Microsoft Graph REST API
"""
import json
import logging
from typing import List, Optional
from datetime import datetime
from curl_cffi import requests as _requests
from ..base import ProviderType, EmailMessage
from ..account import OutlookAccount
from ..token_manager import TokenManager
from .base import OutlookProvider, ProviderConfig
logger = logging.getLogger(__name__)
class GraphAPIProvider(OutlookProvider):
"""
Graph API 提供者
使用 Microsoft Graph REST API 获取邮件
需要 graph.microsoft.com/.default scope
"""
# Graph API 端点
GRAPH_API_BASE = "https://graph.microsoft.com/v1.0"
MESSAGES_ENDPOINT = "/me/mailFolders/inbox/messages"
@property
def provider_type(self) -> ProviderType:
return ProviderType.GRAPH_API
def __init__(
self,
account: OutlookAccount,
config: Optional[ProviderConfig] = None,
):
super().__init__(account, config)
# Token 管理器
self._token_manager: Optional[TokenManager] = None
# 注意Graph API 必须使用 OAuth2
if not account.has_oauth():
logger.warning(
f"[{self.account.email}] Graph API 提供者需要 OAuth2 配置 "
f"(client_id + refresh_token)"
)
def connect(self) -> bool:
"""
验证连接(获取 Token
Returns:
是否连接成功
"""
if not self.account.has_oauth():
error = "Graph API 需要 OAuth2 配置"
self.record_failure(error)
logger.error(f"[{self.account.email}] {error}")
return False
if not self._token_manager:
self._token_manager = TokenManager(
self.account,
ProviderType.GRAPH_API,
self.config.proxy_url,
self.config.timeout,
)
# 尝试获取 Token
token = self._token_manager.get_access_token()
if token:
self._connected = True
self.record_success()
logger.info(f"[{self.account.email}] Graph API 连接成功")
return True
return False
def disconnect(self):
"""断开连接(清除状态)"""
self._connected = False
def get_recent_emails(
self,
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
if not self._connected:
if not self.connect():
return []
try:
# 获取 Access Token
token = self._token_manager.get_access_token()
if not token:
self.record_failure("无法获取 Access Token")
return []
# 构建 API 请求
url = f"{self.GRAPH_API_BASE}{self.MESSAGES_ENDPOINT}"
params = {
"$top": count,
"$select": "id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview,body",
"$orderby": "receivedDateTime desc",
}
# 只获取未读邮件
if only_unseen:
params["$filter"] = "isRead eq false"
# 构建代理配置
proxies = None
if self.config.proxy_url:
proxies = {"http": self.config.proxy_url, "https": self.config.proxy_url}
# 发送请求curl_cffi 自动对 params 进行 URL 编码)
resp = _requests.get(
url,
params=params,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Prefer": "outlook.body-content-type='text'",
},
proxies=proxies,
timeout=self.config.timeout,
impersonate="chrome110",
)
if resp.status_code == 401:
# Token 无 Graph 权限client_id 未授权),清除缓存但不记录健康失败
# 避免因权限不足导致健康检查器禁用该提供者,影响其他账户
if self._token_manager:
self._token_manager.clear_cache()
self._connected = False
logger.warning(f"[{self.account.email}] Graph API 返回 401client_id 可能无 Graph 权限,跳过")
return []
if resp.status_code != 200:
error_body = resp.text[:200]
self.record_failure(f"HTTP {resp.status_code}: {error_body}")
logger.error(f"[{self.account.email}] Graph API 请求失败: HTTP {resp.status_code}")
return []
data = resp.json()
# 解析邮件
messages = data.get("value", [])
emails = []
for msg in messages:
try:
email_msg = self._parse_graph_message(msg)
if email_msg:
emails.append(email_msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析 Graph API 邮件失败: {e}")
self.record_success()
return emails
except Exception as e:
self.record_failure(str(e))
logger.error(f"[{self.account.email}] Graph API 获取邮件失败: {e}")
return []
def _parse_graph_message(self, msg: dict) -> Optional[EmailMessage]:
"""
解析 Graph API 消息
Args:
msg: Graph API 消息对象
Returns:
EmailMessage 对象
"""
# 解析发件人
from_info = msg.get("from", {})
sender_info = from_info.get("emailAddress", {})
sender = sender_info.get("address", "")
# 解析收件人
recipients = []
for recipient in msg.get("toRecipients", []):
addr_info = recipient.get("emailAddress", {})
addr = addr_info.get("address", "")
if addr:
recipients.append(addr)
# 解析日期
received_at = None
received_timestamp = 0
try:
date_str = msg.get("receivedDateTime", "")
if date_str:
# ISO 8601 格式
received_at = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
received_timestamp = int(received_at.timestamp())
except Exception:
pass
# 获取正文
body_info = msg.get("body", {})
body = body_info.get("content", "")
body_preview = msg.get("bodyPreview", "")
return EmailMessage(
id=msg.get("id", ""),
subject=msg.get("subject", ""),
sender=sender,
recipients=recipients,
body=body,
body_preview=body_preview,
received_at=received_at,
received_timestamp=received_timestamp,
is_read=msg.get("isRead", False),
has_attachments=msg.get("hasAttachments", False),
)
def test_connection(self) -> bool:
"""
测试 Graph API 连接
Returns:
连接是否正常
"""
try:
# 尝试获取一封邮件来测试连接
emails = self.get_recent_emails(count=1, only_unseen=False)
return True
except Exception as e:
logger.warning(f"[{self.account.email}] Graph API 连接测试失败: {e}")
return False

View File

@@ -1,39 +1,87 @@
"""
新版 IMAP 提供者
使用 outlook.live.com 服务器和 login.microsoftonline.com/consumers Token 端点
使用 outlook.live.com:993 + consumers Token 端点
引入进程级 IMAPConnectionPool 连接复用和 IMAP IDLE
"""
import email
import imaplib
import logging
import select
import time
import threading
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import List, Optional
from typing import Dict, List, Optional
from ..base import ProviderType, EmailMessage
from ..base import EmailMessage
from ..account import OutlookAccount
from ..token_manager import TokenManager
from .base import OutlookProvider, ProviderConfig
from .imap_old import IMAPOldProvider
logger = logging.getLogger(__name__)
class IMAPNewProvider(OutlookProvider):
"""
新版 IMAP 提供者
使用 outlook.live.com:993 和 login.microsoftonline.com/consumers Token 端点
需要 IMAP.AccessAsUser.All scope
"""
class IMAPConnectionPool:
"""进程级 IMAP 连接池,按 email 复用 IMAP4_SSL 连接"""
# IMAP 服务器配置
IMAP_HOST = "outlook.live.com"
IMAP_PORT = 993
@property
def provider_type(self) -> ProviderType:
return ProviderType.IMAP_NEW
def __init__(self):
self._connections: Dict[str, imaplib.IMAP4_SSL] = {}
self._lock = threading.Lock()
def get_connection(
self,
email_addr: str,
token: str,
timeout: int = 30,
) -> imaplib.IMAP4_SSL:
"""获取或新建 IMAP 连接"""
with self._lock:
conn = self._connections.get(email_addr)
if conn:
try:
conn.noop()
return conn
except Exception:
self._close_one(email_addr)
conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout)
auth_str = f"user={email_addr}\x01auth=Bearer {token}\x01\x01"
conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8"))
self._connections[email_addr] = conn
logger.debug(f"[{email_addr}] IMAP 新连接已建立")
return conn
def invalidate(self, email_addr: str):
"""废弃连接(认证失败时调用)"""
with self._lock:
self._close_one(email_addr)
def _close_one(self, email_addr: str):
conn = self._connections.pop(email_addr, None)
if conn:
try:
conn.logout()
except Exception:
pass
# 模块级单例连接池
_imap_pool = IMAPConnectionPool()
class IMAPNewProvider(OutlookProvider):
"""
新版 IMAP 提供者
通过连接池复用连接,支持 IMAP IDLE
"""
IMAP_HOST = "outlook.live.com"
IMAP_PORT = 993
def __init__(
self,
@@ -41,151 +89,110 @@ class IMAPNewProvider(OutlookProvider):
config: Optional[ProviderConfig] = None,
):
super().__init__(account, config)
# IMAP 连接
self._conn: Optional[imaplib.IMAP4_SSL] = None
# Token 管理器
self._token_manager: Optional[TokenManager] = None
# 注意:新版 IMAP 必须使用 OAuth2
if not account.has_oauth():
logger.warning(
f"[{self.account.email}] 新版 IMAP 提供者需要 OAuth2 配置 "
f"(client_id + refresh_token)"
f"[{self.account.email}] IMAP_NEW 需要 OAuth2 配置 (client_id + refresh_token)"
)
def _get_token_manager(self) -> TokenManager:
if not self._token_manager:
self._token_manager = TokenManager(
self.account,
proxy_url=self.config.proxy_url,
timeout=self.config.timeout,
service_id=self.config.service_id,
)
return self._token_manager
def connect(self) -> bool:
"""
连接到 IMAP 服务器
Returns:
是否连接成功
"""
if self._connected and self._conn:
try:
self._conn.noop()
return True
except Exception:
self.disconnect()
# 新版 IMAP 必须使用 OAuth2无 OAuth 时静默跳过,不记录健康失败
"""从连接池获取连接"""
if not self.account.has_oauth():
logger.debug(f"[{self.account.email}] 跳过 IMAP_NEW无 OAuth")
return False
try:
logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...")
tm = self._get_token_manager()
token = tm.get_access_token()
if not token:
logger.error(f"[{self.account.email}] 获取 IMAP Token 失败")
return False
# 创建连接
self._conn = imaplib.IMAP4_SSL(
self.IMAP_HOST,
self.IMAP_PORT,
timeout=self.config.timeout,
self._conn = _imap_pool.get_connection(
self.account.email, token, self.config.timeout
)
# XOAUTH2 认证
if self._authenticate_xoauth2():
self._connected = True
self.record_success()
logger.info(f"[{self.account.email}] 新版 IMAP 连接成功 (XOAUTH2)")
return True
return False
except Exception as e:
self.disconnect()
self.record_failure(str(e))
logger.error(f"[{self.account.email}] 新版 IMAP 连接失败: {e}")
return False
def _authenticate_xoauth2(self) -> bool:
"""
使用 XOAUTH2 认证
Returns:
是否认证成功
"""
if not self._token_manager:
self._token_manager = TokenManager(
self.account,
ProviderType.IMAP_NEW,
self.config.proxy_url,
self.config.timeout,
)
# 获取 Access Token
token = self._token_manager.get_access_token()
if not token:
logger.error(f"[{self.account.email}] 获取 IMAP Token 失败")
return False
try:
# 构建 XOAUTH2 认证字符串
auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01"
self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8"))
self._connected = True
self.record_success()
logger.debug(f"[{self.account.email}] IMAP 连接就绪(连接池)")
return True
except imaplib.IMAP4.error as e:
err = str(e)
# Token 失效时强制刷新并重试一次
if "AUTHENTICATE" in err or "invalid" in err.lower():
logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试刷新 Token")
_imap_pool.invalidate(self.account.email)
try:
tm = self._get_token_manager()
token = tm.get_access_token(force_refresh=True)
if token:
self._conn = _imap_pool.get_connection(
self.account.email, token, self.config.timeout
)
self._connected = True
self.record_success()
return True
except Exception as retry_e:
self.record_failure(str(retry_e))
logger.error(f"[{self.account.email}] Token 刷新后重连失败: {retry_e}")
else:
self.record_failure(err)
logger.error(f"[{self.account.email}] IMAP 连接失败: {e}")
self._connected = False
self._conn = None
return False
except Exception as e:
logger.error(f"[{self.account.email}] XOAUTH2 认证异常: {e}")
# 清除缓存的 Token
self._token_manager.clear_cache()
self.record_failure(str(e))
logger.error(f"[{self.account.email}] IMAP 连接失败: {e}")
self._connected = False
self._conn = None
return False
def disconnect(self):
"""断开 IMAP 连接"""
if self._conn:
try:
self._conn.close()
except Exception:
pass
try:
self._conn.logout()
except Exception:
pass
self._conn = None
"""归还连接池(不 logout保持复用"""
self._connected = False
self._conn = None
def get_recent_emails(
self,
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
"""获取最近的邮件"""
if not self._connected:
if not self.connect():
return []
try:
# 选择收件箱
self._conn.select("INBOX", readonly=True)
# 搜索邮件
flag = "UNSEEN" if only_unseen else "ALL"
status, data = self._conn.search(None, flag)
if status != "OK" or not data or not data[0]:
return []
# 获取最新的邮件 ID
ids = data[0].split()
recent_ids = ids[-count:][::-1]
emails = []
for msg_id in recent_ids:
try:
email_msg = self._fetch_email(msg_id)
if email_msg:
emails.append(email_msg)
msg = self._fetch_email(msg_id)
if msg:
emails.append(msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}")
@@ -194,6 +201,9 @@ class IMAPNewProvider(OutlookProvider):
except Exception as e:
self.record_failure(str(e))
logger.error(f"[{self.account.email}] 获取邮件失败: {e}")
_imap_pool.invalidate(self.account.email)
self._connected = False
self._conn = None
return []
def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]:
@@ -211,21 +221,170 @@ class IMAPNewProvider(OutlookProvider):
if not raw:
return None
return self._parse_email(raw)
return _parse_email(raw)
@staticmethod
def _parse_email(raw: bytes) -> EmailMessage:
"""解析原始邮件"""
# 使用旧版提供者的解析方法
return IMAPOldProvider._parse_email(raw)
def wait_for_new_email_idle(self, timeout: int = 25) -> bool:
"""
RFC 2177 IMAP IDLE 实现。
发送 IDLE 命令,等待服务器推送 EXISTS/RECENT然后发送 DONE。
Returns True 表示有新邮件推送False 表示超时或异常(调用方降级轮询)。
"""
if not self._connected:
if not self.connect():
return False
try:
self._conn.select("INBOX", readonly=True)
except Exception as e:
logger.warning(f"[{self.account.email}] IDLE 前 SELECT 失败: {e}")
return False
logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s")
sock = self._conn.socket()
tag = self._conn._new_tag().decode() if isinstance(self._conn._new_tag(), bytes) else self._conn._new_tag()
try:
# 发送 IDLE 命令
self._conn.send(f"{tag} IDLE\r\n".encode())
# 等待 "+" 延续响应
deadline = time.time() + timeout
buf = b""
while time.time() < deadline:
ready = select.select([sock], [], [], min(2.0, deadline - time.time()))
if ready[0]:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
if b"+ " in buf or b"+\r\n" in buf:
break
# 等待 EXISTS / RECENT 推送
got_new = False
buf = b""
while time.time() < deadline:
remaining = deadline - time.time()
ready = select.select([sock], [], [], min(2.0, remaining))
if ready[0]:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
if b"EXISTS" in buf or b"RECENT" in buf:
got_new = True
break
return got_new
except Exception as e:
logger.warning(f"[{self.account.email}] IMAP IDLE 异常: {e}")
return False
finally:
# 发送 DONE 结束 IDLE
try:
self._conn.send(b"DONE\r\n")
# 读取 IDLE 结束响应(避免缓冲区污染后续命令)
deadline2 = time.time() + 5
resp_buf = b""
while time.time() < deadline2:
ready = select.select([sock], [], [], 1.0)
if ready[0]:
chunk = sock.recv(4096)
if not chunk:
break
resp_buf += chunk
if tag.encode() in resp_buf:
break
except Exception:
# DONE 发送失败则废弃连接
_imap_pool.invalidate(self.account.email)
self._connected = False
self._conn = None
def test_connection(self) -> bool:
"""测试 IMAP 连接"""
try:
with self:
self._conn.select("INBOX", readonly=True)
self._conn.search(None, "ALL")
return True
except Exception as e:
logger.warning(f"[{self.account.email}] 新版 IMAP 连接测试失败: {e}")
logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}")
return False
def _parse_email(raw: bytes) -> EmailMessage:
"""解析原始邮件为 EmailMessage"""
msg = email.message_from_bytes(raw)
def _decode(val):
if not val:
return ""
parts = decode_header(str(val))
result = ""
for part, charset in parts:
if isinstance(part, bytes):
try:
result += part.decode(charset or "utf-8", errors="replace")
except (LookupError, UnicodeDecodeError):
result += part.decode("utf-8", errors="replace")
else:
result += str(part)
return result
subject = _decode(msg.get("Subject", ""))
sender = _decode(msg.get("From", ""))
recipients = [_decode(msg.get("To", ""))]
received_at = None
received_ts = 0
date_str = msg.get("Date", "")
if date_str:
try:
received_at = parsedate_to_datetime(date_str)
received_ts = int(received_at.timestamp())
except Exception:
pass
body = ""
body_preview = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cd = str(part.get("Content-Disposition", ""))
if "attachment" not in cd.lower() and ct in ("text/plain", "text/html"):
try:
charset = part.get_content_charset() or "utf-8"
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
break
except Exception:
pass
else:
try:
charset = msg.get_content_charset() or "utf-8"
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
except Exception:
pass
body_preview = body[:200].strip()
msg_id = msg.get("Message-ID", "").strip("<>")
if not msg_id:
msg_id = f"{sender}_{received_ts}"
return EmailMessage(
id=msg_id,
subject=subject,
sender=sender,
recipients=recipients,
body=body,
body_preview=body_preview,
received_at=received_at,
received_timestamp=received_ts,
)

View File

@@ -1,345 +0,0 @@
"""
旧版 IMAP 提供者
使用 outlook.office365.com 服务器和 login.live.com Token 端点
"""
import email
import imaplib
import logging
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import List, Optional
from ..base import ProviderType, EmailMessage
from ..account import OutlookAccount
from ..token_manager import TokenManager
from .base import OutlookProvider, ProviderConfig
logger = logging.getLogger(__name__)
class IMAPOldProvider(OutlookProvider):
"""
旧版 IMAP 提供者
使用 outlook.office365.com:993 和 login.live.com Token 端点
"""
# IMAP 服务器配置
IMAP_HOST = "outlook.office365.com"
IMAP_PORT = 993
@property
def provider_type(self) -> ProviderType:
return ProviderType.IMAP_OLD
def __init__(
self,
account: OutlookAccount,
config: Optional[ProviderConfig] = None,
):
super().__init__(account, config)
# IMAP 连接
self._conn: Optional[imaplib.IMAP4_SSL] = None
# Token 管理器
self._token_manager: Optional[TokenManager] = None
def connect(self) -> bool:
"""
连接到 IMAP 服务器
Returns:
是否连接成功
"""
if self._connected and self._conn:
# 检查现有连接
try:
self._conn.noop()
return True
except Exception:
self.disconnect()
try:
logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...")
# 创建连接
self._conn = imaplib.IMAP4_SSL(
self.IMAP_HOST,
self.IMAP_PORT,
timeout=self.config.timeout,
)
# 尝试 XOAUTH2 认证
if self.account.has_oauth():
if self._authenticate_xoauth2():
self._connected = True
self.record_success()
logger.info(f"[{self.account.email}] IMAP 连接成功 (XOAUTH2)")
return True
else:
logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试密码认证")
# 密码认证
if self.account.password:
self._conn.login(self.account.email, self.account.password)
self._connected = True
self.record_success()
logger.info(f"[{self.account.email}] IMAP 连接成功 (密码认证)")
return True
raise ValueError("没有可用的认证方式")
except Exception as e:
self.disconnect()
self.record_failure(str(e))
logger.error(f"[{self.account.email}] IMAP 连接失败: {e}")
return False
def _authenticate_xoauth2(self) -> bool:
"""
使用 XOAUTH2 认证
Returns:
是否认证成功
"""
if not self._token_manager:
self._token_manager = TokenManager(
self.account,
ProviderType.IMAP_OLD,
self.config.proxy_url,
self.config.timeout,
)
# 获取 Access Token
token = self._token_manager.get_access_token()
if not token:
return False
try:
# 构建 XOAUTH2 认证字符串
auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01"
self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8"))
return True
except Exception as e:
logger.debug(f"[{self.account.email}] XOAUTH2 认证异常: {e}")
# 清除缓存的 Token
self._token_manager.clear_cache()
return False
def disconnect(self):
"""断开 IMAP 连接"""
if self._conn:
try:
self._conn.close()
except Exception:
pass
try:
self._conn.logout()
except Exception:
pass
self._conn = None
self._connected = False
def get_recent_emails(
self,
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
if not self._connected:
if not self.connect():
return []
try:
# 选择收件箱
self._conn.select("INBOX", readonly=True)
# 搜索邮件
flag = "UNSEEN" if only_unseen else "ALL"
status, data = self._conn.search(None, flag)
if status != "OK" or not data or not data[0]:
return []
# 获取最新的邮件 ID
ids = data[0].split()
recent_ids = ids[-count:][::-1] # 倒序,最新的在前
emails = []
for msg_id in recent_ids:
try:
email_msg = self._fetch_email(msg_id)
if email_msg:
emails.append(email_msg)
except Exception as e:
logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}")
return emails
except Exception as e:
self.record_failure(str(e))
logger.error(f"[{self.account.email}] 获取邮件失败: {e}")
return []
def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]:
"""
获取并解析单封邮件
Args:
msg_id: 邮件 ID
Returns:
EmailMessage 对象,失败返回 None
"""
status, data = self._conn.fetch(msg_id, "(RFC822)")
if status != "OK" or not data or not data[0]:
return None
# 获取原始邮件内容
raw = b""
for part in data:
if isinstance(part, tuple) and len(part) > 1:
raw = part[1]
break
if not raw:
return None
return self._parse_email(raw)
@staticmethod
def _parse_email(raw: bytes) -> EmailMessage:
"""
解析原始邮件
Args:
raw: 原始邮件数据
Returns:
EmailMessage 对象
"""
# 移除 BOM
if raw.startswith(b"\xef\xbb\xbf"):
raw = raw[3:]
msg = email.message_from_bytes(raw)
# 解析邮件头
subject = IMAPOldProvider._decode_header(msg.get("Subject", ""))
sender = IMAPOldProvider._decode_header(msg.get("From", ""))
to = IMAPOldProvider._decode_header(msg.get("To", ""))
delivered_to = IMAPOldProvider._decode_header(msg.get("Delivered-To", ""))
x_original_to = IMAPOldProvider._decode_header(msg.get("X-Original-To", ""))
date_str = IMAPOldProvider._decode_header(msg.get("Date", ""))
# 提取正文
body = IMAPOldProvider._extract_body(msg)
# 解析日期
received_timestamp = 0
received_at = None
try:
if date_str:
received_at = parsedate_to_datetime(date_str)
received_timestamp = int(received_at.timestamp())
except Exception:
pass
# 构建收件人列表
recipients = [r for r in [to, delivered_to, x_original_to] if r]
return EmailMessage(
id=msg.get("Message-ID", ""),
subject=subject,
sender=sender,
recipients=recipients,
body=body,
received_at=received_at,
received_timestamp=received_timestamp,
is_read=False, # 搜索的是未读邮件
raw_data=raw[:500] if len(raw) > 500 else raw,
)
@staticmethod
def _decode_header(header: str) -> str:
"""解码邮件头"""
if not header:
return ""
parts = []
for chunk, encoding in decode_header(header):
if isinstance(chunk, bytes):
try:
decoded = chunk.decode(encoding or "utf-8", errors="replace")
parts.append(decoded)
except Exception:
parts.append(chunk.decode("utf-8", errors="replace"))
else:
parts.append(str(chunk))
return "".join(parts).strip()
@staticmethod
def _extract_body(msg) -> str:
"""提取邮件正文"""
import html as html_module
import re
texts = []
parts = msg.walk() if msg.is_multipart() else [msg]
for part in parts:
content_type = part.get_content_type()
if content_type not in ("text/plain", "text/html"):
continue
payload = part.get_payload(decode=True)
if not payload:
continue
charset = part.get_content_charset() or "utf-8"
try:
text = payload.decode(charset, errors="replace")
except LookupError:
text = payload.decode("utf-8", errors="replace")
# 如果是 HTML移除标签
if "<html" in text.lower():
text = re.sub(r"<[^>]+>", " ", text)
texts.append(text)
# 合并并清理文本
combined = " ".join(texts)
combined = html_module.unescape(combined)
combined = re.sub(r"\s+", " ", combined).strip()
return combined
def test_connection(self) -> bool:
"""
测试 IMAP 连接
Returns:
连接是否正常
"""
try:
with self:
self._conn.select("INBOX", readonly=True)
self._conn.search(None, "ALL")
return True
except Exception as e:
logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}")
return False

View File

@@ -1,6 +1,6 @@
"""
Outlook 邮箱服务主类
支持多种 IMAP/API 连接方式,自动故障切换
Outlook 邮箱服务主类(简化版)
单一 IMAP_NEW Provider + 邮件缓存 + IMAP IDLE 支持
"""
import logging
@@ -8,34 +8,21 @@ import threading
import time
from typing import Optional, Dict, Any, List
from ..base import BaseEmailService, EmailServiceError, EmailServiceStatus, EmailServiceType
from ..base import BaseEmailService, EmailServiceError, EmailServiceType
from ...config.constants import EmailServiceType as ServiceType
from ...config.settings import get_settings
from .account import OutlookAccount
from .base import ProviderType, EmailMessage
from .email_parser import EmailParser, get_email_parser
from .health_checker import HealthChecker, FailoverManager
from .providers.base import OutlookProvider, ProviderConfig
from .providers.imap_old import IMAPOldProvider
from .base import EmailMessage
from .email_parser import get_email_parser
from .health_checker import HealthChecker
from .providers.base import ProviderConfig
from .providers.imap_new import IMAPNewProvider
from .providers.graph_api import GraphAPIProvider
logger = logging.getLogger(__name__)
# 默认提供者优先级
# IMAP_OLD 最兼容(只需 login.live.com tokenIMAP_NEW 次之Graph API 最后
# 原因:部分 client_id 没有 Graph API 权限,但有 IMAP 权限
DEFAULT_PROVIDER_PRIORITY = [
ProviderType.IMAP_OLD,
ProviderType.IMAP_NEW,
ProviderType.GRAPH_API,
]
def get_email_code_settings() -> dict:
"""获取验证码等待配置"""
def _get_code_settings() -> dict:
settings = get_settings()
return {
"timeout": settings.email_code_timeout,
@@ -43,56 +30,58 @@ def get_email_code_settings() -> dict:
}
class _EmailCache:
"""轻量级邮件内存缓存TTL=60s减少重复 IMAP 请求)"""
TTL = 60
def __init__(self):
self._cache: Dict[str, tuple] = {} # email -> (timestamp, List[EmailMessage])
self._lock = threading.Lock()
def get(self, email: str) -> Optional[List[EmailMessage]]:
with self._lock:
entry = self._cache.get(email)
if entry and time.time() - entry[0] < self.TTL:
return entry[1]
return None
def set(self, email: str, messages: List[EmailMessage]):
with self._lock:
self._cache[email] = (time.time(), messages)
def invalidate(self, email: str):
with self._lock:
self._cache.pop(email, None)
class OutlookService(BaseEmailService):
"""
Outlook 邮箱服务
支持多种 IMAP/API 连接方式,自动故障切换
使用单一 IMAP_NEW Provider支持连接池复用和 IMAP IDLE
"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
"""
初始化 Outlook 服务
Args:
config: 配置字典,支持以下键:
- accounts: Outlook 账户列表
- provider_priority: 提供者优先级列表
- health_failure_threshold: 连续失败次数阈值
- health_disable_duration: 禁用时长(秒)
- timeout: 请求超时时间
- proxy_url: 代理 URL
name: 服务名称
"""
super().__init__(ServiceType.OUTLOOK, name)
# 默认配置
default_config = {
"accounts": [],
"provider_priority": [p.value for p in DEFAULT_PROVIDER_PRIORITY],
"health_failure_threshold": 5,
"health_disable_duration": 60,
"timeout": 30,
"proxy_url": None,
}
self.config = {**default_config, **(config or {})}
# 解析提供者优先级
self.provider_priority = [
ProviderType(p) for p in self.config.get("provider_priority", [])
]
if not self.provider_priority:
self.provider_priority = DEFAULT_PROVIDER_PRIORITY
# 提供者配置
self.provider_config = ProviderConfig(
timeout=self.config.get("timeout", 30),
proxy_url=self.config.get("proxy_url"),
service_id=self.config.get("service_id"),
health_failure_threshold=self.config.get("health_failure_threshold", 3),
health_disable_duration=self.config.get("health_disable_duration", 300),
)
# 获取默认 client_id(供无 client_id 的账户使用)
# 获取默认 client_id
try:
_default_client_id = get_settings().outlook_default_client_id
except Exception:
@@ -103,7 +92,6 @@ class OutlookService(BaseEmailService):
self._current_account_index = 0
self._account_lock = threading.Lock()
# 支持两种配置格式
if "email" in self.config and "password" in self.config:
account = OutlookAccount.from_config(self.config)
if not account.client_id and _default_client_id:
@@ -111,8 +99,8 @@ class OutlookService(BaseEmailService):
if account.validate():
self.accounts.append(account)
else:
for account_config in self.config.get("accounts", []):
account = OutlookAccount.from_config(account_config)
for ac in self.config.get("accounts", []):
account = OutlookAccount.from_config(ac)
if not account.client_id and _default_client_id:
account.client_id = _default_client_id
if account.validate():
@@ -121,175 +109,87 @@ class OutlookService(BaseEmailService):
if not self.accounts:
logger.warning("未配置有效的 Outlook 账户")
# 健康检查器和故障切换管理器
# 健康检查器
self.health_checker = HealthChecker(
failure_threshold=self.provider_config.health_failure_threshold,
disable_duration=self.provider_config.health_disable_duration,
)
self.failover_manager = FailoverManager(
health_checker=self.health_checker,
priority_order=self.provider_priority,
)
# 邮件解析器
self.email_parser = get_email_parser()
# 提供者实例缓存: (email, provider_type) -> OutlookProvider
self._providers: Dict[tuple, OutlookProvider] = {}
# Provider 实例缓存: email -> IMAPNewProvider
self._providers: Dict[str, IMAPNewProvider] = {}
self._provider_lock = threading.Lock()
# IMAP 连接限制(防止限流
# IMAP 并发限制(最多 5 个并发
self._imap_semaphore = threading.Semaphore(5)
# 验证码去重机制
# 邮件缓存
self._email_cache = _EmailCache()
# 验证码去重
self._used_codes: Dict[str, set] = {}
def _get_provider(
self,
account: OutlookAccount,
provider_type: ProviderType,
) -> OutlookProvider:
"""
获取或创建提供者实例
Args:
account: Outlook 账户
provider_type: 提供者类型
Returns:
提供者实例
"""
cache_key = (account.email.lower(), provider_type)
def _get_provider(self, account: OutlookAccount) -> IMAPNewProvider:
key = account.email.lower()
with self._provider_lock:
if cache_key not in self._providers:
provider = self._create_provider(account, provider_type)
self._providers[cache_key] = provider
if key not in self._providers:
self._providers[key] = IMAPNewProvider(account, self.provider_config)
return self._providers[key]
return self._providers[cache_key]
def _create_provider(
def _fetch_emails(
self,
account: OutlookAccount,
provider_type: ProviderType,
) -> OutlookProvider:
"""
创建提供者实例
Args:
account: Outlook 账户
provider_type: 提供者类型
Returns:
提供者实例
"""
if provider_type == ProviderType.IMAP_OLD:
return IMAPOldProvider(account, self.provider_config)
elif provider_type == ProviderType.IMAP_NEW:
return IMAPNewProvider(account, self.provider_config)
elif provider_type == ProviderType.GRAPH_API:
return GraphAPIProvider(account, self.provider_config)
else:
raise ValueError(f"未知的提供者类型: {provider_type}")
def _get_provider_priority_for_account(self, account: OutlookAccount) -> List[ProviderType]:
"""根据账户是否有 OAuth返回适合的提供者优先级列表"""
if account.has_oauth():
return self.provider_priority
else:
# 无 OAuth直接走旧版 IMAP密码认证跳过需要 OAuth 的提供者
return [ProviderType.IMAP_OLD]
def _try_providers_for_emails(
self,
account: OutlookAccount,
count: int = 20,
count: int = 15,
only_unseen: bool = True,
use_cache: bool = False,
) -> List[EmailMessage]:
"""
尝试多个提供者获取邮件
"""通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存"""
if use_cache:
cached = self._email_cache.get(account.email)
if cached is not None:
return cached
Args:
account: Outlook 账户
count: 获取数量
only_unseen: 是否只获取未读
if not self.health_checker.is_available():
logger.debug(f"[{account.email}] IMAP_NEW 不可用,跳过")
return []
Returns:
邮件列表
"""
errors = []
try:
provider = self._get_provider(account)
with self._imap_semaphore:
with provider:
emails = provider.get_recent_emails(count, only_unseen)
# 根据账户类型选择合适的提供者优先级
priority = self._get_provider_priority_for_account(account)
if emails:
self.health_checker.record_success()
if use_cache:
self._email_cache.set(account.email, emails)
return emails
# 按优先级尝试各提供者
for provider_type in priority:
# 检查提供者是否可用
if not self.health_checker.is_available(provider_type):
logger.debug(
f"[{account.email}] {provider_type.value} 不可用,跳过"
)
continue
try:
provider = self._get_provider(account, provider_type)
with self._imap_semaphore:
with provider:
emails = provider.get_recent_emails(count, only_unseen)
if emails:
# 成功获取邮件
self.health_checker.record_success(provider_type)
logger.debug(
f"[{account.email}] {provider_type.value} 获取到 {len(emails)} 封邮件"
)
return emails
except Exception as e:
error_msg = str(e)
errors.append(f"{provider_type.value}: {error_msg}")
self.health_checker.record_failure(provider_type, error_msg)
logger.warning(
f"[{account.email}] {provider_type.value} 获取邮件失败: {e}"
)
logger.error(
f"[{account.email}] 所有提供者都失败: {'; '.join(errors)}"
)
return []
except Exception as e:
err = str(e)
self.health_checker.record_failure(err)
logger.warning(f"[{account.email}] 获取邮件失败: {e}")
return []
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
选择可用的 Outlook 账户
Args:
config: 配置参数(未使用)
Returns:
包含邮箱信息的字典
"""
"""轮询选择可用的 Outlook 账户"""
if not self.accounts:
self.update_status(False, EmailServiceError("没有可用的 Outlook 账户"))
raise EmailServiceError("没有可用的 Outlook 账户")
# 轮询选择账户
with self._account_lock:
account = self.accounts[self._current_account_index]
self._current_account_index = (self._current_account_index + 1) % len(self.accounts)
email_info = {
"email": account.email,
"service_id": account.email,
"account": {
"email": account.email,
"has_oauth": account.has_oauth()
}
}
logger.info(f"选择 Outlook 账户: {account.email}")
self.update_status(True)
return email_info
return {
"email": account.email,
"service_id": account.email,
"account": {"email": account.email, "has_oauth": account.has_oauth()},
}
def get_verification_code(
self,
@@ -299,114 +199,155 @@ class OutlookService(BaseEmailService):
pattern: str = None,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从 Outlook 邮箱获取验证码
Args:
email: 邮箱地址
email_id: 未使用
timeout: 超时时间(秒)
pattern: 验证码正则表达式(未使用)
otp_sent_at: OTP 发送时间戳
Returns:
验证码字符串
"""
# 查找对应的账户
account = None
for acc in self.accounts:
if acc.email.lower() == email.lower():
account = acc
break
"""从 Outlook 邮箱获取验证码"""
account = next(
(a for a in self.accounts if a.email.lower() == email.lower()), None
)
if not account:
self.update_status(False, EmailServiceError(f"未找到邮箱对应的账户: {email}"))
self.update_status(False, EmailServiceError(f"未找到邮箱账户: {email}"))
return None
# 获取验证码等待配置
code_settings = get_email_code_settings()
code_settings = _get_code_settings()
actual_timeout = timeout or code_settings["timeout"]
poll_interval = code_settings["poll_interval"]
logger.info(
f"[{email}] 开始获取验证码,超时 {actual_timeout}s"
f"提供者优先级: {[p.value for p in self.provider_priority]}"
)
logger.info(f"[{email}] 开始获取验证码,超时 {actual_timeout}s")
# 初始化验证码去重集合
if email not in self._used_codes:
self._used_codes[email] = set()
used_codes = self._used_codes[email]
# 计算最小时间戳(留出 60 秒时钟偏差)
min_timestamp = (otp_sent_at - 60) if otp_sent_at else 0
use_idle = True
try:
use_idle = get_settings().outlook_use_idle
except Exception:
pass
if use_idle:
code = self._wait_with_idle(
account, email, actual_timeout, min_timestamp, used_codes
)
else:
code = self._wait_with_poll(
account, email, actual_timeout, poll_interval, min_timestamp, used_codes
)
if code:
used_codes.add(code)
self.update_status(True)
return code
return None
def _wait_with_poll(
self,
account: OutlookAccount,
email: str,
timeout: int,
poll_interval: int,
min_timestamp: float,
used_codes: set,
) -> Optional[str]:
"""轮询方式等待验证码"""
start_time = time.time()
poll_count = 0
while time.time() - start_time < actual_timeout:
while time.time() - start_time < timeout:
poll_count += 1
# 渐进式邮件检查:前 3 次只检查未读
only_unseen = poll_count <= 3
try:
# 尝试多个提供者获取邮件
emails = self._try_providers_for_emails(
account,
count=15,
only_unseen=only_unseen,
)
emails = self._fetch_emails(account, count=15, only_unseen=only_unseen)
if emails:
logger.debug(
f"[{email}] 第 {poll_count} 次轮询获取到 {len(emails)} 封邮件"
)
# 从邮件中查找验证码
code = self.email_parser.find_verification_code_in_emails(
emails,
target_email=email,
min_timestamp=min_timestamp,
used_codes=used_codes,
)
if code:
used_codes.add(code)
elapsed = int(time.time() - start_time)
logger.info(
f"[{email}] 找到验证码: {code}"
f"总耗时 {elapsed}s轮询 {poll_count}"
f"[{email}] 找到验证码: {code}耗时 {elapsed}s轮询 {poll_count}"
)
self.update_status(True)
return code
except Exception as e:
logger.warning(f"[{email}] 检查出错: {e}")
logger.warning(f"[{email}] 轮询出错: {e}")
# 等待下次轮询
time.sleep(poll_interval)
elapsed = int(time.time() - start_time)
logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count}")
logger.warning(f"[{email}] 验证码超时 ({timeout}s),共轮询 {poll_count}")
return None
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
"""列出所有可用的 Outlook 账户"""
return [
{
"email": account.email,
"id": account.email,
"has_oauth": account.has_oauth(),
"type": "outlook"
}
for account in self.accounts
]
def _wait_with_idle(
self,
account: OutlookAccount,
email: str,
timeout: int,
min_timestamp: float,
used_codes: set,
) -> Optional[str]:
"""IMAP IDLE 方式等待验证码,失败时自动降级为轮询"""
if not self.health_checker.is_available():
logger.warning(f"[{email}] IMAP_NEW 不可用,降级为轮询")
return self._wait_with_poll(
account, email, timeout, 3, min_timestamp, used_codes
)
def delete_email(self, email_id: str) -> bool:
"""删除邮箱Outlook 不支持删除账户)"""
logger.warning(f"Outlook 服务不支持删除账户: {email_id}")
return False
start_time = time.time()
try:
provider = self._get_provider(account)
with self._imap_semaphore:
with provider:
# 先做一次即时检查
emails = provider.get_recent_emails(15, only_unseen=True)
code = self.email_parser.find_verification_code_in_emails(
emails,
target_email=email,
min_timestamp=min_timestamp,
used_codes=used_codes,
)
if code:
elapsed = int(time.time() - start_time)
logger.info(f"[{email}] 找到验证码: {code},耗时 {elapsed}s即时检查")
return code
# IDLE 等待循环
while time.time() - start_time < timeout:
remaining = int(timeout - (time.time() - start_time))
if remaining <= 0:
break
arrived = provider.wait_for_new_email_idle(timeout=min(remaining, 25))
# 无效化缓存,强制重新拉取
self._email_cache.invalidate(email)
emails = provider.get_recent_emails(15, only_unseen=True)
code = self.email_parser.find_verification_code_in_emails(
emails,
target_email=email,
min_timestamp=min_timestamp,
used_codes=used_codes,
)
if code:
elapsed = int(time.time() - start_time)
logger.info(
f"[{email}] 找到验证码: {code},耗时 {elapsed}s"
f"IDLE {'推送' if arrived else '超时检查'}"
)
return code
except Exception as e:
logger.warning(f"[{email}] IDLE 失败,降级为轮询: {e}")
elapsed = int(time.time() - start_time)
remaining = max(0, timeout - elapsed)
if remaining > 0:
code_settings = _get_code_settings()
return self._wait_with_poll(
account, email, remaining,
code_settings["poll_interval"], min_timestamp, used_codes
)
logger.warning(f"[{email}] IDLE 等待验证码超时 ({timeout}s)")
return None
def check_health(self) -> bool:
"""检查 Outlook 服务是否可用"""
@@ -414,48 +355,48 @@ class OutlookService(BaseEmailService):
self.update_status(False, EmailServiceError("没有配置的账户"))
return False
# 测试第一个账户的连接
test_account = self.accounts[0]
# 尝试任一提供者连接
for provider_type in self.provider_priority:
try:
provider = self._get_provider(test_account, provider_type)
if provider.test_connection():
self.update_status(True)
return True
except Exception as e:
logger.warning(
f"Outlook 健康检查失败 ({test_account.email}, {provider_type.value}): {e}"
)
try:
provider = self._get_provider(self.accounts[0])
if provider.test_connection():
self.update_status(True)
return True
except Exception as e:
logger.warning(f"Outlook 健康检查失败: {e}")
self.update_status(False, EmailServiceError("健康检查失败"))
return False
def get_provider_status(self) -> Dict[str, Any]:
"""获取提供者状态"""
return self.failover_manager.get_status()
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
return [
{
"email": a.email,
"id": a.email,
"has_oauth": a.has_oauth(),
"type": "outlook",
}
for a in self.accounts
]
def delete_email(self, email_id: str) -> bool:
logger.warning(f"Outlook 服务不支持删除账户: {email_id}")
return False
def get_account_stats(self) -> Dict[str, Any]:
"""获取账户统计信息"""
total = len(self.accounts)
oauth_count = sum(1 for acc in self.accounts if acc.has_oauth())
oauth_count = sum(1 for a in self.accounts if a.has_oauth())
return {
"total_accounts": total,
"oauth_accounts": oauth_count,
"password_accounts": total - oauth_count,
"accounts": [acc.to_dict() for acc in self.accounts],
"provider_status": self.get_provider_status(),
"accounts": [a.to_dict() for a in self.accounts],
"health_status": self.health_checker.get_status(),
}
def add_account(self, account_config: Dict[str, Any]) -> bool:
"""添加新的 Outlook 账户"""
try:
account = OutlookAccount.from_config(account_config)
if not account.validate():
return False
self.accounts.append(account)
logger.info(f"添加 Outlook 账户: {account.email}")
return True
@@ -464,24 +405,13 @@ class OutlookService(BaseEmailService):
return False
def remove_account(self, email: str) -> bool:
"""移除 Outlook 账户"""
for i, acc in enumerate(self.accounts):
if acc.email.lower() == email.lower():
for i, a in enumerate(self.accounts):
if a.email.lower() == email.lower():
self.accounts.pop(i)
logger.info(f"移除 Outlook 账户: {email}")
return True
return False
def reset_provider_health(self):
"""重置所有提供者的健康状态"""
self.health_checker.reset_all()
logger.info("已重置所有提供者的健康状态")
def force_provider(self, provider_type: ProviderType):
"""强制使用指定的提供者"""
self.health_checker.force_enable(provider_type)
# 禁用其他提供者
for pt in ProviderType:
if pt != provider_type:
self.health_checker.force_disable(pt, 60)
logger.info(f"已强制使用提供者: {provider_type.value}")
def reset_health(self):
self.health_checker.reset()
logger.info("已重置 IMAP_NEW 健康状态")

View File

@@ -1,6 +1,6 @@
"""
Token 管理器
支持多个 Microsoft Token 端点,自动选择合适的端点
Token 管理器(简化版)
固定使用 consumers 端点 + IMAP scope
"""
import json
@@ -11,153 +11,98 @@ from typing import Dict, Optional, Any
from curl_cffi import requests as _requests
from .base import ProviderType, TokenEndpoint, TokenInfo
from .base import TokenInfo
from .account import OutlookAccount
logger = logging.getLogger(__name__)
# 各提供者的 Scope 配置
PROVIDER_SCOPES = {
ProviderType.IMAP_OLD: "", # 旧版 IMAP 不需要特定 scope
ProviderType.IMAP_NEW: "https://outlook.office.com/IMAP.AccessAsUser.All offline_access",
ProviderType.GRAPH_API: "https://graph.microsoft.com/.default",
}
# 各提供者的 Token 端点
PROVIDER_TOKEN_URLS = {
ProviderType.IMAP_OLD: TokenEndpoint.LIVE.value,
ProviderType.IMAP_NEW: TokenEndpoint.CONSUMERS.value,
ProviderType.GRAPH_API: TokenEndpoint.COMMON.value,
}
TOKEN_URL = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access"
class TokenManager:
"""
Token 管理器
支持多端点 Token 获取和缓存
固定 consumers 端点,缓存 key = email
"""
# Token 缓存: key = (email, provider_type) -> TokenInfo
_token_cache: Dict[tuple, TokenInfo] = {}
_token_cache: Dict[str, TokenInfo] = {}
_cache_lock = threading.Lock()
# 默认超时时间
DEFAULT_TIMEOUT = 30
# Token 刷新提前时间(秒)
REFRESH_BUFFER = 120
def __init__(
self,
account: OutlookAccount,
provider_type: ProviderType,
proxy_url: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT,
service_id: Optional[int] = None,
):
"""
初始化 Token 管理器
Args:
account: Outlook 账户
provider_type: 提供者类型
proxy_url: 代理 URL可选
timeout: 请求超时时间
"""
self.account = account
self.provider_type = provider_type
self.proxy_url = proxy_url
self.timeout = timeout
self.service_id = service_id
# 获取端点和 Scope
self.token_url = PROVIDER_TOKEN_URLS.get(provider_type, TokenEndpoint.LIVE.value)
self.scope = PROVIDER_SCOPES.get(provider_type, "")
def _cache_key(self) -> str:
return self.account.email.lower()
def get_cached_token(self) -> Optional[TokenInfo]:
"""获取缓存的 Token"""
cache_key = (self.account.email.lower(), self.provider_type)
with self._cache_lock:
token = self._token_cache.get(cache_key)
token = self._token_cache.get(self._cache_key())
if token and not token.is_expired(self.REFRESH_BUFFER):
return token
return None
def set_cached_token(self, token: TokenInfo):
"""缓存 Token"""
cache_key = (self.account.email.lower(), self.provider_type)
with self._cache_lock:
self._token_cache[cache_key] = token
self._token_cache[self._cache_key()] = token
def clear_cache(self):
"""清除缓存"""
cache_key = (self.account.email.lower(), self.provider_type)
with self._cache_lock:
self._token_cache.pop(cache_key, None)
self._token_cache.pop(self._cache_key(), None)
def get_access_token(self, force_refresh: bool = False) -> Optional[str]:
"""
获取 Access Token
Args:
force_refresh: 是否强制刷新
Returns:
Access Token 字符串,失败返回 None
"""
# 检查缓存
if not force_refresh:
cached = self.get_cached_token()
if cached:
logger.debug(f"[{self.account.email}] 使用缓存 Token ({self.provider_type.value})")
logger.debug(f"[{self.account.email}] 使用缓存 Token")
return cached.access_token
# 刷新 Token
try:
token = self._refresh_token()
if token:
self.set_cached_token(token)
return token.access_token
except Exception as e:
logger.error(f"[{self.account.email}] 获取 Token 失败 ({self.provider_type.value}): {e}")
logger.error(f"[{self.account.email}] 获取 Token 失败: {e}")
return None
def _refresh_token(self) -> Optional[TokenInfo]:
"""
刷新 Token
Returns:
TokenInfo 对象,失败返回 None
"""
if not self.account.client_id or not self.account.refresh_token:
raise ValueError("缺少 client_id 或 refresh_token")
logger.debug(f"[{self.account.email}] 正在刷新 Token ({self.provider_type.value})...")
logger.debug(f"[{self.account.email}] Token URL: {self.token_url}")
logger.debug(f"[{self.account.email}] 正在刷新 Token...")
# 构建请求体
data = {
"client_id": self.account.client_id,
"refresh_token": self.account.refresh_token,
"grant_type": "refresh_token",
"scope": IMAP_SCOPE,
}
# 添加 Scope如果需要
if self.scope:
data["scope"] = self.scope
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
proxies = None
if self.proxy_url:
proxies = {"http": self.proxy_url, "https": self.proxy_url}
try:
resp = _requests.post(
self.token_url,
TOKEN_URL,
data=data,
headers=headers,
proxies=proxies,
@@ -166,74 +111,56 @@ class TokenManager:
)
if resp.status_code != 200:
error_body = resp.text
body = resp.text
logger.error(f"[{self.account.email}] Token 刷新失败: HTTP {resp.status_code}")
logger.debug(f"[{self.account.email}] 错误响应: {error_body[:500]}")
if "service abuse" in error_body.lower():
if "service abuse" in body.lower():
logger.warning(f"[{self.account.email}] 账号可能被封禁")
elif "invalid_grant" in error_body.lower():
elif "invalid_grant" in body.lower():
logger.warning(f"[{self.account.email}] Refresh Token 已失效")
return None
response_data = resp.json()
# 解析响应
token = TokenInfo.from_response(response_data, self.scope)
token = TokenInfo.from_response(response_data, IMAP_SCOPE)
logger.info(
f"[{self.account.email}] Token 刷新成功 ({self.provider_type.value}), "
f"[{self.account.email}] Token 刷新成功"
f"有效期 {int(token.expires_at - time.time())}"
)
# 若响应含新 refresh_token → 写回内存 + 持久化数据库
new_rt = response_data.get("refresh_token", "")
if new_rt and new_rt != self.account.refresh_token:
self.account.refresh_token = new_rt
if self.service_id:
try:
from ...database.session import get_session_manager
from ...database.crud import update_outlook_refresh_token
with get_session_manager().session_scope() as db:
update_outlook_refresh_token(
db, self.service_id, self.account.email, new_rt
)
logger.info(f"[{self.account.email}] refresh_token 已写回数据库")
except Exception as e:
logger.warning(f"[{self.account.email}] 写回 refresh_token 失败: {e}")
return token
except json.JSONDecodeError as e:
logger.error(f"[{self.account.email}] JSON 解析错误: {e}")
return None
except Exception as e:
logger.error(f"[{self.account.email}] 未知错误: {e}")
return None
@classmethod
def clear_all_cache(cls):
"""清除所有 Token 缓存"""
with cls._cache_lock:
cls._token_cache.clear()
logger.info("已清除所有 Token 缓存")
@classmethod
def get_cache_stats(cls) -> Dict[str, Any]:
"""获取缓存统计"""
with cls._cache_lock:
return {
"cache_size": len(cls._token_cache),
"entries": [
{
"email": key[0],
"provider": key[1].value,
}
for key in cls._token_cache.keys()
],
"entries": list(cls._token_cache.keys()),
}
def create_token_manager(
account: OutlookAccount,
provider_type: ProviderType,
proxy_url: Optional[str] = None,
timeout: int = TokenManager.DEFAULT_TIMEOUT,
) -> TokenManager:
"""
创建 Token 管理器的工厂函数
Args:
account: Outlook 账户
provider_type: 提供者类型
proxy_url: 代理 URL
timeout: 超时时间
Returns:
TokenManager 实例
"""
return TokenManager(account, provider_type, proxy_url, timeout)

View File

@@ -340,6 +340,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
if selected_service and selected_service.config:
config = selected_service.config.copy()
config['service_id'] = selected_service.id
crud.update_registration_task(db, task_uuid, email_service_id=selected_service.id)
logger.info(f"使用数据库 Outlook 账户: {selected_service.name}")
else:

View File

@@ -705,7 +705,6 @@ async def get_outlook_settings():
return {
"default_client_id": settings.outlook_default_client_id,
"provider_priority": settings.outlook_provider_priority,
"health_failure_threshold": settings.outlook_health_failure_threshold,
"health_disable_duration": settings.outlook_health_disable_duration,
}

View File

@@ -191,13 +191,23 @@ class TaskManager:
return _log_queues.get(task_uuid, []).copy()
def update_status(self, task_uuid: str, status: str, **kwargs):
"""更新任务状态"""
"""更新任务状态并推送到 WebSocket"""
if task_uuid not in _task_status:
_task_status[task_uuid] = {}
_task_status[task_uuid]["status"] = status
_task_status[task_uuid].update(kwargs)
# 推送状态变更到 WebSocket线程安全兼容同步线程调用
if self._loop and self._loop.is_running():
try:
asyncio.run_coroutine_threadsafe(
self.broadcast_status(task_uuid, status, **kwargs),
self._loop
)
except Exception as e:
logger.warning(f"推送状态到 WebSocket 失败: {e}")
def get_status(self, task_uuid: str) -> Optional[dict]:
"""获取任务状态"""
return _task_status.get(task_uuid)