Files
cloudflare_temp_email/smtp_proxy_server/imap_server.py
Dream Hunter 2a52fd35d5 refactor: modularize IMAP server with dual login, STARTTLS, and test suite (#859)
refactor: modularize IMAP server with fixes and E2E tests

- Modularize IMAP server into imap_server, imap_mailbox, imap_message,
  imap_http_client, parse_email, config, models
- Support dual login: JWT token and address+password via backend
- Add STARTTLS support with configurable TLS cert/key
- Fix FETCH/STORE returning UID instead of sequence number (RFC 3501)
- Implement IMessageFile.open() for correct BODY[] raw MIME delivery
- Add UIDNEXT to SELECT response via _cbSelectWork override
- Use per-restart UIDVALIDITY to force client resync
- Pass raw MIME to SimpleMessage for accurate RFC822.SIZE
- Fix SENT mailbox returning empty source
- Handle CREATE command gracefully for Thunderbird compatibility
- Add IMAP E2E tests: auth, LIST, SELECT, STATUS, FETCH, SEARCH,
  STORE, UID FETCH, BODY[] integrity, size, seq numbers, SENT mailbox
- Add SMTP E2E tests using nodemailer: send plain/HTML, auth failure,
  sendbox verification
- Add sendTestMail helper using admin/send_mail

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 11:08:10 +08:00

208 lines
6.9 KiB
Python

import json
import logging
import httpx
from twisted.mail import imap4
from zope.interface import implementer
from twisted.cred.portal import Portal, IRealm
from twisted.internet import protocol, reactor, defer, ssl, threads
from twisted.cred import error as cred_error
from twisted.cred.checkers import ICredentialsChecker, IUsernamePassword
from config import settings
from imap_http_client import BackendClient
from imap_mailbox import SimpleMailbox
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
class SimpleIMAPServer(imap4.IMAP4Server):
def __init__(self, context_factory=None):
chal = {
b"LOGIN": imap4.LOGINCredentials,
b"PLAIN": imap4.PLAINCredentials,
}
imap4.IMAP4Server.__init__(
self, chal=chal, contextFactory=context_factory
)
def lineReceived(self, line):
_logger.debug("C: %s", line)
return imap4.IMAP4Server.lineReceived(self, line)
def sendLine(self, line):
_logger.debug("S: %s", line)
return imap4.IMAP4Server.sendLine(self, line)
def connectionMade(self):
"""Wrap transport to log raw data sent to client."""
imap4.IMAP4Server.connectionMade(self)
real_write_seq = self.transport.writeSequence
def logging_write_seq(data):
joined = b''.join(data)
for line in joined.split(b'\r\n'):
if line:
_logger.debug("S-RAW: %s", line[:300])
return real_write_seq(data)
self.transport.writeSequence = logging_write_seq
def _cbSelectWork(self, mbox, cmdName, tag):
"""Override to add UIDNEXT in SELECT response (RFC 3501)."""
if mbox is None:
self.sendNegativeResponse(tag, b"No such mailbox")
return
if "\\noselect" in [s.lower() for s in mbox.getFlags()]:
self.sendNegativeResponse(tag, "Mailbox cannot be selected")
return
flags = [imap4.networkString(flag) for flag in mbox.getFlags()]
self.sendUntaggedResponse(b"%d EXISTS" % (mbox.getMessageCount(),))
self.sendUntaggedResponse(b"%d RECENT" % (mbox.getRecentCount(),))
self.sendUntaggedResponse(b"FLAGS (" + b" ".join(flags) + b")")
self.sendPositiveResponse(
None, b"[UIDVALIDITY %d]" % (mbox.getUIDValidity(),)
)
self.sendPositiveResponse(
None, b"[UIDNEXT %d]" % (mbox.getUIDNext(),)
)
s = mbox.isWriteable() and b"READ-WRITE" or b"READ-ONLY"
mbox.addListener(self)
self.sendPositiveResponse(
tag, b"[" + s + b"] " + cmdName + b" successful"
)
self.state = "select"
self.mbox = mbox
class Account(imap4.MemoryAccount):
"""Custom account that initializes mailbox UID index on select."""
def _emptyMailbox(self, name, id):
"""Ignore CREATE for unknown mailboxes instead of crashing."""
return None
def create(self, pathspec):
"""Silently ignore mailbox creation requests from clients."""
_logger.debug("Ignoring CREATE request for %s", pathspec)
return False
@defer.inlineCallbacks
def select(self, name, readwrite=1):
mbox = self.mailboxes.get(imap4._parseMbox(name.upper()))
if mbox is not None:
yield mbox._build_uid_index()
return mbox
@implementer(IRealm)
class SimpleRealm:
def requestAvatar(self, avatarId, mind, *interfaces):
res = json.loads(avatarId)
username = res["username"]
password = res["password"]
client = BackendClient(password)
inbox = SimpleMailbox("INBOX", client)
sent = SimpleMailbox("SENT", client)
account = Account(username)
account.mailboxes = {"INBOX": inbox, "SENT": sent}
account.subscriptions = ["INBOX", "SENT"]
return imap4.IAccount, account, lambda: client.close()
class IMAPFactory(protocol.Factory):
def __init__(self, portal, context_factory=None):
self.portal = portal
self._context_factory = context_factory
def buildProtocol(self, addr):
p = SimpleIMAPServer(context_factory=self._context_factory)
p.portal = self.portal
return p
@implementer(ICredentialsChecker)
class CustomChecker:
credentialInterfaces = (IUsernamePassword,)
@staticmethod
def _is_jwt(token: str) -> bool:
"""Check if token looks like a JWT (eyJ... with 3 dot-separated parts)."""
parts = token.split(".")
return len(parts) == 3 and parts[0].startswith("eyJ")
def requestAvatarId(self, credentials):
username = credentials.username.decode()
password = credentials.password.decode()
if self._is_jwt(password):
_logger.info("Login via JWT token")
return defer.succeed(json.dumps({
"username": username,
"password": password,
}))
# Not a JWT — try address+password login via backend
_logger.info("Login via address+password")
d = threads.deferToThread(self._login_with_password, username, password)
return d
@staticmethod
def _login_with_password(username: str, password: str) -> str:
"""Exchange address+password for a JWT via backend."""
res = httpx.post(
f"{settings.proxy_url}/api/address_login",
json={"email": username, "password": password},
headers={
"x-custom-auth": settings.basic_password,
"Content-Type": "application/json",
},
timeout=settings.imap_http_timeout,
)
if res.status_code == 200:
jwt = res.json().get("jwt")
if jwt:
return json.dumps({
"username": username,
"password": jwt,
})
raise cred_error.UnauthorizedLogin(f"address_login failed: {res.status_code}")
def start_imap_server():
_logger.info("Starting IMAP server on port %s", settings.imap_port)
context_factory = None
has_cert = bool(settings.imap_tls_cert)
has_key = bool(settings.imap_tls_key)
if has_cert != has_key:
raise ValueError(
"Both imap_tls_cert and imap_tls_key must be set together"
)
if has_cert and has_key:
_logger.info("TLS enabled for IMAP (STARTTLS)")
context_factory = ssl.DefaultOpenSSLContextFactory(
settings.imap_tls_key,
settings.imap_tls_cert,
)
portal = Portal(SimpleRealm(), [CustomChecker()])
factory = IMAPFactory(portal, context_factory=context_factory)
reactor.listenTCP(settings.imap_port, factory)
reactor.run()
if __name__ == "__main__":
_logger.info(
"Starting IMAP server proxy_url=%s port=%s tls=%s",
settings.proxy_url, settings.imap_port,
bool(settings.imap_tls_cert and settings.imap_tls_key),
)
start_imap_server()