feat: add cloud-mail service support

This commit is contained in:
zhoukailian
2026-03-26 20:07:21 +08:00
parent ae089ee707
commit a890bc7f2b
14 changed files with 801 additions and 9 deletions

View File

@@ -38,6 +38,7 @@ class EmailServiceType(str, Enum):
DUCK_MAIL = "duck_mail"
FREEMAIL = "freemail"
IMAP_MAIL = "imap_mail"
CLOUD_MAIL = "cloud_mail"
# ============================================================================
@@ -143,7 +144,15 @@ EMAIL_SERVICE_DEFAULTS = {
"password": "",
"timeout": 30,
"max_retries": 3,
}
},
"cloud_mail": {
"base_url": "",
"admin_email": "",
"admin_password": "",
"default_domain": "",
"timeout": 30,
"max_retries": 3,
},
}
# ============================================================================

View File

@@ -17,6 +17,7 @@ from .temp_mail import TempMailService
from .duck_mail import DuckMailService
from .freemail import FreemailService
from .imap_mail import ImapMailService
from .cloud_mail import CloudMailService
# 注册服务
EmailServiceFactory.register(EmailServiceType.TEMPMAIL, TempmailService)
@@ -26,6 +27,7 @@ EmailServiceFactory.register(EmailServiceType.TEMP_MAIL, TempMailService)
EmailServiceFactory.register(EmailServiceType.DUCK_MAIL, DuckMailService)
EmailServiceFactory.register(EmailServiceType.FREEMAIL, FreemailService)
EmailServiceFactory.register(EmailServiceType.IMAP_MAIL, ImapMailService)
EmailServiceFactory.register(EmailServiceType.CLOUD_MAIL, CloudMailService)
# 导出 Outlook 模块的额外内容
from .outlook.base import (
@@ -59,6 +61,7 @@ __all__ = [
'DuckMailService',
'FreemailService',
'ImapMailService',
'CloudMailService',
# Outlook 模块
'ProviderType',
'EmailMessage',

312
src/services/cloud_mail.py Normal file
View File

@@ -0,0 +1,312 @@
"""
Cloud Mail 邮箱服务实现
基于 maillab/cloud-mail 的 public API
"""
import logging
import random
import string
import time
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from .base import BaseEmailService, EmailServiceError, EmailServiceType, RateLimitedEmailServiceError
from ..config.constants import OTP_CODE_PATTERN
from ..core.http_client import HTTPClient, RequestConfig
logger = logging.getLogger(__name__)
OTP_SENT_AT_TOLERANCE_SECONDS = 2
class CloudMailService(BaseEmailService):
"""Cloud Mail 邮箱服务"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
super().__init__(EmailServiceType.CLOUD_MAIL, name)
required_keys = ["base_url", "admin_email", "admin_password", "default_domain"]
missing_keys = [key for key in required_keys if not (config or {}).get(key)]
if missing_keys:
raise ValueError(f"缺少必需配置: {missing_keys}")
default_config = {
"timeout": 30,
"max_retries": 3,
"password_length": 16,
}
self.config = {**default_config, **(config or {})}
self.config["base_url"] = str(self.config["base_url"]).rstrip("/")
self.config["default_domain"] = str(self.config["default_domain"]).strip().lstrip("@")
http_config = RequestConfig(
timeout=self.config["timeout"],
max_retries=self.config["max_retries"],
)
self.http_client = HTTPClient(proxy_url=None, config=http_config)
self._email_cache: Dict[str, Dict[str, Any]] = {}
def _build_headers(
self,
token: Optional[str] = None,
extra_headers: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if token:
headers["Authorization"] = token
if extra_headers:
headers.update(extra_headers)
return headers
def _unwrap_result(self, payload: Any) -> Any:
if not isinstance(payload, dict) or "code" not in payload:
return payload
if payload.get("code") != 200:
raise EmailServiceError(str(payload.get("message") or "Cloud Mail API 返回失败"))
return payload.get("data")
def _make_request(
self,
method: str,
path: str,
token: Optional[str] = None,
**kwargs,
) -> Any:
url = f"{self.config['base_url']}/api{path}"
kwargs["headers"] = self._build_headers(token=token, extra_headers=kwargs.get("headers"))
try:
response = self.http_client.request(method, url, **kwargs)
if response.status_code >= 400:
error_msg = f"请求失败: {response.status_code}"
try:
error_data = response.json()
error_msg = f"{error_msg} - {error_data}"
except Exception:
error_msg = f"{error_msg} - {response.text[:200]}"
retry_after = None
if response.status_code == 429:
retry_after_header = response.headers.get("Retry-After")
if retry_after_header:
try:
retry_after = max(1, int(retry_after_header))
except ValueError:
retry_after = None
error = RateLimitedEmailServiceError(error_msg, retry_after=retry_after)
else:
error = EmailServiceError(error_msg)
self.update_status(False, error)
raise error
try:
payload = response.json()
except Exception:
payload = {"raw_response": response.text}
data = self._unwrap_result(payload)
return data
except Exception as e:
self.update_status(False, e)
if isinstance(e, EmailServiceError):
raise
raise EmailServiceError(f"请求失败: {method} {path} - {e}")
def _get_public_token(self) -> str:
data = self._make_request(
"POST",
"/public/genToken",
json={
"email": self.config["admin_email"],
"password": self.config["admin_password"],
},
)
if isinstance(data, dict):
token = str(data.get("token") or "").strip()
else:
token = str(data or "").strip()
if not token:
raise EmailServiceError("Cloud Mail 未返回 public token")
return token
def _generate_local_part(self) -> str:
first = random.choice(string.ascii_lowercase)
rest = "".join(random.choices(string.ascii_lowercase + string.digits, k=7))
return f"{first}{rest}"
def _generate_password(self) -> str:
length = max(8, int(self.config.get("password_length") or 16))
alphabet = string.ascii_letters + string.digits
return "".join(random.choices(alphabet, k=length))
def _parse_message_time(self, value: Any) -> Optional[float]:
if value is None or value == "":
return None
if isinstance(value, (int, float)):
timestamp = float(value)
else:
text = str(value).strip()
if not text:
return None
try:
timestamp = float(text)
except ValueError:
normalized = text.replace("Z", "+00:00")
if "T" not in normalized and "+" not in normalized[10:] and normalized.count(":") >= 2:
normalized = normalized.replace(" ", "T", 1) + "+00:00"
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
timestamp = parsed.astimezone(timezone.utc).timestamp()
while timestamp > 1e11:
timestamp /= 1000.0
return timestamp if timestamp > 0 else None
def _get_received_timestamp(self, mail: Dict[str, Any]) -> Optional[float]:
for field_name in ("createTime", "createdAt", "receivedAt", "timestamp", "time"):
timestamp = self._parse_message_time(mail.get(field_name))
if timestamp is not None:
return timestamp
return None
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
request_config = config or {}
local_part = str(request_config.get("name") or self._generate_local_part()).strip()
domain = str(
request_config.get("default_domain")
or request_config.get("domain")
or self.config["default_domain"]
).strip().lstrip("@")
address = f"{local_part}@{domain}"
password = str(request_config.get("password") or self._generate_password())
token = self._get_public_token()
self._make_request(
"POST",
"/public/addUser",
token=token,
json={
"list": [{
"email": address,
"password": password,
}]
},
)
email_info = {
"email": address,
"password": password,
"service_id": address,
"id": address,
"created_at": time.time(),
}
self._email_cache[address.lower()] = email_info
self.update_status(True)
logger.info(f"成功创建 Cloud Mail 邮箱: {address}")
return email_info
def get_verification_code(
self,
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
logger.info(f"正在从 Cloud Mail 邮箱 {email} 获取验证码...")
start_time = time.time()
seen_mail_ids: set = set()
while time.time() - start_time < timeout:
try:
token = self._get_public_token()
mails = self._make_request(
"POST",
"/public/emailList",
token=token,
json={
"toEmail": email,
"num": 1,
"size": 20,
},
)
if isinstance(mails, dict) and isinstance(mails.get("list"), list):
mails = mails["list"]
if not isinstance(mails, list):
time.sleep(3)
continue
for mail in mails:
msg_timestamp = self._get_received_timestamp(mail)
if otp_sent_at is not None:
min_allowed_timestamp = otp_sent_at - OTP_SENT_AT_TOLERANCE_SECONDS
if msg_timestamp is None or msg_timestamp <= min_allowed_timestamp:
continue
mail_id = mail.get("emailId") or mail.get("id")
if mail_id in seen_mail_ids:
continue
if mail_id is not None:
seen_mail_ids.add(mail_id)
sender = str(mail.get("sendEmail") or mail.get("sender") or "")
sender_name = str(mail.get("sendName") or mail.get("name") or "")
subject = str(mail.get("subject") or "")
text_body = str(mail.get("text") or "")
content = str(mail.get("content") or "")
search_text = "\n".join(
part for part in [sender, sender_name, subject, text_body, content] if part
).strip()
if "openai" not in search_text.lower():
continue
code = self._extract_otp_from_text(search_text, pattern)
if code:
self.update_status(True)
logger.info(f"从 Cloud Mail 邮箱 {email} 找到验证码: {code}")
return code
except Exception as e:
logger.debug(f"检查 Cloud Mail 邮件时出错: {e}")
time.sleep(3)
logger.warning(f"等待 Cloud Mail 验证码超时: {email}")
return None
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
return list(self._email_cache.values())
def delete_email(self, email_id: str) -> bool:
self._email_cache.pop(str(email_id).strip().lower(), None)
self.update_status(True)
return True
def check_health(self) -> bool:
try:
self._get_public_token()
self.update_status(True)
return True
except Exception as e:
logger.warning(f"Cloud Mail 健康检查失败: {e}")
self.update_status(False, e)
return False

View File

@@ -1540,6 +1540,7 @@ def _build_inbox_config(db, service_type, email: str) -> dict:
EST.DUCK_MAIL: "duck_mail",
EST.FREEMAIL: "freemail",
EST.IMAP_MAIL: "imap_mail",
EST.CLOUD_MAIL: "cloud_mail",
EST.OUTLOOK: "outlook",
}
db_type = type_map.get(service_type)

View File

@@ -84,7 +84,7 @@ class OutlookBatchImportResponse(BaseModel):
# ============== Helper Functions ==============
# 敏感字段列表,返回响应时需要过滤
SENSITIVE_FIELDS = {'password', 'api_key', 'refresh_token', 'access_token', 'admin_token'}
SENSITIVE_FIELDS = {'password', 'api_key', 'refresh_token', 'access_token', 'admin_token', 'admin_password'}
def filter_sensitive_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""过滤敏感配置信息"""
@@ -147,6 +147,7 @@ async def get_email_services_stats():
'duck_mail_count': 0,
'freemail_count': 0,
'imap_mail_count': 0,
'cloud_mail_count': 0,
'tempmail_available': True, # 临时邮箱始终可用
'enabled_count': enabled_count
}
@@ -164,6 +165,8 @@ async def get_email_services_stats():
stats['freemail_count'] = count
elif service_type == 'imap_mail':
stats['imap_mail_count'] = count
elif service_type == 'cloud_mail':
stats['cloud_mail_count'] = count
return stats
@@ -235,6 +238,17 @@ async def get_service_types():
{"name": "domain", "label": "邮箱域名", "required": False, "placeholder": "example.com"},
]
},
{
"value": "cloud_mail",
"label": "Cloud Mail",
"description": "cloud-mail 自部署邮箱服务,使用公开 API",
"config_fields": [
{"name": "base_url", "label": "站点地址", "required": True, "placeholder": "https://mail.example.com"},
{"name": "admin_email", "label": "管理员邮箱", "required": True, "placeholder": "admin@example.com"},
{"name": "admin_password", "label": "管理员密码", "required": True, "secret": True},
{"name": "default_domain", "label": "默认域名", "required": True, "placeholder": "mail.example.com"},
]
},
{
"value": "imap_mail",
"label": "IMAP 邮箱",

View File

@@ -285,6 +285,9 @@ def _normalize_email_service_config(
elif service_type == EmailServiceType.DUCK_MAIL:
if 'domain' in normalized and 'default_domain' not in normalized:
normalized['default_domain'] = normalized.pop('domain')
elif service_type == EmailServiceType.CLOUD_MAIL:
if 'domain' in normalized and 'default_domain' not in normalized:
normalized['default_domain'] = normalized.pop('domain')
if proxy_url and 'proxy_url' not in normalized:
normalized['proxy_url'] = proxy_url
@@ -527,6 +530,10 @@ def _build_email_service_candidates(
append_database_candidates("imap_mail")
if not candidates:
raise ValueError("没有可用的 IMAP 邮箱服务,请先在邮箱服务中添加")
elif service_type == EmailServiceType.CLOUD_MAIL:
append_database_candidates("cloud_mail")
if not candidates:
raise ValueError("没有可用的 Cloud Mail 邮箱服务,请先在邮箱服务页面添加服务")
else:
append_candidate(service_type, email_service_config or {})
@@ -1783,6 +1790,11 @@ async def get_available_email_services():
"available": False,
"count": 0,
"services": []
},
"cloud_mail": {
"available": False,
"count": 0,
"services": []
}
}
@@ -1911,6 +1923,24 @@ async def get_available_email_services():
result["imap_mail"]["count"] = len(imap_mail_services)
result["imap_mail"]["available"] = len(imap_mail_services) > 0
cloud_mail_services = db.query(EmailServiceModel).filter(
EmailServiceModel.service_type == "cloud_mail",
EmailServiceModel.enabled == True
).order_by(EmailServiceModel.priority.asc()).all()
for service in cloud_mail_services:
config = service.config or {}
result["cloud_mail"]["services"].append({
"id": service.id,
"name": service.name,
"type": "cloud_mail",
"default_domain": config.get("default_domain"),
"priority": service.priority
})
result["cloud_mail"]["count"] = len(cloud_mail_services)
result["cloud_mail"]["available"] = len(cloud_mail_services) > 0
return result

View File

@@ -24,7 +24,8 @@ let availableServices = {
moe_mail: { available: false, services: [] },
temp_mail: { available: false, services: [] },
duck_mail: { available: false, services: [] },
freemail: { available: false, services: [] }
freemail: { available: false, services: [] },
cloud_mail: { available: false, services: [] }
};
// WebSocket 相关变量
@@ -376,6 +377,23 @@ function updateEmailServiceOptions() {
select.appendChild(optgroup);
}
// Cloud Mail
if (availableServices.cloud_mail && availableServices.cloud_mail.available) {
const optgroup = document.createElement('optgroup');
optgroup.label = `☁️ Cloud Mail (${availableServices.cloud_mail.count} 个服务)`;
availableServices.cloud_mail.services.forEach(service => {
const option = document.createElement('option');
option.value = `cloud_mail:${service.id}`;
option.textContent = service.name + (service.default_domain ? ` (@${service.default_domain})` : '');
option.dataset.type = 'cloud_mail';
option.dataset.serviceId = service.id;
optgroup.appendChild(option);
});
select.appendChild(optgroup);
}
}
// 处理邮箱服务切换
@@ -426,6 +444,11 @@ function handleServiceChange(e) {
if (service) {
addLog('info', `[系统] 已选择 Freemail 服务: ${service.name}`);
}
} else if (type === 'cloud_mail') {
const service = availableServices.cloud_mail.services.find(s => s.id == id);
if (service) {
addLog('info', `[系统] 已选择 Cloud Mail 服务: ${service.name}`);
}
}
}

