Files
cloudflare_temp_email/smtp_proxy_server/smtp_server.py
Dream Hunter 8cf1150b15 feat: add STARTTLS support for SMTP proxy server (#876)
* feat: add STARTTLS support for SMTP proxy server

Add smtp_tls_cert and smtp_tls_key environment variables to enable
STARTTLS on the SMTP proxy server, matching existing IMAP TLS support.

Closes #249

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test: add E2E tests for SMTP/IMAP STARTTLS

- Add smtp-proxy-tls service with self-signed certs in docker-compose
- Add smtp-tls.spec.ts: SMTP STARTTLS send plain/HTML/auth tests
- Add imap-tls.spec.ts: IMAP STARTTLS login/list/select/fetch tests
- Register smtp-proxy project in playwright.config.ts
- Wait for TLS proxy readiness in docker-entrypoint.sh

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: enforce auth over TLS when STARTTLS is configured

- Set auth_require_tls conditionally based on tls_context presence
- Disable insecure SSLv2/SSLv3 protocols in TLS context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: replace cert-gen service with inline cert generation

The cert-gen one-shot container was exiting immediately after
generating certificates, triggering --abort-on-container-exit
and stopping all services before tests could run.

Replace with an entrypoint script in smtp-proxy-tls that generates
the self-signed cert before starting the proxy server.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:05:29 +08:00

184 lines
6.3 KiB
Python

import asyncio
import logging
import email
import ssl
import httpx
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP, Session, Envelope, AuthResult, LoginPassword
from config import settings
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
def _safe_decode_payload(payload, charset):
if payload is None:
return ""
try:
return payload.decode(charset or "utf-8", errors="replace")
except LookupError:
return payload.decode("utf-8", errors="replace")
class CustomSMTPHandler:
def authenticator(self, server, session, envelope, mechanism, auth_data):
fail_nothandled = AuthResult(success=False, handled=False)
if mechanism not in ("LOGIN", "PLAIN"):
_logger.warning(f"Unsupported mechanism {mechanism}")
return fail_nothandled
if not isinstance(auth_data, LoginPassword):
_logger.warning(f"Invalid auth data {auth_data}")
return fail_nothandled
return AuthResult(success=True, auth_data=auth_data)
async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str:
_logger.info(
f"handle_DATA from {envelope.mail_from} to {envelope.rcpt_tos}"
)
if not isinstance(session.auth_data, LoginPassword):
return '530 Authentication required'
if len(envelope.rcpt_tos) != 1:
return '500 Only one recipient allowed'
# Only one recipient allowed
to_mail = envelope.rcpt_tos[0]
# Parse email
msg = email.message_from_string(envelope.content)
content_list = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
charset = part.get_content_charset()
cte = str(part.get('content-transfer-encoding', '')).lower()
if content_type not in ["text/plain", "text/html"]:
_logger.warning(f"Skipping {content_type}")
continue
if cte == "8bit":
value = part.get_payload(decode=False)
else:
payload = part.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
if not value:
continue
content_list.append({
"type": content_type,
"value": value
})
elif msg.get_content_type() in ["text/plain", "text/html"] and msg.get_payload(decode=True):
cte = str(msg.get('content-transfer-encoding', '')).lower()
charset = msg.get_content_charset()
if cte == "8bit":
value = msg.get_payload(decode=False)
else:
payload = msg.get_payload(decode=True)
value = _safe_decode_payload(payload, charset)
_logger.debug("Parsed content charset=%s", charset)
content_list.append({
"type": msg.get_content_type(),
"value": value
})
if not content_list:
return '500 Invalid content'
body = max(
content_list,
key=lambda x: (x["type"] == "text/html", len(x["value"]))
)
from_name, _ = email.utils.parseaddr(
str(email.header.make_header(
email.header.decode_header(msg['From'])
))
)
to_mail_map = {}
for to in str(email.header.make_header(
email.header.decode_header(msg['To'])
)).split(","):
tmp_to_name, tmp_to_mail = email.utils.parseaddr(to)
to_mail_map[tmp_to_mail] = tmp_to_name
_logger.info(f"Parsed mail from {from_name} to {to_mail_map}")
# Send mail
send_body = {
"token": session.auth_data.password.decode(),
"from_name": from_name,
"to_name": to_mail_map.get(to_mail),
"to_mail": to_mail,
"subject": str(email.header.make_header(
email.header.decode_header(msg['Subject'])
)),
"is_html": body["type"] == "text/html",
"content": body["value"],
}
_logger.info(f"Send mail {dict(send_body, token='***')}")
try:
res = httpx.post(
f"{settings.proxy_url}/external/api/send_mail",
json=send_body, headers={
"Content-Type": "application/json"
}
)
if res.status_code != 200:
_logger.error(
"Failed to send mail "
f"code=[{res.status_code}] text=[{res.text}]"
)
return f'500 Internal server error code=[{res.status_code}] text=[{res.text}]'
except Exception as e:
_logger.error(e)
return '500 Internal server error'
return '250 OK'
def start_smtp_server():
handler = CustomSMTPHandler()
tls_context = None
has_cert = bool(settings.smtp_tls_cert)
has_key = bool(settings.smtp_tls_key)
if has_cert != has_key:
raise ValueError(
"Both smtp_tls_cert and smtp_tls_key must be set together"
)
if has_cert and has_key:
_logger.info("TLS enabled for SMTP (STARTTLS)")
tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
tls_context.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
tls_context.load_cert_chain(settings.smtp_tls_cert, settings.smtp_tls_key)
server = Controller(
handler,
hostname="",
port=settings.port,
auth_require_tls=bool(tls_context),
decode_data=True,
authenticator=handler.authenticator,
auth_exclude_mechanism=["DONT"],
tls_context=tls_context,
)
_logger.info(
"Starting SMTP server on port %s tls=%s",
settings.port, bool(tls_context),
)
server.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
except KeyboardInterrupt:
_logger.info("Got KeyboardInterrupt, stopping")
server.stop()
if __name__ == "__main__":
_logger.info(
"Starting SMTP server proxy_url=%s port=%s tls=%s",
settings.proxy_url, settings.port,
bool(settings.smtp_tls_cert and settings.smtp_tls_key),
)
start_smtp_server()