fix(imap): fix mojibake in nested emails, empty headers, and date handling (#909)

* fix(imap): fix mojibake in nested emails, empty headers, and date handling

- Add line-by-line mojibake fix fallback for complex emails with mixed content
- Apply empty header cleanup globally to fix nested message/rfc822 parts
- Add locale-independent date formatting (format_imap_date, format_rfc2822_date)
- Fill missing Date header from created_at field
- Fix getSubPart for non-multipart messages
- Accept CREATE requests from clients (e.g. Gmail creating Drafts)
- Strip whitespace from IMAP password
- Use MIMEText instead of MIMEMultipart for sent mail generation
- Keep body in original CTE encoding for correct BODYSTRUCTURE
- Update CHANGELOG (zh/en)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: consolidate IMAP changelog entries into single line

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dream Hunter
2026-03-22 20:52:18 +08:00
committed by GitHub
parent 64d11799b3
commit 03965f3612
7 changed files with 143 additions and 37 deletions

View File

@@ -17,7 +17,7 @@ class BackendClient:
"""
def __init__(self, password: str):
self.password = password
self.password = password.strip()
self._client = httpx.Client(
base_url=settings.proxy_url,
headers={

View File

@@ -10,7 +10,7 @@ from zope.interface import implementer
from config import settings
from imap_http_client import BackendClient
from imap_message import SimpleMessage
from parse_email import generate_email_model, parse_email
from parse_email import generate_email_model, parse_email, clean_raw_headers, fix_mojibake
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
@@ -246,6 +246,8 @@ class SimpleMailbox:
try:
if self.name == "INBOX":
raw = item.get("raw", "")
raw = fix_mojibake(raw)
raw = clean_raw_headers(raw)
email_model = parse_email(raw)
elif self.name == "SENT":
email_model, raw = generate_email_model(item)
@@ -256,7 +258,8 @@ class SimpleMailbox:
self._flags[uid_val] = {r"\Seen"}
flags = self._flags[uid_val]
msg = SimpleMessage(
uid_val, email_model, flags=flags, raw=raw
uid_val, email_model, flags=flags, raw=raw,
created_at=item.get("created_at"),
)
self._cache.put(uid_val, msg)
except Exception as e:

View File

@@ -1,28 +1,72 @@
from io import BytesIO
from datetime import datetime, timezone
from twisted.mail import imap4
from zope.interface import implementer
from models import EmailModel
# Locale-independent English names for IMAP date formatting
_MONTHS = ('', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
_DAYS = ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')
_CREATED_AT_FMTS = (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%d %H:%M:%S.%f",
)
def parse_created_at(created_at: str) -> datetime | None:
"""Parse created_at string into datetime, returns None on failure."""
for fmt in _CREATED_AT_FMTS:
try:
return datetime.strptime(created_at, fmt)
except ValueError:
continue
return None
def format_imap_date(dt: datetime) -> str:
"""Format datetime as IMAP INTERNALDATE: '21-Mar-2026 13:04:59 +0000'."""
return (f"{dt.day:02d}-{_MONTHS[dt.month]}-{dt.year} "
f"{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d} +0000")
def format_rfc2822_date(dt: datetime) -> str:
"""Format datetime as RFC 2822: 'Thu, 13 Mar 2026 11:15:57 +0000'."""
return (f"{_DAYS[dt.weekday()]}, {dt.day:02d} {_MONTHS[dt.month]} {dt.year} "
f"{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d} +0000")
@implementer(imap4.IMessage, imap4.IMessageFile)
class SimpleMessage:
def __init__(self, uid: int, email_model: EmailModel,
flags: set[str] = None, raw: str = None):
flags: set[str] = None, raw: str = None, created_at: str = None):
self.uid = uid
self.email = email_model
self.subparts = self.email.subparts
self._flags = flags if flags is not None else set()
self._raw = raw
self._created_at = created_at
self._fill_date_header()
def _fill_date_header(self):
"""Fill empty/missing Date header from created_at."""
date_val = self.email.headers.get("Date", "").strip()
if date_val or not self._created_at:
return
dt = parse_created_at(self._created_at)
if dt:
self.email.headers["Date"] = format_rfc2822_date(dt)
def getUID(self):
return self.uid
def getHeaders(self, negate, *names):
# Twisted passes header names as bytes (e.g. b"SUBJECT");
# normalize to lowercase str for comparison.
names_lower = set()
for n in names:
if isinstance(n, bytes):
@@ -47,6 +91,10 @@ class SimpleMessage:
return len(self.subparts) > 0
def getSubPart(self, part):
if not self.subparts:
if part == 0:
return SimpleMessage(self.uid, self.email, flags=self._flags)
raise IndexError(part)
return SimpleMessage(self.uid, self.subparts[part], flags=self._flags)
def getBodyFile(self):
@@ -61,6 +109,10 @@ class SimpleMessage:
return list(self._flags)
def getInternalDate(self):
if self._created_at:
dt = parse_created_at(self._created_at)
if dt:
return format_imap_date(dt)
return self.email.headers.get("Date", "Mon, 1 Jan 1900 00:00:00 +0000")
# IMessageFile

View File

@@ -81,13 +81,18 @@ class Account(imap4.MemoryAccount):
"""Custom account that initializes mailbox UID index on select."""
def _emptyMailbox(self, name, id):
"""Ignore CREATE for unknown mailboxes instead of crashing."""
return None
"""Return a dummy mailbox for CREATE requests (e.g. Gmail creating Drafts)."""
_logger.debug("Accepting CREATE request for %s", name)
return SimpleMailbox(name, self._client)
def create(self, pathspec):
"""Silently ignore mailbox creation requests from clients."""
_logger.debug("Ignoring CREATE request for %s", pathspec)
return False
"""Accept CREATE silently without actually creating mailboxes."""
_logger.debug("Ignoring CREATE for %s", pathspec)
return True
def listMailboxes(self, ref, wildcard):
"""Only list INBOX and SENT, ignore client-created mailboxes."""
return [("INBOX", self.mailboxes["INBOX"]), ("SENT", self.mailboxes["SENT"])]
@defer.inlineCallbacks
def select(self, name, readwrite=1):
@@ -110,6 +115,7 @@ class SimpleRealm:
sent = SimpleMailbox("SENT", client)
account = Account(username)
account._client = client
account.mailboxes = {"INBOX": inbox, "SENT": sent}
account.subscriptions = ["INBOX", "SENT"]

View File

@@ -1,18 +1,22 @@
import datetime
import json
import logging
import email
from email.message import Message
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from models import EmailModel
from imap_message import parse_created_at, format_rfc2822_date
import re
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
# Matches an empty header value (header name with no value)
_EMPTY_HEADER_RE = re.compile(r'^([A-Za-z][A-Za-z0-9-]*):\s*\r?\n', re.MULTILINE)
def get_email_model(msg: Message):
subparts = [
@@ -22,23 +26,62 @@ def get_email_model(msg: Message):
if msg.is_multipart():
body = ""
else:
raw_body = msg.get_payload(decode=True) or b""
charset = msg.get_content_charset() or "utf-8"
try:
body = raw_body.decode(charset, errors="replace")
except LookupError:
body = raw_body.decode("utf-8", errors="replace")
# Keep body in its original CTE encoding (base64/QP/7bit/8bit)
# so it matches the Content-Transfer-Encoding header.
# The IMAP client will decode CTE itself based on BODYSTRUCTURE.
body = msg.get_payload(decode=False) or ""
return EmailModel(
headers={k: v for k, v in msg.items()},
body=body,
content_type=msg.get_content_type(),
size=len(body) + sum(subpart.size for subpart in subparts),
size=len(body.encode("utf-8") if isinstance(body, str) else body) + sum(subpart.size for subpart in subparts),
subparts=subparts,
)
def clean_raw_headers(raw: str) -> str:
"""Remove empty header lines that break Python email parser.
Some emails (e.g. from Gmail via Cloudflare) have duplicate headers
like 'Content-Type: \\n' (empty) followed by the real Content-Type.
The empty one confuses email.message_from_string().
Applies globally so nested message/rfc822 parts are also cleaned.
"""
return _EMPTY_HEADER_RE.sub('', raw)
def fix_mojibake(raw: str) -> str:
"""Fix UTF-8 mojibake where upstream stored UTF-8 bytes as cp1252/latin-1.
Tries whole-string fix first (fast path). If that fails (e.g. complex
emails with mixed binary/text content), falls back to line-by-line fix.
"""
# Fast path: fix entire string at once
for enc in ("cp1252", "latin-1"):
try:
return raw.encode(enc).decode("utf-8")
except (UnicodeDecodeError, UnicodeEncodeError):
continue
# Slow path: fix line by line (tolerates mixed content)
lines = raw.split('\n')
fixed = []
for line in lines:
fixed_line = line
for enc in ("cp1252", "latin-1"):
try:
fixed_line = line.encode(enc).decode("utf-8")
break
except (UnicodeDecodeError, UnicodeEncodeError):
continue
fixed.append(fixed_line)
return '\n'.join(fixed)
def parse_email(raw: str) -> EmailModel:
try:
raw = clean_raw_headers(raw)
msg = email.message_from_string(raw)
return get_email_model(msg)
except Exception as e:
@@ -52,6 +95,7 @@ def parse_email(raw: str) -> EmailModel:
)
def generate_email_model(item: dict) -> tuple[EmailModel, str]:
"""Build an EmailModel from a sendbox item.
@@ -59,25 +103,24 @@ def generate_email_model(item: dict) -> tuple[EmailModel, str]:
synthesised MIME to SimpleMessage for correct BODY[] responses.
"""
email_json = json.loads(item["raw"])
message = MIMEMultipart()
if email_json.get("version") == "v2":
message['From'] = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"]
message['To'] = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"]
message.attach(MIMEText(
email_json["content"],
"html" if email_json.get("is_html") else "plain"
))
from_addr = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"]
to_addr = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"]
content = email_json["content"]
subtype = "html" if email_json.get("is_html") else "plain"
else:
message['From'] = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>'
message['To'] = ", ".join(
from_addr = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>'
to_addr = ", ".join(
[f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]])
message.attach(MIMEText(
email_json["content"][0]["value"],
"html" if "html" in email_json["content"][0]["type"] else "plain"
))
content = email_json["content"][0]["value"]
subtype = "html" if "html" in email_json["content"][0]["type"] else "plain"
message = MIMEText(content, subtype, "utf-8")
message['From'] = from_addr
message['To'] = to_addr
message['Subject'] = email_json["subject"]
message["Date"] = datetime.datetime.strptime(
item["created_at"], "%Y-%m-%d %H:%M:%S"
).strftime("%a, %d %b %Y %H:%M:%S +0000")
dt = parse_created_at(item["created_at"])
if dt:
message["Date"] = format_rfc2822_date(dt)
raw_mime = message.as_string()
return parse_email(raw_mime), raw_mime