fix(harden): isolate resource cleanup and self-healing flow

This commit is contained in:
Mison
2026-03-23 11:25:51 +08:00
parent cf571d37c1
commit 43149ff079
7 changed files with 1542 additions and 103 deletions

View File

@@ -6,6 +6,7 @@ import asyncio
import logging
import uuid
import random
import re
from datetime import datetime
from typing import List, Optional, Dict, Tuple, Any
@@ -32,7 +33,16 @@ batch_tasks: Dict[str, dict] = {}
# ============== Proxy Helper Functions ==============
def get_proxy_for_registration(db) -> Tuple[Optional[str], Optional[int]]:
RETRYABLE_PROXY_ERROR_PATTERN = re.compile(
r"(?:curl(?:[^0-9]{0,8})?(35|56)\b|curl:\s*\((35|56)\))",
re.IGNORECASE,
)
def get_proxy_for_registration(
db,
exclude_proxy_ids: Optional[List[int]] = None,
) -> Tuple[Optional[str], Optional[int]]:
"""
获取用于注册的代理
@@ -45,7 +55,7 @@ def get_proxy_for_registration(db) -> Tuple[Optional[str], Optional[int]]:
Tuple[proxy_url, proxy_id]: 代理 URL 和代理 ID如果来自代理列表
"""
# 先尝试从代理列表中获取
proxy = crud.get_random_proxy(db)
proxy = crud.get_random_proxy(db, exclude_ids=exclude_proxy_ids)
if proxy:
return proxy.proxy_url, proxy.id
@@ -64,6 +74,27 @@ def update_proxy_usage(db, proxy_id: Optional[int]):
crud.update_proxy_last_used(db, proxy_id)
def is_retryable_proxy_error(error_message: Optional[str]) -> bool:
"""判断是否属于可通过切换代理自愈的 curl 网络错误。"""
message = str(error_message or "").strip()
if not message:
return False
return RETRYABLE_PROXY_ERROR_PATTERN.search(message) is not None
def disable_proxy_for_network_error(db, proxy_id: Optional[int], reason: str) -> bool:
"""将当前数据库代理标记为失效,避免后续再次被选中。"""
if not proxy_id:
return False
proxy = crud.update_proxy(db, proxy_id, enabled=False)
if not proxy:
return False
logger.warning(f"代理 {proxy_id} 因网络错误已自动禁用: {reason}")
return True
# ============== Pydantic Models ==============
class RegistrationTaskCreate(BaseModel):
@@ -246,51 +277,44 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
logger.error(f"任务不存在: {task_uuid}")
return
# 确定使用的代理
# 如果前端传入了代理参数,使用传入的
# 否则从代理列表或系统设置中获取
actual_proxy_url = proxy
proxy_id = None
resolved_email_service_id = email_service_id or task.email_service_id
if not actual_proxy_url:
actual_proxy_url, proxy_id = get_proxy_for_registration(db)
if actual_proxy_url:
logger.info(f"任务 {task_uuid} 使用代理: {actual_proxy_url[:50]}...")
# 更新任务的代理记录
crud.update_registration_task(db, task_uuid, proxy=actual_proxy_url)
# 创建邮箱服务
service_type = EmailServiceType(email_service_type)
# 更新 TaskManager 状态
task_manager.update_status(task_uuid, "running")
settings = get_settings()
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id)
# 优先使用数据库中配置的邮箱服务
if email_service_id:
from ...database.models import EmailService as EmailServiceModel
db_service = db.query(EmailServiceModel).filter(
EmailServiceModel.id == email_service_id,
EmailServiceModel.enabled == True
).first()
def build_email_service(active_proxy_url: Optional[str]):
requested_service_type = EmailServiceType(email_service_type)
if db_service:
service_type = EmailServiceType(db_service.service_type)
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
# 更新任务关联的邮箱服务
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库邮箱服务: {db_service.name} (ID: {db_service.id}, 类型: {service_type.value})")
else:
raise ValueError(f"邮箱服务不存在或已禁用: {email_service_id}")
else:
# 使用默认配置或传入的配置
if resolved_email_service_id:
from ...database.models import EmailService as EmailServiceModel
db_service = db.query(EmailServiceModel).filter(
EmailServiceModel.id == resolved_email_service_id,
EmailServiceModel.enabled == True
).first()
if db_service:
selected_service_type = EmailServiceType(db_service.service_type)
config = _normalize_email_service_config(selected_service_type, db_service.config, active_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(
f"使用数据库邮箱服务: {db_service.name} "
f"(ID: {db_service.id}, 类型: {selected_service_type.value})"
)
email_service = EmailServiceFactory.create(selected_service_type, config)
return email_service, selected_service_type
raise ValueError(f"邮箱服务不存在或已禁用: {resolved_email_service_id}")
service_type = requested_service_type
if service_type == EmailServiceType.TEMPMAIL:
config = {
"base_url": settings.tempmail_base_url,
"timeout": settings.tempmail_timeout,
"max_retries": settings.tempmail_max_retries,
"proxy_url": actual_proxy_url,
"proxy_url": active_proxy_url,
}
elif service_type == EmailServiceType.MOE_MAIL:
# 检查数据库中是否有可用的自定义域名服务
from ...database.models import EmailService as EmailServiceModel
db_service = db.query(EmailServiceModel).filter(
EmailServiceModel.service_type == "moe_mail",
@@ -298,21 +322,19 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).order_by(EmailServiceModel.priority.asc()).first()
if db_service and db_service.config:
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
config = _normalize_email_service_config(service_type, db_service.config, active_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库自定义域名服务: {db_service.name}")
elif settings.custom_domain_base_url and settings.custom_domain_api_key:
config = {
"base_url": settings.custom_domain_base_url,
"api_key": settings.custom_domain_api_key.get_secret_value() if settings.custom_domain_api_key else "",
"proxy_url": actual_proxy_url,
"proxy_url": active_proxy_url,
}
else:
raise ValueError("没有可用的自定义域名邮箱服务,请先在设置中配置")
elif service_type == EmailServiceType.OUTLOOK:
# 检查数据库中是否有可用的 Outlook 账户
from ...database.models import EmailService as EmailServiceModel, Account
# 获取所有启用的 Outlook 服务
outlook_services = db.query(EmailServiceModel).filter(
EmailServiceModel.service_type == "outlook",
EmailServiceModel.enabled == True
@@ -327,14 +349,12 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
email = svc.config.get("email") if svc.config else None
if not email:
continue
# 检查是否已在 accounts 表中注册
existing = db.query(Account).filter(Account.email == email).first()
if not existing:
selected_service = svc
logger.info(f"选择未注册的 Outlook 账户: {email}")
break
else:
logger.info(f"跳过已注册的 Outlook 账户: {email}")
logger.info(f"跳过已注册的 Outlook 账户: {email}")
if selected_service and selected_service.config:
config = selected_service.config.copy()
@@ -352,7 +372,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).order_by(EmailServiceModel.priority.asc()).first()
if db_service and db_service.config:
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
config = _normalize_email_service_config(service_type, db_service.config, active_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库 DuckMail 服务: {db_service.name}")
else:
@@ -366,7 +386,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).order_by(EmailServiceModel.priority.asc()).first()
if db_service and db_service.config:
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
config = _normalize_email_service_config(service_type, db_service.config, active_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库 Freemail 服务: {db_service.name}")
else:
@@ -380,7 +400,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
).order_by(EmailServiceModel.priority.asc()).first()
if db_service and db_service.config:
config = _normalize_email_service_config(service_type, db_service.config, actual_proxy_url)
config = _normalize_email_service_config(service_type, db_service.config, active_proxy_url)
crud.update_registration_task(db, task_uuid, email_service_id=db_service.id)
logger.info(f"使用数据库 IMAP 邮箱服务: {db_service.name}")
else:
@@ -388,23 +408,56 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
else:
config = email_service_config or {}
email_service = EmailServiceFactory.create(service_type, config)
email_service = EmailServiceFactory.create(service_type, config)
return email_service, service_type
# 在 WebSocket 状态里附带邮箱服务类型,前端可同步更新任务卡片
task_manager.update_status(task_uuid, "running", email_service=service_type.value)
requested_proxy = proxy
exhausted_proxy_ids = set()
result = None
active_service_type = EmailServiceType(email_service_type)
# 创建注册引擎 - 使用 TaskManager 的日志回调
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id)
while True:
actual_proxy_url = requested_proxy
proxy_id = None
engine = LoginEngine(
email_service=email_service,
proxy_url=actual_proxy_url,
callback_logger=log_callback,
task_uuid=task_uuid
)
if not actual_proxy_url:
actual_proxy_url, proxy_id = get_proxy_for_registration(
db,
exclude_proxy_ids=list(exhausted_proxy_ids),
)
if actual_proxy_url:
logger.info(f"任务 {task_uuid} 使用代理: {actual_proxy_url[:50]}...")
# 执行注册
result = engine.run()
crud.update_registration_task(db, task_uuid, proxy=actual_proxy_url)
email_service, active_service_type = build_email_service(actual_proxy_url)
task_manager.update_status(task_uuid, "running", email_service=active_service_type.value)
engine = LoginEngine(
email_service=email_service,
proxy_url=actual_proxy_url,
callback_logger=log_callback,
task_uuid=task_uuid
)
result = engine.run()
if result.success:
break
if is_retryable_proxy_error(result.error_message):
log_callback(f"[代理] 检测到可重试网络错误: {result.error_message}")
if proxy_id and disable_proxy_for_network_error(db, proxy_id, result.error_message):
exhausted_proxy_ids.add(proxy_id)
log_callback(f"[代理] 当前代理已标记失效并从代理池移除: {proxy_id}")
next_proxy_url, next_proxy_id = get_proxy_for_registration(
db,
exclude_proxy_ids=list(exhausted_proxy_ids),
)
if next_proxy_url and (next_proxy_url != actual_proxy_url or next_proxy_id != proxy_id):
requested_proxy = None
log_callback(f"[代理] 切换到新代理后重试注册: {next_proxy_url[:50]}...")
continue
break
if result.success:
# 更新代理使用时间
@@ -506,7 +559,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
completed_at=datetime.utcnow(),
result={
**result.to_dict(),
"email_service": service_type.value,
"email_service": active_service_type.value,
}
)
@@ -515,7 +568,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
task_uuid,
"completed",
email=result.email,
email_service=service_type.value,
email_service=active_service_type.value,
)
logger.info(f"注册任务完成: {task_uuid}, 邮箱: {result.email}")
@@ -533,7 +586,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
task_uuid,
"failed",
error=result.error_message,
email_service=service_type.value,
email_service=active_service_type.value,
)
logger.warning(f"注册任务失败: {task_uuid}, 原因: {result.error_message}")