Files
cloudflare_temp_email/smtp_proxy_server/parse_email.py
Dream Hunter 03965f3612 fix(imap): fix mojibake in nested emails, empty headers, and date handling (#909)
* fix(imap): fix mojibake in nested emails, empty headers, and date handling

- Add line-by-line mojibake fix fallback for complex emails with mixed content
- Apply empty header cleanup globally to fix nested message/rfc822 parts
- Add locale-independent date formatting (format_imap_date, format_rfc2822_date)
- Fill missing Date header from created_at field
- Fix getSubPart for non-multipart messages
- Accept CREATE requests from clients (e.g. Gmail creating Drafts)
- Strip whitespace from IMAP password
- Use MIMEText instead of MIMEMultipart for sent mail generation
- Keep body in original CTE encoding for correct BODYSTRUCTURE
- Update CHANGELOG (zh/en)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: consolidate IMAP changelog entries into single line

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 20:52:18 +08:00

127 lines
4.3 KiB
Python

import json
import logging
import email
from email.message import Message
from email.mime.text import MIMEText
from models import EmailModel
from imap_message import parse_created_at, format_rfc2822_date
import re
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
# Matches an empty header value (header name with no value)
_EMPTY_HEADER_RE = re.compile(r'^([A-Za-z][A-Za-z0-9-]*):\s*\r?\n', re.MULTILINE)
def get_email_model(msg: Message):
subparts = [
get_email_model(subpart)
for subpart in msg.get_payload()
] if msg.is_multipart() else []
if msg.is_multipart():
body = ""
else:
# Keep body in its original CTE encoding (base64/QP/7bit/8bit)
# so it matches the Content-Transfer-Encoding header.
# The IMAP client will decode CTE itself based on BODYSTRUCTURE.
body = msg.get_payload(decode=False) or ""
return EmailModel(
headers={k: v for k, v in msg.items()},
body=body,
content_type=msg.get_content_type(),
size=len(body.encode("utf-8") if isinstance(body, str) else body) + sum(subpart.size for subpart in subparts),
subparts=subparts,
)
def clean_raw_headers(raw: str) -> str:
"""Remove empty header lines that break Python email parser.
Some emails (e.g. from Gmail via Cloudflare) have duplicate headers
like 'Content-Type: \\n' (empty) followed by the real Content-Type.
The empty one confuses email.message_from_string().
Applies globally so nested message/rfc822 parts are also cleaned.
"""
return _EMPTY_HEADER_RE.sub('', raw)
def fix_mojibake(raw: str) -> str:
"""Fix UTF-8 mojibake where upstream stored UTF-8 bytes as cp1252/latin-1.
Tries whole-string fix first (fast path). If that fails (e.g. complex
emails with mixed binary/text content), falls back to line-by-line fix.
"""
# Fast path: fix entire string at once
for enc in ("cp1252", "latin-1"):
try:
return raw.encode(enc).decode("utf-8")
except (UnicodeDecodeError, UnicodeEncodeError):
continue
# Slow path: fix line by line (tolerates mixed content)
lines = raw.split('\n')
fixed = []
for line in lines:
fixed_line = line
for enc in ("cp1252", "latin-1"):
try:
fixed_line = line.encode(enc).decode("utf-8")
break
except (UnicodeDecodeError, UnicodeEncodeError):
continue
fixed.append(fixed_line)
return '\n'.join(fixed)
def parse_email(raw: str) -> EmailModel:
try:
raw = clean_raw_headers(raw)
msg = email.message_from_string(raw)
return get_email_model(msg)
except Exception as e:
_logger.error(f"Could not parse email: {e}")
return EmailModel(
headers={},
body="could not parse email",
content_type="text/plain",
size=len("could not parse email"),
subparts=[],
)
def generate_email_model(item: dict) -> tuple[EmailModel, str]:
"""Build an EmailModel from a sendbox item.
Returns (EmailModel, raw_mime_string) so callers can pass the
synthesised MIME to SimpleMessage for correct BODY[] responses.
"""
email_json = json.loads(item["raw"])
if email_json.get("version") == "v2":
from_addr = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"]
to_addr = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"]
content = email_json["content"]
subtype = "html" if email_json.get("is_html") else "plain"
else:
from_addr = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>'
to_addr = ", ".join(
[f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]])
content = email_json["content"][0]["value"]
subtype = "html" if "html" in email_json["content"][0]["type"] else "plain"
message = MIMEText(content, subtype, "utf-8")
message['From'] = from_addr
message['To'] = to_addr
message['Subject'] = email_json["subject"]
dt = parse_created_at(item["created_at"])
if dt:
message["Date"] = format_rfc2822_date(dt)
raw_mime = message.as_string()
return parse_email(raw_mime), raw_mime