import asyncio import logging import email import ssl import httpx from aiosmtpd.controller import Controller from aiosmtpd.smtp import SMTP, Session, Envelope, AuthResult, LoginPassword from config import settings _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) def _safe_decode_payload(payload, charset): if payload is None: return "" try: return payload.decode(charset or "utf-8", errors="replace") except LookupError: return payload.decode("utf-8", errors="replace") class CustomSMTPHandler: def authenticator(self, server, session, envelope, mechanism, auth_data): fail_nothandled = AuthResult(success=False, handled=False) if mechanism not in ("LOGIN", "PLAIN"): _logger.warning(f"Unsupported mechanism {mechanism}") return fail_nothandled if not isinstance(auth_data, LoginPassword): _logger.warning(f"Invalid auth data {auth_data}") return fail_nothandled return AuthResult(success=True, auth_data=auth_data) async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str: _logger.info( f"handle_DATA from {envelope.mail_from} to {envelope.rcpt_tos}" ) if not isinstance(session.auth_data, LoginPassword): return '530 Authentication required' if len(envelope.rcpt_tos) != 1: return '500 Only one recipient allowed' # Only one recipient allowed to_mail = envelope.rcpt_tos[0] # Parse email msg = email.message_from_string(envelope.content) content_list = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() charset = part.get_content_charset() cte = str(part.get('content-transfer-encoding', '')).lower() if content_type not in ["text/plain", "text/html"]: _logger.warning(f"Skipping {content_type}") continue if cte == "8bit": value = part.get_payload(decode=False) else: payload = part.get_payload(decode=True) value = _safe_decode_payload(payload, charset) if not value: continue content_list.append({ "type": content_type, "value": value }) elif msg.get_content_type() in ["text/plain", "text/html"] and msg.get_payload(decode=True): cte = str(msg.get('content-transfer-encoding', '')).lower() charset = msg.get_content_charset() if cte == "8bit": value = msg.get_payload(decode=False) else: payload = msg.get_payload(decode=True) value = _safe_decode_payload(payload, charset) _logger.debug("Parsed content charset=%s", charset) content_list.append({ "type": msg.get_content_type(), "value": value }) if not content_list: return '500 Invalid content' body = max( content_list, key=lambda x: (x["type"] == "text/html", len(x["value"])) ) from_name, _ = email.utils.parseaddr( str(email.header.make_header( email.header.decode_header(msg['From']) )) ) to_mail_map = {} for to in str(email.header.make_header( email.header.decode_header(msg['To']) )).split(","): tmp_to_name, tmp_to_mail = email.utils.parseaddr(to) to_mail_map[tmp_to_mail] = tmp_to_name _logger.info(f"Parsed mail from {from_name} to {to_mail_map}") # Send mail send_body = { "token": session.auth_data.password.decode(), "from_name": from_name, "to_name": to_mail_map.get(to_mail), "to_mail": to_mail, "subject": str(email.header.make_header( email.header.decode_header(msg['Subject']) )), "is_html": body["type"] == "text/html", "content": body["value"], } _logger.info(f"Send mail {dict(send_body, token='***')}") try: res = httpx.post( f"{settings.proxy_url}/external/api/send_mail", json=send_body, headers={ "Content-Type": "application/json" } ) if res.status_code != 200: _logger.error( "Failed to send mail " f"code=[{res.status_code}] text=[{res.text}]" ) return f'500 Internal server error code=[{res.status_code}] text=[{res.text}]' except Exception as e: _logger.error(e) return '500 Internal server error' return '250 OK' def start_smtp_server(): handler = CustomSMTPHandler() tls_context = None has_cert = bool(settings.smtp_tls_cert) has_key = bool(settings.smtp_tls_key) if has_cert != has_key: raise ValueError( "Both smtp_tls_cert and smtp_tls_key must be set together" ) if has_cert and has_key: _logger.info("TLS enabled for SMTP (STARTTLS)") tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) tls_context.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 tls_context.load_cert_chain(settings.smtp_tls_cert, settings.smtp_tls_key) server = Controller( handler, hostname="", port=settings.port, auth_require_tls=bool(tls_context), decode_data=True, authenticator=handler.authenticator, auth_exclude_mechanism=["DONT"], tls_context=tls_context, ) _logger.info( "Starting SMTP server on port %s tls=%s", settings.port, bool(tls_context), ) server.start() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_forever() except KeyboardInterrupt: _logger.info("Got KeyboardInterrupt, stopping") server.stop() if __name__ == "__main__": _logger.info( "Starting SMTP server proxy_url=%s port=%s tls=%s", settings.proxy_url, settings.port, bool(settings.smtp_tls_cert and settings.smtp_tls_key), ) start_smtp_server()