diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index adc71d9..7c23038 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,13 +21,13 @@ jobs: include: - os: windows-latest artifact_name: codex-register.exe - asset_name: codex-register-windows-x64.exe + asset_name: codex-register-v2-windows-x64.exe - os: ubuntu-latest artifact_name: codex-register - asset_name: codex-register-linux-x64 + asset_name: codex-register-v2-linux-x64 - os: macos-latest artifact_name: codex-register - asset_name: codex-register-macos-arm64 + asset_name: codex-register-v2-macos-arm64 steps: - name: 检出代码 @@ -74,19 +74,19 @@ jobs: - name: 整理文件并打包 zip run: | mkdir -p release - # 为每个平台二进制文件打包成 zip - find dist/ -type f | while read f; do - name=$(basename "$f") - # 根据文件名确定平台标识 - case "$name" in - *windows*) platform=$(echo "$name" | sed 's/\.[^.]*$//') ;; - *linux*) platform="$name" ;; - *macos*) platform="$name" ;; - *) platform="$name" ;; - esac + # download-artifact@v4 将每个 artifact 放在 dist// 子目录下 + # 遍历子目录,用目录名作为平台标识(即 matrix.asset_name) + for artifact_dir in dist/*/; do + platform=$(basename "$artifact_dir") + # 找到该目录下的二进制文件(只有一个) + binary=$(find "$artifact_dir" -maxdepth 1 -type f | head -n1) + if [ -z "$binary" ]; then + echo "警告:$artifact_dir 下没有找到文件,跳过" + continue + fi tmpdir="tmp_${platform}" mkdir -p "$tmpdir" - cp "$f" "$tmpdir/" + cp "$binary" "$tmpdir/" cp README.md "$tmpdir/README.md" cp .env.example "$tmpdir/.env.example" [ -f LICENSE ] && cp LICENSE "$tmpdir/LICENSE" || true @@ -103,14 +103,14 @@ jobs: files: release/* generate_release_notes: true body: | - ## OpenAI 自动注册系统 v2 + ## OpenAI 账号管理系统 v2 ### 下载说明 | 平台 | 文件 | |------|------| - | Windows x64 | `codex-register-windows-x64.exe` | - | Linux x64 | `codex-register-linux-x64` | - | macOS ARM64 | `codex-register-macos-arm64` | + | Windows x64 | `codex-register-v2-windows-x64.exe` | + | Linux x64 | `codex-register-v2-linux-x64` | + | macOS ARM64 | `codex-register-v2-macos-arm64` | ### 使用方法 ```bash diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index 874d40b..4c4b290 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -64,5 +64,6 @@ jobs: push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + platforms: linux/amd64,linux/arm64 cache-from: type=gha cache-to: type=gha,mode=max \ No newline at end of file diff --git a/src/config/constants.py b/src/config/constants.py index b65bff6..1469c18 100644 --- a/src/config/constants.py +++ b/src/config/constants.py @@ -65,15 +65,19 @@ OPENAI_API_ENDPOINTS = { "signup": "https://auth.openai.com/api/accounts/authorize/continue", "register": "https://auth.openai.com/api/accounts/user/register", "send_otp": "https://auth.openai.com/api/accounts/email-otp/send", + "passwordless_send_otp": "https://auth.openai.com/api/accounts/passwordless/send-otp", "validate_otp": "https://auth.openai.com/api/accounts/email-otp/validate", "create_account": "https://auth.openai.com/api/accounts/create_account", + "add_phone" : "https://auth.openai.com/add-phone", "select_workspace": "https://auth.openai.com/api/accounts/workspace/select", + "password_verify" : "https://auth.openai.com/api/accounts/password/verify" } # OpenAI 页面类型(用于判断账号状态) OPENAI_PAGE_TYPES = { + "LOGIN_PASSWORD": "login_password", "EMAIL_OTP_VERIFICATION": "email_otp_verification", # 已注册账号,需要 OTP 验证 - "PASSWORD_REGISTRATION": "password", # 新账号,需要设置密码 + "PASSWORD_REGISTRATION": "create_account_password", # 新账号,需要设置密码 } # ============================================================================ @@ -378,20 +382,8 @@ MICROSOFT_TOKEN_ENDPOINTS = { } # IMAP 服务器配置 -OUTLOOK_IMAP_SERVERS = { - "OLD": "outlook.office365.com", # 旧版 IMAP - "NEW": "outlook.live.com", # 新版 IMAP -} +OUTLOOK_IMAP_SERVER = "outlook.live.com" +OUTLOOK_IMAP_PORT = 993 -# Microsoft OAuth2 Scopes -MICROSOFT_SCOPES = { - # 旧版 IMAP 不需要特定 scope - "IMAP_OLD": "", - # 新版 IMAP 需要的 scope - "IMAP_NEW": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access", - # Graph API 需要的 scope - "GRAPH_API": "https://graph.microsoft.com/.default", -} - -# Outlook 提供者默认优先级 -OUTLOOK_PROVIDER_PRIORITY = ["imap_new", "imap_old", "graph_api"] +# Microsoft OAuth2 Scope(IMAP_NEW) +OUTLOOK_IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access" diff --git a/src/config/settings.py b/src/config/settings.py index 2803afd..aac2b97 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -358,12 +358,6 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = { ), # Outlook 配置 - "outlook_provider_priority": SettingDefinition( - db_key="outlook.provider_priority", - default_value=["imap_old", "imap_new", "graph_api"], - category=SettingCategory.EMAIL, - description="Outlook 提供者优先级" - ), "outlook_health_failure_threshold": SettingDefinition( db_key="outlook.health_failure_threshold", default_value=5, @@ -382,6 +376,12 @@ SETTING_DEFINITIONS: Dict[str, SettingDefinition] = { category=SettingCategory.EMAIL, description="Outlook OAuth 默认 Client ID" ), + "outlook_use_idle": SettingDefinition( + db_key="outlook.use_idle", + default_value=True, + category=SettingCategory.EMAIL, + description="使用 IMAP IDLE 替代轮询获取验证码(降低延迟,默认开启)" + ), } # 属性名到数据库键名的映射(用于向后兼容) @@ -407,9 +407,9 @@ SETTING_TYPES: Dict[str, Type] = { "cpa_enabled": bool, "email_code_timeout": int, "email_code_poll_interval": int, - "outlook_provider_priority": list, "outlook_health_failure_threshold": int, "outlook_health_disable_duration": int, + "outlook_use_idle": bool, } # 需要作为 SecretStr 处理的字段 @@ -694,10 +694,10 @@ class Settings(BaseModel): email_code_poll_interval: int = 3 # Outlook 配置 - outlook_provider_priority: List[str] = ["imap_old", "imap_new", "graph_api"] outlook_health_failure_threshold: int = 5 outlook_health_disable_duration: int = 60 outlook_default_client_id: str = "24d9a0ed-8787-4584-883c-2fd79308940a" + outlook_use_idle: bool = True # 全局配置实例 diff --git a/src/core/__init__.py b/src/core/__init__.py index 7ec7c6f..a828d10 100644 --- a/src/core/__init__.py +++ b/src/core/__init__.py @@ -12,6 +12,7 @@ from .http_client import ( create_openai_client, ) from .register import RegistrationEngine, RegistrationResult +from .login import LoginEngine from .utils import setup_logging, get_data_dir __all__ = [ @@ -27,6 +28,7 @@ __all__ = [ 'create_openai_client', 'RegistrationEngine', 'RegistrationResult', + 'LoginEngine', 'setup_logging', 'get_data_dir', ] diff --git a/src/core/http_client.py b/src/core/http_client.py index 517dfc4..f3dd876 100644 --- a/src/core/http_client.py +++ b/src/core/http_client.py @@ -282,7 +282,7 @@ class OpenAIHTTPClient(HTTPClient): loc = loc_match.group(1) if loc_match else None # 检查是否支持 - if loc in ["CN", "HK", "MO", "TW"]: + if loc in ["CN", "HK", "MO"]: return False, loc return True, loc diff --git a/src/core/login.py b/src/core/login.py new file mode 100644 index 0000000..1520f77 --- /dev/null +++ b/src/core/login.py @@ -0,0 +1,466 @@ +""" +登录流程引擎 +从 register.py 中拆分的登录专属方法 +""" + +import urllib.parse +import base64 +import json as json_module +from datetime import datetime +from typing import Optional, Dict, Any + +from .register import RegistrationEngine, RegistrationResult +from ..config.constants import OPENAI_API_ENDPOINTS + + +class LoginEngine(RegistrationEngine): + """ + 登录引擎 + 继承 RegistrationEngine,包含登录流程专属方法: + - _follow_login_redirects + - _submit_login_form + - _send_verification_code_passwordless + - _get_workspace_id + - _select_workspace + - _follow_redirects + - _handle_oauth_callback + """ + + def _follow_login_redirects(self, start_url: str) -> bool: + """跟随重定向链,寻找回调 URL""" + try: + current_url = start_url + max_redirects = 6 + + for i in range(max_redirects): + self._log(f"重定向 {i+1}/{max_redirects}: {current_url[:100]}...") + + response = self.session.get( + current_url, + allow_redirects=False, + timeout=15 + ) + + location = response.headers.get("Location") or "" + + # 如果不是重定向状态码,停止 + if response.status_code == 200: + self._log(f"非重定向状态码: {response.status_code}") + return True + + if not location: + self._log("重定向响应缺少 Location 头") + break + + # 构建下一个 URL + next_url = urllib.parse.urljoin(current_url, location) + + # 检查是否包含回调参数 + if "code=" in next_url and "state=" in next_url: + self._log(f"找到回调 URL: {next_url[:100]}...") + + current_url = next_url + + self._log("未能在重定向链中找到最终 URL") + return False + + except Exception as e: + self._log(f"跟随重定向失败: {e}", "error") + return False + + def _submit_login_form(self, did: str, sen_token) -> bool: + """处理 免密登录""" + try: + self._log("处理免密登录...") + login_body = f'{{"username":{{"value":"{self.email}","kind":"email"}}}}' + headers = { + "referer": "https://auth.openai.com/log-in", + "accept": "application/json", + "content-type": "application/json", + } + + if sen_token: + sentinel = f'{{"p": "", "t": "", "c": "{sen_token}", "id": "{did}", "flow": "authorize_continue"}}' + headers["openai-sentinel-token"] = sentinel + + response = self.session.post( + OPENAI_API_ENDPOINTS["signup"], + headers=headers, + data=login_body, + ) + self._log(f"提交登录表单状态: {response.status_code}") + if response.status_code == 200: + return True + return False + + except Exception as e: + self._log(f"处理登录失败: {e}", "error") + return False + + def _send_verification_code_passwordless(self) -> bool: + """发送验证码""" + try: + import time + # 记录发送时间戳 + self._otp_sent_at = time.time() + response = self.session.post( + OPENAI_API_ENDPOINTS["passwordless_send_otp"], + headers={ + "referer": "https://auth.openai.com/log-in/password", + "accept": "application/json" + } + ) + + self._log(f"验证码发送状态: {response.status_code}") + return response.status_code == 200 + + except Exception as e: + self._log(f"发送验证码失败: {e}", "error") + return False + + def _get_workspace_id(self) -> Optional[str]: + """获取 Workspace ID""" + try: + auth_cookie = self.session.cookies.get("oai-client-auth-session") + if not auth_cookie: + self._log("未能获取到授权 Cookie", "error") + return None + + try: + segments = auth_cookie.split(".") + if len(segments) < 1: + self._log("授权 Cookie 格式错误", "error") + return None + + # 解码第一个 segment + payload = segments[0] + pad = "=" * ((4 - (len(payload) % 4)) % 4) + decoded = base64.urlsafe_b64decode((payload + pad).encode("ascii")) + auth_json = json_module.loads(decoded.decode("utf-8")) + + workspaces = auth_json.get("workspaces") or [] + if not workspaces: + self._log("授权 Cookie 里没有 workspace 信息", "error") + return None + + workspace_id = str((workspaces[0] or {}).get("id") or "").strip() + if not workspace_id: + self._log("无法解析 workspace_id", "error") + return None + + self._log(f"Workspace ID: {workspace_id}") + return workspace_id + + except Exception as e: + self._log(f"解析授权 Cookie 失败: {e}", "error") + return None + + except Exception as e: + self._log(f"获取 Workspace ID 失败: {e}", "error") + return None + + def _select_workspace(self, workspace_id: str) -> Optional[str]: + """选择 Workspace""" + try: + select_body = f'{{"workspace_id":"{workspace_id}"}}' + + response = self.session.post( + OPENAI_API_ENDPOINTS["select_workspace"], + headers={ + "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", + "content-type": "application/json", + }, + data=select_body, + ) + + if response.status_code != 200: + self._log(f"选择 workspace 失败: {response.status_code}", "error") + self._log(f"响应: {response.text[:200]}", "warning") + return None + + continue_url = str((response.json() or {}).get("continue_url") or "").strip() + if not continue_url: + self._log("workspace/select 响应里缺少 continue_url", "error") + return None + + self._log(f"Continue URL: {continue_url[:100]}...") + return continue_url + + except Exception as e: + self._log(f"选择 Workspace 失败: {e}", "error") + return None + + def _follow_redirects(self, start_url: str) -> Optional[str]: + """跟随重定向链,寻找回调 URL""" + try: + current_url = start_url + max_redirects = 6 + + for i in range(max_redirects): + self._log(f"重定向 {i+1}/{max_redirects}: {current_url[:100]}...") + + response = self.session.get( + current_url, + allow_redirects=False, + timeout=15 + ) + + location = response.headers.get("Location") or "" + + # 如果不是重定向状态码,停止 + if response.status_code not in [301, 302, 303, 307, 308]: + self._log(f"非重定向状态码: {response.status_code}") + break + + if not location: + self._log("重定向响应缺少 Location 头") + break + + # 构建下一个 URL + next_url = urllib.parse.urljoin(current_url, location) + + # 检查是否包含回调参数 + if "code=" in next_url and "state=" in next_url: + self._log(f"找到回调 URL: {next_url[:100]}...") + return next_url + + current_url = next_url + + self._log("未能在重定向链中找到回调 URL", "error") + return None + + except Exception as e: + self._log(f"跟随重定向失败: {e}", "error") + return None + + def _handle_oauth_callback(self, callback_url: str) -> Optional[Dict[str, Any]]: + """处理 OAuth 回调""" + try: + if not self.oauth_start: + self._log("OAuth 流程未初始化", "error") + return None + + self._log("处理 OAuth 回调...") + token_info = self.oauth_manager.handle_callback( + callback_url=callback_url, + expected_state=self.oauth_start.state, + code_verifier=self.oauth_start.code_verifier + ) + + self._log("OAuth 授权成功") + return token_info + + except Exception as e: + self._log(f"处理 OAuth 回调失败: {e}", "error") + return None + + def run(self) -> RegistrationResult: + """ + 执行完整的注册流程 + + 支持已注册账号自动登录: + - 如果检测到邮箱已注册,自动切换到登录流程 + - 已注册账号跳过:设置密码、发送验证码、创建用户账户 + - 共用步骤:获取验证码、验证验证码、Workspace 和 OAuth 回调 + + Returns: + RegistrationResult: 注册结果 + """ + result = RegistrationResult(success=False, logs=self.logs) + + try: + self._log("=" * 60) + self._log("开始注册流程") + self._log("=" * 60) + + # 1. 检查 IP 地理位置 + self._log("1. 检查 IP 地理位置...") + ip_ok, location = self._check_ip_location() + if not ip_ok: + result.error_message = f"IP 地理位置不支持: {location}" + self._log(f"IP 检查失败: {location}", "error") + return result + + self._log(f"IP 位置: {location}") + + # 2. 创建邮箱 + self._log("2. 创建邮箱...") + if not self._create_email(): + result.error_message = "创建邮箱失败" + return result + + result.email = self.email + + # 3. 初始化会话 + self._log("3. 初始化会话...") + if not self._init_session(): + result.error_message = "初始化会话失败" + return result + + # 4. 开始 OAuth 流程 + self._log("4. 开始 OAuth 授权流程...") + if not self._start_oauth(): + result.error_message = "开始 OAuth 流程失败" + return result + + # 5. 获取 Device ID + self._log("5. 获取 Device ID...") + did = self._get_device_id() + if not did: + result.error_message = "获取 Device ID 失败" + return result + + # 6. 检查 Sentinel 拦截 + self._log("6. 检查 Sentinel 拦截...") + sen_token = self._check_sentinel(did) + if sen_token: + self._log("Sentinel 检查通过") + else: + self._log("Sentinel 检查失败或未启用", "warning") + + # 7. 提交注册表单 + 解析响应判断账号状态 + self._log("7. 提交注册表单...") + signup_result = self._submit_signup_form(did, sen_token) + if not signup_result.success: + result.error_message = f"提交注册表单失败: {signup_result.error_message}" + return result + + # 8. 检测到已注册账号 → 直接终止任务 + if self._is_existing_account: + self._log(f"8. 邮箱 {self.email} 在 OpenAI 已注册,跳过注册流程", "warning") + result.error_message = f"邮箱 {self.email} 已在 OpenAI 注册" + return result + else: + self._log("8. 注册密码...") + password_ok, password = self._register_password() + if not password_ok: + result.error_message = "注册密码失败" + return result + + # 9. 发送验证码 + self._log("9. 发送验证码...") + if not self._send_verification_code(): + result.error_message = "发送验证码失败" + return result + + # 10. 获取验证码(超时后重发一次) + self._log("10. 等待验证码...") + code = self._get_verification_code() + if not code: + self._log("10. 验证码超时,重新发送...") + if self._send_verification_code(): + code = self._get_verification_code() + if not code: + result.error_message = "获取验证码失败" + return result + + # 11. 验证验证码 + self._log("11. 验证验证码...") + if not self._validate_verification_code(code): + result.error_message = "验证验证码失败" + return result + + # 12. 创建用户账户 + self._log("12. 创建用户账户...") + if not self._create_user_account(): + result.error_message = "创建用户账户失败" + return result + + self._log("13-1. 结束注册,启用登录流程...") + if not self._follow_login_redirects(self.oauth_start.auth_url): + result.error_message = "跟随重定向链失败" + return result + + self._log("13-2. 提交登陆表单") + if not self._submit_login_form(did, sen_token): + result.error_message = "提交登陆表单失败" + return result + + self._log("14. 发送验证码...") + if not self._send_verification_code_passwordless(): + result.error_message = "发送验证码失败" + return result + + self._log("15. 等待验证码...") + code = self._get_verification_code() + if not code: + self._log("15. 验证码超时,重新发送...") + if self._send_verification_code_passwordless(): + code = self._get_verification_code() + if not code: + result.error_message = "获取验证码失败" + return result + + self._log("16. 验证验证码...") + if not self._validate_verification_code(code): + result.error_message = "验证验证码失败" + return result + + # 13. 获取 Workspace ID + self._log("17. 获取 Workspace ID...") + workspace_id = self._get_workspace_id() + if not workspace_id: + result.error_message = "获取 Workspace ID 失败" + return result + + result.workspace_id = workspace_id + + # 14. 选择 Workspace + self._log("18. 选择 Workspace...") + continue_url = self._select_workspace(workspace_id) + if not continue_url: + result.error_message = "选择 Workspace 失败" + return result + + # 15. 跟随重定向链 + self._log("19. 跟随重定向链...") + callback_url = self._follow_redirects(continue_url) + if not callback_url: + result.error_message = "跟随重定向链失败" + return result + + # 16. 处理 OAuth 回调 + self._log("20. 处理 OAuth 回调...") + token_info = self._handle_oauth_callback(callback_url) + if not token_info: + result.error_message = "处理 OAuth 回调失败" + return result + + # 提取账户信息 + result.account_id = token_info.get("account_id", "") + result.access_token = token_info.get("access_token", "") + result.refresh_token = token_info.get("refresh_token", "") + result.id_token = token_info.get("id_token", "") + result.password = self.password or "" # 保存密码(已注册账号为空) + + # 设置来源标记 + result.source = "register" + + # 尝试获取 session_token 从 cookie + session_cookie = self.session.cookies.get("__Secure-next-auth.session-token") + if session_cookie: + self.session_token = session_cookie + result.session_token = session_cookie + self._log(f"获取到 Session Token") + + # 17. 完成 + self._log("=" * 60) + self._log("注册成功!") + self._log(f"邮箱: {result.email}") + self._log(f"Account ID: {result.account_id}") + self._log(f"Workspace ID: {result.workspace_id}") + self._log("=" * 60) + + result.success = True + result.metadata = { + "email_service": self.email_service.service_type.value, + "proxy_used": self.proxy_url, + "registered_at": datetime.now().isoformat(), + } + + return result + + except Exception as e: + self._log(f"注册过程中发生未预期错误: {e}", "error") + result.error_message = str(e) + return result diff --git a/src/core/register.py b/src/core/register.py index 41549a0..431e093 100644 --- a/src/core/register.py +++ b/src/core/register.py @@ -12,7 +12,6 @@ import string from typing import Optional, Dict, Any, Tuple, Callable from dataclasses import dataclass from datetime import datetime - from curl_cffi import requests as cffi_requests from .openai.oauth import OAuthManager, OAuthStart @@ -155,9 +154,9 @@ class RegistrationEngine: logger.warning(f"记录任务日志失败: {e}") # 根据级别记录到日志系统 - if level == "error": + if level == 'error': logger.error(message) - elif level == "warning": + elif level == 'warning': logger.warning(message) else: logger.info(message) @@ -171,7 +170,7 @@ class RegistrationEngine: try: return self.http_client.check_ip_location() except Exception as e: - self._log(f"检查 IP 地理位置失败: {e}", "error") + self._log(f"检查 IP 地理位置失败: {e}", 'error') return False, None def _create_email(self) -> bool: @@ -181,7 +180,7 @@ class RegistrationEngine: self.email_info = self.email_service.create_email() if not self.email_info or "email" not in self.email_info: - self._log("创建邮箱失败: 返回信息不完整", "error") + self._log("创建邮箱失败: 返回信息不完整", 'error') return False self.email = self.email_info["email"] @@ -189,7 +188,7 @@ class RegistrationEngine: return True except Exception as e: - self._log(f"创建邮箱失败: {e}", "error") + self._log(f"创建邮箱失败: {e}", 'error') return False def _start_oauth(self) -> bool: @@ -200,7 +199,7 @@ class RegistrationEngine: self._log(f"OAuth URL 已生成: {self.oauth_start.auth_url[:80]}...") return True except Exception as e: - self._log(f"生成 OAuth URL 失败: {e}", "error") + self._log(f"生成 OAuth URL 失败: {e}", 'error') return False def _init_session(self) -> bool: @@ -209,7 +208,7 @@ class RegistrationEngine: self.session = self.http_client.session return True except Exception as e: - self._log(f"初始化会话失败: {e}", "error") + self._log(f"初始化会话失败: {e}", 'error') return False def _get_device_id(self) -> Optional[str]: @@ -235,12 +234,12 @@ class RegistrationEngine: self._log( f"获取 Device ID 失败: 未返回 oai-did Cookie (HTTP {response.status_code}, 第 {attempt}/{max_attempts} 次)", - "warning" if attempt < max_attempts else "error" + 'warning' if attempt < max_attempts else 'error' ) except Exception as e: self._log( f"获取 Device ID 失败: {e} (第 {attempt}/{max_attempts} 次)", - "warning" if attempt < max_attempts else "error" + 'warning' if attempt < max_attempts else 'error' ) if attempt < max_attempts: @@ -270,11 +269,11 @@ class RegistrationEngine: self._log(f"Sentinel token 获取成功") return sen_token else: - self._log(f"Sentinel 检查失败: {response.status_code}", "warning") + self._log(f"Sentinel 检查失败: {response.status_code}", 'warning') return None except Exception as e: - self._log(f"Sentinel 检查异常: {e}", "warning") + self._log(f"Sentinel 检查异常: {e}", 'warning') return None def _submit_signup_form(self, did: str, sen_token: Optional[str]) -> SignupFormResult: @@ -286,6 +285,7 @@ class RegistrationEngine: """ try: signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"signup"}}' + #signup_body = f'{{"username":{{"value":"{self.email}","kind":"email"}},"screen_hint":"login_or_signup"}}' headers = { "referer": "https://auth.openai.com/create-account", @@ -318,7 +318,7 @@ class RegistrationEngine: self._log(f"响应页面类型: {page_type}") # 判断是否为已注册账号 - is_existing = page_type == OPENAI_PAGE_TYPES["EMAIL_OTP_VERIFICATION"] + is_existing = page_type != OPENAI_PAGE_TYPES["PASSWORD_REGISTRATION"] if is_existing: self._log(f"检测到已注册账号,将自动切换到登录流程") @@ -332,12 +332,12 @@ class RegistrationEngine: ) except Exception as parse_error: - self._log(f"解析响应失败: {parse_error}", "warning") + self._log(f"解析响应失败: {parse_error}", 'warning') # 无法解析,默认成功 return SignupFormResult(success=True) except Exception as e: - self._log(f"提交注册表单失败: {e}", "error") + self._log(f"提交注册表单失败: {e}", 'error') return SignupFormResult(success=False, error_message=str(e)) def _register_password(self) -> Tuple[bool, Optional[str]]: @@ -367,18 +367,17 @@ class RegistrationEngine: self._log(f"提交密码状态: {response.status_code}") if response.status_code != 200: - error_text = response.text[:500] - self._log(f"密码注册失败: {error_text}", "warning") + self._log(f"密码注册失败: {response.json().get('error', {}).get('message', "")}", 'warning') # 解析错误信息,判断是否是邮箱已注册 try: error_json = response.json() - error_msg = error_json.get("error", {}).get("message", "") - error_code = error_json.get("error", {}).get("code", "") + error_msg = error_json.get('error', {}).get('message', "") + error_code = error_json.get('error', {}).get('code', "") # 检测邮箱已注册的情况 - if "already" in error_msg.lower() or "exists" in error_msg.lower() or error_code == "user_exists": - self._log(f"邮箱 {self.email} 可能已在 OpenAI 注册过", "error") + if "already" in error_msg.lower() or "exists" in error_msg.lower() or "again" in error_msg.lower() or error_code == "user_exists" or error_code == "invalid_request": + self._log(f"邮箱 {self.email} 可能已在 OpenAI 注册过", 'error') # 标记此邮箱为已注册状态 self._mark_email_as_registered() except Exception: @@ -389,27 +388,38 @@ class RegistrationEngine: return True, password except Exception as e: - self._log(f"密码注册失败: {e}", "error") + self._log(f"密码注册失败: {e}", 'error') return False, None + + def _mark_email_as_registered(self): - """标记邮箱为已注册状态(用于防止重复尝试)""" + """标记邮箱为已注册状态(OpenAI 侧已存在该账号)""" try: with get_db() as db: - # 检查是否已存在该邮箱的记录 existing = crud.get_account_by_email(db, self.email) - if not existing: - # 创建一个失败记录,标记该邮箱已注册过 + if existing: + # 数据库中已有该账号,更新 extra_data 标记 + extra = existing.extra_data or {} + extra["openai_already_registered"] = True + crud.update_account( + db, + existing.id, + extra_data=extra + ) + self._log(f"已更新数据库中 {self.email} 的已注册标记") + else: + # 数据库中不存在,创建失败记录并标记 crud.create_account( db, email=self.email, - password="", # 空密码表示未成功注册 + password="", email_service=self.email_service.service_type.value, email_service_id=self.email_info.get("service_id") if self.email_info else None, status="failed", - extra_data={"register_failed_reason": "email_already_registered_on_openai"} + extra_data={"openai_already_registered": True} ) - self._log(f"已在数据库中标记邮箱 {self.email} 为已注册状态") + self._log(f"已在数据库中标记 {self.email} 为已注册状态") except Exception as e: logger.warning(f"标记邮箱状态失败: {e}") @@ -431,7 +441,7 @@ class RegistrationEngine: return response.status_code == 200 except Exception as e: - self._log(f"发送验证码失败: {e}", "error") + self._log(f"发送验证码失败: {e}", 'error') return False def _get_verification_code(self) -> Optional[str]: @@ -452,11 +462,11 @@ class RegistrationEngine: self._log(f"成功获取验证码: {code}") return code else: - self._log("等待验证码超时", "error") + self._log("等待验证码超时", 'error') return None except Exception as e: - self._log(f"获取验证码失败: {e}", "error") + self._log(f"获取验证码失败: {e}", 'error') return None def _validate_verification_code(self, code: str) -> bool: @@ -478,7 +488,7 @@ class RegistrationEngine: return response.status_code == 200 except Exception as e: - self._log(f"验证验证码失败: {e}", "error") + self._log(f"验证验证码失败: {e}", 'error') return False def _create_user_account(self) -> bool: @@ -501,342 +511,27 @@ class RegistrationEngine: self._log(f"账户创建状态: {response.status_code}") if response.status_code != 200: - self._log(f"账户创建失败: {response.text[:200]}", "warning") + self._log(f"账户创建失败: {response.text[:200]}", 'warning') return False return True except Exception as e: - self._log(f"创建账户失败: {e}", "error") + self._log(f"创建账户失败: {e}", 'error') return False - def _get_workspace_id(self) -> Optional[str]: - """获取 Workspace ID""" - try: - auth_cookie = self.session.cookies.get("oai-client-auth-session") - if not auth_cookie: - self._log("未能获取到授权 Cookie", "error") - return None - - # 解码 JWT - import base64 - import json as json_module - - try: - segments = auth_cookie.split(".") - if len(segments) < 1: - self._log("授权 Cookie 格式错误", "error") - return None - - # 解码第一个 segment - payload = segments[0] - pad = "=" * ((4 - (len(payload) % 4)) % 4) - decoded = base64.urlsafe_b64decode((payload + pad).encode("ascii")) - auth_json = json_module.loads(decoded.decode("utf-8")) - - workspaces = auth_json.get("workspaces") or [] - if not workspaces: - self._log("授权 Cookie 里没有 workspace 信息", "error") - return None - - workspace_id = str((workspaces[0] or {}).get("id") or "").strip() - if not workspace_id: - self._log("无法解析 workspace_id", "error") - return None - - self._log(f"Workspace ID: {workspace_id}") - return workspace_id - - except Exception as e: - self._log(f"解析授权 Cookie 失败: {e}", "error") - return None - - except Exception as e: - self._log(f"获取 Workspace ID 失败: {e}", "error") - return None - - def _select_workspace(self, workspace_id: str) -> Optional[str]: - """选择 Workspace""" - try: - select_body = f'{{"workspace_id":"{workspace_id}"}}' - - response = self.session.post( - OPENAI_API_ENDPOINTS["select_workspace"], - headers={ - "referer": "https://auth.openai.com/sign-in-with-chatgpt/codex/consent", - "content-type": "application/json", - }, - data=select_body, - ) - - if response.status_code != 200: - self._log(f"选择 workspace 失败: {response.status_code}", "error") - self._log(f"响应: {response.text[:200]}", "warning") - return None - - continue_url = str((response.json() or {}).get("continue_url") or "").strip() - if not continue_url: - self._log("workspace/select 响应里缺少 continue_url", "error") - return None - - self._log(f"Continue URL: {continue_url[:100]}...") - return continue_url - - except Exception as e: - self._log(f"选择 Workspace 失败: {e}", "error") - return None - - def _follow_redirects(self, start_url: str) -> Optional[str]: - """跟随重定向链,寻找回调 URL""" - try: - current_url = start_url - max_redirects = 6 - - for i in range(max_redirects): - self._log(f"重定向 {i+1}/{max_redirects}: {current_url[:100]}...") - - response = self.session.get( - current_url, - allow_redirects=False, - timeout=15 - ) - - location = response.headers.get("Location") or "" - - # 如果不是重定向状态码,停止 - if response.status_code not in [301, 302, 303, 307, 308]: - self._log(f"非重定向状态码: {response.status_code}") - break - - if not location: - self._log("重定向响应缺少 Location 头") - break - - # 构建下一个 URL - import urllib.parse - next_url = urllib.parse.urljoin(current_url, location) - - # 检查是否包含回调参数 - if "code=" in next_url and "state=" in next_url: - self._log(f"找到回调 URL: {next_url[:100]}...") - return next_url - - current_url = next_url - - self._log("未能在重定向链中找到回调 URL", "error") - return None - - except Exception as e: - self._log(f"跟随重定向失败: {e}", "error") - return None - - def _handle_oauth_callback(self, callback_url: str) -> Optional[Dict[str, Any]]: - """处理 OAuth 回调""" - try: - if not self.oauth_start: - self._log("OAuth 流程未初始化", "error") - return None - - self._log("处理 OAuth 回调...") - token_info = self.oauth_manager.handle_callback( - callback_url=callback_url, - expected_state=self.oauth_start.state, - code_verifier=self.oauth_start.code_verifier - ) - - self._log("OAuth 授权成功") - return token_info - - except Exception as e: - self._log(f"处理 OAuth 回调失败: {e}", "error") - return None - - def run(self) -> RegistrationResult: - """ - 执行完整的注册流程 - - 支持已注册账号自动登录: - - 如果检测到邮箱已注册,自动切换到登录流程 - - 已注册账号跳过:设置密码、发送验证码、创建用户账户 - - 共用步骤:获取验证码、验证验证码、Workspace 和 OAuth 回调 - - Returns: - RegistrationResult: 注册结果 - """ - result = RegistrationResult(success=False, logs=self.logs) - - try: - self._log("=" * 60) - self._log("开始注册流程") - self._log("=" * 60) - - # 1. 检查 IP 地理位置 - self._log("1. 检查 IP 地理位置...") - ip_ok, location = self._check_ip_location() - if not ip_ok: - result.error_message = f"IP 地理位置不支持: {location}" - self._log(f"IP 检查失败: {location}", "error") - return result - - self._log(f"IP 位置: {location}") - - # 2. 创建邮箱 - self._log("2. 创建邮箱...") - if not self._create_email(): - result.error_message = "创建邮箱失败" - return result - - result.email = self.email - - # 3. 初始化会话 - self._log("3. 初始化会话...") - if not self._init_session(): - result.error_message = "初始化会话失败" - return result - - # 4. 开始 OAuth 流程 - self._log("4. 开始 OAuth 授权流程...") - if not self._start_oauth(): - result.error_message = "开始 OAuth 流程失败" - return result - - # 5. 获取 Device ID - self._log("5. 获取 Device ID...") - did = self._get_device_id() - if not did: - result.error_message = "获取 Device ID 失败" - return result - - # 6. 检查 Sentinel 拦截 - self._log("6. 检查 Sentinel 拦截...") - sen_token = self._check_sentinel(did) - if sen_token: - self._log("Sentinel 检查通过") - else: - self._log("Sentinel 检查失败或未启用", "warning") - - # 7. 提交注册表单 + 解析响应判断账号状态 - self._log("7. 提交注册表单...") - signup_result = self._submit_signup_form(did, sen_token) - if not signup_result.success: - result.error_message = f"提交注册表单失败: {signup_result.error_message}" - return result - - # 8. [已注册账号跳过] 注册密码 - if self._is_existing_account: - self._log("8. [已注册账号] 跳过密码设置,OTP 已自动发送") - else: - self._log("8. 注册密码...") - password_ok, password = self._register_password() - if not password_ok: - result.error_message = "注册密码失败" - return result - - # 9. [已注册账号跳过] 发送验证码 - if self._is_existing_account: - self._log("9. [已注册账号] 跳过发送验证码,使用自动发送的 OTP") - # 已注册账号的 OTP 在提交表单时已自动发送,记录时间戳 - self._otp_sent_at = time.time() - else: - self._log("9. 发送验证码...") - if not self._send_verification_code(): - result.error_message = "发送验证码失败" - return result - - # 10. 获取验证码 - self._log("10. 等待验证码...") - code = self._get_verification_code() - if not code: - result.error_message = "获取验证码失败" - return result - - # 11. 验证验证码 - self._log("11. 验证验证码...") - if not self._validate_verification_code(code): - result.error_message = "验证验证码失败" - return result - - # 12. [已注册账号跳过] 创建用户账户 - if self._is_existing_account: - self._log("12. [已注册账号] 跳过创建用户账户") - else: - self._log("12. 创建用户账户...") - if not self._create_user_account(): - result.error_message = "创建用户账户失败" - return result - - # 13. 获取 Workspace ID - self._log("13. 获取 Workspace ID...") - workspace_id = self._get_workspace_id() - if not workspace_id: - result.error_message = "获取 Workspace ID 失败" - return result - - result.workspace_id = workspace_id - - # 14. 选择 Workspace - self._log("14. 选择 Workspace...") - continue_url = self._select_workspace(workspace_id) - if not continue_url: - result.error_message = "选择 Workspace 失败" - return result - - # 15. 跟随重定向链 - self._log("15. 跟随重定向链...") - callback_url = self._follow_redirects(continue_url) - if not callback_url: - result.error_message = "跟随重定向链失败" - return result - - # 16. 处理 OAuth 回调 - self._log("16. 处理 OAuth 回调...") - token_info = self._handle_oauth_callback(callback_url) - if not token_info: - result.error_message = "处理 OAuth 回调失败" - return result - - # 提取账户信息 - result.account_id = token_info.get("account_id", "") - result.access_token = token_info.get("access_token", "") - result.refresh_token = token_info.get("refresh_token", "") - result.id_token = token_info.get("id_token", "") - result.password = self.password or "" # 保存密码(已注册账号为空) - - # 设置来源标记 - result.source = "login" if self._is_existing_account else "register" - - # 尝试获取 session_token 从 cookie - session_cookie = self.session.cookies.get("__Secure-next-auth.session-token") - if session_cookie: - self.session_token = session_cookie - result.session_token = session_cookie - self._log(f"获取到 Session Token") - - # 17. 完成 - self._log("=" * 60) - if self._is_existing_account: - self._log("登录成功! (已注册账号)") - else: - self._log("注册成功!") - self._log(f"邮箱: {result.email}") - self._log(f"Account ID: {result.account_id}") - self._log(f"Workspace ID: {result.workspace_id}") - self._log("=" * 60) - - result.success = True - result.metadata = { - "email_service": self.email_service.service_type.value, - "proxy_used": self.proxy_url, - "registered_at": datetime.now().isoformat(), - "is_existing_account": self._is_existing_account, - } - - return result - - except Exception as e: - self._log(f"注册过程中发生未预期错误: {e}", "error") - result.error_message = str(e) - return result + def _add_phone(self) -> bool: + """获取 手机验证码""" + phone_body = f'{{"code":"{code}"}}' + response = self.session.post( + OPENAI_API_ENDPOINTS["add_phone"], + headers={ + "referer": "https://auth.openai.com/api/accounts/create_account", + "accept": "application/json", + "content-type": "application/json", + }, + data=create_account_body, + ) def save_to_database(self, result: RegistrationResult) -> bool: """ @@ -879,5 +574,5 @@ class RegistrationEngine: return True except Exception as e: - self._log(f"保存到数据库失败: {e}", "error") + self._log(f"保存到数据库失败: {e}", 'error') return False diff --git a/src/core/utils.py b/src/core/utils.py index 80ab61f..0e1ecc3 100644 --- a/src/core/utils.py +++ b/src/core/utils.py @@ -15,6 +15,7 @@ import base64 import re import uuid from datetime import datetime, timedelta +from html.parser import HTMLParser from typing import Any, Dict, List, Optional, Union, Callable from pathlib import Path @@ -568,3 +569,49 @@ class Timer: if self.start_time is not None: return time.time() - self.start_time return 0.0 + +class BootstrapExtractor(HTMLParser): + """内部解析器,专门提取 id="client-bootstrap" 的 script 内容""" + def __init__(self): + super().__init__() + self._in_target = False + self.json_text = None + + def handle_starttag(self, tag, attrs): + if tag == 'script': + attrs_dict = dict(attrs) + if attrs_dict.get('id') == 'client-bootstrap': + self._in_target = True + + def handle_endtag(self, tag): + if tag == 'script' and self._in_target: + self._in_target = False + + def handle_data(self, data): + if self._in_target and self.json_text is None: + self.json_text = data.strip() + +def extract_client_bootstrap_json(html: str): + """ + 从 HTML 字符串中提取 id="client-bootstrap" 的 script 标签内容并解析为 JSON。 + 返回 dict 或 None(未找到或解析失败)。 + """ + parser = BootstrapExtractor() + parser.feed(html) + if parser.json_text: + try: + return json.loads(parser.json_text) + except json.JSONDecodeError: + return None + return None + +def base64_payload_decode(payload_b64): + import base64 + import json as json_module + padding = 4 - (len(payload_b64) % 4) + if padding != 4: + payload_b64 += '=' * padding + + # 解码(Base64URL 使用 - 和 _ 替代 + 和 /) + payload_bytes = base64.urlsafe_b64decode(payload_b64) + return json_module.loads(payload_bytes) diff --git a/src/database/crud.py b/src/database/crud.py index c1beb91..49aaba1 100644 --- a/src/database/crud.py +++ b/src/database/crud.py @@ -721,3 +721,19 @@ def delete_tm_service(db: Session, service_id: int) -> bool: db.delete(svc) db.commit() return True + +def update_outlook_refresh_token(db: Session, service_id: int, email: str, new_refresh_token: str): + """更新 EmailService.config 中指定邮箱的 refresh_token""" + service = db.query(EmailService).filter(EmailService.id == service_id).first() + if not service or not service.config: + return + config = dict(service.config) + # 单账户格式 + if config.get("email", "").lower() == email.lower(): + config["refresh_token"] = new_refresh_token + # 多账户列表格式 + for acc in config.get("accounts", []): + if acc.get("email", "").lower() == email.lower(): + acc["refresh_token"] = new_refresh_token + service.config = config + db.commit() \ No newline at end of file diff --git a/src/services/__init__.py b/src/services/__init__.py index ad29d3e..d2312da 100644 --- a/src/services/__init__.py +++ b/src/services/__init__.py @@ -38,9 +38,7 @@ from .outlook.base import ( from .outlook.account import OutlookAccount from .outlook.providers import ( OutlookProvider, - IMAPOldProvider, IMAPNewProvider, - GraphAPIProvider, ) __all__ = [ @@ -67,7 +65,5 @@ __all__ = [ 'ProviderStatus', 'OutlookAccount', 'OutlookProvider', - 'IMAPOldProvider', 'IMAPNewProvider', - 'GraphAPIProvider', ] diff --git a/src/services/outlook/account.py b/src/services/outlook/account.py index 6f427d5..0ecff31 100644 --- a/src/services/outlook/account.py +++ b/src/services/outlook/account.py @@ -3,7 +3,7 @@ Outlook 账户数据类 """ from dataclasses import dataclass -from typing import Dict, Any, Optional +from typing import Dict, Any @dataclass @@ -16,7 +16,6 @@ class OutlookAccount: @classmethod def from_config(cls, config: Dict[str, Any]) -> "OutlookAccount": - """从配置创建账户""" return cls( email=config.get("email", ""), password=config.get("password", ""), @@ -25,15 +24,12 @@ class OutlookAccount: ) def has_oauth(self) -> bool: - """是否支持 OAuth2""" return bool(self.client_id and self.refresh_token) def validate(self) -> bool: - """验证账户信息是否有效""" return bool(self.email and self.password) or self.has_oauth() def to_dict(self, include_sensitive: bool = False) -> Dict[str, Any]: - """转换为字典""" result = { "email": self.email, "has_oauth": self.has_oauth(), @@ -47,5 +43,4 @@ class OutlookAccount: return result def __str__(self) -> str: - """字符串表示""" return f"OutlookAccount({self.email})" diff --git a/src/services/outlook/base.py b/src/services/outlook/base.py index 335b11e..7bc294d 100644 --- a/src/services/outlook/base.py +++ b/src/services/outlook/base.py @@ -1,6 +1,5 @@ """ -Outlook 服务基础定义 -包含枚举类型和数据类 +Outlook 邮箱服务基础定义 """ from dataclasses import dataclass, field @@ -10,49 +9,38 @@ from typing import Optional, Dict, Any, List class ProviderType(str, Enum): - """Outlook 提供者类型""" - IMAP_OLD = "imap_old" # 旧版 IMAP (outlook.office365.com) - IMAP_NEW = "imap_new" # 新版 IMAP (outlook.live.com) - GRAPH_API = "graph_api" # Microsoft Graph API + """Outlook 提供者类型(仅 IMAP_NEW)""" + IMAP_NEW = "imap_new" class TokenEndpoint(str, Enum): """Token 端点""" - LIVE = "https://login.live.com/oauth20_token.srf" CONSUMERS = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" - COMMON = "https://login.microsoftonline.com/common/oauth2/v2.0/token" - - -class IMAPServer(str, Enum): - """IMAP 服务器""" - OLD = "outlook.office365.com" - NEW = "outlook.live.com" class ProviderStatus(str, Enum): """提供者状态""" - HEALTHY = "healthy" # 健康 - DEGRADED = "degraded" # 降级 - DISABLED = "disabled" # 禁用 + HEALTHY = "healthy" + DEGRADED = "degraded" + DISABLED = "disabled" @dataclass class EmailMessage: """邮件消息数据类""" - id: str # 消息 ID - subject: str # 主题 - sender: str # 发件人 - recipients: List[str] = field(default_factory=list) # 收件人列表 - body: str = "" # 正文内容 - body_preview: str = "" # 正文预览 - received_at: Optional[datetime] = None # 接收时间 - received_timestamp: int = 0 # 接收时间戳 - is_read: bool = False # 是否已读 - has_attachments: bool = False # 是否有附件 - raw_data: Optional[bytes] = None # 原始数据(用于调试) + id: str + subject: str + sender: str + recipients: List[str] = field(default_factory=list) + body: str = "" + body_preview: str = "" + received_at: Optional[datetime] = None + received_timestamp: int = 0 + is_read: bool = False + has_attachments: bool = False + raw_data: Optional[bytes] = None def to_dict(self) -> Dict[str, Any]: - """转换为字典""" return { "id": self.id, "subject": self.subject, @@ -71,19 +59,17 @@ class EmailMessage: class TokenInfo: """Token 信息数据类""" access_token: str - expires_at: float # 过期时间戳 + expires_at: float token_type: str = "Bearer" scope: str = "" refresh_token: Optional[str] = None def is_expired(self, buffer_seconds: int = 120) -> bool: - """检查 Token 是否已过期""" import time return time.time() >= (self.expires_at - buffer_seconds) @classmethod def from_response(cls, data: Dict[str, Any], scope: str = "") -> "TokenInfo": - """从 API 响应创建""" import time return cls( access_token=data.get("access_token", ""), @@ -99,49 +85,42 @@ class ProviderHealth: """提供者健康状态""" provider_type: ProviderType status: ProviderStatus = ProviderStatus.HEALTHY - failure_count: int = 0 # 连续失败次数 - last_success: Optional[datetime] = None # 最后成功时间 - last_failure: Optional[datetime] = None # 最后失败时间 - last_error: str = "" # 最后错误信息 - disabled_until: Optional[datetime] = None # 禁用截止时间 + failure_count: int = 0 + last_success: Optional[datetime] = None + last_failure: Optional[datetime] = None + last_error: str = "" + disabled_until: Optional[datetime] = None def record_success(self): - """记录成功""" self.status = ProviderStatus.HEALTHY self.failure_count = 0 self.last_success = datetime.now() self.disabled_until = None def record_failure(self, error: str): - """记录失败""" self.failure_count += 1 self.last_failure = datetime.now() self.last_error = error def should_disable(self, threshold: int = 3) -> bool: - """判断是否应该禁用""" return self.failure_count >= threshold def is_disabled(self) -> bool: - """检查是否被禁用""" if self.disabled_until and datetime.now() < self.disabled_until: return True return False def disable(self, duration_seconds: int = 300): - """禁用提供者""" from datetime import timedelta self.status = ProviderStatus.DISABLED self.disabled_until = datetime.now() + timedelta(seconds=duration_seconds) def enable(self): - """启用提供者""" self.status = ProviderStatus.HEALTHY self.disabled_until = None self.failure_count = 0 def to_dict(self) -> Dict[str, Any]: - """转换为字典""" return { "provider_type": self.provider_type.value, "status": self.status.value, diff --git a/src/services/outlook/health_checker.py b/src/services/outlook/health_checker.py index c68ed4e..2b7060b 100644 --- a/src/services/outlook/health_checker.py +++ b/src/services/outlook/health_checker.py @@ -1,15 +1,13 @@ """ -健康检查和故障切换管理 +健康检查管理(简化版,单 Provider) """ import logging import threading -import time from datetime import datetime, timedelta -from typing import Dict, List, Optional, Any +from typing import Any, Dict, Optional -from .base import ProviderType, ProviderHealth, ProviderStatus -from .providers.base import OutlookProvider +from .base import ProviderHealth, ProviderStatus, ProviderType logger = logging.getLogger(__name__) @@ -17,296 +15,49 @@ logger = logging.getLogger(__name__) class HealthChecker: """ - 健康检查管理器 - 跟踪各提供者的健康状态,管理故障切换 + 单 Provider 健康检查器 + 跟踪 IMAP_NEW 的健康状态 """ def __init__( self, failure_threshold: int = 3, disable_duration: int = 300, - recovery_check_interval: int = 60, ): - """ - 初始化健康检查器 - - Args: - failure_threshold: 连续失败次数阈值,超过后禁用 - disable_duration: 禁用时长(秒) - recovery_check_interval: 恢复检查间隔(秒) - """ self.failure_threshold = failure_threshold self.disable_duration = disable_duration - self.recovery_check_interval = recovery_check_interval - - # 提供者健康状态: ProviderType -> ProviderHealth - self._health_status: Dict[ProviderType, ProviderHealth] = {} + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) self._lock = threading.Lock() - # 初始化所有提供者的健康状态 - for provider_type in ProviderType: - self._health_status[provider_type] = ProviderHealth( - provider_type=provider_type - ) - - def get_health(self, provider_type: ProviderType) -> ProviderHealth: - """获取提供者的健康状态""" + def record_success(self): with self._lock: - return self._health_status.get(provider_type, ProviderHealth(provider_type=provider_type)) + self._health.record_success() - def record_success(self, provider_type: ProviderType): - """记录成功操作""" + def record_failure(self, error: str): with self._lock: - health = self._health_status.get(provider_type) - if health: - health.record_success() - logger.debug(f"{provider_type.value} 记录成功") - - def record_failure(self, provider_type: ProviderType, error: str): - """记录失败操作""" - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.record_failure(error) - - # 检查是否需要禁用 - if health.should_disable(self.failure_threshold): - health.disable(self.disable_duration) - logger.warning( - f"{provider_type.value} 已禁用 {self.disable_duration} 秒," - f"原因: {error}" - ) - - def is_available(self, provider_type: ProviderType) -> bool: - """ - 检查提供者是否可用 - - Args: - provider_type: 提供者类型 - - Returns: - 是否可用 - """ - health = self.get_health(provider_type) - - # 检查是否被禁用 - if health.is_disabled(): - remaining = (health.disabled_until - datetime.now()).total_seconds() - logger.debug( - f"{provider_type.value} 已被禁用,剩余 {int(remaining)} 秒" - ) - return False - - return health.status != ProviderStatus.DISABLED - - def get_available_providers( - self, - priority_order: Optional[List[ProviderType]] = None, - ) -> List[ProviderType]: - """ - 获取可用的提供者列表 - - Args: - priority_order: 优先级顺序,默认为 [IMAP_NEW, IMAP_OLD, GRAPH_API] - - Returns: - 可用的提供者列表 - """ - if priority_order is None: - priority_order = [ - ProviderType.IMAP_NEW, - ProviderType.IMAP_OLD, - ProviderType.GRAPH_API, - ] - - available = [] - for provider_type in priority_order: - if self.is_available(provider_type): - available.append(provider_type) - - return available - - def get_next_available_provider( - self, - priority_order: Optional[List[ProviderType]] = None, - ) -> Optional[ProviderType]: - """ - 获取下一个可用的提供者 - - Args: - priority_order: 优先级顺序 - - Returns: - 可用的提供者类型,如果没有返回 None - """ - available = self.get_available_providers(priority_order) - return available[0] if available else None - - def force_disable(self, provider_type: ProviderType, duration: Optional[int] = None): - """ - 强制禁用提供者 - - Args: - provider_type: 提供者类型 - duration: 禁用时长(秒),默认使用配置值 - """ - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.disable(duration or self.disable_duration) - logger.warning(f"{provider_type.value} 已强制禁用") - - def force_enable(self, provider_type: ProviderType): - """ - 强制启用提供者 - - Args: - provider_type: 提供者类型 - """ - with self._lock: - health = self._health_status.get(provider_type) - if health: - health.enable() - logger.info(f"{provider_type.value} 已启用") - - def get_all_health_status(self) -> Dict[str, Any]: - """ - 获取所有提供者的健康状态 - - Returns: - 健康状态字典 - """ - with self._lock: - return { - provider_type.value: health.to_dict() - for provider_type, health in self._health_status.items() - } - - def check_and_recover(self): - """ - 检查并恢复被禁用的提供者 - - 如果禁用时间已过,自动恢复提供者 - """ - with self._lock: - for provider_type, health in self._health_status.items(): - if health.is_disabled(): - # 检查是否可以恢复 - if health.disabled_until and datetime.now() >= health.disabled_until: - health.enable() - logger.info(f"{provider_type.value} 已自动恢复") - - def reset_all(self): - """重置所有提供者的健康状态""" - with self._lock: - for provider_type in ProviderType: - self._health_status[provider_type] = ProviderHealth( - provider_type=provider_type + self._health.record_failure(error) + if self._health.should_disable(self.failure_threshold): + self._health.disable(self.disable_duration) + logger.warning( + f"IMAP_NEW 已禁用 {self.disable_duration}s,原因: {error}" ) - logger.info("已重置所有提供者的健康状态") - - -class FailoverManager: - """ - 故障切换管理器 - 管理提供者之间的自动切换 - """ - - def __init__( - self, - health_checker: HealthChecker, - priority_order: Optional[List[ProviderType]] = None, - ): - """ - 初始化故障切换管理器 - - Args: - health_checker: 健康检查器 - priority_order: 提供者优先级顺序 - """ - self.health_checker = health_checker - self.priority_order = priority_order or [ - ProviderType.IMAP_NEW, - ProviderType.IMAP_OLD, - ProviderType.GRAPH_API, - ] - - # 当前使用的提供者索引 - self._current_index = 0 - self._lock = threading.Lock() - - def get_current_provider(self) -> Optional[ProviderType]: - """ - 获取当前提供者 - - Returns: - 当前提供者类型,如果没有可用的返回 None - """ - available = self.health_checker.get_available_providers(self.priority_order) - if not available: - return None + def is_available(self) -> bool: with self._lock: - # 尝试使用当前索引 - if self._current_index < len(available): - return available[self._current_index] - return available[0] - - def switch_to_next(self) -> Optional[ProviderType]: - """ - 切换到下一个提供者 - - Returns: - 下一个提供者类型,如果没有可用的返回 None - """ - available = self.health_checker.get_available_providers(self.priority_order) - if not available: - return None + if self._health.is_disabled(): + remaining = ( + (self._health.disabled_until - datetime.now()).total_seconds() + if self._health.disabled_until + else 0 + ) + logger.debug(f"IMAP_NEW 已被禁用,剩余 {int(remaining)}s") + return False + return self._health.status != ProviderStatus.DISABLED + def reset(self): with self._lock: - self._current_index = (self._current_index + 1) % len(available) - next_provider = available[self._current_index] - logger.info(f"切换到提供者: {next_provider.value}") - return next_provider - - def on_provider_success(self, provider_type: ProviderType): - """ - 提供者成功时调用 - - Args: - provider_type: 提供者类型 - """ - self.health_checker.record_success(provider_type) - - # 重置索引到成功的提供者 - with self._lock: - available = self.health_checker.get_available_providers(self.priority_order) - if provider_type in available: - self._current_index = available.index(provider_type) - - def on_provider_failure(self, provider_type: ProviderType, error: str): - """ - 提供者失败时调用 - - Args: - provider_type: 提供者类型 - error: 错误信息 - """ - self.health_checker.record_failure(provider_type, error) + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) def get_status(self) -> Dict[str, Any]: - """ - 获取故障切换状态 - - Returns: - 状态字典 - """ - current = self.get_current_provider() - return { - "current_provider": current.value if current else None, - "priority_order": [p.value for p in self.priority_order], - "available_providers": [ - p.value for p in self.health_checker.get_available_providers(self.priority_order) - ], - "health_status": self.health_checker.get_all_health_status(), - } + with self._lock: + return self._health.to_dict() diff --git a/src/services/outlook/providers/__init__.py b/src/services/outlook/providers/__init__.py index d6fe6a1..95e88a4 100644 --- a/src/services/outlook/providers/__init__.py +++ b/src/services/outlook/providers/__init__.py @@ -3,27 +3,18 @@ Outlook 提供者模块 """ from .base import OutlookProvider, ProviderConfig -from .imap_old import IMAPOldProvider from .imap_new import IMAPNewProvider -from .graph_api import GraphAPIProvider __all__ = [ 'OutlookProvider', 'ProviderConfig', - 'IMAPOldProvider', 'IMAPNewProvider', - 'GraphAPIProvider', ] - -# 提供者注册表 PROVIDER_REGISTRY = { - 'imap_old': IMAPOldProvider, 'imap_new': IMAPNewProvider, - 'graph_api': GraphAPIProvider, } def get_provider_class(provider_type: str): - """获取提供者类""" return PROVIDER_REGISTRY.get(provider_type) diff --git a/src/services/outlook/providers/base.py b/src/services/outlook/providers/base.py index 0d6c072..bb27f57 100644 --- a/src/services/outlook/providers/base.py +++ b/src/services/outlook/providers/base.py @@ -5,7 +5,7 @@ Outlook 提供者抽象基类 import abc import logging from dataclasses import dataclass -from typing import Dict, Any, List, Optional +from typing import List, Optional from ..base import ProviderType, EmailMessage, ProviderHealth, ProviderStatus from ..account import OutlookAccount @@ -18,56 +18,36 @@ logger = logging.getLogger(__name__) class ProviderConfig: """提供者配置""" timeout: int = 30 - max_retries: int = 3 proxy_url: Optional[str] = None - - # 健康检查配置 + service_id: Optional[int] = None health_failure_threshold: int = 3 - health_disable_duration: int = 300 # 秒 + health_disable_duration: int = 300 class OutlookProvider(abc.ABC): - """ - Outlook 提供者抽象基类 - 定义所有提供者必须实现的接口 - """ + """Outlook 提供者抽象基类""" def __init__( self, account: OutlookAccount, config: Optional[ProviderConfig] = None, ): - """ - 初始化提供者 - - Args: - account: Outlook 账户 - config: 提供者配置 - """ self.account = account self.config = config or ProviderConfig() - - # 健康状态 - self._health = ProviderHealth(provider_type=self.provider_type) - - # 连接状态 + self._health = ProviderHealth(provider_type=ProviderType.IMAP_NEW) self._connected = False self._last_error: Optional[str] = None @property - @abc.abstractmethod def provider_type(self) -> ProviderType: - """获取提供者类型""" - pass + return ProviderType.IMAP_NEW @property def health(self) -> ProviderHealth: - """获取健康状态""" return self._health @property def is_healthy(self) -> bool: - """检查是否健康""" return ( self._health.status == ProviderStatus.HEALTHY and not self._health.is_disabled() @@ -75,22 +55,14 @@ class OutlookProvider(abc.ABC): @property def is_connected(self) -> bool: - """检查是否已连接""" return self._connected @abc.abstractmethod def connect(self) -> bool: - """ - 连接到服务 - - Returns: - 是否连接成功 - """ pass @abc.abstractmethod def disconnect(self): - """断开连接""" pass @abc.abstractmethod @@ -99,81 +71,44 @@ class OutlookProvider(abc.ABC): count: int = 20, only_unseen: bool = True, ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ pass @abc.abstractmethod def test_connection(self) -> bool: - """ - 测试连接是否正常 - - Returns: - 连接是否正常 - """ pass + def wait_for_new_email_idle(self, timeout: int = 25) -> bool: + """IMAP IDLE(默认不支持,子类可覆盖)""" + return False + def record_success(self): - """记录成功操作""" self._health.record_success() self._last_error = None - logger.debug(f"[{self.account.email}] {self.provider_type.value} 操作成功") def record_failure(self, error: str): - """记录失败操作""" self._health.record_failure(error) self._last_error = error - - # 检查是否需要禁用 if self._health.should_disable(self.config.health_failure_threshold): self._health.disable(self.config.health_disable_duration) logger.warning( - f"[{self.account.email}] {self.provider_type.value} 已禁用 " - f"{self.config.health_disable_duration} 秒,原因: {error}" - ) - else: - logger.warning( - f"[{self.account.email}] {self.provider_type.value} 操作失败 " - f"({self._health.failure_count}/{self.config.health_failure_threshold}): {error}" + f"[{self.account.email}] IMAP_NEW 已禁用 " + f"{self.config.health_disable_duration}s,原因: {error}" ) def check_health(self) -> bool: - """ - 检查健康状态 - - Returns: - 是否健康可用 - """ - # 检查是否被禁用 if self._health.is_disabled(): - logger.debug( - f"[{self.account.email}] {self.provider_type.value} 已被禁用," - f"将在 {self._health.disabled_until} 后恢复" - ) return False - return self._health.status in (ProviderStatus.HEALTHY, ProviderStatus.DEGRADED) def __enter__(self): - """上下文管理器入口""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - """上下文管理器出口""" self.disconnect() return False def __str__(self) -> str: - """字符串表示""" return f"{self.__class__.__name__}({self.account.email})" def __repr__(self) -> str: diff --git a/src/services/outlook/providers/graph_api.py b/src/services/outlook/providers/graph_api.py deleted file mode 100644 index 00cec80..0000000 --- a/src/services/outlook/providers/graph_api.py +++ /dev/null @@ -1,250 +0,0 @@ -""" -Graph API 提供者 -使用 Microsoft Graph REST API -""" - -import json -import logging -from typing import List, Optional -from datetime import datetime - -from curl_cffi import requests as _requests - -from ..base import ProviderType, EmailMessage -from ..account import OutlookAccount -from ..token_manager import TokenManager -from .base import OutlookProvider, ProviderConfig - - -logger = logging.getLogger(__name__) - - -class GraphAPIProvider(OutlookProvider): - """ - Graph API 提供者 - 使用 Microsoft Graph REST API 获取邮件 - 需要 graph.microsoft.com/.default scope - """ - - # Graph API 端点 - GRAPH_API_BASE = "https://graph.microsoft.com/v1.0" - MESSAGES_ENDPOINT = "/me/mailFolders/inbox/messages" - - @property - def provider_type(self) -> ProviderType: - return ProviderType.GRAPH_API - - def __init__( - self, - account: OutlookAccount, - config: Optional[ProviderConfig] = None, - ): - super().__init__(account, config) - - # Token 管理器 - self._token_manager: Optional[TokenManager] = None - - # 注意:Graph API 必须使用 OAuth2 - if not account.has_oauth(): - logger.warning( - f"[{self.account.email}] Graph API 提供者需要 OAuth2 配置 " - f"(client_id + refresh_token)" - ) - - def connect(self) -> bool: - """ - 验证连接(获取 Token) - - Returns: - 是否连接成功 - """ - if not self.account.has_oauth(): - error = "Graph API 需要 OAuth2 配置" - self.record_failure(error) - logger.error(f"[{self.account.email}] {error}") - return False - - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.GRAPH_API, - self.config.proxy_url, - self.config.timeout, - ) - - # 尝试获取 Token - token = self._token_manager.get_access_token() - if token: - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] Graph API 连接成功") - return True - - return False - - def disconnect(self): - """断开连接(清除状态)""" - self._connected = False - - def get_recent_emails( - self, - count: int = 20, - only_unseen: bool = True, - ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ - if not self._connected: - if not self.connect(): - return [] - - try: - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - self.record_failure("无法获取 Access Token") - return [] - - # 构建 API 请求 - url = f"{self.GRAPH_API_BASE}{self.MESSAGES_ENDPOINT}" - - params = { - "$top": count, - "$select": "id,subject,from,toRecipients,receivedDateTime,isRead,hasAttachments,bodyPreview,body", - "$orderby": "receivedDateTime desc", - } - - # 只获取未读邮件 - if only_unseen: - params["$filter"] = "isRead eq false" - - # 构建代理配置 - proxies = None - if self.config.proxy_url: - proxies = {"http": self.config.proxy_url, "https": self.config.proxy_url} - - # 发送请求(curl_cffi 自动对 params 进行 URL 编码) - resp = _requests.get( - url, - params=params, - headers={ - "Authorization": f"Bearer {token}", - "Accept": "application/json", - "Prefer": "outlook.body-content-type='text'", - }, - proxies=proxies, - timeout=self.config.timeout, - impersonate="chrome110", - ) - - if resp.status_code == 401: - # Token 无 Graph 权限(client_id 未授权),清除缓存但不记录健康失败 - # 避免因权限不足导致健康检查器禁用该提供者,影响其他账户 - if self._token_manager: - self._token_manager.clear_cache() - self._connected = False - logger.warning(f"[{self.account.email}] Graph API 返回 401,client_id 可能无 Graph 权限,跳过") - return [] - - if resp.status_code != 200: - error_body = resp.text[:200] - self.record_failure(f"HTTP {resp.status_code}: {error_body}") - logger.error(f"[{self.account.email}] Graph API 请求失败: HTTP {resp.status_code}") - return [] - - data = resp.json() - - # 解析邮件 - messages = data.get("value", []) - emails = [] - - for msg in messages: - try: - email_msg = self._parse_graph_message(msg) - if email_msg: - emails.append(email_msg) - except Exception as e: - logger.warning(f"[{self.account.email}] 解析 Graph API 邮件失败: {e}") - - self.record_success() - return emails - - except Exception as e: - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] Graph API 获取邮件失败: {e}") - return [] - - def _parse_graph_message(self, msg: dict) -> Optional[EmailMessage]: - """ - 解析 Graph API 消息 - - Args: - msg: Graph API 消息对象 - - Returns: - EmailMessage 对象 - """ - # 解析发件人 - from_info = msg.get("from", {}) - sender_info = from_info.get("emailAddress", {}) - sender = sender_info.get("address", "") - - # 解析收件人 - recipients = [] - for recipient in msg.get("toRecipients", []): - addr_info = recipient.get("emailAddress", {}) - addr = addr_info.get("address", "") - if addr: - recipients.append(addr) - - # 解析日期 - received_at = None - received_timestamp = 0 - try: - date_str = msg.get("receivedDateTime", "") - if date_str: - # ISO 8601 格式 - received_at = datetime.fromisoformat(date_str.replace("Z", "+00:00")) - received_timestamp = int(received_at.timestamp()) - except Exception: - pass - - # 获取正文 - body_info = msg.get("body", {}) - body = body_info.get("content", "") - body_preview = msg.get("bodyPreview", "") - - return EmailMessage( - id=msg.get("id", ""), - subject=msg.get("subject", ""), - sender=sender, - recipients=recipients, - body=body, - body_preview=body_preview, - received_at=received_at, - received_timestamp=received_timestamp, - is_read=msg.get("isRead", False), - has_attachments=msg.get("hasAttachments", False), - ) - - def test_connection(self) -> bool: - """ - 测试 Graph API 连接 - - Returns: - 连接是否正常 - """ - try: - # 尝试获取一封邮件来测试连接 - emails = self.get_recent_emails(count=1, only_unseen=False) - return True - except Exception as e: - logger.warning(f"[{self.account.email}] Graph API 连接测试失败: {e}") - return False diff --git a/src/services/outlook/providers/imap_new.py b/src/services/outlook/providers/imap_new.py index 5daa2f3..f42d5c6 100644 --- a/src/services/outlook/providers/imap_new.py +++ b/src/services/outlook/providers/imap_new.py @@ -1,39 +1,101 @@ """ 新版 IMAP 提供者 -使用 outlook.live.com 服务器和 login.microsoftonline.com/consumers Token 端点 +使用 outlook.live.com:993 + consumers Token 端点 +引入进程级 IMAPConnectionPool 连接复用和 IMAP IDLE """ import email import imaplib import logging +import select +import time +import threading +from datetime import datetime, timedelta, timezone from email.header import decode_header from email.utils import parsedate_to_datetime -from typing import List, Optional +from typing import Dict, List, Optional -from ..base import ProviderType, EmailMessage +from ..base import EmailMessage from ..account import OutlookAccount from ..token_manager import TokenManager from .base import OutlookProvider, ProviderConfig -from .imap_old import IMAPOldProvider logger = logging.getLogger(__name__) -class IMAPNewProvider(OutlookProvider): - """ - 新版 IMAP 提供者 - 使用 outlook.live.com:993 和 login.microsoftonline.com/consumers Token 端点 - 需要 IMAP.AccessAsUser.All scope - """ +class IMAPConnectionPool: + """进程级 IMAP 连接池,按 email 复用 IMAP4_SSL 连接""" - # IMAP 服务器配置 IMAP_HOST = "outlook.live.com" IMAP_PORT = 993 - @property - def provider_type(self) -> ProviderType: - return ProviderType.IMAP_NEW + def __init__(self): + self._connections: Dict[str, imaplib.IMAP4_SSL] = {} + self._lock = threading.Lock() + + def get_connection( + self, + email_addr: str, + token: str, + timeout: int = 30, + ) -> imaplib.IMAP4_SSL: + """获取或新建 IMAP 连接""" + # 先在锁内检查现有连接 + with self._lock: + conn = self._connections.get(email_addr) + if conn: + try: + conn.noop() + return conn + except Exception: + self._close_one(email_addr) + # 标记为「建连中」,防止重复建连 + self._connections[email_addr] = None + + # 锁外建立新连接(耗时操作不持锁) + try: + new_conn = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT, timeout=timeout) + auth_str = f"user={email_addr}\x01auth=Bearer {token}\x01\x01" + new_conn.authenticate("XOAUTH2", lambda _: auth_str.encode("utf-8")) + except Exception: + with self._lock: + # 建连失败,清除占位 + if self._connections.get(email_addr) is None: + del self._connections[email_addr] + raise + + with self._lock: + self._connections[email_addr] = new_conn + logger.debug(f"[{email_addr}] IMAP 新连接已建立") + return new_conn + + def invalidate(self, email_addr: str): + """废弃连接(认证失败或连接异常时调用)""" + with self._lock: + self._close_one(email_addr) + + def _close_one(self, email_addr: str): + conn = self._connections.pop(email_addr, None) + if conn: + try: + conn.logout() + except Exception: + pass + + +# 模块级单例连接池 +_imap_pool = IMAPConnectionPool() + + +class IMAPNewProvider(OutlookProvider): + """ + 新版 IMAP 提供者 + 通过连接池复用连接,支持 IMAP IDLE + """ + + IMAP_HOST = "outlook.live.com" + IMAP_PORT = 993 def __init__( self, @@ -41,160 +103,159 @@ class IMAPNewProvider(OutlookProvider): config: Optional[ProviderConfig] = None, ): super().__init__(account, config) - - # IMAP 连接 self._conn: Optional[imaplib.IMAP4_SSL] = None - - # Token 管理器 self._token_manager: Optional[TokenManager] = None + self._idle_tag_counter = 0 + self._idle_tag_lock = threading.Lock() - # 注意:新版 IMAP 必须使用 OAuth2 if not account.has_oauth(): logger.warning( - f"[{self.account.email}] 新版 IMAP 提供者需要 OAuth2 配置 " - f"(client_id + refresh_token)" + f"[{self.account.email}] IMAP_NEW 需要 OAuth2 配置 (client_id + refresh_token)" ) + def _get_token_manager(self) -> TokenManager: + if not self._token_manager: + self._token_manager = TokenManager( + self.account, + proxy_url=self.config.proxy_url, + timeout=self.config.timeout, + service_id=self.config.service_id, + ) + return self._token_manager + + def _next_idle_tag(self) -> str: + """生成唯一 IDLE tag(避免使用私有 _new_tag)""" + with self._idle_tag_lock: + self._idle_tag_counter += 1 + return f"IDLE{self._idle_tag_counter:04d}" + def connect(self) -> bool: - """ - 连接到 IMAP 服务器 - - Returns: - 是否连接成功 - """ - if self._connected and self._conn: - try: - self._conn.noop() - return True - except Exception: - self.disconnect() - - # 新版 IMAP 必须使用 OAuth2,无 OAuth 时静默跳过,不记录健康失败 + """从连接池获取连接""" if not self.account.has_oauth(): logger.debug(f"[{self.account.email}] 跳过 IMAP_NEW(无 OAuth)") return False try: - logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...") + tm = self._get_token_manager() + token = tm.get_access_token() + if not token: + logger.error(f"[{self.account.email}] 获取 IMAP Token 失败") + return False - # 创建连接 - self._conn = imaplib.IMAP4_SSL( - self.IMAP_HOST, - self.IMAP_PORT, - timeout=self.config.timeout, + self._conn = _imap_pool.get_connection( + self.account.email, token, self.config.timeout ) - - # XOAUTH2 认证 - if self._authenticate_xoauth2(): - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] 新版 IMAP 连接成功 (XOAUTH2)") - return True - - return False - - except Exception as e: - self.disconnect() - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] 新版 IMAP 连接失败: {e}") - return False - - def _authenticate_xoauth2(self) -> bool: - """ - 使用 XOAUTH2 认证 - - Returns: - 是否认证成功 - """ - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.IMAP_NEW, - self.config.proxy_url, - self.config.timeout, - ) - - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - logger.error(f"[{self.account.email}] 获取 IMAP Token 失败") - return False - - try: - # 构建 XOAUTH2 认证字符串 - auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01" - self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8")) + self._connected = True + self.record_success() + logger.debug(f"[{self.account.email}] IMAP 连接就绪(连接池)") return True + + except imaplib.IMAP4.error as e: + err = str(e) + # Token 失效时强制刷新并重试一次 + if "AUTHENTICATE" in err or "invalid" in err.lower(): + logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试刷新 Token") + _imap_pool.invalidate(self.account.email) + try: + tm = self._get_token_manager() + token = tm.get_access_token(force_refresh=True) + if token: + self._conn = _imap_pool.get_connection( + self.account.email, token, self.config.timeout + ) + self._connected = True + self.record_success() + return True + except Exception as retry_e: + self.record_failure(str(retry_e)) + logger.error(f"[{self.account.email}] Token 刷新后重连失败: {retry_e}") + else: + self.record_failure(err) + logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") + self._connected = False + self._conn = None + return False + except Exception as e: - logger.error(f"[{self.account.email}] XOAUTH2 认证异常: {e}") - # 清除缓存的 Token - self._token_manager.clear_cache() + self.record_failure(str(e)) + logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") + self._connected = False + self._conn = None return False def disconnect(self): - """断开 IMAP 连接""" - if self._conn: - try: - self._conn.close() - except Exception: - pass - try: - self._conn.logout() - except Exception: - pass - self._conn = None - + """归还连接池(不 logout,保持复用)""" self._connected = False + self._conn = None def get_recent_emails( self, count: int = 20, only_unseen: bool = True, + since_minutes: Optional[int] = None, + folders: Optional[List[str]] = None, ) -> List[EmailMessage]: """ - 获取最近的邮件 + 获取最近的邮件,支持多文件夹搜索(合并去重)。 - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 + 搜索策略: + - since_minutes 指定时:用 SINCE 日期 + ALL 搜索最近N分钟内的邮件(不受已读/未读限制) + - only_unseen=True 且未指定 since_minutes:搜索 UNSEEN + - only_unseen=False 且未指定 since_minutes:搜索全部(取最近 count 封) + - folders 默认为 ["INBOX"],可传入多个文件夹(如 ["INBOX", "Junk Email"]) """ if not self._connected: if not self.connect(): return [] - try: - # 选择收件箱 - self._conn.select("INBOX", readonly=True) + if folders is None: + folders = ["INBOX"] - # 搜索邮件 - flag = "UNSEEN" if only_unseen else "ALL" - status, data = self._conn.search(None, flag) + all_emails: List[EmailMessage] = [] + seen_ids: set = set() - if status != "OK" or not data or not data[0]: - return [] + for folder in folders: + try: + status, _ = self._conn.select(folder, readonly=True) + if status != "OK": + logger.debug(f"[{self.account.email}] 文件夹 {folder} 不存在或无法访问,跳过") + continue - # 获取最新的邮件 ID - ids = data[0].split() - recent_ids = ids[-count:][::-1] + if since_minutes is not None: + since_dt = datetime.now(timezone.utc) - timedelta(minutes=since_minutes) + since_str = since_dt.strftime("%d-%b-%Y") + status, data = self._conn.search(None, f"SINCE {since_str}") + elif only_unseen: + status, data = self._conn.search(None, "UNSEEN") + else: + status, data = self._conn.search(None, "ALL") - emails = [] - for msg_id in recent_ids: - try: - email_msg = self._fetch_email(msg_id) - if email_msg: - emails.append(email_msg) - except Exception as e: - logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}") + if status != "OK" or not data or not data[0]: + continue - return emails + ids = data[0].split() + recent_ids = ids[-count:][::-1] # 取最新的 count 封,倒序(最新在前) - except Exception as e: - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] 获取邮件失败: {e}") - return [] + for msg_id in recent_ids: + try: + msg = self._fetch_email(msg_id) + if msg and msg.id not in seen_ids: + seen_ids.add(msg.id) + all_emails.append(msg) + except Exception as e: + logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}, folder: {folder}): {e}") + + except Exception as e: + self.record_failure(str(e)) + logger.warning(f"[{self.account.email}] 搜索文件夹 {folder} 失败: {e}") + _imap_pool.invalidate(self.account.email) + self._connected = False + self._conn = None + break + + # 按收信时间降序排列,截取 count 封 + all_emails.sort(key=lambda m: m.received_timestamp, reverse=True) + return all_emails[:count] def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]: """获取并解析单封邮件""" @@ -211,21 +272,193 @@ class IMAPNewProvider(OutlookProvider): if not raw: return None - return self._parse_email(raw) + return _parse_email(raw) - @staticmethod - def _parse_email(raw: bytes) -> EmailMessage: - """解析原始邮件""" - # 使用旧版提供者的解析方法 - return IMAPOldProvider._parse_email(raw) + def wait_for_new_email_idle(self, timeout: int = 25) -> bool: + """ + RFC 2177 IMAP IDLE 实现。 + 发送 IDLE 命令,等待服务器推送 EXISTS/RECENT,然后发送 DONE。 + Returns True 表示有新邮件推送,False 表示超时或异常(调用方降级轮询)。 + """ + if not self._connected: + if not self.connect(): + return False + + try: + self._conn.select("INBOX", readonly=True) + except Exception as e: + logger.warning(f"[{self.account.email}] IDLE 前 SELECT 失败: {e}") + return False + + tag = self._next_idle_tag() + sock = self._conn.socket() + logger.info(f"[{self.account.email}] 进入 IMAP IDLE 等待模式(超时 {timeout}s,tag={tag})") + + try: + # 发送 IDLE 命令 + self._conn.send(f"{tag} IDLE\r\n".encode()) + + # 等待 "+" 延续响应(服务端确认进入 IDLE) + deadline = time.time() + min(10.0, timeout) + buf = b"" + got_continuation = False + while time.time() < deadline: + ready = select.select([sock], [], [], min(2.0, deadline - time.time())) + if ready[0]: + chunk = sock.recv(4096) + if not chunk: + break + buf += chunk + if b"+ " in buf or b"+\r\n" in buf: + got_continuation = True + break + + if not got_continuation: + logger.warning(f"[{self.account.email}] 未收到 IDLE 延续响应,放弃") + return False + + # 等待 EXISTS / RECENT 推送 + got_new = False + buf = b"" + deadline = time.time() + timeout + while time.time() < deadline: + remaining = deadline - time.time() + if remaining <= 0: + break + ready = select.select([sock], [], [], min(2.0, remaining)) + if ready[0]: + chunk = sock.recv(4096) + if not chunk: + break + buf += chunk + if b"EXISTS" in buf or b"RECENT" in buf: + logger.debug(f"[{self.account.email}] IDLE 收到新邮件推送") + got_new = True + break + + return got_new + + except Exception as e: + logger.warning(f"[{self.account.email}] IMAP IDLE 异常: {e}") + return False + + finally: + # 发送 DONE 结束 IDLE,并排空服务端响应 + try: + self._conn.send(b"DONE\r\n") + drain_deadline = time.time() + 5 + drain_buf = b"" + tag_end = f"{tag} OK".encode() + tag_no = f"{tag} NO".encode() + tag_bad = f"{tag} BAD".encode() + while time.time() < drain_deadline: + ready = select.select([sock], [], [], 1.0) + if not ready[0]: + break + chunk = sock.recv(4096) + if not chunk: + break + drain_buf += chunk + if any(t in drain_buf for t in (tag_end, tag_no, tag_bad)): + break + except Exception: + _imap_pool.invalidate(self.account.email) + self._connected = False + self._conn = None def test_connection(self) -> bool: """测试 IMAP 连接""" try: with self: self._conn.select("INBOX", readonly=True) - self._conn.search(None, "ALL") return True except Exception as e: - logger.warning(f"[{self.account.email}] 新版 IMAP 连接测试失败: {e}") + logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}") return False + + +def _parse_email(raw: bytes) -> EmailMessage: + """解析原始邮件为 EmailMessage(优先 text/plain,次选 text/html)""" + msg = email.message_from_bytes(raw) + + def _decode(val): + if not val: + return "" + parts = decode_header(str(val)) + result = "" + for part, charset in parts: + if isinstance(part, bytes): + try: + result += part.decode(charset or "utf-8", errors="replace") + except (LookupError, UnicodeDecodeError): + result += part.decode("utf-8", errors="replace") + else: + result += str(part) + return result + + subject = _decode(msg.get("Subject", "")) + sender = _decode(msg.get("From", "")) + recipients = [_decode(msg.get("To", ""))] + + received_at = None + received_ts = 0 + date_str = msg.get("Date", "") + if date_str: + try: + received_at = parsedate_to_datetime(date_str) + received_ts = int(received_at.timestamp()) + except Exception: + pass + + # 提取正文:优先 text/plain,次选 text/html + plain_body = "" + html_body = "" + if msg.is_multipart(): + for part in msg.walk(): + ct = part.get_content_type() + cd = str(part.get("Content-Disposition", "")) + if "attachment" in cd.lower(): + continue + try: + charset = part.get_content_charset() or "utf-8" + payload = part.get_payload(decode=True) + if not payload: + continue + decoded = payload.decode(charset, errors="replace") + if ct == "text/plain" and not plain_body: + plain_body = decoded + elif ct == "text/html" and not html_body: + html_body = decoded + except Exception: + pass + else: + try: + charset = msg.get_content_charset() or "utf-8" + payload = msg.get_payload(decode=True) + if payload: + ct = msg.get_content_type() + decoded = payload.decode(charset, errors="replace") + if ct == "text/plain": + plain_body = decoded + else: + html_body = decoded + except Exception: + pass + + body = plain_body or html_body + body_preview = body[:200].strip() + + msg_id = msg.get("Message-ID", "").strip("<>") + if not msg_id: + msg_id = f"{sender}_{received_ts}" + + return EmailMessage( + id=msg_id, + subject=subject, + sender=sender, + recipients=recipients, + body=body, + body_preview=body_preview, + received_at=received_at, + received_timestamp=received_ts, + ) diff --git a/src/services/outlook/providers/imap_old.py b/src/services/outlook/providers/imap_old.py deleted file mode 100644 index e46f3ed..0000000 --- a/src/services/outlook/providers/imap_old.py +++ /dev/null @@ -1,345 +0,0 @@ -""" -旧版 IMAP 提供者 -使用 outlook.office365.com 服务器和 login.live.com Token 端点 -""" - -import email -import imaplib -import logging -from email.header import decode_header -from email.utils import parsedate_to_datetime -from typing import List, Optional - -from ..base import ProviderType, EmailMessage -from ..account import OutlookAccount -from ..token_manager import TokenManager -from .base import OutlookProvider, ProviderConfig - - -logger = logging.getLogger(__name__) - - -class IMAPOldProvider(OutlookProvider): - """ - 旧版 IMAP 提供者 - 使用 outlook.office365.com:993 和 login.live.com Token 端点 - """ - - # IMAP 服务器配置 - IMAP_HOST = "outlook.office365.com" - IMAP_PORT = 993 - - @property - def provider_type(self) -> ProviderType: - return ProviderType.IMAP_OLD - - def __init__( - self, - account: OutlookAccount, - config: Optional[ProviderConfig] = None, - ): - super().__init__(account, config) - - # IMAP 连接 - self._conn: Optional[imaplib.IMAP4_SSL] = None - - # Token 管理器 - self._token_manager: Optional[TokenManager] = None - - def connect(self) -> bool: - """ - 连接到 IMAP 服务器 - - Returns: - 是否连接成功 - """ - if self._connected and self._conn: - # 检查现有连接 - try: - self._conn.noop() - return True - except Exception: - self.disconnect() - - try: - logger.debug(f"[{self.account.email}] 正在连接 IMAP ({self.IMAP_HOST})...") - - # 创建连接 - self._conn = imaplib.IMAP4_SSL( - self.IMAP_HOST, - self.IMAP_PORT, - timeout=self.config.timeout, - ) - - # 尝试 XOAUTH2 认证 - if self.account.has_oauth(): - if self._authenticate_xoauth2(): - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] IMAP 连接成功 (XOAUTH2)") - return True - else: - logger.warning(f"[{self.account.email}] XOAUTH2 认证失败,尝试密码认证") - - # 密码认证 - if self.account.password: - self._conn.login(self.account.email, self.account.password) - self._connected = True - self.record_success() - logger.info(f"[{self.account.email}] IMAP 连接成功 (密码认证)") - return True - - raise ValueError("没有可用的认证方式") - - except Exception as e: - self.disconnect() - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] IMAP 连接失败: {e}") - return False - - def _authenticate_xoauth2(self) -> bool: - """ - 使用 XOAUTH2 认证 - - Returns: - 是否认证成功 - """ - if not self._token_manager: - self._token_manager = TokenManager( - self.account, - ProviderType.IMAP_OLD, - self.config.proxy_url, - self.config.timeout, - ) - - # 获取 Access Token - token = self._token_manager.get_access_token() - if not token: - return False - - try: - # 构建 XOAUTH2 认证字符串 - auth_string = f"user={self.account.email}\x01auth=Bearer {token}\x01\x01" - self._conn.authenticate("XOAUTH2", lambda _: auth_string.encode("utf-8")) - return True - except Exception as e: - logger.debug(f"[{self.account.email}] XOAUTH2 认证异常: {e}") - # 清除缓存的 Token - self._token_manager.clear_cache() - return False - - def disconnect(self): - """断开 IMAP 连接""" - if self._conn: - try: - self._conn.close() - except Exception: - pass - try: - self._conn.logout() - except Exception: - pass - self._conn = None - - self._connected = False - - def get_recent_emails( - self, - count: int = 20, - only_unseen: bool = True, - ) -> List[EmailMessage]: - """ - 获取最近的邮件 - - Args: - count: 获取数量 - only_unseen: 是否只获取未读 - - Returns: - 邮件列表 - """ - if not self._connected: - if not self.connect(): - return [] - - try: - # 选择收件箱 - self._conn.select("INBOX", readonly=True) - - # 搜索邮件 - flag = "UNSEEN" if only_unseen else "ALL" - status, data = self._conn.search(None, flag) - - if status != "OK" or not data or not data[0]: - return [] - - # 获取最新的邮件 ID - ids = data[0].split() - recent_ids = ids[-count:][::-1] # 倒序,最新的在前 - - emails = [] - for msg_id in recent_ids: - try: - email_msg = self._fetch_email(msg_id) - if email_msg: - emails.append(email_msg) - except Exception as e: - logger.warning(f"[{self.account.email}] 解析邮件失败 (ID: {msg_id}): {e}") - - return emails - - except Exception as e: - self.record_failure(str(e)) - logger.error(f"[{self.account.email}] 获取邮件失败: {e}") - return [] - - def _fetch_email(self, msg_id: bytes) -> Optional[EmailMessage]: - """ - 获取并解析单封邮件 - - Args: - msg_id: 邮件 ID - - Returns: - EmailMessage 对象,失败返回 None - """ - status, data = self._conn.fetch(msg_id, "(RFC822)") - if status != "OK" or not data or not data[0]: - return None - - # 获取原始邮件内容 - raw = b"" - for part in data: - if isinstance(part, tuple) and len(part) > 1: - raw = part[1] - break - - if not raw: - return None - - return self._parse_email(raw) - - @staticmethod - def _parse_email(raw: bytes) -> EmailMessage: - """ - 解析原始邮件 - - Args: - raw: 原始邮件数据 - - Returns: - EmailMessage 对象 - """ - # 移除 BOM - if raw.startswith(b"\xef\xbb\xbf"): - raw = raw[3:] - - msg = email.message_from_bytes(raw) - - # 解析邮件头 - subject = IMAPOldProvider._decode_header(msg.get("Subject", "")) - sender = IMAPOldProvider._decode_header(msg.get("From", "")) - to = IMAPOldProvider._decode_header(msg.get("To", "")) - delivered_to = IMAPOldProvider._decode_header(msg.get("Delivered-To", "")) - x_original_to = IMAPOldProvider._decode_header(msg.get("X-Original-To", "")) - date_str = IMAPOldProvider._decode_header(msg.get("Date", "")) - - # 提取正文 - body = IMAPOldProvider._extract_body(msg) - - # 解析日期 - received_timestamp = 0 - received_at = None - try: - if date_str: - received_at = parsedate_to_datetime(date_str) - received_timestamp = int(received_at.timestamp()) - except Exception: - pass - - # 构建收件人列表 - recipients = [r for r in [to, delivered_to, x_original_to] if r] - - return EmailMessage( - id=msg.get("Message-ID", ""), - subject=subject, - sender=sender, - recipients=recipients, - body=body, - received_at=received_at, - received_timestamp=received_timestamp, - is_read=False, # 搜索的是未读邮件 - raw_data=raw[:500] if len(raw) > 500 else raw, - ) - - @staticmethod - def _decode_header(header: str) -> str: - """解码邮件头""" - if not header: - return "" - - parts = [] - for chunk, encoding in decode_header(header): - if isinstance(chunk, bytes): - try: - decoded = chunk.decode(encoding or "utf-8", errors="replace") - parts.append(decoded) - except Exception: - parts.append(chunk.decode("utf-8", errors="replace")) - else: - parts.append(str(chunk)) - - return "".join(parts).strip() - - @staticmethod - def _extract_body(msg) -> str: - """提取邮件正文""" - import html as html_module - import re - - texts = [] - parts = msg.walk() if msg.is_multipart() else [msg] - - for part in parts: - content_type = part.get_content_type() - if content_type not in ("text/plain", "text/html"): - continue - - payload = part.get_payload(decode=True) - if not payload: - continue - - charset = part.get_content_charset() or "utf-8" - try: - text = payload.decode(charset, errors="replace") - except LookupError: - text = payload.decode("utf-8", errors="replace") - - # 如果是 HTML,移除标签 - if "]+>", " ", text) - - texts.append(text) - - # 合并并清理文本 - combined = " ".join(texts) - combined = html_module.unescape(combined) - combined = re.sub(r"\s+", " ", combined).strip() - - return combined - - def test_connection(self) -> bool: - """ - 测试 IMAP 连接 - - Returns: - 连接是否正常 - """ - try: - with self: - self._conn.select("INBOX", readonly=True) - self._conn.search(None, "ALL") - return True - except Exception as e: - logger.warning(f"[{self.account.email}] IMAP 连接测试失败: {e}") - return False diff --git a/src/services/outlook/service.py b/src/services/outlook/service.py index 321d8b3..a38c858 100644 --- a/src/services/outlook/service.py +++ b/src/services/outlook/service.py @@ -1,6 +1,6 @@ """ -Outlook 邮箱服务主类 -支持多种 IMAP/API 连接方式,自动故障切换 +Outlook 邮箱服务主类(简化版) +单一 IMAP_NEW Provider + 邮件缓存 + IMAP IDLE 支持 """ import logging @@ -8,34 +8,24 @@ import threading import time from typing import Optional, Dict, Any, List -from ..base import BaseEmailService, EmailServiceError, EmailServiceStatus, EmailServiceType +from ..base import BaseEmailService, EmailServiceError, EmailServiceType from ...config.constants import EmailServiceType as ServiceType from ...config.settings import get_settings from .account import OutlookAccount -from .base import ProviderType, EmailMessage -from .email_parser import EmailParser, get_email_parser -from .health_checker import HealthChecker, FailoverManager -from .providers.base import OutlookProvider, ProviderConfig -from .providers.imap_old import IMAPOldProvider +from .base import EmailMessage +from .email_parser import get_email_parser +from .health_checker import HealthChecker +from .providers.base import ProviderConfig from .providers.imap_new import IMAPNewProvider -from .providers.graph_api import GraphAPIProvider logger = logging.getLogger(__name__) - -# 默认提供者优先级 -# IMAP_OLD 最兼容(只需 login.live.com token),IMAP_NEW 次之,Graph API 最后 -# 原因:部分 client_id 没有 Graph API 权限,但有 IMAP 权限 -DEFAULT_PROVIDER_PRIORITY = [ - ProviderType.IMAP_OLD, - ProviderType.IMAP_NEW, - ProviderType.GRAPH_API, -] +# 验证码搜索的文件夹列表(同时搜索收件箱和垃圾箱) +_OUTLOOK_SEARCH_FOLDERS = ["INBOX", "Junk Email"] -def get_email_code_settings() -> dict: - """获取验证码等待配置""" +def _get_code_settings() -> dict: settings = get_settings() return { "timeout": settings.email_code_timeout, @@ -43,56 +33,58 @@ def get_email_code_settings() -> dict: } +class _EmailCache: + """轻量级邮件内存缓存(TTL=60s,减少重复 IMAP 请求)""" + + TTL = 60 + + def __init__(self): + self._cache: Dict[str, tuple] = {} # email -> (timestamp, List[EmailMessage]) + self._lock = threading.Lock() + + def get(self, email: str) -> Optional[List[EmailMessage]]: + with self._lock: + entry = self._cache.get(email) + if entry and time.time() - entry[0] < self.TTL: + return entry[1] + return None + + def set(self, email: str, messages: List[EmailMessage]): + with self._lock: + self._cache[email] = (time.time(), messages) + + def invalidate(self, email: str): + with self._lock: + self._cache.pop(email, None) + + class OutlookService(BaseEmailService): """ Outlook 邮箱服务 - 支持多种 IMAP/API 连接方式,自动故障切换 + 使用单一 IMAP_NEW Provider,支持连接池复用和 IMAP IDLE """ def __init__(self, config: Dict[str, Any] = None, name: str = None): - """ - 初始化 Outlook 服务 - - Args: - config: 配置字典,支持以下键: - - accounts: Outlook 账户列表 - - provider_priority: 提供者优先级列表 - - health_failure_threshold: 连续失败次数阈值 - - health_disable_duration: 禁用时长(秒) - - timeout: 请求超时时间 - - proxy_url: 代理 URL - name: 服务名称 - """ super().__init__(ServiceType.OUTLOOK, name) - # 默认配置 default_config = { "accounts": [], - "provider_priority": [p.value for p in DEFAULT_PROVIDER_PRIORITY], "health_failure_threshold": 5, "health_disable_duration": 60, "timeout": 30, "proxy_url": None, } - self.config = {**default_config, **(config or {})} - # 解析提供者优先级 - self.provider_priority = [ - ProviderType(p) for p in self.config.get("provider_priority", []) - ] - if not self.provider_priority: - self.provider_priority = DEFAULT_PROVIDER_PRIORITY - - # 提供者配置 self.provider_config = ProviderConfig( timeout=self.config.get("timeout", 30), proxy_url=self.config.get("proxy_url"), + service_id=self.config.get("service_id"), health_failure_threshold=self.config.get("health_failure_threshold", 3), health_disable_duration=self.config.get("health_disable_duration", 300), ) - # 获取默认 client_id(供无 client_id 的账户使用) + # 获取默认 client_id try: _default_client_id = get_settings().outlook_default_client_id except Exception: @@ -103,193 +95,120 @@ class OutlookService(BaseEmailService): self._current_account_index = 0 self._account_lock = threading.Lock() - # 支持两种配置格式 if "email" in self.config and "password" in self.config: account = OutlookAccount.from_config(self.config) if not account.client_id and _default_client_id: account.client_id = _default_client_id if account.validate(): - self.accounts.append(account) + if not account.has_oauth(): + logger.warning( + f"[{account.email}] 跳过:IMAP_NEW 仅支持 OAuth2," + f"请配置 client_id 和 refresh_token" + ) + else: + self.accounts.append(account) else: - for account_config in self.config.get("accounts", []): - account = OutlookAccount.from_config(account_config) + for ac in self.config.get("accounts", []): + account = OutlookAccount.from_config(ac) if not account.client_id and _default_client_id: account.client_id = _default_client_id if account.validate(): - self.accounts.append(account) + if not account.has_oauth(): + logger.warning( + f"[{account.email}] 跳过:IMAP_NEW 仅支持 OAuth2," + f"请配置 client_id 和 refresh_token" + ) + else: + self.accounts.append(account) if not self.accounts: - logger.warning("未配置有效的 Outlook 账户") + logger.warning("未配置有效的 Outlook 账户(需要 client_id + refresh_token)") - # 健康检查器和故障切换管理器 + # 健康检查器 self.health_checker = HealthChecker( failure_threshold=self.provider_config.health_failure_threshold, disable_duration=self.provider_config.health_disable_duration, ) - self.failover_manager = FailoverManager( - health_checker=self.health_checker, - priority_order=self.provider_priority, - ) # 邮件解析器 self.email_parser = get_email_parser() - # 提供者实例缓存: (email, provider_type) -> OutlookProvider - self._providers: Dict[tuple, OutlookProvider] = {} + # Provider 实例缓存: email -> IMAPNewProvider + self._providers: Dict[str, IMAPNewProvider] = {} self._provider_lock = threading.Lock() - # IMAP 连接限制(防止限流) + # IMAP 并发限制(最多 5 个并发) self._imap_semaphore = threading.Semaphore(5) - # 验证码去重机制 + # 邮件缓存 + self._email_cache = _EmailCache() + + # 验证码去重 self._used_codes: Dict[str, set] = {} - def _get_provider( - self, - account: OutlookAccount, - provider_type: ProviderType, - ) -> OutlookProvider: - """ - 获取或创建提供者实例 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - - Returns: - 提供者实例 - """ - cache_key = (account.email.lower(), provider_type) - + def _get_provider(self, account: OutlookAccount) -> IMAPNewProvider: + key = account.email.lower() with self._provider_lock: - if cache_key not in self._providers: - provider = self._create_provider(account, provider_type) - self._providers[cache_key] = provider + if key not in self._providers: + self._providers[key] = IMAPNewProvider(account, self.provider_config) + return self._providers[key] - return self._providers[cache_key] - - def _create_provider( + def _fetch_emails( self, account: OutlookAccount, - provider_type: ProviderType, - ) -> OutlookProvider: - """ - 创建提供者实例 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - - Returns: - 提供者实例 - """ - if provider_type == ProviderType.IMAP_OLD: - return IMAPOldProvider(account, self.provider_config) - elif provider_type == ProviderType.IMAP_NEW: - return IMAPNewProvider(account, self.provider_config) - elif provider_type == ProviderType.GRAPH_API: - return GraphAPIProvider(account, self.provider_config) - else: - raise ValueError(f"未知的提供者类型: {provider_type}") - - def _get_provider_priority_for_account(self, account: OutlookAccount) -> List[ProviderType]: - """根据账户是否有 OAuth,返回适合的提供者优先级列表""" - if account.has_oauth(): - return self.provider_priority - else: - # 无 OAuth,直接走旧版 IMAP(密码认证),跳过需要 OAuth 的提供者 - return [ProviderType.IMAP_OLD] - - def _try_providers_for_emails( - self, - account: OutlookAccount, - count: int = 20, + count: int = 15, only_unseen: bool = True, + since_minutes: Optional[int] = None, + use_cache: bool = False, + folders: Optional[List[str]] = None, ) -> List[EmailMessage]: - """ - 尝试多个提供者获取邮件 + """通过 IMAP_NEW Provider 获取邮件,可选使用内存缓存""" + if use_cache: + cached = self._email_cache.get(account.email) + if cached is not None: + return cached - Args: - account: Outlook 账户 - count: 获取数量 - only_unseen: 是否只获取未读 + if not self.health_checker.is_available(): + logger.debug(f"[{account.email}] IMAP_NEW 不可用,跳过") + return [] - Returns: - 邮件列表 - """ - errors = [] + try: + provider = self._get_provider(account) + with self._imap_semaphore: + with provider: + emails = provider.get_recent_emails( + count, only_unseen, since_minutes=since_minutes, folders=folders + ) - # 根据账户类型选择合适的提供者优先级 - priority = self._get_provider_priority_for_account(account) + if emails: + self.health_checker.record_success() + if use_cache: + self._email_cache.set(account.email, emails) + return emails - # 按优先级尝试各提供者 - for provider_type in priority: - # 检查提供者是否可用 - if not self.health_checker.is_available(provider_type): - logger.debug( - f"[{account.email}] {provider_type.value} 不可用,跳过" - ) - continue - - try: - provider = self._get_provider(account, provider_type) - - with self._imap_semaphore: - with provider: - emails = provider.get_recent_emails(count, only_unseen) - - if emails: - # 成功获取邮件 - self.health_checker.record_success(provider_type) - logger.debug( - f"[{account.email}] {provider_type.value} 获取到 {len(emails)} 封邮件" - ) - return emails - - except Exception as e: - error_msg = str(e) - errors.append(f"{provider_type.value}: {error_msg}") - self.health_checker.record_failure(provider_type, error_msg) - logger.warning( - f"[{account.email}] {provider_type.value} 获取邮件失败: {e}" - ) - - logger.error( - f"[{account.email}] 所有提供者都失败: {'; '.join(errors)}" - ) - return [] + except Exception as e: + err = str(e) + self.health_checker.record_failure(err) + logger.warning(f"[{account.email}] 获取邮件失败: {e}") + return [] def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: - """ - 选择可用的 Outlook 账户 - - Args: - config: 配置参数(未使用) - - Returns: - 包含邮箱信息的字典 - """ + """轮询选择可用的 Outlook 账户""" if not self.accounts: self.update_status(False, EmailServiceError("没有可用的 Outlook 账户")) raise EmailServiceError("没有可用的 Outlook 账户") - # 轮询选择账户 with self._account_lock: account = self.accounts[self._current_account_index] self._current_account_index = (self._current_account_index + 1) % len(self.accounts) - email_info = { - "email": account.email, - "service_id": account.email, - "account": { - "email": account.email, - "has_oauth": account.has_oauth() - } - } - logger.info(f"选择 Outlook 账户: {account.email}") self.update_status(True) - return email_info + return { + "email": account.email, + "service_id": account.email, + "account": {"email": account.email, "has_oauth": account.has_oauth()}, + } def get_verification_code( self, @@ -299,114 +218,185 @@ class OutlookService(BaseEmailService): pattern: str = None, otp_sent_at: Optional[float] = None, ) -> Optional[str]: - """ - 从 Outlook 邮箱获取验证码 - - Args: - email: 邮箱地址 - email_id: 未使用 - timeout: 超时时间(秒) - pattern: 验证码正则表达式(未使用) - otp_sent_at: OTP 发送时间戳 - - Returns: - 验证码字符串 - """ - # 查找对应的账户 - account = None - for acc in self.accounts: - if acc.email.lower() == email.lower(): - account = acc - break - + """从 Outlook 邮箱获取验证码""" + account = next( + (a for a in self.accounts if a.email.lower() == email.lower()), None + ) if not account: - self.update_status(False, EmailServiceError(f"未找到邮箱对应的账户: {email}")) + self.update_status(False, EmailServiceError(f"未找到邮箱账户: {email}")) return None - # 获取验证码等待配置 - code_settings = get_email_code_settings() + code_settings = _get_code_settings() actual_timeout = timeout or code_settings["timeout"] poll_interval = code_settings["poll_interval"] - logger.info( - f"[{email}] 开始获取验证码,超时 {actual_timeout}s," - f"提供者优先级: {[p.value for p in self.provider_priority]}" - ) + logger.info(f"[{email}] 开始获取验证码,超时 {actual_timeout}s") - # 初始化验证码去重集合 if email not in self._used_codes: self._used_codes[email] = set() used_codes = self._used_codes[email] - # 计算最小时间戳(留出 60 秒时钟偏差) min_timestamp = (otp_sent_at - 60) if otp_sent_at else 0 + use_idle = True + try: + use_idle = get_settings().outlook_use_idle + except Exception: + pass + + if use_idle: + code = self._wait_with_idle( + account, email, actual_timeout, min_timestamp, used_codes, otp_sent_at + ) + else: + code = self._wait_with_poll( + account, email, actual_timeout, poll_interval, min_timestamp, used_codes, otp_sent_at + ) + + if code: + used_codes.add(code) + self.update_status(True) + return code + return None + + def _wait_with_poll( + self, + account: OutlookAccount, + email: str, + timeout: int, + poll_interval: int, + min_timestamp: float, + used_codes: set, + otp_sent_at: Optional[float] = None, + ) -> Optional[str]: + """轮询方式等待验证码""" start_time = time.time() poll_count = 0 - while time.time() - start_time < actual_timeout: + while time.time() - start_time < timeout: poll_count += 1 - - # 渐进式邮件检查:前 3 次只检查未读 - only_unseen = poll_count <= 3 - + # 每次动态计算 since_minutes,确保时间窗口随轮询推进而更新 + if otp_sent_at: + elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2 + since_minutes: Optional[int] = min(elapsed_since_send, 180) + only_unseen = False + else: + since_minutes = None + only_unseen = poll_count <= 3 try: - # 尝试多个提供者获取邮件 - emails = self._try_providers_for_emails( - account, - count=15, - only_unseen=only_unseen, + emails = self._fetch_emails( + account, count=15, only_unseen=only_unseen, + since_minutes=since_minutes, + folders=_OUTLOOK_SEARCH_FOLDERS, ) - if emails: - logger.debug( - f"[{email}] 第 {poll_count} 次轮询获取到 {len(emails)} 封邮件" - ) - - # 从邮件中查找验证码 code = self.email_parser.find_verification_code_in_emails( emails, target_email=email, min_timestamp=min_timestamp, used_codes=used_codes, ) - if code: - used_codes.add(code) elapsed = int(time.time() - start_time) logger.info( - f"[{email}] 找到验证码: {code}," - f"总耗时 {elapsed}s,轮询 {poll_count} 次" + f"[{email}] 找到验证码: {code},耗时 {elapsed}s,轮询 {poll_count} 次" ) - self.update_status(True) return code - except Exception as e: - logger.warning(f"[{email}] 检查出错: {e}") + logger.warning(f"[{email}] 轮询出错: {e}") - # 等待下次轮询 time.sleep(poll_interval) - elapsed = int(time.time() - start_time) - logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count} 次") + logger.warning(f"[{email}] 验证码超时 ({timeout}s),共轮询 {poll_count} 次") return None - def list_emails(self, **kwargs) -> List[Dict[str, Any]]: - """列出所有可用的 Outlook 账户""" - return [ - { - "email": account.email, - "id": account.email, - "has_oauth": account.has_oauth(), - "type": "outlook" - } - for account in self.accounts - ] + def _wait_with_idle( + self, + account: OutlookAccount, + email: str, + timeout: int, + min_timestamp: float, + used_codes: set, + otp_sent_at: Optional[float] = None, + ) -> Optional[str]: + """IMAP IDLE 方式等待验证码,失败时自动降级为轮询""" + if not self.health_checker.is_available(): + logger.warning(f"[{email}] IMAP_NEW 不可用,降级为轮询") + return self._wait_with_poll( + account, email, timeout, 3, min_timestamp, used_codes, otp_sent_at + ) - def delete_email(self, email_id: str) -> bool: - """删除邮箱(Outlook 不支持删除账户)""" - logger.warning(f"Outlook 服务不支持删除账户: {email_id}") - return False + # 计算 since_minutes:从发送时间前2分钟开始,最多180分钟 + since_minutes: Optional[int] = None + if otp_sent_at: + elapsed_since_send = int((time.time() - otp_sent_at) / 60) + 2 + since_minutes = min(elapsed_since_send, 180) + + start_time = time.time() + try: + provider = self._get_provider(account) + with self._imap_semaphore: + with provider: + # 先做一次即时检查 + emails = provider.get_recent_emails( + 15, only_unseen=(since_minutes is None), since_minutes=since_minutes, + folders=_OUTLOOK_SEARCH_FOLDERS, + ) + code = self.email_parser.find_verification_code_in_emails( + emails, + target_email=email, + min_timestamp=min_timestamp, + used_codes=used_codes, + ) + if code: + elapsed = int(time.time() - start_time) + logger.info(f"[{email}] 找到验证码: {code},耗时 {elapsed}s(即时检查)") + return code + + # IDLE 等待循环 + while time.time() - start_time < timeout: + remaining = int(timeout - (time.time() - start_time)) + if remaining <= 0: + break + arrived = provider.wait_for_new_email_idle(timeout=min(remaining, 25)) + # 无效化缓存,强制重新拉取 + self._email_cache.invalidate(email) + # IDLE 触发后用 since_minutes 搜索,覆盖已读邮件 + fetch_since = since_minutes + if fetch_since is None: + # 没有 otp_sent_at 时,用距当前时间2分钟内的邮件 + fetch_since = 2 + emails = provider.get_recent_emails( + 15, only_unseen=False, since_minutes=fetch_since, + folders=_OUTLOOK_SEARCH_FOLDERS, + ) + code = self.email_parser.find_verification_code_in_emails( + emails, + target_email=email, + min_timestamp=min_timestamp, + used_codes=used_codes, + ) + if code: + elapsed = int(time.time() - start_time) + logger.info( + f"[{email}] 找到验证码: {code},耗时 {elapsed}s" + f"(IDLE {'推送' if arrived else '超时检查'})" + ) + return code + + except Exception as e: + logger.warning(f"[{email}] IDLE 失败,降级为轮询: {e}") + elapsed = int(time.time() - start_time) + remaining = max(0, timeout - elapsed) + if remaining > 0: + code_settings = _get_code_settings() + return self._wait_with_poll( + account, email, remaining, + code_settings["poll_interval"], min_timestamp, used_codes, otp_sent_at + ) + + logger.warning(f"[{email}] IDLE 等待验证码超时 ({timeout}s)") + return None def check_health(self) -> bool: """检查 Outlook 服务是否可用""" @@ -414,48 +404,48 @@ class OutlookService(BaseEmailService): self.update_status(False, EmailServiceError("没有配置的账户")) return False - # 测试第一个账户的连接 - test_account = self.accounts[0] - - # 尝试任一提供者连接 - for provider_type in self.provider_priority: - try: - provider = self._get_provider(test_account, provider_type) - if provider.test_connection(): - self.update_status(True) - return True - except Exception as e: - logger.warning( - f"Outlook 健康检查失败 ({test_account.email}, {provider_type.value}): {e}" - ) + try: + provider = self._get_provider(self.accounts[0]) + if provider.test_connection(): + self.update_status(True) + return True + except Exception as e: + logger.warning(f"Outlook 健康检查失败: {e}") self.update_status(False, EmailServiceError("健康检查失败")) return False - def get_provider_status(self) -> Dict[str, Any]: - """获取提供者状态""" - return self.failover_manager.get_status() + def list_emails(self, **kwargs) -> List[Dict[str, Any]]: + return [ + { + "email": a.email, + "id": a.email, + "has_oauth": a.has_oauth(), + "type": "outlook", + } + for a in self.accounts + ] + + def delete_email(self, email_id: str) -> bool: + logger.warning(f"Outlook 服务不支持删除账户: {email_id}") + return False def get_account_stats(self) -> Dict[str, Any]: - """获取账户统计信息""" total = len(self.accounts) - oauth_count = sum(1 for acc in self.accounts if acc.has_oauth()) - + oauth_count = sum(1 for a in self.accounts if a.has_oauth()) return { "total_accounts": total, "oauth_accounts": oauth_count, "password_accounts": total - oauth_count, - "accounts": [acc.to_dict() for acc in self.accounts], - "provider_status": self.get_provider_status(), + "accounts": [a.to_dict() for a in self.accounts], + "health_status": self.health_checker.get_status(), } def add_account(self, account_config: Dict[str, Any]) -> bool: - """添加新的 Outlook 账户""" try: account = OutlookAccount.from_config(account_config) if not account.validate(): return False - self.accounts.append(account) logger.info(f"添加 Outlook 账户: {account.email}") return True @@ -464,24 +454,13 @@ class OutlookService(BaseEmailService): return False def remove_account(self, email: str) -> bool: - """移除 Outlook 账户""" - for i, acc in enumerate(self.accounts): - if acc.email.lower() == email.lower(): + for i, a in enumerate(self.accounts): + if a.email.lower() == email.lower(): self.accounts.pop(i) logger.info(f"移除 Outlook 账户: {email}") return True return False - def reset_provider_health(self): - """重置所有提供者的健康状态""" - self.health_checker.reset_all() - logger.info("已重置所有提供者的健康状态") - - def force_provider(self, provider_type: ProviderType): - """强制使用指定的提供者""" - self.health_checker.force_enable(provider_type) - # 禁用其他提供者 - for pt in ProviderType: - if pt != provider_type: - self.health_checker.force_disable(pt, 60) - logger.info(f"已强制使用提供者: {provider_type.value}") + def reset_health(self): + self.health_checker.reset() + logger.info("已重置 IMAP_NEW 健康状态") diff --git a/src/services/outlook/token_manager.py b/src/services/outlook/token_manager.py index 77e54f2..0c23536 100644 --- a/src/services/outlook/token_manager.py +++ b/src/services/outlook/token_manager.py @@ -1,6 +1,6 @@ """ -Token 管理器 -支持多个 Microsoft Token 端点,自动选择合适的端点 +Token 管理器(简化版) +固定使用 consumers 端点 + IMAP scope """ import json @@ -11,153 +11,98 @@ from typing import Dict, Optional, Any from curl_cffi import requests as _requests -from .base import ProviderType, TokenEndpoint, TokenInfo +from .base import TokenInfo from .account import OutlookAccount logger = logging.getLogger(__name__) - -# 各提供者的 Scope 配置 -PROVIDER_SCOPES = { - ProviderType.IMAP_OLD: "", # 旧版 IMAP 不需要特定 scope - ProviderType.IMAP_NEW: "https://outlook.office.com/IMAP.AccessAsUser.All offline_access", - ProviderType.GRAPH_API: "https://graph.microsoft.com/.default", -} - -# 各提供者的 Token 端点 -PROVIDER_TOKEN_URLS = { - ProviderType.IMAP_OLD: TokenEndpoint.LIVE.value, - ProviderType.IMAP_NEW: TokenEndpoint.CONSUMERS.value, - ProviderType.GRAPH_API: TokenEndpoint.COMMON.value, -} +TOKEN_URL = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" +IMAP_SCOPE = "https://outlook.office.com/IMAP.AccessAsUser.All offline_access" class TokenManager: """ Token 管理器 - 支持多端点 Token 获取和缓存 + 固定 consumers 端点,缓存 key = email """ - # Token 缓存: key = (email, provider_type) -> TokenInfo - _token_cache: Dict[tuple, TokenInfo] = {} + _token_cache: Dict[str, TokenInfo] = {} _cache_lock = threading.Lock() - # 默认超时时间 DEFAULT_TIMEOUT = 30 - # Token 刷新提前时间(秒) REFRESH_BUFFER = 120 def __init__( self, account: OutlookAccount, - provider_type: ProviderType, proxy_url: Optional[str] = None, timeout: int = DEFAULT_TIMEOUT, + service_id: Optional[int] = None, ): - """ - 初始化 Token 管理器 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - proxy_url: 代理 URL(可选) - timeout: 请求超时时间 - """ self.account = account - self.provider_type = provider_type self.proxy_url = proxy_url self.timeout = timeout + self.service_id = service_id - # 获取端点和 Scope - self.token_url = PROVIDER_TOKEN_URLS.get(provider_type, TokenEndpoint.LIVE.value) - self.scope = PROVIDER_SCOPES.get(provider_type, "") + def _cache_key(self) -> str: + return self.account.email.lower() def get_cached_token(self) -> Optional[TokenInfo]: - """获取缓存的 Token""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - token = self._token_cache.get(cache_key) + token = self._token_cache.get(self._cache_key()) if token and not token.is_expired(self.REFRESH_BUFFER): return token return None def set_cached_token(self, token: TokenInfo): - """缓存 Token""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - self._token_cache[cache_key] = token + self._token_cache[self._cache_key()] = token def clear_cache(self): - """清除缓存""" - cache_key = (self.account.email.lower(), self.provider_type) with self._cache_lock: - self._token_cache.pop(cache_key, None) + self._token_cache.pop(self._cache_key(), None) def get_access_token(self, force_refresh: bool = False) -> Optional[str]: - """ - 获取 Access Token - - Args: - force_refresh: 是否强制刷新 - - Returns: - Access Token 字符串,失败返回 None - """ - # 检查缓存 if not force_refresh: cached = self.get_cached_token() if cached: - logger.debug(f"[{self.account.email}] 使用缓存的 Token ({self.provider_type.value})") + logger.debug(f"[{self.account.email}] 使用缓存 Token") return cached.access_token - # 刷新 Token try: token = self._refresh_token() if token: self.set_cached_token(token) return token.access_token except Exception as e: - logger.error(f"[{self.account.email}] 获取 Token 失败 ({self.provider_type.value}): {e}") + logger.error(f"[{self.account.email}] 获取 Token 失败: {e}") return None def _refresh_token(self) -> Optional[TokenInfo]: - """ - 刷新 Token - - Returns: - TokenInfo 对象,失败返回 None - """ if not self.account.client_id or not self.account.refresh_token: raise ValueError("缺少 client_id 或 refresh_token") - logger.debug(f"[{self.account.email}] 正在刷新 Token ({self.provider_type.value})...") - logger.debug(f"[{self.account.email}] Token URL: {self.token_url}") + logger.debug(f"[{self.account.email}] 正在刷新 Token...") - # 构建请求体 data = { "client_id": self.account.client_id, "refresh_token": self.account.refresh_token, "grant_type": "refresh_token", + "scope": IMAP_SCOPE, } - - # 添加 Scope(如果需要) - if self.scope: - data["scope"] = self.scope - headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", } - proxies = None if self.proxy_url: proxies = {"http": self.proxy_url, "https": self.proxy_url} try: resp = _requests.post( - self.token_url, + TOKEN_URL, data=data, headers=headers, proxies=proxies, @@ -166,74 +111,56 @@ class TokenManager: ) if resp.status_code != 200: - error_body = resp.text + body = resp.text logger.error(f"[{self.account.email}] Token 刷新失败: HTTP {resp.status_code}") - logger.debug(f"[{self.account.email}] 错误响应: {error_body[:500]}") - - if "service abuse" in error_body.lower(): + if "service abuse" in body.lower(): logger.warning(f"[{self.account.email}] 账号可能被封禁") - elif "invalid_grant" in error_body.lower(): + elif "invalid_grant" in body.lower(): logger.warning(f"[{self.account.email}] Refresh Token 已失效") - return None response_data = resp.json() - - # 解析响应 - token = TokenInfo.from_response(response_data, self.scope) + token = TokenInfo.from_response(response_data, IMAP_SCOPE) logger.info( - f"[{self.account.email}] Token 刷新成功 ({self.provider_type.value}), " + f"[{self.account.email}] Token 刷新成功," f"有效期 {int(token.expires_at - time.time())} 秒" ) + + # 若响应含新 refresh_token → 写回内存 + 持久化数据库 + new_rt = response_data.get("refresh_token", "") + if new_rt and new_rt != self.account.refresh_token: + self.account.refresh_token = new_rt + if self.service_id: + try: + from ...database.session import get_session_manager + from ...database.crud import update_outlook_refresh_token + with get_session_manager().session_scope() as db: + update_outlook_refresh_token( + db, self.service_id, self.account.email, new_rt + ) + logger.info(f"[{self.account.email}] refresh_token 已写回数据库") + except Exception as e: + logger.warning(f"[{self.account.email}] 写回 refresh_token 失败: {e}") + return token except json.JSONDecodeError as e: logger.error(f"[{self.account.email}] JSON 解析错误: {e}") return None - except Exception as e: logger.error(f"[{self.account.email}] 未知错误: {e}") return None @classmethod def clear_all_cache(cls): - """清除所有 Token 缓存""" with cls._cache_lock: cls._token_cache.clear() logger.info("已清除所有 Token 缓存") @classmethod def get_cache_stats(cls) -> Dict[str, Any]: - """获取缓存统计""" with cls._cache_lock: return { "cache_size": len(cls._token_cache), - "entries": [ - { - "email": key[0], - "provider": key[1].value, - } - for key in cls._token_cache.keys() - ], + "entries": list(cls._token_cache.keys()), } - - -def create_token_manager( - account: OutlookAccount, - provider_type: ProviderType, - proxy_url: Optional[str] = None, - timeout: int = TokenManager.DEFAULT_TIMEOUT, -) -> TokenManager: - """ - 创建 Token 管理器的工厂函数 - - Args: - account: Outlook 账户 - provider_type: 提供者类型 - proxy_url: 代理 URL - timeout: 超时时间 - - Returns: - TokenManager 实例 - """ - return TokenManager(account, provider_type, proxy_url, timeout) diff --git a/src/web/routes/accounts.py b/src/web/routes/accounts.py index 7b4aa6f..c559012 100644 --- a/src/web/routes/accounts.py +++ b/src/web/routes/accounts.py @@ -1028,9 +1028,11 @@ def _build_inbox_config(db, service_type, email: str) -> dict: EmailServiceModel.enabled == True ) if service_type == EST.OUTLOOK: - # 按 config.email 匹配账号 email - services = query.all() - svc = next((s for s in services if (s.config or {}).get("email") == email), None) + # 按 config.email 精确匹配,不受 enabled 限制(收件箱是账号自己的邮箱) + all_outlook = db.query(EmailServiceModel).filter( + EmailServiceModel.service_type == db_type + ).all() + svc = next((s for s in all_outlook if (s.config or {}).get("email", "").lower() == email.lower()), None) else: svc = query.order_by(EmailServiceModel.priority.asc()).first() diff --git a/src/web/routes/email.py b/src/web/routes/email.py index 5f0123c..fb67149 100644 --- a/src/web/routes/email.py +++ b/src/web/routes/email.py @@ -67,7 +67,7 @@ class ServiceTestResult(BaseModel): class OutlookBatchImportRequest(BaseModel): """Outlook 批量导入请求""" - data: str # 多行数据,每行格式: 邮箱----密码 或 邮箱----密码----client_id----refresh_token + data: str # 多行数据,每行格式: 邮箱----密码----client_id----refresh_token enabled: bool = True priority: int = 0 @@ -461,11 +461,8 @@ async def batch_import_outlook(request: OutlookBatchImportRequest): """ 批量导入 Outlook 邮箱账户 - 支持两种格式: - - 格式一(密码认证):邮箱----密码 - - 格式二(XOAUTH2 认证):邮箱----密码----client_id----refresh_token - - 每行一个账户,使用四个连字符(----)分隔字段 + 格式(每行):邮箱----密码----client_id----refresh_token + 使用四个连字符(----)分隔字段 """ lines = request.data.strip().split("\n") total = len(lines) @@ -484,14 +481,18 @@ async def batch_import_outlook(request: OutlookBatchImportRequest): parts = line.split("----") - # 验证格式 - if len(parts) < 2: + # 必须是四字段格式 + if len(parts) < 4: failed += 1 - errors.append(f"行 {i+1}: 格式错误,至少需要邮箱和密码") + errors.append( + f"行 {i+1}: 格式错误,必须为 邮箱----密码----client_id----refresh_token" + ) continue email = parts[0].strip() password = parts[1].strip() + client_id = parts[2].strip() + refresh_token = parts[3].strip() # 验证邮箱格式 if "@" not in email: @@ -499,6 +500,12 @@ async def batch_import_outlook(request: OutlookBatchImportRequest): errors.append(f"行 {i+1}: 无效的邮箱地址: {email}") continue + # 验证 OAuth 字段非空 + if not client_id or not refresh_token: + failed += 1 + errors.append(f"行 {i+1}: [{email}] client_id 或 refresh_token 不能为空") + continue + # 检查是否已存在 existing = db.query(EmailServiceModel).filter( EmailServiceModel.service_type == "outlook", @@ -513,17 +520,11 @@ async def batch_import_outlook(request: OutlookBatchImportRequest): # 构建配置 config = { "email": email, - "password": password + "password": password, + "client_id": client_id, + "refresh_token": refresh_token, } - # 检查是否有 OAuth 信息(格式二) - if len(parts) >= 4: - client_id = parts[2].strip() - refresh_token = parts[3].strip() - if client_id and refresh_token: - config["client_id"] = client_id - config["refresh_token"] = refresh_token - # 创建服务记录 try: service = EmailServiceModel( @@ -608,3 +609,86 @@ async def test_tempmail_service(request: TempmailTestRequest): except Exception as e: logger.error(f"测试临时邮箱失败: {e}") return {"success": False, "message": f"测试失败: {str(e)}"} + + +# ============== 收件箱 ============== + +@router.get("/{service_id}/inbox") +async def get_outlook_inbox( + service_id: int, + count: int = Query(30, ge=1, le=100), + only_unseen: bool = Query(False), +): + """获取 Outlook 收件箱邮件列表""" + with get_db() as db: + service = db.query(EmailServiceModel).filter(EmailServiceModel.id == service_id).first() + if not service: + raise HTTPException(status_code=404, detail="服务不存在") + if service.service_type != "outlook": + raise HTTPException(status_code=400, detail="仅支持 Outlook 类型服务") + + config = service.config or {} + email_addr = config.get("email", "") + client_id = config.get("client_id", "") + refresh_token = config.get("refresh_token", "") + + # client_id 为空时尝试使用全局默认值 + if not client_id: + from ...config.settings import get_settings + client_id = get_settings().outlook_default_client_id or "" + + if not client_id or not refresh_token: + raise HTTPException(status_code=400, detail="该账户缺少 OAuth 配置(client_id / refresh_token),无法读取收件箱") + + try: + from ...services.outlook.account import OutlookAccount + from ...services.outlook.token_manager import TokenManager + from ...services.outlook.providers.imap_new import IMAPNewProvider + from ...services.outlook.providers.base import ProviderConfig + + account = OutlookAccount( + email=email_addr, + password=config.get("password", ""), + client_id=client_id, + refresh_token=refresh_token, + ) + provider_config = ProviderConfig( + proxy_url=None, + timeout=30, + service_id=service_id, + ) + provider = IMAPNewProvider(account, provider_config) + + connected = provider.connect() + if not connected: + raise HTTPException(status_code=502, detail="IMAP 连接失败,请检查 OAuth 配置") + + try: + messages = provider.get_recent_emails(count=count, only_unseen=only_unseen) + finally: + provider.disconnect() + + emails = [] + for m in messages: + received_str = m.received_at.isoformat() if m.received_at else None + emails.append({ + "id": m.id or "", + "subject": m.subject or "", + "sender": m.sender or "", + "received_at": received_str, + "body_preview": m.body_preview or (m.body or "")[:200], + "body": m.body or "", + "is_read": m.is_read, + }) + + return { + "email": email_addr, + "total": len(emails), + "emails": emails, + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"获取收件箱失败 service_id={service_id}: {e}") + raise HTTPException(status_code=500, detail=f"获取收件箱失败: {str(e)}") diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index 6f5896f..603b5b3 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -15,7 +15,8 @@ from pydantic import BaseModel, Field from ...database import crud from ...database.session import get_db from ...database.models import RegistrationTask, Proxy -from ...core.register import RegistrationEngine, RegistrationResult +from ...core.login import LoginEngine +from ...core.register import RegistrationResult from ...services import EmailServiceFactory, EmailServiceType from ...config.settings import get_settings from ..task_manager import task_manager @@ -340,6 +341,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: if selected_service and selected_service.config: config = selected_service.config.copy() + config['service_id'] = selected_service.id crud.update_registration_task(db, task_uuid, email_service_id=selected_service.id) logger.info(f"使用数据库 Outlook 账户: {selected_service.name}") else: @@ -394,7 +396,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: # 创建注册引擎 - 使用 TaskManager 的日志回调 log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id) - engine = RegistrationEngine( + engine = LoginEngine( email_service=email_service, proxy_url=actual_proxy_url, callback_logger=log_callback, diff --git a/src/web/routes/settings.py b/src/web/routes/settings.py index d89a5f2..59350b3 100644 --- a/src/web/routes/settings.py +++ b/src/web/routes/settings.py @@ -720,7 +720,6 @@ async def get_outlook_settings(): return { "default_client_id": settings.outlook_default_client_id, - "provider_priority": settings.outlook_provider_priority, "health_failure_threshold": settings.outlook_health_failure_threshold, "health_disable_duration": settings.outlook_health_disable_duration, } diff --git a/src/web/task_manager.py b/src/web/task_manager.py index 31c620b..ed722d4 100644 --- a/src/web/task_manager.py +++ b/src/web/task_manager.py @@ -191,13 +191,23 @@ class TaskManager: return _log_queues.get(task_uuid, []).copy() def update_status(self, task_uuid: str, status: str, **kwargs): - """更新任务状态""" + """更新任务状态并推送到 WebSocket""" if task_uuid not in _task_status: _task_status[task_uuid] = {} _task_status[task_uuid]["status"] = status _task_status[task_uuid].update(kwargs) + # 推送状态变更到 WebSocket(线程安全,兼容同步线程调用) + if self._loop and self._loop.is_running(): + try: + asyncio.run_coroutine_threadsafe( + self.broadcast_status(task_uuid, status, **kwargs), + self._loop + ) + except Exception as e: + logger.warning(f"推送状态到 WebSocket 失败: {e}") + def get_status(self, task_uuid: str) -> Optional[dict]: """获取任务状态""" return _task_status.get(task_uuid) diff --git a/static/js/app.js b/static/js/app.js index 543dc0b..4e1132b 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -1049,7 +1049,6 @@ function resetButtons() { elements.cancelBtn.disabled = true; currentTask = null; currentBatch = null; - isBatchMode = false; // 重置完成标志 taskCompleted = false; batchCompleted = false; @@ -1275,12 +1274,13 @@ function connectBatchWebSocket(batchId) { if (!toastShown) { toastShown = true; if (data.status === 'completed') { - addLog('success', `[完成] Outlook 批量任务完成!成功: ${data.success}, 失败: ${data.failed}, 跳过: ${data.skipped || 0}`); + const batchLabel = isOutlookBatchMode ? 'Outlook 批量' : '批量'; + addLog('success', `[完成] ${batchLabel}任务完成!成功: ${data.success}, 失败: ${data.failed}, 跳过: ${data.skipped || 0}`); if (data.success > 0) { - toast.success(`Outlook 批量注册完成,成功 ${data.success} 个`); + toast.success(`${batchLabel}注册完成,成功 ${data.success} 个`); loadRecentAccounts(); } else { - toast.warning('Outlook 批量注册完成,但没有成功注册任何账号'); + toast.warning(`${batchLabel}注册完成,但没有成功注册任何账号`); } } else if (data.status === 'failed') { addLog('error', '[错误] 批量任务执行失败'); diff --git a/static/js/email_services.js b/static/js/email_services.js index fafd85b..a53e96f 100644 --- a/static/js/email_services.js +++ b/static/js/email_services.js @@ -72,6 +72,25 @@ const elements = { editOutlookForm: document.getElementById('edit-outlook-form'), closeEditOutlookModal: document.getElementById('close-edit-outlook-modal'), cancelEditOutlook: document.getElementById('cancel-edit-outlook'), + + // 收件箱模态框 + inboxModal: document.getElementById('inbox-modal'), + closeInboxModal: document.getElementById('close-inbox-modal'), + inboxRefreshBtn: document.getElementById('inbox-refresh-btn'), + inboxOnlyUnseen: document.getElementById('inbox-only-unseen'), + inboxLoading: document.getElementById('inbox-loading'), + inboxTable: document.getElementById('inbox-table'), + inboxTbody: document.getElementById('inbox-tbody'), + inboxEmpty: document.getElementById('inbox-empty'), + inboxModalEmail: document.getElementById('inbox-modal-email'), + + // 邮件正文模态框 + emailDetailModal: document.getElementById('email-detail-modal'), + closeEmailDetailModal: document.getElementById('close-email-detail-modal'), + emailDetailSubject: document.getElementById('email-detail-subject'), + emailDetailSender: document.getElementById('email-detail-sender'), + emailDetailDate: document.getElementById('email-detail-date'), + emailDetailBody: document.getElementById('email-detail-body'), }; const CUSTOM_SUBTYPE_LABELS = { @@ -164,6 +183,12 @@ function initEventListeners() { document.addEventListener('click', () => { document.querySelectorAll('.dropdown-menu.active').forEach(m => m.classList.remove('active')); }); + + // 收件箱模态框事件 + elements.closeInboxModal.addEventListener('click', () => elements.inboxModal.classList.remove('active')); + elements.closeEmailDetailModal.addEventListener('click', () => elements.emailDetailModal.classList.remove('active')); + elements.inboxRefreshBtn.addEventListener('click', () => loadInbox(currentInboxServiceId, true)); + elements.inboxOnlyUnseen.addEventListener('change', () => loadInbox(currentInboxServiceId)); } function toggleEmailMoreMenu(btn) { @@ -247,6 +272,7 @@ async function loadOutlookServices() { ${format.date(service.last_used)}
+