View File

@@ -4,7 +4,7 @@
// 状态
let outlookServices = [];
let customServices = []; // 合并 moe_mail + temp_mail + duck_mail + freemail + imap_mail
let customServices = []; // 合并 moe_mail + temp_mail + duck_mail + freemail + cloud_mail + imap_mail
let selectedOutlook = new Set();
let selectedCustom = new Set();
@@ -52,6 +52,7 @@ const elements = {
addTempmailFields: document.getElementById('add-tempmail-fields'),
addDuckmailFields: document.getElementById('add-duckmail-fields'),
addFreemailFields: document.getElementById('add-freemail-fields'),
addCloudmailFields: document.getElementById('add-cloudmail-fields'),
addImapFields: document.getElementById('add-imap-fields'),
// 编辑自定义域名模态框
@@ -63,6 +64,7 @@ const elements = {
editTempmailFields: document.getElementById('edit-tempmail-fields'),
editDuckmailFields: document.getElementById('edit-duckmail-fields'),
editFreemailFields: document.getElementById('edit-freemail-fields'),
editCloudmailFields: document.getElementById('edit-cloudmail-fields'),
editImapFields: document.getElementById('edit-imap-fields'),
editCustomTypeBadge: document.getElementById('edit-custom-type-badge'),
editCustomSubTypeHidden: document.getElementById('edit-custom-sub-type-hidden'),
@@ -79,6 +81,7 @@ const CUSTOM_SUBTYPE_LABELS = {
tempmail: '📮 TempMail自部署 Cloudflare Worker',
duckmail: '🦆 DuckMailDuckMail API',
freemail: 'Freemail自部署 Cloudflare Worker',
cloudmail: '☁️ Cloud Mail公开 API',
imap: '📧 IMAP 邮箱Gmail/QQ/163等'
};
@@ -185,6 +188,7 @@ function switchAddSubType(subType) {
elements.addTempmailFields.style.display = subType === 'tempmail' ? '' : 'none';
elements.addDuckmailFields.style.display = subType === 'duckmail' ? '' : 'none';
elements.addFreemailFields.style.display = subType === 'freemail' ? '' : 'none';
elements.addCloudmailFields.style.display = subType === 'cloudmail' ? '' : 'none';
elements.addImapFields.style.display = subType === 'imap' ? '' : 'none';
}
@@ -195,6 +199,7 @@ function switchEditSubType(subType) {
elements.editTempmailFields.style.display = subType === 'tempmail' ? '' : 'none';
elements.editDuckmailFields.style.display = subType === 'duckmail' ? '' : 'none';
elements.editFreemailFields.style.display = subType === 'freemail' ? '' : 'none';
elements.editCloudmailFields.style.display = subType === 'cloudmail' ? '' : 'none';
elements.editImapFields.style.display = subType === 'imap' ? '' : 'none';
elements.editCustomTypeBadge.textContent = CUSTOM_SUBTYPE_LABELS[subType] || CUSTOM_SUBTYPE_LABELS.moemail;
}
@@ -204,7 +209,7 @@ async function loadStats() {
try {
const data = await api.get('/email-services/stats');
elements.outlookCount.textContent = data.outlook_count || 0;
elements.customCount.textContent = (data.custom_count || 0) + (data.temp_mail_count || 0) + (data.duck_mail_count || 0) + (data.freemail_count || 0) + (data.imap_mail_count || 0);
elements.customCount.textContent = (data.custom_count || 0) + (data.temp_mail_count || 0) + (data.duck_mail_count || 0) + (data.freemail_count || 0) + (data.cloud_mail_count || 0) + (data.imap_mail_count || 0);
elements.tempmailStatus.textContent = data.tempmail_available ? '可用' : '不可用';
elements.totalEnabled.textContent = data.enabled_count || 0;
} catch (error) {
@@ -289,6 +294,9 @@ function getCustomServiceTypeBadge(subType) {
if (subType === 'freemail') {
return '<span class="status-badge" style="background-color:#9c27b0;color:white;">Freemail</span>';
}
if (subType === 'cloudmail') {
return '<span class="status-badge" style="background-color:#1976d2;color:white;">Cloud Mail</span>';
}
return '<span class="status-badge" style="background-color:#0288d1;color:white;">IMAP</span>';
}
@@ -306,14 +314,15 @@ function getCustomServiceAddress(service) {
return `${escapeHtml(baseUrl)}<div style="color: var(--text-muted); margin-top: 4px;">默认域名:@${escapeHtml(domain)}</div>`;
}
// 加载自定义邮箱服务moe_mail + temp_mail + duck_mail + freemail 合并)
// 加载自定义邮箱服务moe_mail + temp_mail + duck_mail + freemail + cloud_mail 合并)
async function loadCustomServices() {
try {
const [r1, r2, r3, r4, r5] = await Promise.all([
const [r1, r2, r3, r4, r5, r6] = await Promise.all([
api.get('/email-services?service_type=moe_mail'),
api.get('/email-services?service_type=temp_mail'),
api.get('/email-services?service_type=duck_mail'),
api.get('/email-services?service_type=freemail'),
api.get('/email-services?service_type=cloud_mail'),
api.get('/email-services?service_type=imap_mail')
]);
customServices = [
@@ -321,7 +330,8 @@ async function loadCustomServices() {
...(r2.services || []).map(s => ({ ...s, _subType: 'tempmail' })),
...(r3.services || []).map(s => ({ ...s, _subType: 'duckmail' })),
...(r4.services || []).map(s => ({ ...s, _subType: 'freemail' })),
...(r5.services || []).map(s => ({ ...s, _subType: 'imap' }))
...(r5.services || []).map(s => ({ ...s, _subType: 'cloudmail' })),
...(r6.services || []).map(s => ({ ...s, _subType: 'imap' }))
];
if (customServices.length === 0) {
@@ -466,6 +476,14 @@ async function handleAddCustom(e) {
admin_token: formData.get('fm_admin_token'),
domain: formData.get('fm_domain')
};
} else if (subType === 'cloudmail') {
serviceType = 'cloud_mail';
config = {
base_url: formData.get('cm_base_url'),
admin_email: formData.get('cm_admin_email'),
admin_password: formData.get('cm_admin_password'),
default_domain: formData.get('cm_domain')
};
} else {
serviceType = 'imap_mail';
config = {
@@ -617,6 +635,8 @@ async function editCustomService(id, subType) {
? 'duckmail'
: service.service_type === 'freemail'
? 'freemail'
: service.service_type === 'cloud_mail'
? 'cloudmail'
: service.service_type === 'imap_mail'
? 'imap'
: 'moemail'
@@ -650,6 +670,12 @@ async function editCustomService(id, subType) {
document.getElementById('edit-fm-admin-token').value = '';
document.getElementById('edit-fm-admin-token').placeholder = service.config?.admin_token ? '已设置,留空保持不变' : '请输入 Admin Token';
document.getElementById('edit-fm-domain').value = service.config?.domain || '';
} else if (resolvedSubType === 'cloudmail') {
document.getElementById('edit-cm-base-url').value = service.config?.base_url || '';
document.getElementById('edit-cm-admin-email').value = service.config?.admin_email || '';
document.getElementById('edit-cm-admin-password').value = '';
document.getElementById('edit-cm-admin-password').placeholder = service.config?.admin_password ? '已设置,留空保持不变' : '请输入管理员密码';
document.getElementById('edit-cm-domain').value = service.config?.default_domain || '';
} else {
document.getElementById('edit-imap-host').value = service.config?.host || '';
document.getElementById('edit-imap-port').value = service.config?.port || 993;
@@ -703,6 +729,14 @@ async function handleEditCustom(e) {
};
const token = formData.get('fm_admin_token');
if (token && token.trim()) config.admin_token = token.trim();
} else if (subType === 'cloudmail') {
config = {
base_url: formData.get('cm_base_url'),
admin_email: formData.get('cm_admin_email'),
default_domain: formData.get('cm_domain')
};
const pwd = formData.get('cm_admin_password');
if (pwd && pwd.trim()) config.admin_password = pwd.trim();
} else {
config = {
host: formData.get('imap_host'),

View File

@@ -372,7 +372,8 @@ const statusMap = {
temp_mail: 'Temp-Mail自部署',
duck_mail: 'DuckMail',
freemail: 'Freemail',
imap_mail: 'IMAP 邮箱'
imap_mail: 'IMAP 邮箱',
cloud_mail: 'Cloud Mail'
}
};

View File

@@ -110,6 +110,7 @@
<option value="tempmail">Tempmail</option>
<option value="outlook">Outlook</option>
<option value="moe_mail">MoeMail</option>
<option value="cloud_mail">Cloud Mail</option>
</select>
<input type="text" id="search-input" class="form-input" placeholder="🔍 搜索邮箱..." style="min-width: 200px;">

View File

@@ -211,6 +211,7 @@
<option value="tempmail">TempMail自部署 Cloudflare Worker</option>
<option value="duckmail">DuckMailDuckMail API</option>
<option value="freemail">Freemail自部署 Cloudflare Worker</option>
<option value="cloudmail">Cloud Mail公开 API</option>
<option value="imap">标准 IMAPGmail/QQ/163等</option>
</select>
</div>
@@ -278,6 +279,25 @@
<input type="text" id="custom-fm-domain" name="fm_domain" placeholder="example.com">
</div>
</div>
<!-- Cloud Mail 字段 -->
<div id="add-cloudmail-fields" style="display:none;">
<div class="form-group">
<label for="custom-cm-base-url">站点地址</label>
<input type="text" id="custom-cm-base-url" name="cm_base_url" placeholder="https://mail.example.com">
</div>
<div class="form-group">
<label for="custom-cm-admin-email">管理员邮箱</label>
<input type="email" id="custom-cm-admin-email" name="cm_admin_email" placeholder="admin@example.com">
</div>
<div class="form-group">
<label for="custom-cm-admin-password">管理员密码</label>
<input type="password" id="custom-cm-admin-password" name="cm_admin_password" placeholder="用于获取 public token">
</div>
<div class="form-group">
<label for="custom-cm-domain">默认域名</label>
<input type="text" id="custom-cm-domain" name="cm_domain" placeholder="mail.example.com">
</div>
</div>
<!-- IMAP 字段 -->
<div id="add-imap-fields" style="display:none;">
<div class="form-group">
@@ -416,6 +436,26 @@
<input type="text" id="edit-fm-domain" name="fm_domain" placeholder="example.com">
</div>
</div>
<!-- Cloud Mail 字段 -->
<div id="edit-cloudmail-fields" style="display:none;">
<div class="form-group">
<label for="edit-cm-base-url">站点地址</label>
<input type="text" id="edit-cm-base-url" name="cm_base_url" placeholder="https://mail.example.com">
</div>
<div class="form-group">
<label for="edit-cm-admin-email">管理员邮箱</label>
<input type="email" id="edit-cm-admin-email" name="cm_admin_email" placeholder="admin@example.com">
</div>
<div class="form-group">
<label for="edit-cm-admin-password">管理员密码</label>
<input type="password" id="edit-cm-admin-password" name="cm_admin_password" placeholder="留空则不修改">
<small style="color: var(--text-muted);">留空则保持原值不变</small>
</div>
<div class="form-group">
<label for="edit-cm-domain">默认域名</label>
<input type="text" id="edit-cm-domain" name="cm_domain" placeholder="mail.example.com">
</div>
</div>
<!-- IMAP 字段 -->
<div id="edit-imap-fields" style="display:none;">
<div class="form-group">

View File

@@ -181,6 +181,7 @@
<option value="moe_mail">MoeMail</option>
<option value="temp_mail">Temp-Mail 自部署</option>
<option value="duck_mail">DuckMail</option>
<option value="cloud_mail">Cloud Mail</option>
<option value="imap_mail">IMAP 邮箱</option>
<option value="outlook_batch:all">Outlook 批量注册</option>
</select>

View File

@@ -0,0 +1,177 @@
from datetime import datetime, timezone
from src.services.cloud_mail import CloudMailService
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = {}
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
class FakeHTTPClient:
def __init__(self, responses):
self.responses = list(responses)
self.calls = []
def request(self, method, url, **kwargs):
self.calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
if not self.responses:
raise AssertionError(f"未准备响应: {method} {url}")
return self.responses.pop(0)
def _to_timestamp(value: str) -> float:
normalized = value.replace(" ", "T")
return datetime.fromisoformat(normalized.replace("Z", "+00:00")).astimezone(timezone.utc).timestamp()
def test_cloud_mail_creates_address_via_public_api():
service = CloudMailService({
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": {"token": "public-token"},
}
),
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": None,
}
),
])
result = service.create_email()
assert result["email"].endswith("@mail.example.com")
assert result["service_id"] == result["email"]
assert result["id"] == result["email"]
assert result["password"]
first_call = service.http_client.calls[0]
second_call = service.http_client.calls[1]
assert first_call["method"] == "POST"
assert first_call["url"] == "https://mail.example.com/api/public/genToken"
assert first_call["kwargs"]["json"] == {
"email": "admin@example.com",
"password": "admin-secret",
}
assert second_call["method"] == "POST"
assert second_call["url"] == "https://mail.example.com/api/public/addUser"
assert second_call["kwargs"]["headers"]["Authorization"] == "public-token"
assert second_call["kwargs"]["json"]["list"][0]["email"] == result["email"]
def test_cloud_mail_extracts_openai_verification_code_from_public_email_list():
service = CloudMailService({
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": {"token": "public-token"},
}
),
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": [
{
"emailId": 1,
"sendEmail": "noreply@openai.com",
"sendName": "OpenAI",
"subject": "Your OpenAI verification code",
"text": "Your OpenAI verification code is 654321",
"content": "",
}
],
}
),
])
code = service.get_verification_code(
email="tester@mail.example.com",
timeout=1,
)
assert code == "654321"
def test_cloud_mail_ignores_messages_received_before_otp_sent_at():
service = CloudMailService({
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
})
service.http_client = FakeHTTPClient([
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": {"token": "public-token"},
}
),
FakeResponse(
payload={
"code": 200,
"message": "success",
"data": [
{
"emailId": 1,
"sendEmail": "noreply@openai.com",
"sendName": "OpenAI",
"subject": "Old code",
"text": "111111",
"content": "",
"createTime": "2026-03-23 10:00:00",
},
{
"emailId": 2,
"sendEmail": "noreply@openai.com",
"sendName": "OpenAI",
"subject": "New code",
"text": "222222",
"content": "",
"createTime": "2026-03-23 10:00:05",
},
],
}
),
])
code = service.get_verification_code(
email="tester@mail.example.com",
timeout=1,
otp_sent_at=_to_timestamp("2026-03-23T10:00:02Z"),
)
assert code == "222222"

View File

@@ -0,0 +1,146 @@
import asyncio
from contextlib import contextmanager
from pathlib import Path
from src.config.constants import EmailServiceType
from src.database.models import Base, EmailService
from src.database.session import DatabaseSessionManager
from src.services.base import EmailServiceFactory
from src.web.routes import email as email_routes
from src.web.routes import registration as registration_routes
class DummySettings:
custom_domain_base_url = ""
custom_domain_api_key = None
tempmail_base_url = "https://api.tempmail.lol/v2"
tempmail_timeout = 30
tempmail_max_retries = 3
def test_cloud_mail_service_registered():
service_type = EmailServiceType("cloud_mail")
service_class = EmailServiceFactory.get_service_class(service_type)
assert service_class is not None
assert service_class.__name__ == "CloudMailService"
def test_email_service_types_include_cloud_mail():
result = asyncio.run(email_routes.get_service_types())
cloud_mail_type = next(item for item in result["types"] if item["value"] == "cloud_mail")
assert cloud_mail_type["label"] == "Cloud Mail"
field_names = [field["name"] for field in cloud_mail_type["config_fields"]]
assert "base_url" in field_names
assert "admin_email" in field_names
assert "admin_password" in field_names
assert "default_domain" in field_names
def test_filter_sensitive_config_marks_cloud_mail_admin_password():
filtered = email_routes.filter_sensitive_config({
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
})
assert filtered["base_url"] == "https://mail.example.com"
assert filtered["admin_email"] == "admin@example.com"
assert filtered["default_domain"] == "mail.example.com"
assert filtered["has_admin_password"] is True
assert "admin_password" not in filtered
def test_registration_available_services_include_cloud_mail(monkeypatch):
runtime_dir = Path("tests_runtime")
runtime_dir.mkdir(exist_ok=True)
db_path = runtime_dir / "cloudmail_routes.db"
if db_path.exists():
db_path.unlink()
manager = DatabaseSessionManager(f"sqlite:///{db_path}")
Base.metadata.create_all(bind=manager.engine)
with manager.session_scope() as session:
session.add(
EmailService(
service_type="cloud_mail",
name="Cloud Mail 主服务",
config={
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
},
enabled=True,
priority=0,
)
)
@contextmanager
def fake_get_db():
session = manager.SessionLocal()
try:
yield session
finally:
session.close()
monkeypatch.setattr(registration_routes, "get_db", fake_get_db)
import src.config.settings as settings_module
monkeypatch.setattr(settings_module, "get_settings", lambda: DummySettings())
monkeypatch.setattr(registration_routes, "get_settings", lambda: DummySettings())
result = asyncio.run(registration_routes.get_available_email_services())
assert result["cloud_mail"]["available"] is True
assert result["cloud_mail"]["count"] == 1
assert result["cloud_mail"]["services"][0]["name"] == "Cloud Mail 主服务"
assert result["cloud_mail"]["services"][0]["type"] == "cloud_mail"
assert result["cloud_mail"]["services"][0]["default_domain"] == "mail.example.com"
def test_build_email_service_candidates_supports_cloud_mail(monkeypatch):
runtime_dir = Path("tests_runtime")
runtime_dir.mkdir(exist_ok=True)
db_path = runtime_dir / "cloudmail_candidates.db"
if db_path.exists():
db_path.unlink()
manager = DatabaseSessionManager(f"sqlite:///{db_path}")
Base.metadata.create_all(bind=manager.engine)
with manager.session_scope() as session:
session.add(
EmailService(
service_type="cloud_mail",
name="Cloud Mail 主服务",
config={
"base_url": "https://mail.example.com",
"admin_email": "admin@example.com",
"admin_password": "admin-secret",
"default_domain": "mail.example.com",
},
enabled=True,
priority=0,
)
)
registration_routes.email_service_circuit_breakers.clear()
monkeypatch.setattr(registration_routes, "get_settings", lambda: DummySettings())
with manager.session_scope() as session:
candidates = registration_routes._build_email_service_candidates(
db=session,
service_type=EmailServiceType("cloud_mail"),
actual_proxy_url=None,
email_service_id=None,
email_service_config=None,
)
assert len(candidates) == 1
assert candidates[0]["service_type"] == EmailServiceType("cloud_mail")
assert candidates[0]["config"]["base_url"] == "https://mail.example.com"
assert candidates[0]["db_service"].name == "Cloud Mail 主服务"