Files
cloudflare_temp_email/smtp_proxy_server/imap_server.py
Dream Hunter 03965f3612 fix(imap): fix mojibake in nested emails, empty headers, and date handling (#909)
* fix(imap): fix mojibake in nested emails, empty headers, and date handling

- Add line-by-line mojibake fix fallback for complex emails with mixed content
- Apply empty header cleanup globally to fix nested message/rfc822 parts
- Add locale-independent date formatting (format_imap_date, format_rfc2822_date)
- Fill missing Date header from created_at field
- Fix getSubPart for non-multipart messages
- Accept CREATE requests from clients (e.g. Gmail creating Drafts)
- Strip whitespace from IMAP password
- Use MIMEText instead of MIMEMultipart for sent mail generation
- Keep body in original CTE encoding for correct BODYSTRUCTURE
- Update CHANGELOG (zh/en)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: consolidate IMAP changelog entries into single line

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 20:52:18 +08:00

214 lines
7.2 KiB
Python

import json
import logging
import httpx
from twisted.mail import imap4
from zope.interface import implementer
from twisted.cred.portal import Portal, IRealm
from twisted.internet import protocol, reactor, defer, ssl, threads
from twisted.cred import error as cred_error
from twisted.cred.checkers import ICredentialsChecker, IUsernamePassword
from config import settings
from imap_http_client import BackendClient
from imap_mailbox import SimpleMailbox
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
class SimpleIMAPServer(imap4.IMAP4Server):
def __init__(self, context_factory=None):
chal = {
b"LOGIN": imap4.LOGINCredentials,
b"PLAIN": imap4.PLAINCredentials,
}
imap4.IMAP4Server.__init__(
self, chal=chal, contextFactory=context_factory
)
def lineReceived(self, line):
_logger.debug("C: %s", line)
return imap4.IMAP4Server.lineReceived(self, line)
def sendLine(self, line):
_logger.debug("S: %s", line)
return imap4.IMAP4Server.sendLine(self, line)
def connectionMade(self):
"""Wrap transport to log raw data sent to client."""
imap4.IMAP4Server.connectionMade(self)
real_write_seq = self.transport.writeSequence
def logging_write_seq(data):
joined = b''.join(data)
for line in joined.split(b'\r\n'):
if line:
_logger.debug("S-RAW: %s", line[:300])
return real_write_seq(data)
self.transport.writeSequence = logging_write_seq
def _cbSelectWork(self, mbox, cmdName, tag):
"""Override to add UIDNEXT in SELECT response (RFC 3501)."""
if mbox is None:
self.sendNegativeResponse(tag, b"No such mailbox")
return
if "\\noselect" in [s.lower() for s in mbox.getFlags()]:
self.sendNegativeResponse(tag, "Mailbox cannot be selected")
return
flags = [imap4.networkString(flag) for flag in mbox.getFlags()]
self.sendUntaggedResponse(b"%d EXISTS" % (mbox.getMessageCount(),))
self.sendUntaggedResponse(b"%d RECENT" % (mbox.getRecentCount(),))
self.sendUntaggedResponse(b"FLAGS (" + b" ".join(flags) + b")")
self.sendPositiveResponse(
None, b"[UIDVALIDITY %d]" % (mbox.getUIDValidity(),)
)
self.sendPositiveResponse(
None, b"[UIDNEXT %d]" % (mbox.getUIDNext(),)
)
s = mbox.isWriteable() and b"READ-WRITE" or b"READ-ONLY"
mbox.addListener(self)
self.sendPositiveResponse(
tag, b"[" + s + b"] " + cmdName + b" successful"
)
self.state = "select"
self.mbox = mbox
class Account(imap4.MemoryAccount):
"""Custom account that initializes mailbox UID index on select."""
def _emptyMailbox(self, name, id):
"""Return a dummy mailbox for CREATE requests (e.g. Gmail creating Drafts)."""
_logger.debug("Accepting CREATE request for %s", name)
return SimpleMailbox(name, self._client)
def create(self, pathspec):
"""Accept CREATE silently without actually creating mailboxes."""
_logger.debug("Ignoring CREATE for %s", pathspec)
return True
def listMailboxes(self, ref, wildcard):
"""Only list INBOX and SENT, ignore client-created mailboxes."""
return [("INBOX", self.mailboxes["INBOX"]), ("SENT", self.mailboxes["SENT"])]
@defer.inlineCallbacks
def select(self, name, readwrite=1):
mbox = self.mailboxes.get(imap4._parseMbox(name.upper()))
if mbox is not None:
yield mbox._build_uid_index()
return mbox
@implementer(IRealm)
class SimpleRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
res = json.loads(avatarId)
username = res["username"]
password = res["password"]
client = BackendClient(password)
inbox = SimpleMailbox("INBOX", client)
sent = SimpleMailbox("SENT", client)
account = Account(username)
account._client = client
account.mailboxes = {"INBOX": inbox, "SENT": sent}
account.subscriptions = ["INBOX", "SENT"]
return imap4.IAccount, account, lambda: client.close()
class IMAPFactory(protocol.Factory):
def __init__(self, portal, context_factory=None):
self.portal = portal
self._context_factory = context_factory
def buildProtocol(self, addr):
p = SimpleIMAPServer(context_factory=self._context_factory)
p.portal = self.portal
return p
@implementer(ICredentialsChecker)
class CustomChecker:
credentialInterfaces = (IUsernamePassword,)
@staticmethod
def _is_jwt(token: str) -> bool:
"""Check if token looks like a JWT (eyJ... with 3 dot-separated parts)."""
parts = token.split(".")
return len(parts) == 3 and parts[0].startswith("eyJ")
def requestAvatarId(self, credentials):
username = credentials.username.decode()
password = credentials.password.decode()
if self._is_jwt(password):
_logger.info("Login via JWT token")
return defer.succeed(json.dumps({
"username": username,
"password": password,
}))
# Not a JWT — try address+password login via backend
_logger.info("Login via address+password")
d = threads.deferToThread(self._login_with_password, username, password)
return d
@staticmethod
def _login_with_password(username: str, password: str) -> str:
"""Exchange address+password for a JWT via backend."""
res = httpx.post(
f"{settings.proxy_url}/api/address_login",
json={"email": username, "password": password},
headers={
"x-custom-auth": settings.basic_password,
"Content-Type": "application/json",
},
timeout=settings.imap_http_timeout,
)
if res.status_code == 200:
jwt = res.json().get("jwt")
if jwt:
return json.dumps({
"username": username,
"password": jwt,
})
raise cred_error.UnauthorizedLogin(f"address_login failed: {res.status_code}")
def start_imap_server():
_logger.info("Starting IMAP server on port %s", settings.imap_port)
context_factory = None
has_cert = bool(settings.imap_tls_cert)
has_key = bool(settings.imap_tls_key)
if has_cert != has_key:
raise ValueError(
"Both imap_tls_cert and imap_tls_key must be set together"
)
if has_cert and has_key:
_logger.info("TLS enabled for IMAP (STARTTLS)")
context_factory = ssl.DefaultOpenSSLContextFactory(
settings.imap_tls_key,
settings.imap_tls_cert,
)
portal = Portal(SimpleRealm(), [CustomChecker()])
factory = IMAPFactory(portal, context_factory=context_factory)
reactor.listenTCP(settings.imap_port, factory)
reactor.run()
if __name__ == "__main__":
_logger.info(
"Starting IMAP server proxy_url=%s port=%s tls=%s",
settings.proxy_url, settings.imap_port,
bool(settings.imap_tls_cert and settings.imap_tls_key),
)
start_imap_server()