Files
cloudflare_temp_email/smtp_proxy_server/smtp_server.py
Dream Hunter 2a52fd35d5 refactor: modularize IMAP server with dual login, STARTTLS, and test suite (#859)
refactor: modularize IMAP server with fixes and E2E tests

- Modularize IMAP server into imap_server, imap_mailbox, imap_message,
  imap_http_client, parse_email, config, models
- Support dual login: JWT token and address+password via backend
- Add STARTTLS support with configurable TLS cert/key
- Fix FETCH/STORE returning UID instead of sequence number (RFC 3501)
- Implement IMessageFile.open() for correct BODY[] raw MIME delivery
- Add UIDNEXT to SELECT response via _cbSelectWork override
- Use per-restart UIDVALIDITY to force client resync
- Pass raw MIME to SimpleMessage for accurate RFC822.SIZE
- Fix SENT mailbox returning empty source
- Handle CREATE command gracefully for Thunderbird compatibility
- Add IMAP E2E tests: auth, LIST, SELECT, STATUS, FETCH, SEARCH,
  STORE, UID FETCH, BODY[] integrity, size, seq numbers, SENT mailbox
- Add SMTP E2E tests using nodemailer: send plain/HTML, auth failure,
  sendbox verification
- Add sendTestMail helper using admin/send_mail

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 11:08:10 +08:00

163 lines
5.6 KiB
Python

import asyncio
import logging
import email
import httpx
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP, Session, Envelope, AuthResult, LoginPassword
from config import settings
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
def _safe_decode_payload(payload, charset):
if payload is None:
return ""
try:
return payload.decode(charset or "utf-8", errors="replace")
except LookupError:
return payload.decode("utf-8", errors="replace")
class CustomSMTPHandler:
def authenticator(self, server, session, envelope, mechanism, auth_data):
fail_nothandled = AuthResult(success=False, handled=False)
if mechanism not in ("LOGIN", "PLAIN"):
_logger.warning(f"Unsupported mechanism {mechanism}")
return fail_nothandled
if not isinstance(auth_data, LoginPassword):
_logger.warning(f"Invalid auth data {auth_data}")
return fail_nothandled
return AuthResult(success=True, auth_data=auth_data)
async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str:
_logger.info(
f"handle_DATA from {envelope.mail_from} to {envelope.rcpt_tos}"
)
if not isinstance(session.auth_data, LoginPassword):
return '530 Authentication required'
if len(envelope.rcpt_tos) != 1:
return '500 Only one recipient allowed'
# Only one recipient allowed
to_mail = envelope.rcpt_tos[0]
# Parse email
msg = email.message_from_string(envelope.content)
content_list = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
charset = part.get_content_charset()
cte = str(part.get('content-transfer-encoding', '')).lower()
if content_type not in ["text/plain", "text/html"]:
_logger.warning(f"Skipping {content_type}")
continue
if cte == "8bit":
value = part.get_payload(decode=False)
else:
payload = part.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
if not value:
continue
content_list.append({
"type": content_type,
"value": value
})
elif msg.get_content_type() in ["text/plain", "text/html"] and msg.get_payload(decode=True):
cte = str(msg.get('content-transfer-encoding', '')).lower()
charset = msg.get_content_charset()
if cte == "8bit":
value = msg.get_payload(decode=False)
else:
payload = msg.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
_logger.debug("Parsed content charset=%s", charset)
content_list.append({
"type": msg.get_content_type(),
"value": value
})
if not content_list:
return '500 Invalid content'
body = max(
content_list,
key=lambda x: (x["type"] == "text/html", len(x["value"]))
)
from_name, _ = email.utils.parseaddr(
str(email.header.make_header(
email.header.decode_header(msg['From'])
))
)
to_mail_map = {}
for to in str(email.header.make_header(
email.header.decode_header(msg['To'])
)).split(","):
tmp_to_name, tmp_to_mail = email.utils.parseaddr(to)
to_mail_map[tmp_to_mail] = tmp_to_name
_logger.info(f"Parsed mail from {from_name} to {to_mail_map}")
# Send mail
send_body = {
"token": session.auth_data.password.decode(),
"from_name": from_name,
"to_name": to_mail_map.get(to_mail),
"to_mail": to_mail,
"subject": str(email.header.make_header(
email.header.decode_header(msg['Subject'])
)),
"is_html": body["type"] == "text/html",
"content": body["value"],
}
_logger.info(f"Send mail {dict(send_body, token='***')}")
try:
res = httpx.post(
f"{settings.proxy_url}/external/api/send_mail",
json=send_body, headers={
"Content-Type": "application/json"
}
)
if res.status_code != 200:
_logger.error(
"Failed to send mail "
f"code=[{res.status_code}] text=[{res.text}]"
)
return f'500 Internal server error code=[{res.status_code}] text=[{res.text}]'
except Exception as e:
_logger.error(e)
return '500 Internal server error'
return '250 OK'
def start_smtp_server():
handler = CustomSMTPHandler()
server = Controller(
handler,
hostname="",
port=settings.port,
auth_require_tls=False,
decode_data=True,
authenticator=handler.authenticator,
auth_exclude_mechanism=["DONT"]
)
_logger.info("Starting SMTP server on port %s", settings.port)
server.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except KeyboardInterrupt:
_logger.info("Got KeyboardInterrupt, stopping")
server.stop()
if __name__ == "__main__":
_logger.info(
"Starting SMTP server proxy_url=%s port=%s",
settings.proxy_url, settings.port,
)
start_smtp_server()