mirror of
https://github.com/dreamhunter2333/cloudflare_temp_email.git
synced 2026-05-07 08:12:46 +08:00
refactor: modularize IMAP server with fixes and E2E tests - Modularize IMAP server into imap_server, imap_mailbox, imap_message, imap_http_client, parse_email, config, models - Support dual login: JWT token and address+password via backend - Add STARTTLS support with configurable TLS cert/key - Fix FETCH/STORE returning UID instead of sequence number (RFC 3501) - Implement IMessageFile.open() for correct BODY[] raw MIME delivery - Add UIDNEXT to SELECT response via _cbSelectWork override - Use per-restart UIDVALIDITY to force client resync - Pass raw MIME to SimpleMessage for accurate RFC822.SIZE - Fix SENT mailbox returning empty source - Handle CREATE command gracefully for Thunderbird compatibility - Add IMAP E2E tests: auth, LIST, SELECT, STATUS, FETCH, SEARCH, STORE, UID FETCH, BODY[] integrity, size, seq numbers, SENT mailbox - Add SMTP E2E tests using nodemailer: send plain/HTML, auth failure, sendbox verification - Add sendTestMail helper using admin/send_mail Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
84 lines
2.8 KiB
Python
84 lines
2.8 KiB
Python
import datetime
|
|
import json
|
|
import logging
|
|
import email
|
|
|
|
from email.message import Message
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
|
|
|
|
from models import EmailModel
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
_logger.setLevel(logging.INFO)
|
|
|
|
|
|
def get_email_model(msg: Message):
|
|
subparts = [
|
|
get_email_model(subpart)
|
|
for subpart in msg.get_payload()
|
|
] if msg.is_multipart() else []
|
|
if msg.is_multipart():
|
|
body = ""
|
|
else:
|
|
raw_body = msg.get_payload(decode=True) or b""
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
try:
|
|
body = raw_body.decode(charset, errors="replace")
|
|
except LookupError:
|
|
body = raw_body.decode("utf-8", errors="replace")
|
|
return EmailModel(
|
|
headers={k: v for k, v in msg.items()},
|
|
body=body,
|
|
content_type=msg.get_content_type(),
|
|
size=len(body) + sum(subpart.size for subpart in subparts),
|
|
subparts=subparts,
|
|
)
|
|
|
|
|
|
def parse_email(raw: str) -> EmailModel:
|
|
try:
|
|
msg = email.message_from_string(raw)
|
|
return get_email_model(msg)
|
|
except Exception as e:
|
|
_logger.error(f"Could not parse email: {e}")
|
|
return EmailModel(
|
|
headers={},
|
|
body="could not parse email",
|
|
content_type="text/plain",
|
|
size=len("could not parse email"),
|
|
subparts=[],
|
|
)
|
|
|
|
|
|
def generate_email_model(item: dict) -> tuple[EmailModel, str]:
|
|
"""Build an EmailModel from a sendbox item.
|
|
|
|
Returns (EmailModel, raw_mime_string) so callers can pass the
|
|
synthesised MIME to SimpleMessage for correct BODY[] responses.
|
|
"""
|
|
email_json = json.loads(item["raw"])
|
|
message = MIMEMultipart()
|
|
if email_json.get("version") == "v2":
|
|
message['From'] = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"]
|
|
message['To'] = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"]
|
|
message.attach(MIMEText(
|
|
email_json["content"],
|
|
"html" if email_json.get("is_html") else "plain"
|
|
))
|
|
else:
|
|
message['From'] = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>'
|
|
message['To'] = ", ".join(
|
|
[f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]])
|
|
message.attach(MIMEText(
|
|
email_json["content"][0]["value"],
|
|
"html" if "html" in email_json["content"][0]["type"] else "plain"
|
|
))
|
|
message['Subject'] = email_json["subject"]
|
|
message["Date"] = datetime.datetime.strptime(
|
|
item["created_at"], "%Y-%m-%d %H:%M:%S"
|
|
).strftime("%a, %d %b %Y %H:%M:%S +0000")
|
|
raw_mime = message.as_string()
|
|
return parse_email(raw_mime), raw_mime
|