mirror of
https://github.com/dreamhunter2333/cloudflare_temp_email.git
synced 2026-05-07 08:12:46 +08:00
* fix(imap): fix mojibake in nested emails, empty headers, and date handling - Add line-by-line mojibake fix fallback for complex emails with mixed content - Apply empty header cleanup globally to fix nested message/rfc822 parts - Add locale-independent date formatting (format_imap_date, format_rfc2822_date) - Fill missing Date header from created_at field - Fix getSubPart for non-multipart messages - Accept CREATE requests from clients (e.g. Gmail creating Drafts) - Strip whitespace from IMAP password - Use MIMEText instead of MIMEMultipart for sent mail generation - Keep body in original CTE encoding for correct BODYSTRUCTURE - Update CHANGELOG (zh/en) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * docs: consolidate IMAP changelog entries into single line Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
127 lines
4.3 KiB
Python
127 lines
4.3 KiB
Python
import json
|
|
import logging
|
|
import email
|
|
|
|
from email.message import Message
|
|
from email.mime.text import MIMEText
|
|
|
|
from models import EmailModel
|
|
from imap_message import parse_created_at, format_rfc2822_date
|
|
|
|
import re
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
_logger.setLevel(logging.INFO)
|
|
|
|
# Matches an empty header value (header name with no value)
|
|
_EMPTY_HEADER_RE = re.compile(r'^([A-Za-z][A-Za-z0-9-]*):\s*\r?\n', re.MULTILINE)
|
|
|
|
|
|
def get_email_model(msg: Message):
|
|
subparts = [
|
|
get_email_model(subpart)
|
|
for subpart in msg.get_payload()
|
|
] if msg.is_multipart() else []
|
|
if msg.is_multipart():
|
|
body = ""
|
|
else:
|
|
# Keep body in its original CTE encoding (base64/QP/7bit/8bit)
|
|
# so it matches the Content-Transfer-Encoding header.
|
|
# The IMAP client will decode CTE itself based on BODYSTRUCTURE.
|
|
body = msg.get_payload(decode=False) or ""
|
|
return EmailModel(
|
|
headers={k: v for k, v in msg.items()},
|
|
body=body,
|
|
content_type=msg.get_content_type(),
|
|
size=len(body.encode("utf-8") if isinstance(body, str) else body) + sum(subpart.size for subpart in subparts),
|
|
subparts=subparts,
|
|
)
|
|
|
|
|
|
def clean_raw_headers(raw: str) -> str:
|
|
"""Remove empty header lines that break Python email parser.
|
|
|
|
Some emails (e.g. from Gmail via Cloudflare) have duplicate headers
|
|
like 'Content-Type: \\n' (empty) followed by the real Content-Type.
|
|
The empty one confuses email.message_from_string().
|
|
|
|
Applies globally so nested message/rfc822 parts are also cleaned.
|
|
"""
|
|
return _EMPTY_HEADER_RE.sub('', raw)
|
|
|
|
|
|
def fix_mojibake(raw: str) -> str:
|
|
"""Fix UTF-8 mojibake where upstream stored UTF-8 bytes as cp1252/latin-1.
|
|
|
|
Tries whole-string fix first (fast path). If that fails (e.g. complex
|
|
emails with mixed binary/text content), falls back to line-by-line fix.
|
|
"""
|
|
# Fast path: fix entire string at once
|
|
for enc in ("cp1252", "latin-1"):
|
|
try:
|
|
return raw.encode(enc).decode("utf-8")
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
continue
|
|
|
|
# Slow path: fix line by line (tolerates mixed content)
|
|
lines = raw.split('\n')
|
|
fixed = []
|
|
for line in lines:
|
|
fixed_line = line
|
|
for enc in ("cp1252", "latin-1"):
|
|
try:
|
|
fixed_line = line.encode(enc).decode("utf-8")
|
|
break
|
|
except (UnicodeDecodeError, UnicodeEncodeError):
|
|
continue
|
|
fixed.append(fixed_line)
|
|
return '\n'.join(fixed)
|
|
|
|
|
|
def parse_email(raw: str) -> EmailModel:
|
|
try:
|
|
raw = clean_raw_headers(raw)
|
|
msg = email.message_from_string(raw)
|
|
return get_email_model(msg)
|
|
except Exception as e:
|
|
_logger.error(f"Could not parse email: {e}")
|
|
return EmailModel(
|
|
headers={},
|
|
body="could not parse email",
|
|
content_type="text/plain",
|
|
size=len("could not parse email"),
|
|
subparts=[],
|
|
)
|
|
|
|
|
|
|
|
def generate_email_model(item: dict) -> tuple[EmailModel, str]:
|
|
"""Build an EmailModel from a sendbox item.
|
|
|
|
Returns (EmailModel, raw_mime_string) so callers can pass the
|
|
synthesised MIME to SimpleMessage for correct BODY[] responses.
|
|
"""
|
|
email_json = json.loads(item["raw"])
|
|
if email_json.get("version") == "v2":
|
|
from_addr = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"]
|
|
to_addr = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"]
|
|
content = email_json["content"]
|
|
subtype = "html" if email_json.get("is_html") else "plain"
|
|
else:
|
|
from_addr = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>'
|
|
to_addr = ", ".join(
|
|
[f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]])
|
|
content = email_json["content"][0]["value"]
|
|
subtype = "html" if "html" in email_json["content"][0]["type"] else "plain"
|
|
|
|
message = MIMEText(content, subtype, "utf-8")
|
|
message['From'] = from_addr
|
|
message['To'] = to_addr
|
|
message['Subject'] = email_json["subject"]
|
|
dt = parse_created_at(item["created_at"])
|
|
if dt:
|
|
message["Date"] = format_rfc2822_date(dt)
|
|
raw_mime = message.as_string()
|
|
return parse_email(raw_mime), raw_mime
|