Files
cloudflare_temp_email/smtp_proxy_server/parse_email.py
2024-05-22 20:48:03 +08:00

63 lines
1.8 KiB
Python

import datetime
import json
import logging
import email
from email.message import Message
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from models import EmailModel
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
def get_email_model(msg: Message):
subparts = [
get_email_model(subpart)
for subpart in msg.get_payload()
] if msg.is_multipart() else []
body = "" if msg.is_multipart() else msg._payload
return EmailModel(
headers={k: v for k, v in msg.items()},
body=body,
content_type=msg.get_content_type(),
size=len(body) + sum(subpart.size for subpart in subparts),
subparts=subparts,
)
def parse_email(raw: str) -> EmailModel:
try:
msg = email.message_from_string(raw)
return get_email_model(msg)
except Exception as e:
_logger.error(f"Could not parse email: {e}")
return EmailModel(
headers={},
body="could not parse email",
content_type="text/plain",
size=len("could not parse email"),
subparts=[],
)
def generate_email_model(item: dict) -> EmailModel:
email_json = json.loads(item["raw"])
message = MIMEMultipart()
message['From'] = f"{email_json["from"]['name']} <{
email_json["from"]['email']}>"
message['To'] = ", ".join(
[f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]])
message['Subject'] = email_json["subject"]
message["Date"] = datetime.datetime.strptime(
item["created_at"], "%Y-%m-%d %H:%M:%S"
).strftime("%a, %d %b %Y %H:%M:%S +0000")
message.attach(MIMEText(
email_json["content"][0]["value"],
"html" if "html" in email_json["content"][0]["type"] else "plain"
))
return parse_email(message.as_string())