From 7c6d0d7c8a81e9b034d14faa972883f804ea50ac Mon Sep 17 00:00:00 2001 From: Dream Hunter Date: Sat, 4 Apr 2026 18:46:39 +0800 Subject: [PATCH] feat(mail): support gzip compressed email storage via ENABLE_MAIL_GZIP (#933) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(mail): support gzip compressed email storage in D1 raw_blob column Add ENABLE_MAIL_GZIP env var to optionally gzip-compress incoming emails into a new raw_blob BLOB column, saving D1 storage space. Reading is backward-compatible: prioritizes raw_blob (decompress) with fallback to plaintext raw field. Includes DB migration v0.0.7, docs, and changelogs. Co-Authored-By: Claude Opus 4.6 * fix: gzip fallback on missing column + decouple resolve from handleListQuery - email/index.ts: gzip INSERT failure now falls back to plaintext INSERT instead of silently losing the email (P1: data loss prevention) - common.ts: add handleMailListQuery for raw_mails-specific list queries with resolveRawEmailList, keeping handleListQuery generic - Replace handleListQuery → handleMailListQuery in mails_api, admin_mail_api, user_mail_api (only raw_mails callers) - Add e2e test infrastructure: worker-gzip service, wrangler.toml.e2e.gzip, api-gzip playwright project, mail-gzip.spec.ts with 4 test cases Co-Authored-By: Claude Opus 4.6 * fix: address CodeRabbit review feedback for gzip feature - Use destructuring in resolveRawEmailRow to truly remove raw_blob key - Narrow fallback scope: only fallback to plaintext on compression failure or missing raw_blob column, re-throw other DB errors - Clean unused imports in e2e gzip test Co-Authored-By: Claude Opus 4.6 * fix: add try-catch in resolveRawEmail to prevent single corrupt blob from failing entire list A corrupted raw_blob would cause decompressBlob to throw, which with Promise.all in resolveRawEmailList would reject the entire batch query. Now catches decompression errors and falls back to row.raw field. Co-Authored-By: Claude Opus 4.6 * fix(mail): align sendAdminInternalMail with gzip storage path sendAdminInternalMail now respects ENABLE_MAIL_GZIP: compresses to raw_blob when enabled, with fallback to plaintext on failure. Added e2e test verifying admin internal mail is readable under gzip. Co-Authored-By: Claude Opus 4.6 * fix(e2e): match admin internal mail by body content instead of encoded subject mimetext base64-encodes the Subject header, so the raw MIME string does not contain the literal subject text. Match on body content (balance: 99) which is plaintext. Co-Authored-By: Claude Opus 4.6 * fix(e2e): add WORKER_GZIP_URL guard and length assertions in gzip tests Address CodeRabbit feedback: - Skip gzip tests when WORKER_GZIP_URL is not set to prevent false positives - Assert results array length before accessing [0] for clearer error messages Co-Authored-By: Claude Opus 4.6 * fix(mail): narrow gzip fallback scope and fix webhook query compatibility - sendAdminInternalMail: separate compress vs DB error handling, only fallback to plaintext on compression failure or missing raw_blob column, rethrow other DB errors (aligns with email/index.ts) - Webhook test endpoints: use SELECT * instead of explicit raw_blob column reference, so pre-migration databases don't 500 - Docs/changelog: clarify that db_migration must run before enabling ENABLE_MAIL_GZIP Co-Authored-By: Claude Opus 4.6 * fix(telegram): use generic Record type for raw_mails query result Align with other query sites — avoid hardcoding raw_blob in the TypeScript type annotation so the query works with or without the column after migration. Co-Authored-By: Claude Opus 4.6 * refactor(models): add RawMailRow type and unify raw_mails query typing Add RawMailRow type to models with raw_blob as optional field, replacing ad-hoc Record and inline type annotations across webhook test endpoints, telegram API, and gzip utilities. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- CHANGELOG.md | 1 + CHANGELOG_EN.md | 1 + db/2026-04-03-raw-blob.sql | 2 + db/schema.sql | 1 + e2e/Dockerfile.worker | 3 +- e2e/docker-compose.yml | 27 +- e2e/fixtures/test-helpers.ts | 1 + e2e/fixtures/wrangler.toml.e2e.gzip | 34 +++ e2e/playwright.config.ts | 8 + e2e/scripts/docker-entrypoint.sh | 22 ++ e2e/tests/api-gzip/mail-gzip.spec.ts | 242 ++++++++++++++++++ vitepress-docs/docs/en/guide/worker-vars.md | 1 + vitepress-docs/docs/zh/guide/worker-vars.md | 1 + worker/src/admin_api/admin_mail_api.ts | 6 +- worker/src/admin_api/db_api.ts | 13 + worker/src/admin_api/mail_webhook_settings.ts | 13 +- worker/src/common.ts | 27 ++ worker/src/constants.ts | 2 +- worker/src/email/index.ts | 51 +++- worker/src/gzip.ts | 48 ++++ worker/src/mails_api/index.ts | 8 +- worker/src/mails_api/webhook_settings.ts | 13 +- worker/src/models/index.ts | 11 + worker/src/telegram_api/miniapp.ts | 10 +- worker/src/telegram_api/telegram.ts | 9 +- worker/src/types.d.ts | 3 + worker/src/user_api/user_mail_api.ts | 4 +- worker/src/utils.ts | 41 ++- 28 files changed, 563 insertions(+), 40 deletions(-) create mode 100644 db/2026-04-03-raw-blob.sql create mode 100644 e2e/fixtures/wrangler.toml.e2e.gzip create mode 100644 e2e/tests/api-gzip/mail-gzip.spec.ts create mode 100644 worker/src/gzip.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c1bf0e82..569e0c1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - feat: |自动回复| 发件人过滤支持正则表达式匹配,使用 `/pattern/` 语法(如 `/@example\.com$/`),同时保持前缀匹配的向后兼容 - feat: |Turnstile| 新增全局登录表单 Turnstile 人机验证,通过 `ENABLE_GLOBAL_TURNSTILE_CHECK` 环境变量控制(#767) - feat: |Telegram| Telegram 推送支持发送邮件附件(单文件限制 50MB),多附件通过 `sendMediaGroup` 批量发送,通过 `ENABLE_TG_PUSH_ATTACHMENT` 环境变量开启(#894) +- feat: |邮件存储| 支持 Gzip 压缩存储邮件原文到 D1 数据库 `raw_blob` 字段,通过 `ENABLE_MAIL_GZIP` 环境变量开启,兼容已有明文 `raw` 字段的双字段读取。**启用前需先执行数据库迁移**(#823) ### Bug Fixes diff --git a/CHANGELOG_EN.md b/CHANGELOG_EN.md index 5e774ef5..a14f8833 100644 --- a/CHANGELOG_EN.md +++ b/CHANGELOG_EN.md @@ -16,6 +16,7 @@ - feat: |Auto Reply| Add regex matching support for sender filter using `/pattern/` syntax (e.g. `/@example\.com$/`), backward compatible with prefix matching - feat: |Turnstile| Add global Turnstile CAPTCHA for all login forms via `ENABLE_GLOBAL_TURNSTILE_CHECK` env var (#767) - feat: |Telegram| Support sending email attachments in Telegram push (50MB per file limit), multiple attachments sent via `sendMediaGroup`, controlled by `ENABLE_TG_PUSH_ATTACHMENT` env var (#894) +- feat: |Mail Storage| Support gzip-compressed email storage in D1 `raw_blob` BLOB column via `ENABLE_MAIL_GZIP` env var, with backward-compatible dual-field reading from `raw` and `raw_blob`. **Run database migration before enabling** (#823) ### Bug Fixes diff --git a/db/2026-04-03-raw-blob.sql b/db/2026-04-03-raw-blob.sql new file mode 100644 index 00000000..6b72a28b --- /dev/null +++ b/db/2026-04-03-raw-blob.sql @@ -0,0 +1,2 @@ +-- Add raw_blob BLOB column for gzip-compressed email storage +ALTER TABLE raw_mails ADD COLUMN raw_blob BLOB; diff --git a/db/schema.sql b/db/schema.sql index 6160f757..321d9bfb 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -4,6 +4,7 @@ CREATE TABLE IF NOT EXISTS raw_mails ( source TEXT, address TEXT, raw TEXT, + raw_blob BLOB, metadata TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); diff --git a/e2e/Dockerfile.worker b/e2e/Dockerfile.worker index 52c54476..a3a31619 100644 --- a/e2e/Dockerfile.worker +++ b/e2e/Dockerfile.worker @@ -11,7 +11,8 @@ RUN pnpm install --frozen-lockfile || (echo "WARN: frozen-lockfile failed, falli COPY worker/src/ src/ COPY worker/tsconfig.json ./ -COPY e2e/fixtures/wrangler.toml.e2e wrangler.toml +ARG WRANGLER_TOML=e2e/fixtures/wrangler.toml.e2e +COPY ${WRANGLER_TOML} wrangler.toml EXPOSE 8787 diff --git a/e2e/docker-compose.yml b/e2e/docker-compose.yml index c7950c14..3807ac79 100644 --- a/e2e/docker-compose.yml +++ b/e2e/docker-compose.yml @@ -41,10 +41,28 @@ services: context: .. dockerfile: e2e/Dockerfile.worker ports: - - "8788:8788" - command: ["pnpm", "exec", "wrangler", "dev", "--port", "8788", "--ip", "0.0.0.0"] + - "8790:8790" + command: ["pnpm", "exec", "wrangler", "dev", "--port", "8790", "--ip", "0.0.0.0"] volumes: - ./fixtures/wrangler.toml.e2e.env-off:/app/worker/wrangler.toml:ro + depends_on: + - mailpit + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8790/health_check"] + interval: 3s + timeout: 5s + start_period: 10s + retries: 20 + + worker-gzip: + build: + context: .. + dockerfile: e2e/Dockerfile.worker + args: + WRANGLER_TOML: e2e/fixtures/wrangler.toml.e2e.gzip + ports: + - "8788:8788" + command: ["pnpm", "exec", "wrangler", "dev", "--port", "8788", "--ip", "0.0.0.0"] depends_on: - mailpit healthcheck: @@ -108,7 +126,8 @@ services: environment: WORKER_URL: http://worker:8787 WORKER_URL_SUBDOMAIN: http://worker-subdomain:8789 - WORKER_URL_ENV_OFF: http://worker-env-off:8788 + WORKER_URL_ENV_OFF: http://worker-env-off:8790 + WORKER_GZIP_URL: http://worker-gzip:8788 FRONTEND_URL: https://frontend:5173 MAILPIT_API: http://mailpit:8025/api SMTP_PROXY_HOST: smtp-proxy @@ -125,6 +144,8 @@ services: condition: service_healthy worker-env-off: condition: service_healthy + worker-gzip: + condition: service_healthy frontend: condition: service_started smtp-proxy: diff --git a/e2e/fixtures/test-helpers.ts b/e2e/fixtures/test-helpers.ts index 839b100f..320d9c68 100644 --- a/e2e/fixtures/test-helpers.ts +++ b/e2e/fixtures/test-helpers.ts @@ -4,6 +4,7 @@ import WebSocket from 'ws'; export const WORKER_URL = process.env.WORKER_URL!; export const WORKER_URL_SUBDOMAIN = process.env.WORKER_URL_SUBDOMAIN || ''; export const WORKER_URL_ENV_OFF = process.env.WORKER_URL_ENV_OFF || ''; +export const WORKER_GZIP_URL = process.env.WORKER_GZIP_URL || ''; export const FRONTEND_URL = process.env.FRONTEND_URL!; export const MAILPIT_API = process.env.MAILPIT_API!; export const TEST_DOMAIN = 'test.example.com'; diff --git a/e2e/fixtures/wrangler.toml.e2e.gzip b/e2e/fixtures/wrangler.toml.e2e.gzip new file mode 100644 index 00000000..f52fb311 --- /dev/null +++ b/e2e/fixtures/wrangler.toml.e2e.gzip @@ -0,0 +1,34 @@ +name = "cloudflare_temp_email_gzip" +main = "src/worker.ts" +compatibility_date = "2025-04-01" +compatibility_flags = [ "nodejs_compat" ] +keep_vars = true + +[vars] +PREFIX = "tmp" +DEFAULT_DOMAINS = ["test.example.com"] +DOMAINS = ["test.example.com"] +JWT_SECRET = "e2e-test-secret-key" +BLACK_LIST = "" +ENABLE_USER_CREATE_EMAIL = true +ENABLE_USER_DELETE_EMAIL = true +ENABLE_AUTO_REPLY = true +DEFAULT_SEND_BALANCE = 10 +ENABLE_ADDRESS_PASSWORD = true +DISABLE_ADMIN_PASSWORD_CHECK = true +ADMIN_PASSWORDS = '["e2e-admin-pass"]' +ENABLE_WEBHOOK = true +E2E_TEST_MODE = true +ENABLE_MAIL_GZIP = true +SMTP_CONFIG = """ +{"test.example.com":{"host":"mailpit","port":1025,"secure":false}} +""" + +[[kv_namespaces]] +binding = "KV" +id = "e2e-test-kv-gzip-00000000-0000-0000-0000-000000000000" + +[[d1_databases]] +binding = "DB" +database_name = "e2e-temp-email-gzip" +database_id = "e2e-test-db-gzip-00000000-0000-0000-0000-000000000000" diff --git a/e2e/playwright.config.ts b/e2e/playwright.config.ts index d7c7aea4..c6d172c0 100644 --- a/e2e/playwright.config.ts +++ b/e2e/playwright.config.ts @@ -1,6 +1,7 @@ import { defineConfig, devices } from '@playwright/test'; const WORKER_BASE = process.env.WORKER_URL!; +const WORKER_GZIP_BASE = process.env.WORKER_GZIP_URL || ''; const FRONTEND_BASE = process.env.FRONTEND_URL!; export default defineConfig({ @@ -16,6 +17,13 @@ export default defineConfig({ baseURL: WORKER_BASE, }, }, + { + name: 'api-gzip', + testDir: './tests/api-gzip', + use: { + baseURL: WORKER_GZIP_BASE, + }, + }, { name: 'smtp-proxy', testDir: './tests/smtp-proxy', diff --git a/e2e/scripts/docker-entrypoint.sh b/e2e/scripts/docker-entrypoint.sh index 40211cae..a745319b 100755 --- a/e2e/scripts/docker-entrypoint.sh +++ b/e2e/scripts/docker-entrypoint.sh @@ -44,6 +44,21 @@ if [ -n "${WORKER_URL_ENV_OFF:-}" ]; then done fi +if [ -n "${WORKER_GZIP_URL:-}" ]; then + echo "==> Waiting for worker-gzip at $WORKER_GZIP_URL ..." + for i in $(seq 1 60); do + if curl -sf "$WORKER_GZIP_URL/health_check" > /dev/null 2>&1; then + echo " Worker-gzip ready after ${i}s" + break + fi + if [ "$i" -eq 60 ]; then + echo "ERROR: Worker-gzip not ready after 60s" + exit 1 + fi + sleep 1 + done +fi + echo "==> Waiting for frontend at $FRONTEND_URL ..." for i in $(seq 1 60); do if curl -skf "$FRONTEND_URL" > /dev/null 2>&1; then @@ -88,5 +103,12 @@ if [ -n "${WORKER_URL_ENV_OFF:-}" ]; then echo " Env-off database initialized" fi +if [ -n "${WORKER_GZIP_URL:-}" ]; then + echo "==> Initializing gzip worker database" + curl -sf -X POST "$WORKER_GZIP_URL/admin/db_initialize" > /dev/null + curl -sf -X POST "$WORKER_GZIP_URL/admin/db_migration" > /dev/null + echo " Gzip worker database initialized" +fi + echo "==> Running Playwright tests" exec npx playwright test "$@" diff --git a/e2e/tests/api-gzip/mail-gzip.spec.ts b/e2e/tests/api-gzip/mail-gzip.spec.ts new file mode 100644 index 00000000..88af5bb1 --- /dev/null +++ b/e2e/tests/api-gzip/mail-gzip.spec.ts @@ -0,0 +1,242 @@ +import { test, expect } from '@playwright/test'; +import { WORKER_GZIP_URL, TEST_DOMAIN } from '../../fixtures/test-helpers'; + +/** + * These tests run against a worker instance with ENABLE_MAIL_GZIP=true. + * They verify gzip-compressed storage and backward-compatible reading. + */ + +// Helper: create address on the gzip worker +async function createGzipAddress(ctx: any, name: string) { + const uniqueName = `${name}${Date.now()}`; + const res = await ctx.post(`${WORKER_GZIP_URL}/api/new_address`, { + data: { name: uniqueName, domain: TEST_DOMAIN }, + }); + if (!res.ok()) throw new Error(`Failed to create address: ${res.status()} ${await res.text()}`); + const body = await res.json(); + return { jwt: body.jwt, address: body.address, address_id: body.address_id }; +} + +// Helper: seed mail via receiveMail (goes through email() handler → gzip compression) +async function receiveGzipMail( + ctx: any, address: string, + opts: { subject?: string; html?: string; text?: string; from?: string } +) { + const from = opts.from || `sender@${TEST_DOMAIN}`; + const subject = opts.subject || 'Test Email'; + const boundary = `----E2E${Date.now()}`; + const htmlPart = opts.html || `

${opts.text || 'Hello from E2E'}

`; + const textPart = opts.text || 'Hello from E2E'; + const messageId = ``; + + const raw = [ + `From: ${from}`, + `To: ${address}`, + `Subject: ${subject}`, + `Message-ID: ${messageId}`, + `MIME-Version: 1.0`, + `Content-Type: multipart/alternative; boundary="${boundary}"`, + ``, + `--${boundary}`, + `Content-Type: text/plain; charset=utf-8`, + ``, + textPart, + `--${boundary}`, + `Content-Type: text/html; charset=utf-8`, + ``, + htmlPart, + `--${boundary}--`, + ].join('\r\n'); + + const res = await ctx.post(`${WORKER_GZIP_URL}/admin/test/receive_mail`, { + data: { from, to: address, raw }, + }); + if (!res.ok()) throw new Error(`Failed to receive mail: ${res.status()} ${await res.text()}`); + const body = await res.json(); + if (!body.success) throw new Error(`Mail was rejected: ${body.rejected || 'unknown reason'}`); +} + +// Helper: seed mail via seedMail (direct INSERT → plaintext raw, no gzip) +async function seedPlaintextMail( + ctx: any, address: string, + opts: { subject?: string; text?: string; from?: string } +) { + const from = opts.from || `sender@${TEST_DOMAIN}`; + const subject = opts.subject || 'Plaintext Mail'; + const messageId = ``; + const raw = [ + `From: ${from}`, + `To: ${address}`, + `Subject: ${subject}`, + `Message-ID: ${messageId}`, + `Content-Type: text/plain; charset=utf-8`, + ``, + opts.text || 'Hello plaintext from E2E', + ].join('\r\n'); + + const res = await ctx.post(`${WORKER_GZIP_URL}/admin/test/seed_mail`, { + data: { address, source: from, raw, message_id: messageId }, + }); + if (!res.ok()) throw new Error(`Failed to seed mail: ${res.status()} ${await res.text()}`); +} + +// Helper: delete address on gzip worker +async function deleteGzipAddress(ctx: any, jwt: string) { + await ctx.delete(`${WORKER_GZIP_URL}/api/delete_address`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); +} + +test.describe('Mail Gzip Storage', () => { + test.beforeEach(() => { + test.skip(!WORKER_GZIP_URL, 'WORKER_GZIP_URL not set — skipping gzip tests'); + }); + + test('gzip-compressed mail is readable in list', async ({ request }) => { + const { jwt, address } = await createGzipAddress(request, 'gzip-list'); + try { + await receiveGzipMail(request, address, { + subject: 'Gzip List Test', + text: 'compressed content here', + }); + + const res = await request.get(`${WORKER_GZIP_URL}/api/mails?limit=10&offset=0`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + expect(res.ok()).toBe(true); + const { results } = await res.json(); + expect(results).toHaveLength(1); + expect(results[0].raw).toContain('Gzip List Test'); + expect(results[0].raw).toContain('compressed content here'); + } finally { + await deleteGzipAddress(request, jwt); + } + }); + + test('gzip-compressed mail is readable in detail', async ({ request }) => { + const { jwt, address } = await createGzipAddress(request, 'gzip-detail'); + try { + await receiveGzipMail(request, address, { + subject: 'Gzip Detail Test', + html: 'bold gzip', + }); + + const listRes = await request.get(`${WORKER_GZIP_URL}/api/mails?limit=10&offset=0`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + const { results } = await listRes.json(); + expect(results.length).toBeGreaterThanOrEqual(1); + const mailId = results[0].id; + + const detailRes = await request.get(`${WORKER_GZIP_URL}/api/mail/${mailId}`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + expect(detailRes.ok()).toBe(true); + const mail = await detailRes.json(); + expect(mail.raw).toContain('Gzip Detail Test'); + expect(mail.raw).toContain('bold gzip'); + } finally { + await deleteGzipAddress(request, jwt); + } + }); + + test('mixed: plaintext seed + gzip receive both readable in same list', async ({ request }) => { + const { jwt, address } = await createGzipAddress(request, 'gzip-mixed'); + try { + // 1. Direct INSERT plaintext (simulates pre-gzip data) + await seedPlaintextMail(request, address, { + subject: 'Old Plaintext Mail', + text: 'legacy plain content', + }); + + // 2. receiveMail → goes through email() handler → gzip compressed + await receiveGzipMail(request, address, { + subject: 'New Gzip Mail', + text: 'new compressed content', + }); + + const res = await request.get(`${WORKER_GZIP_URL}/api/mails?limit=10&offset=0`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + expect(res.ok()).toBe(true); + const { results } = await res.json(); + expect(results).toHaveLength(2); + + // Both mails should have readable raw content + const subjects = results.map((r: any) => r.raw); + expect(subjects.some((r: string) => r.includes('Old Plaintext Mail'))).toBe(true); + expect(subjects.some((r: string) => r.includes('New Gzip Mail'))).toBe(true); + expect(subjects.some((r: string) => r.includes('legacy plain content'))).toBe(true); + expect(subjects.some((r: string) => r.includes('new compressed content'))).toBe(true); + } finally { + await deleteGzipAddress(request, jwt); + } + }); + + test('admin internal mail (sendAdminInternalMail) is gzip-compressed and readable', async ({ request }) => { + const { jwt, address } = await createGzipAddress(request, 'gzip-admin-mail'); + try { + // 1. Request send access → creates address_sender row + const reqAccessRes = await request.post(`${WORKER_GZIP_URL}/api/request_send_mail_access`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + expect(reqAccessRes.ok()).toBe(true); + + // 2. Get address_sender id + const senderListRes = await request.get( + `${WORKER_GZIP_URL}/admin/address_sender?limit=10&offset=0&address=${encodeURIComponent(address)}`, + ); + expect(senderListRes.ok()).toBe(true); + const senderList = await senderListRes.json(); + expect(senderList.results.length).toBeGreaterThanOrEqual(1); + const senderId = senderList.results[0].id; + + // 3. Update send access via admin API → triggers sendAdminInternalMail + const updateRes = await request.post(`${WORKER_GZIP_URL}/admin/address_sender`, { + data: { address, address_id: senderId, balance: 99, enabled: true }, + }); + expect(updateRes.ok()).toBe(true); + + // 4. Verify the internal mail is readable + const mailsRes = await request.get(`${WORKER_GZIP_URL}/api/mails?limit=10&offset=0`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + expect(mailsRes.ok()).toBe(true); + const { results } = await mailsRes.json(); + expect(results.length).toBeGreaterThanOrEqual(1); + + // mimetext base64-encodes the Subject header, so match on body content instead + const internalMail = results.find((m: any) => m.raw?.includes('balance: 99')); + expect(internalMail).toBeDefined(); + expect(internalMail.raw).toContain('admin@internal'); + expect(internalMail.raw).toContain('balance: 99'); + expect(internalMail).not.toHaveProperty('raw_blob'); + } finally { + await deleteGzipAddress(request, jwt); + } + }); + + test('raw_blob field is not exposed in API response', async ({ request }) => { + const { jwt, address } = await createGzipAddress(request, 'gzip-noblob'); + try { + await receiveGzipMail(request, address, { subject: 'No Blob Leak' }); + + // Check list response + const listRes = await request.get(`${WORKER_GZIP_URL}/api/mails?limit=10&offset=0`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + const { results } = await listRes.json(); + expect(results.length).toBeGreaterThanOrEqual(1); + expect(results[0]).not.toHaveProperty('raw_blob'); + + // Check detail response + const detailRes = await request.get(`${WORKER_GZIP_URL}/api/mail/${results[0].id}`, { + headers: { Authorization: `Bearer ${jwt}` }, + }); + const mail = await detailRes.json(); + expect(mail).not.toHaveProperty('raw_blob'); + } finally { + await deleteGzipAddress(request, jwt); + } + }); +}); diff --git a/vitepress-docs/docs/en/guide/worker-vars.md b/vitepress-docs/docs/en/guide/worker-vars.md index 68a7434f..2caab161 100644 --- a/vitepress-docs/docs/en/guide/worker-vars.md +++ b/vitepress-docs/docs/en/guide/worker-vars.md @@ -70,6 +70,7 @@ | `FORWARD_ADDRESS_LIST` | JSON | Global forward address list, disabled if not configured, all emails will be forwarded to listed addresses when enabled | `["xxx@xxx.com"]` | | `REMOVE_EXCEED_SIZE_ATTACHMENT` | Text/JSON | If attachment exceeds 2MB, remove it, email may lose some information due to parsing | `true` | | `REMOVE_ALL_ATTACHMENT` | Text/JSON | Remove all attachments, email may lose some information due to parsing | `true` | +| `ENABLE_MAIL_GZIP` | Text/JSON | When enabled, new emails are gzip-compressed and stored in `raw_blob` column to save D1 database space. Existing plaintext `raw` data is automatically compatible for reading. **Run database migration first (Admin → DB Migration) to ensure `raw_blob` column exists before enabling** | `true` | > [!NOTE] > `Junk mail checking` and `attachment removal` require email parsing, free tier CPU is limited, may cause large email parsing timeout diff --git a/vitepress-docs/docs/zh/guide/worker-vars.md b/vitepress-docs/docs/zh/guide/worker-vars.md index 38a9721e..81fac676 100644 --- a/vitepress-docs/docs/zh/guide/worker-vars.md +++ b/vitepress-docs/docs/zh/guide/worker-vars.md @@ -66,6 +66,7 @@ | `FORWARD_ADDRESS_LIST` | JSON | 全局转发地址列表,如果不配置则不启用,启用后所有邮件都会转发到列表中的地址 | `["xxx@xxx.com"]` | | `REMOVE_EXCEED_SIZE_ATTACHMENT` | 文本/JSON | 如果附件大小超过 2MB,则删除附件,邮件可能由于解析而丢失一些信息 | `true` | | `REMOVE_ALL_ATTACHMENT` | 文本/JSON | 移除所有附件,邮件可能由于解析而丢失一些信息 | `true` | +| `ENABLE_MAIL_GZIP` | 文本/JSON | 启用后新邮件将 Gzip 压缩存储到 `raw_blob` 字段,可节省 D1 数据库空间。已有明文 `raw` 数据自动兼容读取。**启用前请先执行数据库迁移(管理后台 → 数据库迁移),确保 `raw_blob` 列已创建** | `true` | > [!NOTE] > `垃圾邮件检查` 和 `移除附件功能` 需要解析邮件,免费版 CPU 有限,可能会导致大邮件解析超时 diff --git a/worker/src/admin_api/admin_mail_api.ts b/worker/src/admin_api/admin_mail_api.ts index 2524b704..8257b891 100644 --- a/worker/src/admin_api/admin_mail_api.ts +++ b/worker/src/admin_api/admin_mail_api.ts @@ -1,5 +1,5 @@ import { Context } from "hono"; -import { handleListQuery } from "../common"; +import { handleMailListQuery } from "../common"; export default { getMails: async (c: Context) => { @@ -9,7 +9,7 @@ export default { const filterQuerys = [addressQuery].filter((item) => item).join(" and "); const finalQuery = filterQuerys.length > 0 ? `where ${filterQuerys}` : ""; const filterParams = [...addressParams] - return await handleListQuery(c, + return await handleMailListQuery(c, `SELECT * FROM raw_mails ${finalQuery}`, `SELECT count(*) as count FROM raw_mails ${finalQuery}`, filterParams, limit, offset @@ -17,7 +17,7 @@ export default { }, getUnknowMails: async (c: Context) => { const { limit, offset } = c.req.query(); - return await handleListQuery(c, + return await handleMailListQuery(c, `SELECT * FROM raw_mails where address NOT IN (select name from address) `, `SELECT count(*) as count FROM raw_mails` + ` where address NOT IN (select name from address) `, diff --git a/worker/src/admin_api/db_api.ts b/worker/src/admin_api/db_api.ts index c229de6a..e75c6411 100644 --- a/worker/src/admin_api/db_api.ts +++ b/worker/src/admin_api/db_api.ts @@ -9,6 +9,7 @@ CREATE TABLE IF NOT EXISTS raw_mails ( source TEXT, address TEXT, raw TEXT, + raw_blob BLOB, metadata TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); @@ -184,6 +185,18 @@ export default { // migration to v0.0.6: add message_id index on raw_mails await c.env.DB.exec(`CREATE INDEX IF NOT EXISTS idx_raw_mails_message_id ON raw_mails(message_id);`); } + if (version && version <= "v0.0.6") { + // migration to v0.0.7: add raw_blob column for gzip compressed email storage + const tableInfo = await c.env.DB.prepare( + `PRAGMA table_info(raw_mails)` + ).all(); + const hasRawBlob = tableInfo.results?.some( + (col: any) => col.name === 'raw_blob' + ); + if (!hasRawBlob) { + await c.env.DB.exec(`ALTER TABLE raw_mails ADD COLUMN raw_blob BLOB;`); + } + } if (version != CONSTANTS.DB_VERSION) { // remove all \r and \n characters from the query string // split by ; and join with a ;\n diff --git a/worker/src/admin_api/mail_webhook_settings.ts b/worker/src/admin_api/mail_webhook_settings.ts index 71b55c15..a854e2cc 100644 --- a/worker/src/admin_api/mail_webhook_settings.ts +++ b/worker/src/admin_api/mail_webhook_settings.ts @@ -1,7 +1,8 @@ import { Context } from "hono"; import { CONSTANTS } from "../constants"; -import { WebhookSettings } from "../models"; +import { WebhookSettings, RawMailRow } from "../models"; import { commonParseMail, sendWebhook } from "../common"; +import { resolveRawEmail } from "../gzip"; async function getWebhookSettings(c: Context): Promise { const settings = await c.env.KV.get( @@ -21,10 +22,12 @@ async function saveWebhookSettings(c: Context): Promise): Promise { const settings = await c.req.json(); // random raw email - const { id: mailId, raw } = await c.env.DB.prepare( - `SELECT id, raw FROM raw_mails ORDER BY RANDOM() LIMIT 1` - ).first<{ id: string, raw: string }>() || {}; - const parsedEmailContext: ParsedEmailContext = { rawEmail: raw || "" }; + const mailRow = await c.env.DB.prepare( + `SELECT * FROM raw_mails ORDER BY RANDOM() LIMIT 1` + ).first(); + const mailId = mailRow?.id; + const raw = mailRow ? await resolveRawEmail(mailRow) : ""; + const parsedEmailContext: ParsedEmailContext = { rawEmail: raw }; const parsedEmail = await commonParseMail(parsedEmailContext); const res = await sendWebhook(settings, { id: mailId || "0", diff --git a/worker/src/common.ts b/worker/src/common.ts index dd82a1ef..4eb24737 100644 --- a/worker/src/common.ts +++ b/worker/src/common.ts @@ -622,6 +622,33 @@ export const handleListQuery = async ( return c.json({ results, count }); } +/** + * handleListQuery variant for raw_mails: resolves raw_blob → raw after query. + */ +export const handleMailListQuery = async ( + c: Context, + query: string, countQuery: string, params: string[], + limit: string | number | undefined | null, + offset: string | number | undefined | null, + orderBy?: string +): Promise => { + const { resolveRawEmailList } = await import('./gzip'); + const msgs = i18n.getMessagesbyContext(c); + if (typeof limit === "string") limit = parseInt(limit); + if (typeof offset === "string") offset = parseInt(offset); + if (!limit || limit < 0 || limit > 100) return c.text(msgs.InvalidLimitMsg, 400); + if (offset == null || offset == undefined || offset < 0) return c.text(msgs.InvalidOffsetMsg, 400); + const orderClause = orderBy || 'id desc'; + const resultsQuery = `${query} order by ${orderClause} limit ? offset ?`; + const { results } = await c.env.DB.prepare(resultsQuery).bind( + ...params, limit, offset + ).all(); + const resolvedResults = await resolveRawEmailList(results); + const count = offset == 0 ? await c.env.DB.prepare( + countQuery + ).bind(...params).first("count") : 0; + return c.json({ results: resolvedResults, count }); +} export const commonParseMail = async (parsedEmailContext: ParsedEmailContext): Promise<{ sender: string, diff --git a/worker/src/constants.ts b/worker/src/constants.ts index 5a6de7f3..9c89ecac 100644 --- a/worker/src/constants.ts +++ b/worker/src/constants.ts @@ -3,7 +3,7 @@ export const CONSTANTS = { // DB Version DB_VERSION_KEY: 'db_version', - DB_VERSION: "v0.0.6", + DB_VERSION: "v0.0.7", // DB settings ADDRESS_BLOCK_LIST_KEY: 'address_block_list', diff --git a/worker/src/email/index.ts b/worker/src/email/index.ts index 91d34afe..3a67ee44 100644 --- a/worker/src/email/index.ts +++ b/worker/src/email/index.ts @@ -1,6 +1,6 @@ import { Context } from "hono"; -import { getJsonSetting } from "../utils"; +import { getBooleanValue, getJsonSetting } from "../utils"; import { sendMailToTelegram } from "../telegram_api"; import { auto_reply } from "./auto_reply"; import { isBlocked } from "./black_list"; @@ -11,6 +11,7 @@ import { extractEmailInfo } from "./ai_extract"; import { forwardEmail } from "./forward"; import { EmailRuleSettings } from "../models"; import { CONSTANTS } from "../constants"; +import { compressText } from "../gzip"; async function email(message: ForwardableEmailMessage, env: Bindings, ctx: ExecutionContext) { @@ -65,11 +66,49 @@ async function email(message: ForwardableEmailMessage, env: Bindings, ctx: Execu const message_id = message.headers.get("Message-ID"); // save email try { - const { success } = await env.DB.prepare( - `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` - ).bind( - message.from, message.to, parsedEmailContext.rawEmail, message_id - ).run(); + let success = false; + if (getBooleanValue(env.ENABLE_MAIL_GZIP)) { + let compressed: ArrayBuffer | null = null; + try { + compressed = await compressText(parsedEmailContext.rawEmail); + } catch (gzipError) { + console.error("gzip compression failed, falling back to plaintext", gzipError); + } + if (compressed) { + try { + ({ success } = await env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw_blob, message_id) VALUES (?, ?, ?, ?)` + ).bind( + message.from, message.to, compressed, message_id + ).run()); + } catch (dbError) { + // Fallback to plaintext only if raw_blob column is missing (migration not applied) + const errMsg = String(dbError); + if (errMsg.includes('raw_blob') || errMsg.includes('no such column')) { + console.error("raw_blob column missing, falling back to plaintext", dbError); + ({ success } = await env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind( + message.from, message.to, parsedEmailContext.rawEmail, message_id + ).run()); + } else { + throw dbError; + } + } + } else { + ({ success } = await env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind( + message.from, message.to, parsedEmailContext.rawEmail, message_id + ).run()); + } + } else { + ({ success } = await env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind( + message.from, message.to, parsedEmailContext.rawEmail, message_id + ).run()); + } if (!success) { message.setReject(`Failed save message to ${message.to}`); console.error(`Failed save message from ${message.from} to ${message.to}`); diff --git a/worker/src/gzip.ts b/worker/src/gzip.ts new file mode 100644 index 00000000..8d5e9930 --- /dev/null +++ b/worker/src/gzip.ts @@ -0,0 +1,48 @@ +/** + * Gzip compression/decompression utilities for D1 BLOB storage. + * Uses Web Standard CompressionStream/DecompressionStream (native in CF Workers). + */ + +import { RawMailRow } from "./models"; + +export async function compressText(text: string): Promise { + const stream = new Blob([text]).stream().pipeThrough(new CompressionStream('gzip')); + return new Response(stream).arrayBuffer(); +} + +export async function decompressBlob(buffer: ArrayBuffer): Promise { + const stream = new Blob([buffer]).stream().pipeThrough(new DecompressionStream('gzip')); + return new Response(stream).text(); +} + +/** + * Resolve the raw email text from either raw_blob (gzip) or raw (plaintext) field. + */ +export async function resolveRawEmail(row: RawMailRow): Promise { + if (row.raw_blob) { + try { + // D1 returns BLOB as Array, convert to ArrayBuffer for decompression + return await decompressBlob(new Uint8Array(row.raw_blob as ArrayLike).buffer); + } catch (e) { + console.error("decompressBlob failed, fallback to raw field", e); + return row.raw ?? ''; + } + } + return row.raw ?? ''; +} + +/** + * Resolve a single row: decompress raw_blob if present, strip raw_blob from result. + */ +export async function resolveRawEmailRow(row: RawMailRow): Promise { + const raw = await resolveRawEmail(row); + const { raw_blob: _, ...rest } = row; + return { ...rest, raw }; +} + +/** + * Batch resolve raw emails for list queries using Promise.all. + */ +export async function resolveRawEmailList(rows: RawMailRow[]): Promise { + return Promise.all(rows.map(row => resolveRawEmailRow(row))); +} diff --git a/worker/src/mails_api/index.ts b/worker/src/mails_api/index.ts index 3339956c..6b2bcac5 100644 --- a/worker/src/mails_api/index.ts +++ b/worker/src/mails_api/index.ts @@ -2,8 +2,9 @@ import { Context, Hono } from 'hono' import i18n from '../i18n'; import { getBooleanValue, getJsonSetting, checkCfTurnstile, getStringValue, getSplitStringListValue, isAddressCountLimitReached } from '../utils'; -import { newAddress, handleListQuery, deleteAddressWithData, getAddressPrefix, getAllowDomains, updateAddressUpdatedAt, generateRandomName } from '../common' +import { newAddress, handleMailListQuery, deleteAddressWithData, getAddressPrefix, getAllowDomains, updateAddressUpdatedAt, generateRandomName } from '../common' import { CONSTANTS } from '../constants' +import { resolveRawEmailRow } from '../gzip' import auto_reply from './auto_reply' import webhook_settings from './webhook_settings'; import s3_attachment from './s3_attachment'; @@ -28,7 +29,7 @@ api.get('/api/mails', async (c) => { } const { limit, offset } = c.req.query(); if (Number.parseInt(offset) <= 0) updateAddressUpdatedAt(c, address); - return await handleListQuery(c, + return await handleMailListQuery(c, `SELECT * FROM raw_mails where address = ?`, `SELECT count(*) as count FROM raw_mails where address = ?`, [address], limit, offset @@ -41,7 +42,8 @@ api.get('/api/mail/:mail_id', async (c) => { const result = await c.env.DB.prepare( `SELECT * FROM raw_mails where id = ? and address = ?` ).bind(mail_id, address).first(); - return c.json(result); + if (!result) return c.json(null); + return c.json(await resolveRawEmailRow(result)); }) api.delete('/api/mails/:id', async (c) => { diff --git a/worker/src/mails_api/webhook_settings.ts b/worker/src/mails_api/webhook_settings.ts index 2f704e7b..f22a8a03 100644 --- a/worker/src/mails_api/webhook_settings.ts +++ b/worker/src/mails_api/webhook_settings.ts @@ -1,7 +1,8 @@ import { Context } from "hono"; import { CONSTANTS } from "../constants"; -import { AdminWebhookSettings, WebhookSettings } from "../models"; +import { AdminWebhookSettings, WebhookSettings, RawMailRow } from "../models"; import { commonParseMail, sendWebhook } from "../common"; +import { resolveRawEmail } from "../gzip"; import i18n from "../i18n"; @@ -37,10 +38,12 @@ async function testWebhookSettings(c: Context): Promise(); const { address } = c.get("jwtPayload"); // random raw email - const { id: mailId, raw } = await c.env.DB.prepare( - `SELECT id, raw FROM raw_mails WHERE address = ? ORDER BY RANDOM() LIMIT 1` - ).bind(address).first<{ id: string, raw: string }>() || {}; - const parsedEmailContext: ParsedEmailContext = { rawEmail: raw || "" }; + const mailRow = await c.env.DB.prepare( + `SELECT * FROM raw_mails WHERE address = ? ORDER BY RANDOM() LIMIT 1` + ).bind(address).first(); + const mailId = mailRow?.id; + const raw = mailRow ? await resolveRawEmail(mailRow) : ""; + const parsedEmailContext: ParsedEmailContext = { rawEmail: raw }; const parsedEmail = await commonParseMail(parsedEmailContext); const res = await sendWebhook(settings, { id: mailId || "0", diff --git a/worker/src/models/index.ts b/worker/src/models/index.ts index 04de6e7a..c037b4b1 100644 --- a/worker/src/models/index.ts +++ b/worker/src/models/index.ts @@ -190,3 +190,14 @@ export type RoleConfig = { } export type RoleAddressConfig = Record; + +export type RawMailRow = { + id: number; + message_id?: string; + source?: string; + address?: string; + raw?: string; + raw_blob?: unknown; + metadata?: string; + created_at?: string; +} diff --git a/worker/src/telegram_api/miniapp.ts b/worker/src/telegram_api/miniapp.ts index 2c9b6e0f..884b5831 100644 --- a/worker/src/telegram_api/miniapp.ts +++ b/worker/src/telegram_api/miniapp.ts @@ -3,6 +3,7 @@ import { Jwt } from 'hono/utils/jwt' import { CONSTANTS } from "../constants"; import { bindTelegramAddress, jwtListToAddressData, tgUserNewAddress, unbindTelegramAddress } from "./common"; import { checkCfTurnstile, checkIsAdmin, getBooleanValue } from "../utils"; +import { resolveRawEmailRow } from "../gzip"; import { TelegramSettings } from "./settings"; import i18n from "../i18n"; @@ -144,7 +145,7 @@ async function getMail(c: Context): Promise { if (!result) { return c.text("Mail not found", 404); } - return c.json(result); + return c.json(await resolveRawEmailRow(result)); } const userId = await checkTelegramAuth(c, initData); const jwtList = await c.env.KV.get(`${CONSTANTS.TG_KV_PREFIX}:${userId}`, 'json') || []; @@ -152,13 +153,14 @@ async function getMail(c: Context): Promise { const result = await c.env.DB.prepare( `SELECT * FROM raw_mails where id = ?` ).bind(mailId).first(); + if (!result) return c.json(null); const settings = await c.env.KV.get(CONSTANTS.TG_KV_SETTINGS_KEY, "json"); const superUser = settings?.enableGlobalMailPush && settings?.globalMailPushList.includes(userId); if (!superUser) { - if (result?.address && !(result.address as string in addressIdMap)) { + if (!(result.address as string in addressIdMap)) { return c.text(msgs.TgNoPermissionViewMailMsg, 403); } - const address_id = addressIdMap[result?.address as string]; + const address_id = addressIdMap[result.address as string]; const db_address_id = await c.env.DB.prepare( `SELECT id FROM address where id = ? ` ).bind(address_id).first("id"); @@ -166,7 +168,7 @@ async function getMail(c: Context): Promise { return c.text(msgs.TgNoPermissionViewMailMsg, 403); } } - return c.json(result); + return c.json(await resolveRawEmailRow(result)); } catch (e) { return c.text((e as Error).message, 400); diff --git a/worker/src/telegram_api/telegram.ts b/worker/src/telegram_api/telegram.ts index 086a146b..927f1f6e 100644 --- a/worker/src/telegram_api/telegram.ts +++ b/worker/src/telegram_api/telegram.ts @@ -9,6 +9,8 @@ import { TelegramSettings } from "./settings"; import { sendTelegramAttachments } from "./tg_file_upload"; import { bindTelegramAddress, deleteTelegramAddress, jwtListToAddressData, tgUserNewAddress, unbindTelegramAddress, unbindTelegramByAddress } from "./common"; import { commonParseMail } from "../common"; +import { resolveRawEmail } from "../gzip"; +import { RawMailRow } from "../models"; import { UserFromGetMe } from "telegraf/types"; import i18n from "../i18n"; import { LocaleMessages } from "../i18n/type"; @@ -301,12 +303,15 @@ export function newTelegramBot(c: Context, token: string): Teleg if (!db_address_id) { return await ctx.reply(msgs.TgInvalidAddressMsg); } - const { raw, id: mailId, created_at } = await c.env.DB.prepare( + const mailRow = await c.env.DB.prepare( `SELECT * FROM raw_mails where address = ? ` + ` order by id desc limit 1 offset ?` ).bind( queryAddress, mailIndex - ).first<{ raw: string, id: string, created_at: string }>() || {}; + ).first(); + const raw = mailRow ? await resolveRawEmail(mailRow) : undefined; + const mailId = mailRow?.id; + const created_at = mailRow?.created_at; const { mail } = raw ? await parseMail(msgs, { rawEmail: raw }, queryAddress, created_at) : { mail: msgs.TgNoMoreMailsMsg }; const settings = await c.env.KV.get(CONSTANTS.TG_KV_SETTINGS_KEY, "json"); const miniAppButtons = [] diff --git a/worker/src/types.d.ts b/worker/src/types.d.ts index 7518cec3..3001ff50 100644 --- a/worker/src/types.d.ts +++ b/worker/src/types.d.ts @@ -98,6 +98,9 @@ type Bindings = { ENABLE_AI_EMAIL_EXTRACT: string | boolean | undefined AI_EXTRACT_MODEL: string | undefined + // gzip compression for raw_mails + ENABLE_MAIL_GZIP: string | boolean | undefined + // E2E testing E2E_TEST_MODE: string | boolean | undefined } diff --git a/worker/src/user_api/user_mail_api.ts b/worker/src/user_api/user_mail_api.ts index c8c75dff..f0bde6ea 100644 --- a/worker/src/user_api/user_mail_api.ts +++ b/worker/src/user_api/user_mail_api.ts @@ -1,5 +1,5 @@ import { Context } from "hono"; -import { handleListQuery } from "../common"; +import { handleMailListQuery } from "../common"; import UserBindAddressModule from "./bind_address"; export default { @@ -19,7 +19,7 @@ export default { const filterQuerys = [addressQuery].filter((item) => item).join(" and "); const finalQuery = filterQuerys.length > 0 ? `where ${filterQuerys}` : ""; const filterParams = [...addressParams] - return await handleListQuery(c, + return await handleMailListQuery(c, `SELECT * FROM raw_mails ${finalQuery}`, `SELECT count(*) as count FROM raw_mails ${finalQuery}`, filterParams, limit, offset diff --git a/worker/src/utils.ts b/worker/src/utils.ts index e1638f31..0691c04f 100644 --- a/worker/src/utils.ts +++ b/worker/src/utils.ts @@ -2,6 +2,7 @@ import { Context } from "hono"; import { createMimeMessage } from "mimetext"; import { UserSettings, RoleAddressConfig } from "./models"; import { CONSTANTS } from "./constants"; +import { compressText } from "./gzip"; export const getJsonObjectValue = ( value: string | any @@ -264,11 +265,41 @@ export const sendAdminInternalMail = async ( data: text }); const message_id = Math.random().toString(36).substring(2, 15); - const { success } = await c.env.DB.prepare( - `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` - ).bind( - "admin@internal", toMail, msg.asRaw(), message_id - ).run(); + const rawText = msg.asRaw(); + let success = false; + if (getBooleanValue(c.env.ENABLE_MAIL_GZIP)) { + let compressed: ArrayBuffer | null = null; + try { + compressed = await compressText(rawText); + } catch (gzipError) { + console.error("gzip compression failed, falling back to plaintext", gzipError); + } + if (compressed) { + try { + ({ success } = await c.env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw_blob, message_id) VALUES (?, ?, ?, ?)` + ).bind("admin@internal", toMail, compressed, message_id).run()); + } catch (dbError) { + const errMsg = String(dbError); + if (errMsg.includes('raw_blob') || errMsg.includes('no such column')) { + console.error("raw_blob column missing, falling back to plaintext", dbError); + ({ success } = await c.env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind("admin@internal", toMail, rawText, message_id).run()); + } else { + throw dbError; + } + } + } else { + ({ success } = await c.env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind("admin@internal", toMail, rawText, message_id).run()); + } + } else { + ({ success } = await c.env.DB.prepare( + `INSERT INTO raw_mails (source, address, raw, message_id) VALUES (?, ?, ?, ?)` + ).bind("admin@internal", toMail, rawText, message_id).run()); + } if (!success) { console.log(`Failed save message from admin@internal to ${toMail}`); }