diff --git a/src/config/constants.py b/src/config/constants.py index b65bff6..fc68436 100644 --- a/src/config/constants.py +++ b/src/config/constants.py @@ -68,12 +68,14 @@ OPENAI_API_ENDPOINTS = { "validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate", "create_account": "https://auth.openai.com/api/accounts/create_account", "select_workspace": "https://auth.openai.com/api/accounts/workspace/select", + "password_verify" : "https://auth.openai.com/api/accounts/password/verify" } # OpenAI 页面类型(用于判断账号状态) OPENAI_PAGE_TYPES = { + "LOGIN_PASSWORD": "login_password", "EMAIL_OTP_VERIFICATION": "email_otp_verification", # 已注册账号,需要 OTP 验证 - "PASSWORD_REGISTRATION": "password", # 新账号,需要设置密码 + "PASSWORD_REGISTRATION": "create_account_password", # 新账号,需要设置密码 } # ============================================================================ @@ -378,20 +380,8 @@ MICROSOFT_TOKEN_ENDPOINTS = { } # IMAP 服务器配置 -OUTLOOK_IMAP_SERVERS = { - "OLD": "outlook.office365.com", # 旧版 IMAP - "NEW": "outlook.live.com", # 新版 IMAP -} +OUTLOOK_IMAP_SERVER = "outlook.live.com" +OUTLOOK_IMAP_PORT = 993 -# Microsoft OAuth2 Scopes -MICROSOFT_SCOPES = { - # 旧版 IMAP 不需要特定 scope - "IMAP_OLD": "", - # 新版 IMAP 需要的 scope - "IMAP_NEW": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access", - # Graph API 需要的 scope - "GRAPH_API": "https://graph.microsoft.com/.default", -} - -# Outlook 提供者默认优先级 -OUTLOOK_PROVIDER_PRIORITY = ["imap_new", "imap_old", "graph_api"] +# Microsoft OAuth2 Scope(IMAP_NEW) +OUTLOOK_IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access" diff --git a/src/config/settings.py b/src/config/settings.py index 2803afd..aac2b97 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -358,12 +358,6 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = { ), # Outlook 配置 - "outlook_provider_priority": SettingDefinition( - db_key="outlook.provider_priority", - default_value=["imap_old", "imap_new", "graph_api"], - category=SettingCategory.EMAIL, - description="Outlook 提供者优先级" - ), "outlook_health_failure_threshold": SettingDefinition( db_key="outlook.health_failure_threshold", default_value=5, @@ -382,6 +376,12 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = { category=SettingCategory.EMAIL, description="Outlook OAuth 默认 Client ID" ), + "outlook_use_idle": SettingDefinition( + db_key="outlook.use_idle", + default_value=True, + category=SettingCategory.EMAIL, + description="使用 IMAP IDLE 替代轮询获取验证码(降低延迟,默认开启)" + ), } # 属性名到数据库键名的映射(用于向后兼容) @@ -407,9 +407,9 @@ SETTING_TYPES: Dict[str, Type] = { "cpa_enabled": bool, "email_code_timeout": int, "email_code_poll_interval": int, - "outlook_provider_priority": list, "outlook_health_failure_threshold": int, "outlook_health_disable_duration": int, + "outlook_use_idle": bool, } # 需要作为 SecretStr 处理的字段 @@ -694,10 +694,10 @@ class Settings(BaseModel): email_code_poll_interval: int = 3 # Outlook 配置 - outlook_provider_priority: List[str] = ["imap_old", "imap_new", "graph_api"] outlook_health_failure_threshold: int = 5 outlook_health_disable_duration: int = 60 outlook_default_client_id: str = "24d9a0ed-8787-4584-883c-2fd79308940a" + outlook_use_idle: bool = True # 全局配置实例 diff --git a/src/core/register.py b/src/core/register.py index 41549a0..a8d6493 100644 --- a/src/core/register.py +++ b/src/core/register.py @@ -286,6 +286,7 @@ class RegistrationEngine: """ try: signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"signup"}}' + #signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"login_or_signup"}}' headers = { "referer": "https://auth.openai.com/create-account", @@ -318,7 +319,7 @@ class RegistrationEngine: self._log(f"响应页面类型: {page_type}") # 判断是否为已注册账号 - is_existing = page_type == OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"] + is_existing = page_type != OPENAI_PAGE_TYPES["PASSWORD_REGISTRATION"] if is_existing: self._log(f"检测到已注册账号,将自动切换到登录流程") @@ -367,8 +368,7 @@ class RegistrationEngine: self._log(f"提交密码状态: {response.status_code}") if response.status_code != 200: - error_text = response.text[:500] - self._log(f"密码注册失败: {error_text}", "warning") + self._log(f"密码注册失败: {response.json().get("error", {}).get("message", "")}", "warning") # 解析错误信息,判断是否是邮箱已注册 try: @@ -377,7 +377,7 @@ class RegistrationEngine: error_code = error_json.get("error", {}).get("code", "") # 检测邮箱已注册的情况 - if "already" in error_msg.lower() or "exists" in error_msg.lower() or error_code == "user_exists": + if "already" in error_msg.lower() or "exists" in error_msg.lower() or "again" in error_msg.lower() or error_code == "user_exists" or error_code == "invalid_request": self._log(f"邮箱 {self.email} 可能已在 OpenAI 注册过", "error") # 标记此邮箱为已注册状态 self._mark_email_as_registered() @@ -392,24 +392,74 @@ class RegistrationEngine: self._log(f"密码注册失败: {e}", "error") return False, None - def _mark_email_as_registered(self): - """标记邮箱为已注册状态(用于防止重复尝试)""" + def _login_password(self, did: str, sen_token: Optional[str]) -> bool: + """注册密码""" try: + password = "" with get_db() as db: # 检查是否已存在该邮箱的记录 existing = crud.get_account_by_email(db, self.email) - if not existing: - # 创建一个失败记录,标记该邮箱已注册过 + # 生成密码 + password = existing.password + + # 提交密码登录 + register_body = json.dumps({ + "password": password + }) + + if sen_token: + sentinel = f'{{"p": "", "id": "{did}", "flow": "password_verify"}}' + + response = self.session.post( + OPENAI_API_ENDPOINTS["password_verify"], + headers={ + "referer": "https://auth.openai.com/log-in/password", + "accept": "application/json", + "content-type": "application/json", + "openai-sentinel-token": sentinel + }, + data=register_body, + ) + + self._log(f"提交密码状态: {response.status_code}") + + if response.status_code != 200: + error_text = response.text[:500] + self._log(f"密码登录失败: {error_text}", "warning") + return False + return True + + except Exception as e: + self._log(f"密码登录失败: {e}", "error") + return False, None + + def _mark_email_as_registered(self): + """标记邮箱为已注册状态(OpenAI 侧已存在该账号)""" + try: + with get_db() as db: + existing = crud.get_account_by_email(db, self.email) + if existing: + # 数据库中已有该账号,更新 extra_data 标记 + extra = existing.extra_data or {} + extra["openai_already_registered"] = True + crud.update_account( + db, + existing.id, + extra_data=extra + ) + self._log(f"已更新数据库中 {self.email} 的已注册标记") + else: + # 数据库中不存在,创建失败记录并标记 crud.create_account( db, email=self.email, - password="", # 空密码表示未成功注册 + password="", email_service=self.email_service.service_type.value, email_service_id=self.email_info.get("service_id") if self.email_info else None, status="failed", - extra_data={"register_failed_reason": "email_already_registered_on_openai"} + extra_data={"openai_already_registered": True} ) - self._log(f"已在数据库中标记邮箱 {self.email} 为已注册状态") + self._log(f"已在数据库中标记 {self.email} 为已注册状态") except Exception as e: logger.warning(f"标记邮箱状态失败: {e}") @@ -722,9 +772,11 @@ class RegistrationEngine: result.error_message = f"提交注册表单失败: {signup_result.error_message}" return result - # 8. [已注册账号跳过] 注册密码 + # 8. 检测到已注册账号 → 直接终止任务 if self._is_existing_account: - self._log("8. [已注册账号] 跳过密码设置,OTP 已自动发送") + self._log(f"8. 邮箱 {self.email} 在 OpenAI 已注册,跳过注册流程", "warning") + result.error_message = f"邮箱 {self.email} 已在 OpenAI 注册" + return result else: self._log("8. 注册密码...") password_ok, password = self._register_password() @@ -732,16 +784,11 @@ class RegistrationEngine: result.error_message = "注册密码失败" return result - # 9. [已注册账号跳过] 发送验证码 - if self._is_existing_account: - self._log("9. [已注册账号] 跳过发送验证码,使用自动发送的 OTP") - # 已注册账号的 OTP 在提交表单时已自动发送,记录时间戳 - self._otp_sent_at = time.time() - else: - self._log("9. 发送验证码...") - if not self._send_verification_code(): - result.error_message = "发送验证码失败" - return result + # 9. 发送验证码 + self._log("9. 发送验证码...") + if not self._send_verification_code(): + result.error_message = "发送验证码失败" + return result # 10. 获取验证码 self._log("10. 等待验证码...") @@ -756,14 +803,11 @@ class RegistrationEngine: result.error_message = "验证验证码失败" return result - # 12. [已注册账号跳过] 创建用户账户 - if self._is_existing_account: - self._log("12. [已注册账号] 跳过创建用户账户") - else: - self._log("12. 创建用户账户...") - if not self._create_user_account(): - result.error_message = "创建用户账户失败" - return result + # 12. 创建用户账户 + self._log("12. 创建用户账户...") + if not self._create_user_account(): + result.error_message = "创建用户账户失败" + return result # 13. 获取 Workspace ID self._log("13. 获取 Workspace ID...") @@ -803,7 +847,7 @@ class RegistrationEngine: result.password = self.password or "" # 保存密码(已注册账号为空) # 设置来源标记 - result.source = "login" if self._is_existing_account else "register" + result.source = "register" # 尝试获取 session_token 从 cookie session_cookie = self.session.cookies.get("__Secure-next-auth.session-token") @@ -814,10 +858,7 @@ class RegistrationEngine: # 17. 完成 self._log("=" * 60) - if self._is_existing_account: - self._log("登录成功! (已注册账号)") - else: - self._log("注册成功!") + self._log("注册成功!") self._log(f"邮箱: {result.email}") self._log(f"Account ID: {result.account_id}") self._log(f"Workspace ID: {result.workspace_id}") @@ -828,7 +869,6 @@ class RegistrationEngine: "email_service": self.email_service.service_type.value, "proxy_used": self.proxy_url, "registered_at": datetime.now().isoformat(), - "is_existing_account": self._is_existing_account, } return result diff --git a/src/database/crud.py b/src/database/crud.py index 67d827e..6eaeff2 100644 --- a/src/database/crud.py +++ b/src/database/crud.py @@ -713,4 +713,21 @@ def delete_tm_service(db: Session, service_id: int) -> bool: return False db.delete(svc) db.commit() - return True \ No newline at end of file + return True + + +def update_outlook_refresh_token(db: Session, service_id: int, email: str, new_refresh_token: str): + """更新 EmailService.config 中指定邮箱的 refresh_token""" + service = db.query(EmailService).filter(EmailService.id == service_id).first() + if not service or not service.config: + return + config = dict(service.config) + # 单账户格式 + if config.get("email", "").lower() == email.lower(): + config["refresh_token"] = new_refresh_token + # 多账户列表格式 + for acc in config.get("accounts", []): + if acc.get("email", "").lower() == email.lower(): + acc["refresh_token"] = new_refresh_token + service.config = config + db.commit() \ No newline at end of file diff --git a/src/services/__init__.py b/src/services/__init__.py index ad29d3e..d2312da 100644 --- a/src/services/__init__.py +++ b/src/services/__init__.py @@ -38,9 +38,7 @@ from .outlook.base import ( from .outlook.account import OutlookAccount from .outlook.providers import ( OutlookProvider, - IMAPOldProvider, IMAPNewProvider, - GraphAPIProvider, ) __all__ = [ @@ -67,7 +65,5 @@ __all__ = [ 'ProviderStatus', 'OutlookAccount', 'OutlookProvider', - 'IMAPOldProvider', 'IMAPNewProvider', - 'GraphAPIProvider', ] diff --git a/src/services/outlook/account.py b/src/services/outlook/account.py index 6f427d5..0ecff31 100644 --- a/src/services/outlook/account.py +++ b/src/services/outlook/account.py @@ -3,7 +3,7 @@ Outlook 账户数据类 """ from dataclasses import dataclass -from typing import Dict, Any, Optional +from typing import Dict, Any @dataclass @@ -16,7 +16,6 @@ class OutlookAccount: @classmethod def from_config(cls, config: Dict[str, Any]) -> "OutlookAccount": - """从配置创建账户""" return cls( email=config.get("email", ""), password=config.get("password", ""), @@ -25,15 +24,12 @@ class OutlookAccount: ) def has_oauth(self) -> bool: - """是否支持 OAuth2""" return bool(self.client_id and self.refresh_token) def validate(self) -> bool: - """验证账户信息是否有效""" return bool(self.email and self.password) or self.has_oauth() def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]: - """转换为字典""" result = { "email": self.email, "has_oauth": self.has_oauth(), @@ -47,5 +43,4 @@ class OutlookAccount: return result def __str__(self) -> str: - """字符串表示""" return f"OutlookAccount({self.email})" diff --git a/src/services/outlook/base.py b/src/services/outlook/base.py index 335b11e..7bc294d 100644 --- a/src/services/outlook/base.py +++ b/src/services/outlook/base.py @@ -1,6 +1,5 @@ """ -Outlook 服务基础定义 -包含枚举类型和数据类 +Outlook 邮箱服务基础定义 """ from dataclasses import dataclass, field @@ -10,49 +9,38 @@ from typing import Optional, Dict, Any, List class ProviderType(str, Enum): - """Outlook 提供者类型""" - IMAP_OLD = "imap_old" # 旧版 IMAP (outlook.office365.com) - IMAP_NEW = "imap_new" # 新版 IMAP (outlook.live.com) - GRAPH_API = "graph_api" # Microsoft Graph API + """Outlook 提供者类型(仅 IMAP_NEW)""" + IMAP_NEW = "imap_new" class TokenEndpoint(str, Enum): """Token 端点""" - LIVE = "https://login.live.com/oauth20_token.srf" CONSUMERS = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" - COMMON = "https://login.microsoftonline.com/common/oauth2/v2.0/token" - - -class IMAPServer(str, Enum): - """IMAP 服务器""" - OLD = "outlook.office365.com" - NEW = "outlook.live.com" class ProviderStatus(str, Enum): """提供者状态""" - HEALTHY = "healthy" # 健康 - DEGRADED = "degraded" # 降级 - DISABLED = "disabled" # 禁用 + HEALTHY = "healthy" + DEGRADED = "degraded" + DISABLED = "disabled" @dataclass class EmailMessage: """邮件消息数据类""" - id: str # 消息 ID - subject: str # 主题 - sender: str # 发件人 - recipients: List[str] = field(default_factory=list) # 收件人列表 - body: str = "" # 正文内容 - body_preview: str = "" # 正文预览 - received_at: Optional[datetime] = None # 接收时间 - received_timestamp: int = 0 # 接收时间戳 - is_read: bool = False # 是否已读 - has_attachments: bool = False # 是否有附件 - raw_data: Optional[bytes] = None # 原始数据(用于调试) + id: str + subject: str + sender: str + recipients: List[str] = field(default_factory=list) + body: str = "" + body_preview: str = "" + received_at: Optional[datetime] = None + received_timestamp: int = 0 + is_read: bool = False + has_attachments: bool = False + raw_data: Optional[bytes] = None def to_dict(self) -> Dict[str, Any]: - """转换为字典""" return { "id": self.id, "subject": self.subject, @@ -71,19 +59,17 @@ class EmailMessage: class TokenInfo: """Token 信息数据类""" access_token: str - expires_at: float # 过期时间戳 + expires_at: float token_type: str = "Bearer" scope: str = "" refresh_token: Optional[str] = None def is_expired(self, buffer_seconds: int = 120) -> bool: - """检查 Token 是否已过期""" import time return time.time() >= (self.expires_at - buffer_seconds) @classmethod def from_response(cls, data: Dict[str, Any], scope: str = "") -> "TokenInfo": - """从 API 响应创建""" import time return cls( access_token=data.get("access_token", ""), @@ -99,49 +85,42 @@ class ProviderHealth: """提供者健康状态""" provider_type: ProviderType status: ProviderStatus = ProviderStatus.HEALTHY - failure_count: int = 0 # 连续失败次数 - last_success: Optional[datetime] = None # 最后成功时间 - last_failure: Optional[datetime] = None # 最后失败时间 - last_error: str = "" # 最后错误信息 - disabled_until: Optional[datetime] = None # 禁用截止时间 + failure_count: int = 0 + last_success: Optional[datetime] = None + last_failure: Optional[datetime] = None + last_error: str = "" + disabled_until: Optional[datetime] = None def record_success(self): - """记录成功""" self.status = ProviderStatus.HEALTHY self.failure_count = 0 self.last_success = datetime.now() self.disabled_until = None def record_failure(self, error: str): - """记录失败""" self.failure_count += 1 self.last_failure = datetime.now() self.last_error = error def should_disable(self, threshold: int = 3) -> bool: - """判断是否应该禁用""" return self.failure_count >= threshold def is_disabled(self) -> bool: - """检查是否被禁用""" if self.disabled_until and datetime.now() < self.disabled_until: return True return False def disable(self, duration_seconds: int = 300): - """禁用提供者""" from datetime import timedelta self.status = ProviderStatus.DISABLED self.disabled_until = datetime.now() + timedelta(seconds=duration_seconds) def enable(self): - """启用提供者""" self.status = ProviderStatus.HEALTHY self.disabled_until = None self.failure_count = 0 def to_dict(self) -> Dict[str, Any]: - """转换为字典""" return { "provider_type": self.provider_type.value, "status": self.status.value, diff --git a/src/services/outlook/health_checker.py b/src/services/outlook/health_checker.py index c68ed4e..2b7060b 100644 --- a/src/services/outlook/health_checker.py +++ b/src/services/outlook/health_checker.py @@ -1,15 +1,13 @@ """ -健康检查和故障切换管理 +健康检查管理(简化版,单 Provider) """ import logging import threading -import time from datetime import datetime, timedelta -from typing import Dict, List, Optional, Any +from typing import Any, Dict, Optional -from .base import ProviderType, ProviderHealth, ProviderStatus -from .providers.base import OutlookProvider +from .base import ProviderHealth, ProviderStatus, ProviderType logger = logging.getLogger(__name__) @@ -17,296 +15,49 @@ logger = logging.getLogger(__name__) class HealthChecker: """ - 健康检查管理器 - 跟踪各提供者的健康状态,管理故障切换 + 单 Provider 健康检查器 + 跟踪 IMAP_NEW 的健康状态 """ def __init__( self, failure_threshold: int = 3, disable_duration: int = 300, - recovery_check_interval: int = 60, ): - """ - 初始化健康检查器 - - Args: - failure_threshold: 连续失败次数阈值,超过后禁用 - disable_duration: 禁用时长(秒) - recovery_check_interval: 恢复检查间隔(秒) - """ self.failure_threshold = failure_threshold self.disable_duration = disable_duration - self.recovery_check_interval = recovery_check_interval - - # 提供者健康状态: ProviderType -> ProviderHealth - self._health_status: Dict[ProviderType, ProviderHealth] = {} + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) self._lock = threading.Lock() - # 初始化所有提供者的健康状态 - for provider_type in ProviderType: - self._health_status[provider_type] = ProviderHealth( - provider_type=provider_type - ) - - def get_health(self, provider_type: ProviderType) -> ProviderHealth: - """获取提供者的健康状态""" + def record_success(self): with self._lock: - return self._health_status.get(provider_type, ProviderHealth(provider_type=provider_type)) + self._health.record_success() - def record_success(self, provider_type: ProviderType): - """记录成功操作""" + def record_failure(self, error: str): with self._lock: - health = self._health_status.get(provider_type) - if health: - health.record_success() - logger.debug(f"{provider_type.value} 记录成功") - - def record_failure(self, provider_type: ProviderType, error: str): - """记录失败操作""" - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.record_failure(error) - - # 检查是否需要禁用 - if health.should_disable(self.failure_threshold): - health.disable(self.disable_duration) - logger.warning( - f"{provider_type.value} 已禁用 {self.disable_duration} 秒," - f"原因: {error}" - ) - - def is_available(self, provider_type: ProviderType) -> bool: - """ - 检查提供者是否可用 - - Args: - provider_type: 提供者类型 - - Returns: - 是否可用 - """ - health = self.get_health(provider_type) - - # 检查是否被禁用 - if health.is_disabled(): - remaining = (health.disabled_until - datetime.now()).total_seconds() - logger.debug( - f"{provider_type.value} 已被禁用,剩余 {int(remaining)} 秒" - ) - return False - - return health.status != ProviderStatus.DISABLED - - def get_available_providers( - self, - priority_order: Optional[List[ProviderType]] = None, - ) -> List[ProviderType]: - """ - 获取可用的提供者列表 - - Args: - priority_order: 优先级顺序,默认为 [IMAP_NEW, IMAP_OLD, GRAPH_API] - - Returns: - 可用的提供者列表 - """ - if priority_order is None: - priority_order = [ - ProviderType.IMAP_NEW, - ProviderType.IMAP_OLD, - ProviderType.GRAPH_API, - ] - - available = [] - for provider_type in priority_order: - if self.is_available(provider_type): - available.append(provider_type) - - return available - - def get_next_available_provider( - self, - priority_order: Optional[List[ProviderType]] = None, - ) -> Optional[ProviderType]: - """ - 获取下一个可用的提供者 - - Args: - priority_order: 优先级顺序 - - Returns: - 可用的提供者类型,如果没有返回 None - """ - available = self.get_available_providers(priority_order) - return available[0] if available else None - - def force_disable(self, provider_type: ProviderType, duration: Optional[int] = None): - """ - 强制禁用提供者 - - Args: - provider_type: 提供者类型 - duration: 禁用时长(秒),默认使用配置值 - """ - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.disable(duration or self.disable_duration) - logger.warning(f"{provider_type.value} 已强制禁用") - - def force_enable(self, provider_type: ProviderType): - """ - 强制启用提供者 - - Args: - provider_type: 提供者类型 - """ - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.enable() - logger.info(f"{provider_type.value} 已启用") - - def get_all_health_status(self) -> Dict[str, Any]: - """ - 获取所有提供者的健康状态 - - Returns: - 健康状态字典 - """ - with self._lock: - return { - provider_type.value: health.to_dict() - for provider_type, health in self._health_status.items() - } - - def check_and_recover(self): - """ - 检查并恢复被禁用的提供者 - - 如果禁用时间已过,自动恢复提供者 - """ - with self._lock: - for provider_type, health in self._health_status.items(): - if health.is_disabled(): - # 检查是否可以恢复 - if health.disabled_until and datetime.now() >= health.disabled_until: - health.enable() - logger.info(f"{provider_type.value} 已自动恢复") - - def reset_all(self): - """重置所有提供者的健康状态""" - with self._lock: - for provider_type in ProviderType: - self._health_status[provider_type] = ProviderHealth( - provider_type=provider_type + self._health.record_failure(error) + if self._health.should_disable(self.failure_threshold): + self._health.disable(self.disable_duration) + logger.warning( + f"IMAP_NEW 已禁用 {self.disable_duration}s,原因: {error}" ) - logger.info("已重置所有提供者的健康状态") - - -class FailoverManager: - """ - 故障切换管理器 - 管理提供者之间的自动切换 - """ - - def __init__( - self, - health_checker: HealthChecker, - priority_order: Optional[List[ProviderType]] = None, - ): - """ - 初始化故障切换管理器 - - Args: - health_checker: 健康检查器 - priority_order: 提供者优先级顺序 - """ - self.health_checker = health_checker - self.priority_order = priority_order or [ - ProviderType.IMAP_NEW, - ProviderType.IMAP_OLD, - ProviderType.GRAPH_API, - ] - - # 当前使用的提供者索引 - self._current_index = 0 - self._lock = threading.Lock() - - def get_current_provider(self) -> Optional[ProviderType]: - """ - 获取当前提供者 - - Returns: - 当前提供者类型,如果没有可用的返回 None - """ - available = self.health_checker.get_available_providers(self.priority_order) - if not available: - return None + def is_available(self) -> bool: with self._lock: - # 尝试使用当前索引 - if self._current_index < len(available): - return available[self._current_index] - return available[0] - - def switch_to_next(self) -> Optional[ProviderType]: - """ - 切换到下一个提供者 - - Returns: - 下一个提供者类型,如果没有可用的返回 None - """ - available = self.health_checker.get_available_providers(self.priority_order) - if not available: - return None + if self._health.is_disabled(): + remaining = ( + (self._health.disabled_until - datetime.now()).total_seconds() + if self._health.disabled_until + else 0 + ) + logger.debug(f"IMAP_NEW 已被禁用,剩余 {int(remaining)}s") + return False + return self._health.status != ProviderStatus.DISABLED + def reset(self): with self._lock: - self._current_index = (self._current_index + 1) % len(available) - next_provider = available[self._current_index] - logger.info(f"切换到提供者: {next_provider.value}") - return next_provider - - def on_provider_success(self, provider_type: ProviderType): - """ - 提供者成功时调用 - - Args: - provider_type: 提供者类型 - """ - self.health_checker.record_success(provider_type) - - # 重置索引到成功的提供者 - with self._lock: - available = self.health_checker.get_available_providers(self.priority_order) - if provider_type in available: - self._current_index = available.index(provider_type) - - def on_provider_failure(self, provider_type: ProviderType, error: str): - """ - 提供者失败时调用 - - Args: - provider_type: 提供者类型 - error: 错误信息 - """ - self.health_checker.record_failure(provider_type, error) + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) def get_status(self) -> Dict[str, Any]: - """ - 获取故障切换状态 - - Returns: - 状态字典 - """ - current = self.get_current_provider() - return { - "current_provider": current.value if current else None, - "priority_order": [p.value for p in self.priority_order], - "available_providers": [ - p.value for p in self.health_checker.get_available_providers(self.priority_order) - ], - "health_status": self.health_checker.get_all_health_status(), - } + with self._lock: + return self._health.to_dict() diff --git a/src/services/outlook/providers/__init__.py b/src/services/outlook/providers/__init__.py index d6fe6a1..95e88a4 100644 --- a/src/services/outlook/providers/__init__.py +++ b/src/services/outlook/providers/__init__.py @@ -3,27 +3,18 @@ Outlook 提供者模块 """ from .base import OutlookProvider, ProviderConfig -from .imap_old import IMAPOldProvider from .imap_new import IMAPNewProvider -from .graph_api import GraphAPIProvider __all__ = [ 'OutlookProvider', 'ProviderConfig', - 'IMAPOldProvider', 'IMAPNewProvider', - 'GraphAPIProvider', ] - -# 提供者注册表 PROVIDER_REGISTRY = { - 'imap_old': IMAPOldProvider, 'imap_new': IMAPNewProvider, - 'graph_api': GraphAPIProvider, } def get_provider_class(provider_type: str): - """获取提供者类""" return PROVIDER_REGISTRY.get(provider_type) diff --git a/src/services/outlook/providers/base.py b/src/services/outlook/providers/base.py index 0d6c072..bb27f57 100644 --- a/src/services/outlook/providers/base.py +++ b/src/services/outlook/providers/base.py @@ -5,7 +5,7 @@ Outlook 提供者抽象基类 import abc import logging from dataclasses import dataclass -from typing import Dict, Any, List, Optional +from typing import List, Optional from ..base import ProviderType, EmailMessage, ProviderHealth, ProviderStatus from ..account import OutlookAccount @@ -18,56 +18,36 @@ logger = logging.getLogger(__name__) class ProviderConfig: """提供者配置""" timeout: int = 30 - max_retries: int = 3 proxy_url: Optional[str] = None - - # 健康检查配置 + service_id: Optional[int] = None health_failure_threshold: int = 3 - health_disable_duration: int = 300 # 秒 + health_disable_duration: int = 300 class OutlookProvider(abc.ABC): - """ - Outlook 提供者抽象基类 - 定义所有提供者必须实现的接口 - """ + """Outlook 提供者抽象基类""" def __init__( self, account: OutlookAccount, config: Optional[ProviderConfig] = None, ): - """ - 初始化提供者 - - Args: - account: Outlook 账户 - config: 提供者配置 - """ self.account = account self.config = config or ProviderConfig() - - # 健康状态 - self._health = ProviderHealth(provider_type=self.provider_type) - - # 连接状态 + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) self._connected = False self._last_error: Optional[str] = None @property - @abc.abstractmethod def provider_type(self) -> ProviderType: - """获取提供者类型""" - pass + return ProviderType.IMAP_NEW @property def health(self) -> ProviderHealth: - """获取健康状态""" return self._health @property def is_healthy(self) -> bool: - """检查是否健康""" return ( self._health.status == ProviderStatus.HEALTHY and not self._health.is_disabled() @@ -75,22 +55,14 @@ class OutlookProvider(abc.ABC): @property def is_connected(self) -> bool: - """检查是否已连接""" return self._connected @abc.abstractmethod def connect(self) -> bool: - """ - 连接到服务 - - Returns: - 是否连接成功 - """ pass @abc.abstractmethod def disconnect(self): - """断开连接""" pass @abc.abstractmethod @@ -99,81 +71,44 @@ class OutlookProvider(abc.ABC): count: int = 20, only_unseen: bool = True, ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ pass @abc.abstractmethod def test_connection(self) -> bool: - """ - 测试连接是否正常 - - Returns: - 连接是否正常 - """ pass + def wait_for_new_email_idle(self, timeout: int = 25) -> bool: + """IMAP IDLE(默认不支持,子类可覆盖)""" + return False + def record_success(self): - """记录成功操作""" self._health.record_success() self._last_error = None - logger.debug(f"[{self.account.email}] {self.provider_type.value} 操作成功") def record_failure(self, error: str): - """记录失败操作""" self._health.record_failure(error) self._last_error = error - - # 检查是否需要禁用 if self._health.should_disable(self.config.health_failure_threshold): self._health.disable(self.config.health_disable_duration) logger.warning( - f"[{self.account.email}] {self.provider_type.value} 已禁用 " - f"{self.config.health_disable_duration} 秒,原因: {error}" - ) - else: - logger.warning( - f"[{self.account.email}] {self.provider_type.value} 操作失败 " - f"({self._health.failure_count}/{self.config.health_failure_threshold}): {error}" + f"[{self.account.email}] IMAP_NEW 已禁用 " + f"{self.config.health_disable_duration}s,原因: {error}" ) def check_health(self) -> bool: - """ - 检查健康状态 - - Returns: - 是否健康可用 - """ - # 检查是否被禁用 if self._health.is_disabled(): - logger.debug( - f"[{self.account.email}] {self.provider_type.value} 已被禁用," - f"将在 {self._health.disabled_until} 后恢复" - ) return False - return self._health.status in (ProviderStatus.HEALTHY, ProviderStatus.DEGRADED) def __enter__(self): - """上下文管理器入口""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - """上下文管理器出口""" self.disconnect() return False def __str__(self) -> str: - """字符串表示""" return f"{self.__class__.__name__}({self.account.email})" def __repr__(self) -> str: diff --git a/src/services/outlook/providers/graph_api.py b/src/services/outlook/providers/graph_api.py deleted file mode 100644 index 00cec80..0000000 --- a/src/services/outlook/providers/graph_api.py +++ /dev/null @@ -1,250 +0,0 @@ -""" -Graph API 提供者 -使用 Microsoft Graph REST API -""" - -import json -import logging -from typing import List, Optional -from datetime import datetime - -from curl_cffi import requests as _requests - -from ..base import ProviderType, EmailMessage -from ..account import OutlookAccount -from ..token_manager import TokenManager -from .base import OutlookProvider, ProviderConfig - - -logger = logging.getLogger(__name__) - - -class GraphAPIProvider(OutlookProvider): - """ - Graph API 提供者 - 使用 Microsoft Graph REST API 获取邮件 - 需要 graph.microsoft.com/.default scope - """ - - # Graph API 端点 - GRAPH_API_BASE = "https://graph.microsoft.com/v1.0" - MESSAGES_ENDPOINT = "/me/mailFolders/inbox/messages" - - @property - def provider_type(self) -> ProviderType: - return ProviderType.GRAPH_API - - def __init__( - self, - account: OutlookAccount, - config: Optional[ProviderConfig] = None, - ): - super().__init__(account, config) - - # Token 管理器 - self._token_manager: Optional[TokenManager] = None - - # 注意:Graph API 必须使用 OAuth2 - if not account.has_oauth(): - logger.warning( - f"[{self.account.email}] Graph API 提供者需要 OAuth2 配置 " - f"(client_id + refresh_token)" - ) - - def connect(self) -> bool: - """ - 验证连接(获取 Token) - - Returns: - 是否连接成功 - """ - if not self.account.has_oauth(): - error = "Graph API 需要 OAuth2 配置" - self.record_failure(error) - logger.error(f"[{self.account.email}] {error}") - return False - - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.GRAPH_API, - self.config.proxy_url, - self.config.timeout, - ) - - # 尝试获取 Token - token = self._token_manager.get_access_token() - if token: - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] Graph API 连接成功") - return True - - return False - - def disconnect(self): - """断开连接(清除状态)""" - self._connected = False - - def get_recent_emails( - self, - count: int = 20, - only_unseen: bool = True, - ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ - if not self._connected: - if not self.connect(): - return [] - - try: - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - self.record_failure("无法获取 Access Token") - return [] - - # 构建 API 请求 - url = f"{self.GRAPH_API_BASE}{self.MESSAGES_ENDPOINT}" - - params = { - "$top": count, - "$select": "id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview,body", - "$orderby": "receivedDateTime desc", - } - - # 只获取未读邮件 - if only_unseen: - params["$filter"] = "isRead eq false" - - # 构建代理配置 - proxies = None - if self.config.proxy_url: - proxies = {"http": self.config.proxy_url, "https": self.config.proxy_url} - - # 发送请求(curl_cffi 自动对 params 进行 URL 编码) - resp = _requests.get( - url, - params=params, - headers={ - "Authorization": f"Bearer {token}", - "Accept": "application/json", - "Prefer": "outlook.body-content-type='text'", - }, - proxies=proxies, - timeout=self.config.timeout, - impersonate="chrome110", - ) - - if resp.status_code == 401: - # Token 无 Graph 权限(client_id 未授权),清除缓存但不记录健康失败 - # 避免因权限不足导致健康检查器禁用该提供者,影响其他账户 - if self._token_manager: - self._token_manager.clear_cache() - self._connected = False - logger.warning(f"[{self.account.email}] Graph API 返回 401,client_id 可能无 Graph 权限,跳过") - return [] - - if resp.status_code != 200: - error_body = resp.text[:200] - self.record_failure(f"HTTP {resp.status_code}: {error_body}") - logger.error(f"[{self.account.email}] Graph API 请求失败: HTTP {resp.status_code}") - return [] - - data = resp.json() - - # 解析邮件 - messages = data.get("value", []) - emails = [] - - for msg in messages: - try: - email_msg = self._parse_graph_message(msg) - if email_msg: - emails.append(email_msg) - except Exception as e: - logger.warning(f"[{self.account.email}] 解析 Graph API 邮件失败: {e}") - - self.record_success() - return emails - - except Exception as e: - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] Graph API 获取邮件失败: {e}") - return [] - - def _parse_graph_message(self, msg: dict) -> Optional[EmailMessage]: - """ - 解析 Graph API 消息 - - Args: - msg: Graph API 消息对象 - - Returns: - EmailMessage 对象 - """ - # 解析发件人 - from_info = msg.get("from", {}) - sender_info = from_info.get("emailAddress", {}) - sender = sender_info.get("address", "") - - # 解析收件人 - recipients = [] - for recipient in msg.get("toRecipients", []): - addr_info = recipient.get("emailAddress", {}) - addr = addr_info.get("address", "") - if addr: - recipients.append(addr) - - # 解析日期 - received_at = None - received_timestamp = 0 - try: - date_str = msg.get("receivedDateTime", "") - if date_str: - # ISO 8601 格式 - received_at = datetime.fromisoformat(date_str.replace("Z", "+00:00")) - received_timestamp = int(received_at.timestamp()) - except Exception: - pass - - # 获取正文 - body_info = msg.get("body", {}) - body = body_info.get("content", "") - body_preview = msg.get("bodyPreview", "") - - return EmailMessage( - id=msg.get("id", ""), - subject=msg.get("subject", ""), - sender=sender, - recipients=recipients, - body=body, - body_preview=body_preview, - received_at=received_at, - received_timestamp=received_timestamp, - is_read=msg.get("isRead", False), - has_attachments=msg.get("hasAttachments", False), - ) - - def test_connection(self) -> bool: - """ - 测试 Graph API 连接 - - Returns: - 连接是否正常 - """ - try: - # 尝试获取一封邮件来测试连接 - emails = self.get_recent_emails(count=1, only_unseen=False) - return True - except Exception as e: - logger.warning(f"[{self.account.email}] Graph API 连接测试失败: {e}") - return False diff --git a/src/services/outlook/providers/imap_new.py b/src/services/outlook/providers/imap_new.py index 5daa2f3..c3698f8 100644 --- a/src/services/outlook/providers/imap_new.py +++ b/src/services/outlook/providers/imap_new.py @@ -1,39 +1,87 @@ """ 新版 IMAP 提供者 -使用 outlook.live.com 服务器和 login.microsoftonline.com/consumers Token 端点 +使用 outlook.live.com:993 + consumers Token 端点 +引入进程级 IMAPConnectionPool 连接复用和 IMAP IDLE """ import email import imaplib import logging +import select +import time +import threading from email.header import decode_header from email.utils import parsedate_to_datetime -from typing import List, Optional +from typing import Dict, List, Optional -from ..base import ProviderType, EmailMessage +from ..base import EmailMessage from ..account import OutlookAccount from ..token_manager import TokenManager from .base import OutlookProvider, ProviderConfig -from .imap_old import IMAPOldProvider logger = logging.getLogger(__name__) -class IMAPNewProvider(OutlookProvider): - """ - 新版 IMAP 提供者 - 使用 outlook.live.com:993 和 login.microsoftonline.com/consumers Token 端点 - 需要 IMAP.AccessAsUser.All scope - """ +class IMAPConnectionPool: + """进程级 IMAP 连接池,按 email 复用 IMAP4_SSL 连接""" - # IMAP 服务器配置 IMAP_HOST = "outlook.live.com" IMAP_PORT = 993 - @property - def provider_type(self) -> ProviderType: - return ProviderType.IMAP_NEW + def __init__(self): + self._connections: Dict[str, imaplib.IMAP4_SSL] = {} + self._lock = threading.Lock() + + def get_connection( + self, + email_addr: str, + token: str, + timeout: int = 30, + ) -> imaplib.IMAP4_SSL: + """获取或新建 IMAP 连接""" + with self._lock: + conn = self._connections.get(email_addr) + if conn: + try: + conn.noop() + return conn + except Exception: + self._close_one(email_addr) + + conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout) + auth_str = f"user={email_addr}\x01auth=Bearer {token}\x01\x01" + conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8")) + self._connections[email_addr] = conn + logger.debug(f"[{email_addr}] IMAP 新连接已建立") + return conn + + def invalidate(self, email_addr: str): + """废弃连接(认证失败时调用)""" + with self._lock: + self._close_one(email_addr) + + def _close_one(self, email_addr: str): + conn = self._connections.pop(email_addr, None) + if conn: + try: + conn.logout() + except Exception: + pass + + +# 模块级单例连接池 +_imap_pool = IMAPConnectionPool() + + +class IMAPNewProvider(OutlookProvider): + """ + 新版 IMAP 提供者 + 通过连接池复用连接,支持 IMAP IDLE + """ + + IMAP_HOST = "outlook.live.com" + IMAP_PORT = 993 def __init__( self, @@ -41,151 +89,110 @@ class IMAPNewProvider(OutlookProvider): config: Optional[ProviderConfig] = None, ): super().__init__(account, config) - - # IMAP 连接 self._conn: Optional[imaplib.IMAP4_SSL] = None - - # Token 管理器 self._token_manager: Optional[TokenManager] = None - # 注意:新版 IMAP 必须使用 OAuth2 if not account.has_oauth(): logger.warning( - f"[{self.account.email}] 新版 IMAP 提供者需要 OAuth2 配置 " - f"(client_id + refresh_token)" + f"[{self.account.email}] IMAP_NEW 需要 OAuth2 配置 (client_id + refresh_token)" ) + def _get_token_manager(self) -> TokenManager: + if not self._token_manager: + self._token_manager = TokenManager( + self.account, + proxy_url=self.config.proxy_url, + timeout=self.config.timeout, + service_id=self.config.service_id, + ) + return self._token_manager + def connect(self) -> bool: - """ - 连接到 IMAP 服务器 - - Returns: - 是否连接成功 - """ - if self._connected and self._conn: - try: - self._conn.noop() - return True - except Exception: - self.disconnect() - - # 新版 IMAP 必须使用 OAuth2,无 OAuth 时静默跳过,不记录健康失败 + """从连接池获取连接""" if not self.account.has_oauth(): logger.debug(f"[{self.account.email}] 跳过 IMAP_NEW(无 OAuth)") return False try: - logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...") + tm = self._get_token_manager() + token = tm.get_access_token() + if not token: + logger.error(f"[{self.account.email}] 获取 IMAP Token 失败") + return False - # 创建连接 - self._conn = imaplib.IMAP4_SSL( - self.IMAP_HOST, - self.IMAP_PORT, - timeout=self.config.timeout, + self._conn = _imap_pool.get_connection( + self.account.email, token, self.config.timeout ) - - # XOAUTH2 认证 - if self._authenticate_xoauth2(): - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] 新版 IMAP 连接成功 (XOAUTH2)") - return True - - return False - - except Exception as e: - self.disconnect() - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] 新版 IMAP 连接失败: {e}") - return False - - def _authenticate_xoauth2(self) -> bool: - """ - 使用 XOAUTH2 认证 - - Returns: - 是否认证成功 - """ - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.IMAP_NEW, - self.config.proxy_url, - self.config.timeout, - ) - - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - logger.error(f"[{self.account.email}] 获取 IMAP Token 失败") - return False - - try: - # 构建 XOAUTH2 认证字符串 - auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01" - self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8")) + self._connected = True + self.record_success() + logger.debug(f"[{self.account.email}] IMAP 连接就绪(连接池)") return True + + except imaplib.IMAP4.error as e: + err = str(e) + # Token 失效时强制刷新并重试一次 + if "AUTHENTICATE" in err or "invalid" in err.lower(): + logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试刷新 Token") + _imap_pool.invalidate(self.account.email) + try: + tm = self._get_token_manager() + token = tm.get_access_token(force_refresh=True) + if token: + self._conn = _imap_pool.get_connection( + self.account.email, token, self.config.timeout + ) + self._connected = True + self.record_success() + return True + except Exception as retry_e: + self.record_failure(str(retry_e)) + logger.error(f"[{self.account.email}] Token 刷新后重连失败: {retry_e}") + else: + self.record_failure(err) + logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") + self._connected = False + self._conn = None + return False + except Exception as e: - logger.error(f"[{self.account.email}] XOAUTH2 认证异常: {e}") - # 清除缓存的 Token - self._token_manager.clear_cache() + self.record_failure(str(e)) + logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") + self._connected = False + self._conn = None return False def disconnect(self): - """断开 IMAP 连接""" - if self._conn: - try: - self._conn.close() - except Exception: - pass - try: - self._conn.logout() - except Exception: - pass - self._conn = None - + """归还连接池(不 logout,保持复用)""" self._connected = False + self._conn = None def get_recent_emails( self, count: int = 20, only_unseen: bool = True, ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ + """获取最近的邮件""" if not self._connected: if not self.connect(): return [] try: - # 选择收件箱 self._conn.select("INBOX", readonly=True) - - # 搜索邮件 flag = "UNSEEN" if only_unseen else "ALL" status, data = self._conn.search(None, flag) if status != "OK" or not data or not data[0]: return [] - # 获取最新的邮件 ID ids = data[0].split() recent_ids = ids[-count:][::-1] emails = [] for msg_id in recent_ids: try: - email_msg = self._fetch_email(msg_id) - if email_msg: - emails.append(email_msg) + msg = self._fetch_email(msg_id) + if msg: + emails.append(msg) except Exception as e: logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}") @@ -194,6 +201,9 @@ class IMAPNewProvider(OutlookProvider): except Exception as e: self.record_failure(str(e)) logger.error(f"[{self.account.email}] 获取邮件失败: {e}") + _imap_pool.invalidate(self.account.email) + self._connected = False + self._conn = None return [] def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]: @@ -211,21 +221,170 @@ class IMAPNewProvider(OutlookProvider): if not raw: return None - return self._parse_email(raw) + return _parse_email(raw) - @staticmethod - def _parse_email(raw: bytes) -> EmailMessage: - """解析原始邮件""" - # 使用旧版提供者的解析方法 - return IMAPOldProvider._parse_email(raw) + def wait_for_new_email_idle(self, timeout: int = 25) -> bool: + """ + RFC 2177 IMAP IDLE 实现。 + 发送 IDLE 命令,等待服务器推送 EXISTS/RECENT,然后发送 DONE。 + Returns True 表示有新邮件推送,False 表示超时或异常(调用方降级轮询)。 + """ + if not self._connected: + if not self.connect(): + return False + + try: + self._conn.select("INBOX", readonly=True) + except Exception as e: + logger.warning(f"[{self.account.email}] IDLE 前 SELECT 失败: {e}") + return False + + logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s)") + + sock = self._conn.socket() + tag = self._conn._new_tag().decode() if isinstance(self._conn._new_tag(), bytes) else self._conn._new_tag() + + try: + # 发送 IDLE 命令 + self._conn.send(f"{tag} IDLE\r\n".encode()) + + # 等待 "+" 延续响应 + deadline = time.time() + timeout + buf = b"" + while time.time() < deadline: + ready = select.select([sock], [], [], min(2.0, deadline - time.time())) + if ready[0]: + chunk = sock.recv(4096) + if not chunk: + break + buf += chunk + if b"+ " in buf or b"+\r\n" in buf: + break + + # 等待 EXISTS / RECENT 推送 + got_new = False + buf = b"" + while time.time() < deadline: + remaining = deadline - time.time() + ready = select.select([sock], [], [], min(2.0, remaining)) + if ready[0]: + chunk = sock.recv(4096) + if not chunk: + break + buf += chunk + if b"EXISTS" in buf or b"RECENT" in buf: + got_new = True + break + + return got_new + + except Exception as e: + logger.warning(f"[{self.account.email}] IMAP IDLE 异常: {e}") + return False + + finally: + # 发送 DONE 结束 IDLE + try: + self._conn.send(b"DONE\r\n") + # 读取 IDLE 结束响应(避免缓冲区污染后续命令) + deadline2 = time.time() + 5 + resp_buf = b"" + while time.time() < deadline2: + ready = select.select([sock], [], [], 1.0) + if ready[0]: + chunk = sock.recv(4096) + if not chunk: + break + resp_buf += chunk + if tag.encode() in resp_buf: + break + except Exception: + # DONE 发送失败则废弃连接 + _imap_pool.invalidate(self.account.email) + self._connected = False + self._conn = None def test_connection(self) -> bool: """测试 IMAP 连接""" try: with self: self._conn.select("INBOX", readonly=True) - self._conn.search(None, "ALL") return True except Exception as e: - logger.warning(f"[{self.account.email}] 新版 IMAP 连接测试失败: {e}") + logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}") return False + + +def _parse_email(raw: bytes) -> EmailMessage: + """解析原始邮件为 EmailMessage""" + msg = email.message_from_bytes(raw) + + def _decode(val): + if not val: + return "" + parts = decode_header(str(val)) + result = "" + for part, charset in parts: + if isinstance(part, bytes): + try: + result += part.decode(charset or "utf-8", errors="replace") + except (LookupError, UnicodeDecodeError): + result += part.decode("utf-8", errors="replace") + else: + result += str(part) + return result + + subject = _decode(msg.get("Subject", "")) + sender = _decode(msg.get("From", "")) + recipients = [_decode(msg.get("To", ""))] + + received_at = None + received_ts = 0 + date_str = msg.get("Date", "") + if date_str: + try: + received_at = parsedate_to_datetime(date_str) + received_ts = int(received_at.timestamp()) + except Exception: + pass + + body = "" + body_preview = "" + if msg.is_multipart(): + for part in msg.walk(): + ct = part.get_content_type() + cd = str(part.get("Content-Disposition", "")) + if "attachment" not in cd.lower() and ct in ("text/plain", "text/html"): + try: + charset = part.get_content_charset() or "utf-8" + payload = part.get_payload(decode=True) + if payload: + body = payload.decode(charset, errors="replace") + break + except Exception: + pass + else: + try: + charset = msg.get_content_charset() or "utf-8" + payload = msg.get_payload(decode=True) + if payload: + body = payload.decode(charset, errors="replace") + except Exception: + pass + + body_preview = body[:200].strip() + + msg_id = msg.get("Message-ID", "").strip("<>") + if not msg_id: + msg_id = f"{sender}_{received_ts}" + + return EmailMessage( + id=msg_id, + subject=subject, + sender=sender, + recipients=recipients, + body=body, + body_preview=body_preview, + received_at=received_at, + received_timestamp=received_ts, + ) diff --git a/src/services/outlook/providers/imap_old.py b/src/services/outlook/providers/imap_old.py deleted file mode 100644 index e46f3ed..0000000 --- a/src/services/outlook/providers/imap_old.py +++ /dev/null @@ -1,345 +0,0 @@ -""" -旧版 IMAP 提供者 -使用 outlook.office365.com 服务器和 login.live.com Token 端点 -""" - -import email -import imaplib -import logging -from email.header import decode_header -from email.utils import parsedate_to_datetime -from typing import List, Optional - -from ..base import ProviderType, EmailMessage -from ..account import OutlookAccount -from ..token_manager import TokenManager -from .base import OutlookProvider, ProviderConfig - - -logger = logging.getLogger(__name__) - - -class IMAPOldProvider(OutlookProvider): - """ - 旧版 IMAP 提供者 - 使用 outlook.office365.com:993 和 login.live.com Token 端点 - """ - - # IMAP 服务器配置 - IMAP_HOST = "outlook.office365.com" - IMAP_PORT = 993 - - @property - def provider_type(self) -> ProviderType: - return ProviderType.IMAP_OLD - - def __init__( - self, - account: OutlookAccount, - config: Optional[ProviderConfig] = None, - ): - super().__init__(account, config) - - # IMAP 连接 - self._conn: Optional[imaplib.IMAP4_SSL] = None - - # Token 管理器 - self._token_manager: Optional[TokenManager] = None - - def connect(self) -> bool: - """ - 连接到 IMAP 服务器 - - Returns: - 是否连接成功 - """ - if self._connected and self._conn: - # 检查现有连接 - try: - self._conn.noop() - return True - except Exception: - self.disconnect() - - try: - logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...") - - # 创建连接 - self._conn = imaplib.IMAP4_SSL( - self.IMAP_HOST, - self.IMAP_PORT, - timeout=self.config.timeout, - ) - - # 尝试 XOAUTH2 认证 - if self.account.has_oauth(): - if self._authenticate_xoauth2(): - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] IMAP 连接成功 (XOAUTH2)") - return True - else: - logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试密码认证") - - # 密码认证 - if self.account.password: - self._conn.login(self.account.email, self.account.password) - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] IMAP 连接成功 (密码认证)") - return True - - raise ValueError("没有可用的认证方式") - - except Exception as e: - self.disconnect() - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") - return False - - def _authenticate_xoauth2(self) -> bool: - """ - 使用 XOAUTH2 认证 - - Returns: - 是否认证成功 - """ - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.IMAP_OLD, - self.config.proxy_url, - self.config.timeout, - ) - - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - return False - - try: - # 构建 XOAUTH2 认证字符串 - auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01" - self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8")) - return True - except Exception as e: - logger.debug(f"[{self.account.email}] XOAUTH2 认证异常: {e}") - # 清除缓存的 Token - self._token_manager.clear_cache() - return False - - def disconnect(self): - """断开 IMAP 连接""" - if self._conn: - try: - self._conn.close() - except Exception: - pass - try: - self._conn.logout() - except Exception: - pass - self._conn = None - - self._connected = False - - def get_recent_emails( - self, - count: int = 20, - only_unseen: bool = True, - ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ - if not self._connected: - if not self.connect(): - return [] - - try: - # 选择收件箱 - self._conn.select("INBOX", readonly=True) - - # 搜索邮件 - flag = "UNSEEN" if only_unseen else "ALL" - status, data = self._conn.search(None, flag) - - if status != "OK" or not data or not data[0]: - return [] - - # 获取最新的邮件 ID - ids = data[0].split() - recent_ids = ids[-count:][::-1] # 倒序,最新的在前 - - emails = [] - for msg_id in recent_ids: - try: - email_msg = self._fetch_email(msg_id) - if email_msg: - emails.append(email_msg) - except Exception as e: - logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}") - - return emails - - except Exception as e: - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] 获取邮件失败: {e}") - return [] - - def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]: - """ - 获取并解析单封邮件 - - Args: - msg_id: 邮件 ID - - Returns: - EmailMessage 对象,失败返回 None - """ - status, data = self._conn.fetch(msg_id, "(RFC822)") - if status != "OK" or not data or not data[0]: - return None - - # 获取原始邮件内容 - raw = b"" - for part in data: - if isinstance(part, tuple) and len(part) > 1: - raw = part[1] - break - - if not raw: - return None - - return self._parse_email(raw) - - @staticmethod - def _parse_email(raw: bytes) -> EmailMessage: - """ - 解析原始邮件 - - Args: - raw: 原始邮件数据 - - Returns: - EmailMessage 对象 - """ - # 移除 BOM - if raw.startswith(b"\xef\xbb\xbf"): - raw = raw[3:] - - msg = email.message_from_bytes(raw) - - # 解析邮件头 - subject = IMAPOldProvider._decode_header(msg.get("Subject", "")) - sender = IMAPOldProvider._decode_header(msg.get("From", "")) - to = IMAPOldProvider._decode_header(msg.get("To", "")) - delivered_to = IMAPOldProvider._decode_header(msg.get("Delivered-To", "")) - x_original_to = IMAPOldProvider._decode_header(msg.get("X-Original-To", "")) - date_str = IMAPOldProvider._decode_header(msg.get("Date", "")) - - # 提取正文 - body = IMAPOldProvider._extract_body(msg) - - # 解析日期 - received_timestamp = 0 - received_at = None - try: - if date_str: - received_at = parsedate_to_datetime(date_str) - received_timestamp = int(received_at.timestamp()) - except Exception: - pass - - # 构建收件人列表 - recipients = [r for r in [to, delivered_to, x_original_to] if r] - - return EmailMessage( - id=msg.get("Message-ID", ""), - subject=subject, - sender=sender, - recipients=recipients, - body=body, - received_at=received_at, - received_timestamp=received_timestamp, - is_read=False, # 搜索的是未读邮件 - raw_data=raw[:500] if len(raw) > 500 else raw, - ) - - @staticmethod - def _decode_header(header: str) -> str: - """解码邮件头""" - if not header: - return "" - - parts = [] - for chunk, encoding in decode_header(header): - if isinstance(chunk, bytes): - try: - decoded = chunk.decode(encoding or "utf-8", errors="replace") - parts.append(decoded) - except Exception: - parts.append(chunk.decode("utf-8", errors="replace")) - else: - parts.append(str(chunk)) - - return "".join(parts).strip() - - @staticmethod - def _extract_body(msg) -> str: - """提取邮件正文""" - import html as html_module - import re - - texts = [] - parts = msg.walk() if msg.is_multipart() else [msg] - - for part in parts: - content_type = part.get_content_type() - if content_type not in ("text/plain", "text/html"): - continue - - payload = part.get_payload(decode=True) - if not payload: - continue - - charset = part.get_content_charset() or "utf-8" - try: - text = payload.decode(charset, errors="replace") - except LookupError: - text = payload.decode("utf-8", errors="replace") - - # 如果是 HTML,移除标签 - if "]+>", " ", text) - - texts.append(text) - - # 合并并清理文本 - combined = " ".join(texts) - combined = html_module.unescape(combined) - combined = re.sub(r"\s+", " ", combined).strip() - - return combined - - def test_connection(self) -> bool: - """ - 测试 IMAP 连接 - - Returns: - 连接是否正常 - """ - try: - with self: - self._conn.select("INBOX", readonly=True) - self._conn.search(None, "ALL") - return True - except Exception as e: - logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}") - return False diff --git a/src/services/outlook/service.py b/src/services/outlook/service.py index 321d8b3..6d7d280 100644 --- a/src/services/outlook/service.py +++ b/src/services/outlook/service.py @@ -1,6 +1,6 @@ """ -Outlook 邮箱服务主类 -支持多种 IMAP/API 连接方式,自动故障切换 +Outlook 邮箱服务主类(简化版) +单一 IMAP_NEW Provider + 邮件缓存 + IMAP IDLE 支持 """ import logging @@ -8,34 +8,21 @@ import threading import time from typing import Optional, Dict, Any, List -from ..base import BaseEmailService, EmailServiceError, EmailServiceStatus, EmailServiceType +from ..base import BaseEmailService, EmailServiceError, EmailServiceType from ...config.constants import EmailServiceType as ServiceType from ...config.settings import get_settings from .account import OutlookAccount -from .base import ProviderType, EmailMessage -from .email_parser import EmailParser, get_email_parser -from .health_checker import HealthChecker, FailoverManager -from .providers.base import OutlookProvider, ProviderConfig -from .providers.imap_old import IMAPOldProvider +from .base import EmailMessage +from .email_parser import get_email_parser +from .health_checker import HealthChecker +from .providers.base import ProviderConfig from .providers.imap_new import IMAPNewProvider -from .providers.graph_api import GraphAPIProvider logger = logging.getLogger(__name__) -# 默认提供者优先级 -# IMAP_OLD 最兼容(只需 login.live.com token),IMAP_NEW 次之,Graph API 最后 -# 原因:部分 client_id 没有 Graph API 权限,但有 IMAP 权限 -DEFAULT_PROVIDER_PRIORITY = [ - ProviderType.IMAP_OLD, - ProviderType.IMAP_NEW, - ProviderType.GRAPH_API, -] - - -def get_email_code_settings() -> dict: - """获取验证码等待配置""" +def _get_code_settings() -> dict: settings = get_settings() return { "timeout": settings.email_code_timeout, @@ -43,56 +30,58 @@ def get_email_code_settings() -> dict: } +class _EmailCache: + """轻量级邮件内存缓存(TTL=60s,减少重复 IMAP 请求)""" + + TTL = 60 + + def __init__(self): + self._cache: Dict[str, tuple] = {} # email -> (timestamp, List[EmailMessage]) + self._lock = threading.Lock() + + def get(self, email: str) -> Optional[List[EmailMessage]]: + with self._lock: + entry = self._cache.get(email) + if entry and time.time() - entry[0] < self.TTL: + return entry[1] + return None + + def set(self, email: str, messages: List[EmailMessage]): + with self._lock: + self._cache[email] = (time.time(), messages) + + def invalidate(self, email: str): + with self._lock: + self._cache.pop(email, None) + + class OutlookService(BaseEmailService): """ Outlook 邮箱服务 - 支持多种 IMAP/API 连接方式,自动故障切换 + 使用单一 IMAP_NEW Provider,支持连接池复用和 IMAP IDLE """ def __init__(self, config: Dict[str, Any] = None, name: str = None): - """ - 初始化 Outlook 服务 - - Args: - config: 配置字典,支持以下键: - - accounts: Outlook 账户列表 - - provider_priority: 提供者优先级列表 - - health_failure_threshold: 连续失败次数阈值 - - health_disable_duration: 禁用时长(秒) - - timeout: 请求超时时间 - - proxy_url: 代理 URL - name: 服务名称 - """ super().__init__(ServiceType.OUTLOOK, name) - # 默认配置 default_config = { "accounts": [], - "provider_priority": [p.value for p in DEFAULT_PROVIDER_PRIORITY], "health_failure_threshold": 5, "health_disable_duration": 60, "timeout": 30, "proxy_url": None, } - self.config = {**default_config, **(config or {})} - # 解析提供者优先级 - self.provider_priority = [ - ProviderType(p) for p in self.config.get("provider_priority", []) - ] - if not self.provider_priority: - self.provider_priority = DEFAULT_PROVIDER_PRIORITY - - # 提供者配置 self.provider_config = ProviderConfig( timeout=self.config.get("timeout", 30), proxy_url=self.config.get("proxy_url"), + service_id=self.config.get("service_id"), health_failure_threshold=self.config.get("health_failure_threshold", 3), health_disable_duration=self.config.get("health_disable_duration", 300), ) - # 获取默认 client_id(供无 client_id 的账户使用) + # 获取默认 client_id try: _default_client_id = get_settings().outlook_default_client_id except Exception: @@ -103,7 +92,6 @@ class OutlookService(BaseEmailService): self._current_account_index = 0 self._account_lock = threading.Lock() - # 支持两种配置格式 if "email" in self.config and "password" in self.config: account = OutlookAccount.from_config(self.config) if not account.client_id and _default_client_id: @@ -111,8 +99,8 @@ class OutlookService(BaseEmailService): if account.validate(): self.accounts.append(account) else: - for account_config in self.config.get("accounts", []): - account = OutlookAccount.from_config(account_config) + for ac in self.config.get("accounts", []): + account = OutlookAccount.from_config(ac) if not account.client_id and _default_client_id: account.client_id = _default_client_id if account.validate(): @@ -121,175 +109,87 @@ class OutlookService(BaseEmailService): if not self.accounts: logger.warning("未配置有效的 Outlook 账户") - # 健康检查器和故障切换管理器 + # 健康检查器 self.health_checker = HealthChecker( failure_threshold=self.provider_config.health_failure_threshold, disable_duration=self.provider_config.health_disable_duration, ) - self.failover_manager = FailoverManager( - health_checker=self.health_checker, - priority_order=self.provider_priority, - ) # 邮件解析器 self.email_parser = get_email_parser() - # 提供者实例缓存: (email, provider_type) -> OutlookProvider - self._providers: Dict[tuple, OutlookProvider] = {} + # Provider 实例缓存: email -> IMAPNewProvider + self._providers: Dict[str, IMAPNewProvider] = {} self._provider_lock = threading.Lock() - # IMAP 连接限制(防止限流) + # IMAP 并发限制(最多 5 个并发) self._imap_semaphore = threading.Semaphore(5) - # 验证码去重机制 + # 邮件缓存 + self._email_cache = _EmailCache() + + # 验证码去重 self._used_codes: Dict[str, set] = {} - def _get_provider( - self, - account: OutlookAccount, - provider_type: ProviderType, - ) -> OutlookProvider: - """ - 获取或创建提供者实例 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - - Returns: - 提供者实例 - """ - cache_key = (account.email.lower(), provider_type) - + def _get_provider(self, account: OutlookAccount) -> IMAPNewProvider: + key = account.email.lower() with self._provider_lock: - if cache_key not in self._providers: - provider = self._create_provider(account, provider_type) - self._providers[cache_key] = provider + if key not in self._providers: + self._providers[key] = IMAPNewProvider(account, self.provider_config) + return self._providers[key] - return self._providers[cache_key] - - def _create_provider( + def _fetch_emails( self, account: OutlookAccount, - provider_type: ProviderType, - ) -> OutlookProvider: - """ - 创建提供者实例 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - - Returns: - 提供者实例 - """ - if provider_type == ProviderType.IMAP_OLD: - return IMAPOldProvider(account, self.provider_config) - elif provider_type == ProviderType.IMAP_NEW: - return IMAPNewProvider(account, self.provider_config) - elif provider_type == ProviderType.GRAPH_API: - return GraphAPIProvider(account, self.provider_config) - else: - raise ValueError(f"未知的提供者类型: {provider_type}") - - def _get_provider_priority_for_account(self, account: OutlookAccount) -> List[ProviderType]: - """根据账户是否有 OAuth,返回适合的提供者优先级列表""" - if account.has_oauth(): - return self.provider_priority - else: - # 无 OAuth,直接走旧版 IMAP(密码认证),跳过需要 OAuth 的提供者 - return [ProviderType.IMAP_OLD] - - def _try_providers_for_emails( - self, - account: OutlookAccount, - count: int = 20, + count: int = 15, only_unseen: bool = True, + use_cache: bool = False, ) -> List[EmailMessage]: - """ - 尝试多个提供者获取邮件 + """通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存""" + if use_cache: + cached = self._email_cache.get(account.email) + if cached is not None: + return cached - Args: - account: Outlook 账户 - count: 获取数量 - only_unseen: 是否只获取未读 + if not self.health_checker.is_available(): + logger.debug(f"[{account.email}] IMAP_NEW 不可用,跳过") + return [] - Returns: - 邮件列表 - """ - errors = [] + try: + provider = self._get_provider(account) + with self._imap_semaphore: + with provider: + emails = provider.get_recent_emails(count, only_unseen) - # 根据账户类型选择合适的提供者优先级 - priority = self._get_provider_priority_for_account(account) + if emails: + self.health_checker.record_success() + if use_cache: + self._email_cache.set(account.email, emails) + return emails - # 按优先级尝试各提供者 - for provider_type in priority: - # 检查提供者是否可用 - if not self.health_checker.is_available(provider_type): - logger.debug( - f"[{account.email}] {provider_type.value} 不可用,跳过" - ) - continue - - try: - provider = self._get_provider(account, provider_type) - - with self._imap_semaphore: - with provider: - emails = provider.get_recent_emails(count, only_unseen) - - if emails: - # 成功获取邮件 - self.health_checker.record_success(provider_type) - logger.debug( - f"[{account.email}] {provider_type.value} 获取到 {len(emails)} 封邮件" - ) - return emails - - except Exception as e: - error_msg = str(e) - errors.append(f"{provider_type.value}: {error_msg}") - self.health_checker.record_failure(provider_type, error_msg) - logger.warning( - f"[{account.email}] {provider_type.value} 获取邮件失败: {e}" - ) - - logger.error( - f"[{account.email}] 所有提供者都失败: {'; '.join(errors)}" - ) - return [] + except Exception as e: + err = str(e) + self.health_checker.record_failure(err) + logger.warning(f"[{account.email}] 获取邮件失败: {e}") + return [] def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: - """ - 选择可用的 Outlook 账户 - - Args: - config: 配置参数(未使用) - - Returns: - 包含邮箱信息的字典 - """ + """轮询选择可用的 Outlook 账户""" if not self.accounts: self.update_status(False, EmailServiceError("没有可用的 Outlook 账户")) raise EmailServiceError("没有可用的 Outlook 账户") - # 轮询选择账户 with self._account_lock: account = self.accounts[self._current_account_index] self._current_account_index = (self._current_account_index + 1) % len(self.accounts) - email_info = { - "email": account.email, - "service_id": account.email, - "account": { - "email": account.email, - "has_oauth": account.has_oauth() - } - } - logger.info(f"选择 Outlook 账户: {account.email}") self.update_status(True) - return email_info + return { + "email": account.email, + "service_id": account.email, + "account": {"email": account.email, "has_oauth": account.has_oauth()}, + } def get_verification_code( self, @@ -299,114 +199,155 @@ class OutlookService(BaseEmailService): pattern: str = None, otp_sent_at: Optional[float] = None, ) -> Optional[str]: - """ - 从 Outlook 邮箱获取验证码 - - Args: - email: 邮箱地址 - email_id: 未使用 - timeout: 超时时间(秒) - pattern: 验证码正则表达式(未使用) - otp_sent_at: OTP 发送时间戳 - - Returns: - 验证码字符串 - """ - # 查找对应的账户 - account = None - for acc in self.accounts: - if acc.email.lower() == email.lower(): - account = acc - break - + """从 Outlook 邮箱获取验证码""" + account = next( + (a for a in self.accounts if a.email.lower() == email.lower()), None + ) if not account: - self.update_status(False, EmailServiceError(f"未找到邮箱对应的账户: {email}")) + self.update_status(False, EmailServiceError(f"未找到邮箱账户: {email}")) return None - # 获取验证码等待配置 - code_settings = get_email_code_settings() + code_settings = _get_code_settings() actual_timeout = timeout or code_settings["timeout"] poll_interval = code_settings["poll_interval"] - logger.info( - f"[{email}] 开始获取验证码,超时 {actual_timeout}s," - f"提供者优先级: {[p.value for p in self.provider_priority]}" - ) + logger.info(f"[{email}] 开始获取验证码,超时 {actual_timeout}s") - # 初始化验证码去重集合 if email not in self._used_codes: self._used_codes[email] = set() used_codes = self._used_codes[email] - # 计算最小时间戳(留出 60 秒时钟偏差) min_timestamp = (otp_sent_at - 60) if otp_sent_at else 0 + use_idle = True + try: + use_idle = get_settings().outlook_use_idle + except Exception: + pass + + if use_idle: + code = self._wait_with_idle( + account, email, actual_timeout, min_timestamp, used_codes + ) + else: + code = self._wait_with_poll( + account, email, actual_timeout, poll_interval, min_timestamp, used_codes + ) + + if code: + used_codes.add(code) + self.update_status(True) + return code + return None + + def _wait_with_poll( + self, + account: OutlookAccount, + email: str, + timeout: int, + poll_interval: int, + min_timestamp: float, + used_codes: set, + ) -> Optional[str]: + """轮询方式等待验证码""" start_time = time.time() poll_count = 0 - while time.time() - start_time < actual_timeout: + while time.time() - start_time < timeout: poll_count += 1 - - # 渐进式邮件检查:前 3 次只检查未读 only_unseen = poll_count <= 3 - try: - # 尝试多个提供者获取邮件 - emails = self._try_providers_for_emails( - account, - count=15, - only_unseen=only_unseen, - ) - + emails = self._fetch_emails(account, count=15, only_unseen=only_unseen) if emails: - logger.debug( - f"[{email}] 第 {poll_count} 次轮询获取到 {len(emails)} 封邮件" - ) - - # 从邮件中查找验证码 code = self.email_parser.find_verification_code_in_emails( emails, target_email=email, min_timestamp=min_timestamp, used_codes=used_codes, ) - if code: - used_codes.add(code) elapsed = int(time.time() - start_time) logger.info( - f"[{email}] 找到验证码: {code}," - f"总耗时 {elapsed}s,轮询 {poll_count} 次" + f"[{email}] 找到验证码: {code},耗时 {elapsed}s,轮询 {poll_count} 次" ) - self.update_status(True) return code - except Exception as e: - logger.warning(f"[{email}] 检查出错: {e}") + logger.warning(f"[{email}] 轮询出错: {e}") - # 等待下次轮询 time.sleep(poll_interval) - elapsed = int(time.time() - start_time) - logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count} 次") + logger.warning(f"[{email}] 验证码超时 ({timeout}s),共轮询 {poll_count} 次") return None - def list_emails(self, **kwargs) -> List[Dict[str, Any]]: - """列出所有可用的 Outlook 账户""" - return [ - { - "email": account.email, - "id": account.email, - "has_oauth": account.has_oauth(), - "type": "outlook" - } - for account in self.accounts - ] + def _wait_with_idle( + self, + account: OutlookAccount, + email: str, + timeout: int, + min_timestamp: float, + used_codes: set, + ) -> Optional[str]: + """IMAP IDLE 方式等待验证码,失败时自动降级为轮询""" + if not self.health_checker.is_available(): + logger.warning(f"[{email}] IMAP_NEW 不可用,降级为轮询") + return self._wait_with_poll( + account, email, timeout, 3, min_timestamp, used_codes + ) - def delete_email(self, email_id: str) -> bool: - """删除邮箱(Outlook 不支持删除账户)""" - logger.warning(f"Outlook 服务不支持删除账户: {email_id}") - return False + start_time = time.time() + try: + provider = self._get_provider(account) + with self._imap_semaphore: + with provider: + # 先做一次即时检查 + emails = provider.get_recent_emails(15, only_unseen=True) + code = self.email_parser.find_verification_code_in_emails( + emails, + target_email=email, + min_timestamp=min_timestamp, + used_codes=used_codes, + ) + if code: + elapsed = int(time.time() - start_time) + logger.info(f"[{email}] 找到验证码: {code},耗时 {elapsed}s(即时检查)") + return code + + # IDLE 等待循环 + while time.time() - start_time < timeout: + remaining = int(timeout - (time.time() - start_time)) + if remaining <= 0: + break + arrived = provider.wait_for_new_email_idle(timeout=min(remaining, 25)) + # 无效化缓存,强制重新拉取 + self._email_cache.invalidate(email) + emails = provider.get_recent_emails(15, only_unseen=True) + code = self.email_parser.find_verification_code_in_emails( + emails, + target_email=email, + min_timestamp=min_timestamp, + used_codes=used_codes, + ) + if code: + elapsed = int(time.time() - start_time) + logger.info( + f"[{email}] 找到验证码: {code},耗时 {elapsed}s" + f"(IDLE {'推送' if arrived else '超时检查'})" + ) + return code + + except Exception as e: + logger.warning(f"[{email}] IDLE 失败,降级为轮询: {e}") + elapsed = int(time.time() - start_time) + remaining = max(0, timeout - elapsed) + if remaining > 0: + code_settings = _get_code_settings() + return self._wait_with_poll( + account, email, remaining, + code_settings["poll_interval"], min_timestamp, used_codes + ) + + logger.warning(f"[{email}] IDLE 等待验证码超时 ({timeout}s)") + return None def check_health(self) -> bool: """检查 Outlook 服务是否可用""" @@ -414,48 +355,48 @@ class OutlookService(BaseEmailService): self.update_status(False, EmailServiceError("没有配置的账户")) return False - # 测试第一个账户的连接 - test_account = self.accounts[0] - - # 尝试任一提供者连接 - for provider_type in self.provider_priority: - try: - provider = self._get_provider(test_account, provider_type) - if provider.test_connection(): - self.update_status(True) - return True - except Exception as e: - logger.warning( - f"Outlook 健康检查失败 ({test_account.email}, {provider_type.value}): {e}" - ) + try: + provider = self._get_provider(self.accounts[0]) + if provider.test_connection(): + self.update_status(True) + return True + except Exception as e: + logger.warning(f"Outlook 健康检查失败: {e}") self.update_status(False, EmailServiceError("健康检查失败")) return False - def get_provider_status(self) -> Dict[str, Any]: - """获取提供者状态""" - return self.failover_manager.get_status() + def list_emails(self, **kwargs) -> List[Dict[str, Any]]: + return [ + { + "email": a.email, + "id": a.email, + "has_oauth": a.has_oauth(), + "type": "outlook", + } + for a in self.accounts + ] + + def delete_email(self, email_id: str) -> bool: + logger.warning(f"Outlook 服务不支持删除账户: {email_id}") + return False def get_account_stats(self) -> Dict[str, Any]: - """获取账户统计信息""" total = len(self.accounts) - oauth_count = sum(1 for acc in self.accounts if acc.has_oauth()) - + oauth_count = sum(1 for a in self.accounts if a.has_oauth()) return { "total_accounts": total, "oauth_accounts": oauth_count, "password_accounts": total - oauth_count, - "accounts": [acc.to_dict() for acc in self.accounts], - "provider_status": self.get_provider_status(), + "accounts": [a.to_dict() for a in self.accounts], + "health_status": self.health_checker.get_status(), } def add_account(self, account_config: Dict[str, Any]) -> bool: - """添加新的 Outlook 账户""" try: account = OutlookAccount.from_config(account_config) if not account.validate(): return False - self.accounts.append(account) logger.info(f"添加 Outlook 账户: {account.email}") return True @@ -464,24 +405,13 @@ class OutlookService(BaseEmailService): return False def remove_account(self, email: str) -> bool: - """移除 Outlook 账户""" - for i, acc in enumerate(self.accounts): - if acc.email.lower() == email.lower(): + for i, a in enumerate(self.accounts): + if a.email.lower() == email.lower(): self.accounts.pop(i) logger.info(f"移除 Outlook 账户: {email}") return True return False - def reset_provider_health(self): - """重置所有提供者的健康状态""" - self.health_checker.reset_all() - logger.info("已重置所有提供者的健康状态") - - def force_provider(self, provider_type: ProviderType): - """强制使用指定的提供者""" - self.health_checker.force_enable(provider_type) - # 禁用其他提供者 - for pt in ProviderType: - if pt != provider_type: - self.health_checker.force_disable(pt, 60) - logger.info(f"已强制使用提供者: {provider_type.value}") + def reset_health(self): + self.health_checker.reset() + logger.info("已重置 IMAP_NEW 健康状态") diff --git a/src/services/outlook/token_manager.py b/src/services/outlook/token_manager.py index 77e54f2..0c23536 100644 --- a/src/services/outlook/token_manager.py +++ b/src/services/outlook/token_manager.py @@ -1,6 +1,6 @@ """ -Token 管理器 -支持多个 Microsoft Token 端点,自动选择合适的端点 +Token 管理器(简化版) +固定使用 consumers 端点 + IMAP scope """ import json @@ -11,153 +11,98 @@ from typing import Dict, Optional, Any from curl_cffi import requests as _requests -from .base import ProviderType, TokenEndpoint, TokenInfo +from .base import TokenInfo from .account import OutlookAccount logger = logging.getLogger(__name__) - -# 各提供者的 Scope 配置 -PROVIDER_SCOPES = { - ProviderType.IMAP_OLD: "", # 旧版 IMAP 不需要特定 scope - ProviderType.IMAP_NEW: "https://outlook.office.com/IMAP.AccessAsUser.All offline_access", - ProviderType.GRAPH_API: "https://graph.microsoft.com/.default", -} - -# 各提供者的 Token 端点 -PROVIDER_TOKEN_URLS = { - ProviderType.IMAP_OLD: TokenEndpoint.LIVE.value, - ProviderType.IMAP_NEW: TokenEndpoint.CONSUMERS.value, - ProviderType.GRAPH_API: TokenEndpoint.COMMON.value, -} +TOKEN_URL = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" +IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access" class TokenManager: """ Token 管理器 - 支持多端点 Token 获取和缓存 + 固定 consumers 端点,缓存 key = email """ - # Token 缓存: key = (email, provider_type) -> TokenInfo - _token_cache: Dict[tuple, TokenInfo] = {} + _token_cache: Dict[str, TokenInfo] = {} _cache_lock = threading.Lock() - # 默认超时时间 DEFAULT_TIMEOUT = 30 - # Token 刷新提前时间(秒) REFRESH_BUFFER = 120 def __init__( self, account: OutlookAccount, - provider_type: ProviderType, proxy_url: Optional[str] = None, timeout: int = DEFAULT_TIMEOUT, + service_id: Optional[int] = None, ): - """ - 初始化 Token 管理器 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - proxy_url: 代理 URL(可选) - timeout: 请求超时时间 - """ self.account = account - self.provider_type = provider_type self.proxy_url = proxy_url self.timeout = timeout + self.service_id = service_id - # 获取端点和 Scope - self.token_url = PROVIDER_TOKEN_URLS.get(provider_type, TokenEndpoint.LIVE.value) - self.scope = PROVIDER_SCOPES.get(provider_type, "") + def _cache_key(self) -> str: + return self.account.email.lower() def get_cached_token(self) -> Optional[TokenInfo]: - """获取缓存的 Token""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - token = self._token_cache.get(cache_key) + token = self._token_cache.get(self._cache_key()) if token and not token.is_expired(self.REFRESH_BUFFER): return token return None def set_cached_token(self, token: TokenInfo): - """缓存 Token""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - self._token_cache[cache_key] = token + self._token_cache[self._cache_key()] = token def clear_cache(self): - """清除缓存""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - self._token_cache.pop(cache_key, None) + self._token_cache.pop(self._cache_key(), None) def get_access_token(self, force_refresh: bool = False) -> Optional[str]: - """ - 获取 Access Token - - Args: - force_refresh: 是否强制刷新 - - Returns: - Access Token 字符串,失败返回 None - """ - # 检查缓存 if not force_refresh: cached = self.get_cached_token() if cached: - logger.debug(f"[{self.account.email}] 使用缓存的 Token ({self.provider_type.value})") + logger.debug(f"[{self.account.email}] 使用缓存 Token") return cached.access_token - # 刷新 Token try: token = self._refresh_token() if token: self.set_cached_token(token) return token.access_token except Exception as e: - logger.error(f"[{self.account.email}] 获取 Token 失败 ({self.provider_type.value}): {e}") + logger.error(f"[{self.account.email}] 获取 Token 失败: {e}") return None def _refresh_token(self) -> Optional[TokenInfo]: - """ - 刷新 Token - - Returns: - TokenInfo 对象,失败返回 None - """ if not self.account.client_id or not self.account.refresh_token: raise ValueError("缺少 client_id 或 refresh_token") - logger.debug(f"[{self.account.email}] 正在刷新 Token ({self.provider_type.value})...") - logger.debug(f"[{self.account.email}] Token URL: {self.token_url}") + logger.debug(f"[{self.account.email}] 正在刷新 Token...") - # 构建请求体 data = { "client_id": self.account.client_id, "refresh_token": self.account.refresh_token, "grant_type": "refresh_token", + "scope": IMAP_SCOPE, } - - # 添加 Scope(如果需要) - if self.scope: - data["scope"] = self.scope - headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", } - proxies = None if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} try: resp = _requests.post( - self.token_url, + TOKEN_URL, data=data, headers=headers, proxies=proxies, @@ -166,74 +111,56 @@ class TokenManager: ) if resp.status_code != 200: - error_body = resp.text + body = resp.text logger.error(f"[{self.account.email}] Token 刷新失败: HTTP {resp.status_code}") - logger.debug(f"[{self.account.email}] 错误响应: {error_body[:500]}") - - if "service abuse" in error_body.lower(): + if "service abuse" in body.lower(): logger.warning(f"[{self.account.email}] 账号可能被封禁") - elif "invalid_grant" in error_body.lower(): + elif "invalid_grant" in body.lower(): logger.warning(f"[{self.account.email}] Refresh Token 已失效") - return None response_data = resp.json() - - # 解析响应 - token = TokenInfo.from_response(response_data, self.scope) + token = TokenInfo.from_response(response_data, IMAP_SCOPE) logger.info( - f"[{self.account.email}] Token 刷新成功 ({self.provider_type.value}), " + f"[{self.account.email}] Token 刷新成功," f"有效期 {int(token.expires_at - time.time())} 秒" ) + + # 若响应含新 refresh_token → 写回内存 + 持久化数据库 + new_rt = response_data.get("refresh_token", "") + if new_rt and new_rt != self.account.refresh_token: + self.account.refresh_token = new_rt + if self.service_id: + try: + from ...database.session import get_session_manager + from ...database.crud import update_outlook_refresh_token + with get_session_manager().session_scope() as db: + update_outlook_refresh_token( + db, self.service_id, self.account.email, new_rt + ) + logger.info(f"[{self.account.email}] refresh_token 已写回数据库") + except Exception as e: + logger.warning(f"[{self.account.email}] 写回 refresh_token 失败: {e}") + return token except json.JSONDecodeError as e: logger.error(f"[{self.account.email}] JSON 解析错误: {e}") return None - except Exception as e: logger.error(f"[{self.account.email}] 未知错误: {e}") return None @classmethod def clear_all_cache(cls): - """清除所有 Token 缓存""" with cls._cache_lock: cls._token_cache.clear() logger.info("已清除所有 Token 缓存") @classmethod def get_cache_stats(cls) -> Dict[str, Any]: - """获取缓存统计""" with cls._cache_lock: return { "cache_size": len(cls._token_cache), - "entries": [ - { - "email": key[0], - "provider": key[1].value, - } - for key in cls._token_cache.keys() - ], + "entries": list(cls._token_cache.keys()), } - - -def create_token_manager( - account: OutlookAccount, - provider_type: ProviderType, - proxy_url: Optional[str] = None, - timeout: int = TokenManager.DEFAULT_TIMEOUT, -) -> TokenManager: - """ - 创建 Token 管理器的工厂函数 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - proxy_url: 代理 URL - timeout: 超时时间 - - Returns: - TokenManager 实例 - """ - return TokenManager(account, provider_type, proxy_url, timeout) diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index 6f5896f..c12ddf7 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -340,6 +340,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: if selected_service and selected_service.config: config = selected_service.config.copy() + config['service_id'] = selected_service.id crud.update_registration_task(db, task_uuid, email_service_id=selected_service.id) logger.info(f"使用数据库 Outlook 账户: {selected_service.name}") else: diff --git a/src/web/routes/settings.py b/src/web/routes/settings.py index d096fa4..f244914 100644 --- a/src/web/routes/settings.py +++ b/src/web/routes/settings.py @@ -705,7 +705,6 @@ async def get_outlook_settings(): return { "default_client_id": settings.outlook_default_client_id, - "provider_priority": settings.outlook_provider_priority, "health_failure_threshold": settings.outlook_health_failure_threshold, "health_disable_duration": settings.outlook_health_disable_duration, } diff --git a/src/web/task_manager.py b/src/web/task_manager.py index 31c620b..ed722d4 100644 --- a/src/web/task_manager.py +++ b/src/web/task_manager.py @@ -191,13 +191,23 @@ class TaskManager: return _log_queues.get(task_uuid, []).copy() def update_status(self, task_uuid: str, status: str, **kwargs): - """更新任务状态""" + """更新任务状态并推送到 WebSocket""" if task_uuid not in _task_status: _task_status[task_uuid] = {} _task_status[task_uuid]["status"] = status _task_status[task_uuid].update(kwargs) + # 推送状态变更到 WebSocket(线程安全,兼容同步线程调用) + if self._loop and self._loop.is_running(): + try: + asyncio.run_coroutine_threadsafe( + self.broadcast_status(task_uuid, status, **kwargs), + self._loop + ) + except Exception as e: + logger.warning(f"推送状态到 WebSocket 失败: {e}") + def get_status(self, task_uuid: str) -> Optional[dict]: """获取任务状态""" return _task_status.get(task_uuid)