feat: trigger another worker (#547)

This commit is contained in:
刘志聪
2025-01-08 20:02:48 +08:00
committed by GitHub
parent 92620cdedb
commit 5bfa588f70
14 changed files with 302 additions and 9 deletions

View File

@@ -1,7 +1,12 @@
<!-- markdownlint-disable-file MD004 MD024 MD034 MD036 -->
# CHANGE LOG
## main(v0.8.4)
## main(v0.8.5)
- feat: 增加调用其他 worker功能解耦
- feat: 增加 worker 配置 `ENABLE_ANOTHER_WORKER``ANOTHER_WORKER_LIST` ,用于调用其他 worker 的 rpc 接口
## v0.8.4
- fix: |UI| 修复 admin portal 无收件人邮箱删除调用api 错误
- feat: |Telegram Bot| 增加 telegram bot 清理无效地址凭证命令

View File

@@ -138,6 +138,7 @@ function sidebarGuide(): DefaultTheme.SidebarItem[] {
{ text: '配置 webhook', link: 'feature/webhook' },
{ text: '新建邮箱地址 API', link: 'feature/new-address-api' },
{ text: 'Oauth2 第三方登录', link: 'feature/user-oauth2' },
{ text: '配置其他worker增强', link: 'feature/another-worker-enhanced' },
]
},
{

Binary file not shown.

After

Width:  |  Height:  |  Size: 98 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 121 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 231 KiB

View File

@@ -108,6 +108,23 @@ ENABLE_AUTO_REPLY = false
# ENABLE_CHECK_JUNK_MAIL = false
# 垃圾邮件检查配置, 任何一项不存在或者不通过则被判定为垃圾邮件
# JUNK_MAIL_FORCE_PASS_LIST = ["spf", "dkim", "dmarc"]
# 是否开启其他 worker 处理邮件
# ENABLE_ANOTHER_WORKER = false
# 其他 worker 处理邮件的配置,可以配置多个其他 worker。
# 通过关键词筛选,调用对应绑定的 worker 的方法(默认方法名为 rpcEmail
# keywords必填否则 worker 将不会被触发
#ANOTHER_WORKER_LIST ="""
#[
# {
# "binding":"AUTH_INBOX",
# "method":"rpcEmail",
# "keywords":[
# "验证码","激活码","激活链接","确认链接","验证邮箱","确认邮件","账号激活","邮件验证","账户确认","安全码","认证码","安全验证","登陆码","确认码","启用账户","激活账户","账号验证","注册确认",
# "account","activation","verify","verification","activate","confirmation","email","code","validate","registration","login","code","expire","confirm"
# ]
# }
#]
#"""
# D1 数据库的名称和 ID 可以在 cloudflare 控制台查看
[[d1_databases]]
@@ -127,6 +144,11 @@ database_id = "xxx" # D1 数据库 ID
# namespace_id = "1001"
# # 10 requests per minute
# simple = { limit = 10, period = 60 }
# 绑定其他 worker 处理邮件,例如通过 auth-inbox ai 能力解析验证码或激活链接
# [[services]]
# binding = "AUTH_INBOX"
# service = "auth-inbox"
```
## Telegram Bot 配置

View File

@@ -0,0 +1,144 @@
# 通过其他 worker 增强
> 临时邮箱的核心能力在邮件的管理,通过其他 worker 可以增强临时邮箱的功能,例如通过 auth-inbox ai 能力解析验证码或激活链接
> 该功能仅触发其他 worker ,在 webhook 后执行
> [!NOTE]
> 如果要使用 worker 增强,请提前创建可以 rpc 调用的 worker具体下文详述
> 参考:
> - https://developers.cloudflare.com/workers/runtime-apis/bindings/service-bindings/rpc/
> - https://developers.cloudflare.com/workers/runtime-apis/rpc/
> - auth-inbox 项目https://github.com/TooonyChen/AuthInbox
## 创建其他 worker以 auth-inbox 项目ai解析验证码为例子
### worker 改造为继承 WorkerEntrypoint
一个简单,作为被调用方,提供 rpc 方法调用的worker代码如下rpcEmail 方法为样例)
(使用已经修改好的项目 https://github.com/oneisall8955/AuthInbox-fork
src/index.ts 文件
```js
import { WorkerEntrypoint } from "cloudflare:workers";
interface Env {
DB: D1Database;
// ...
}
export default class extends WorkerEntrypoint<Env> {
async fetch(request: Request): Promise<Response> {
console.log("原本fetch接口入参是request,env,ctx");
console.log("修改为WorkerEntrypoint风格后只有一个入参request获取环境变量和上下文有小改动");
// 环境变量及上下文改动详见:
// https://developers.cloudflare.com/workers/runtime-apis/bindings/service-bindings/rpc/#bindings-env
// https://developers.cloudflare.com/workers/runtime-apis/bindings/service-bindings/rpc/#lifecycle-methods-ctx
const env: Env = this.env;
const ctx: ExecutionContext = this.ctx;
console.log("后续逻辑不变");
return new Response('ok', { status: 200 });
}
// 主要功能
async email(message: ForwardableEmailMessage): Promise<void> {
console.log("原本fetch接口入参是message,env,ctx");
console.log("修改为WorkerEntrypoint风格后只有一个入参message获取环境变量和上下文和fetch方法一样");
const env: Env = this.env;
const ctx: ExecutionContext = this.ctx;
console.log("接受email routing请求后后续逻辑不变");
}
// 暴露rpc接口处理来自其他worker的邮件请求
async rpcEmail(requestBody: string): Promise<void> {
console.log(`接受其他worker临时邮件服务cloudflare_temp_email的请求request body: ${requestBody}`);
// requestBody json 格式,由临时邮件服务发送,格式如下
// type RPCEmailMessage = {
// from: string | undefined | null,
// to: string | undefined | null,
// rawEmail: string | undefined | null,
// headers: Map<string, string>,
// }
// ... todo ...
}
}
```
### 部署其他 worker
修改好或者使用 以auth-inbox 为例,部署到 cloudflare worker 上,详见 https://github.com/TooonyChen/AuthInbox ,或者使用已经修改好的项目 https://github.com/oneisall8955/AuthInbox-fork
## 配置临时邮件服务,使用指定其他 worker 增强
## 绑定服务
### 通过 wrangler.toml 配置
```toml
[[services]]
binding = "AUTH_INBOX"
service = "auth-inbox"
```
这里的 `binding = "AUTH_INBOX"` 可以自定义,可以是任何字符串,`service = "auth-inbox"` 是部署好的提供rpc接口调用的worker名称。
### 用户界面配置
在设置-绑定,添加绑定,选择绑定服务。
变量名称填写自定义的名称,可以任意字符串 ,例如 `AUTH_INBOX`
服务绑定选择上一步创建好的服务,例如 `auth-inbox`
![another-worker-enhanced-01.png](/feature/another-worker-enhanced-01.png)
![another-worker-enhanced-02.png](/feature/another-worker-enhanced-02.png)
## 环境变量配置
### 通过 wrangler.toml 配置
```toml
ENABLE_ANOTHER_WORKER = true
ANOTHER_WORKER_LIST ="""
[
{
"binding":"AUTH_INBOX",
"method":"rpcEmail",
"keywords":[
"","","","","","","","","","","","","","","","","","",
"account","activation","verify","verification","activate","confirmation","email","code","validate","registration","login","code","expire","confirm"
]
}
]
"""
```
环境变量解释:
- ENABLE_ANOTHER_WORKER = true默认为falsetrue则开启其他 worker 处理邮件
- ANOTHER_WORKER_LIST 是一个JOSN数组每个对象包3个字段
- binding: *必填必须与services部分指定的 binding = "XXX" 保持一致*,例子中为 AUTH_INBOX
- method: 可选,默认 rpcEmail指的是调用这个 worker 的哪一个 rpc 方法处理
- keywords: 关键词数组,忽略大小写。用于过滤,如果*解析后邮件文本*匹配到这些关键词,触发这个 worker并且调用这个 worker 的 `method` 方法
### 用户界面配置
在设置-环境变量,添加环境变量
- ENABLE_ANOTHER_WORKER = true
- ANOTHER_WORKER_LIST 为上面提及的JSON数组字符串不再复述详细介绍看上文
```json
[
{
"binding":"AUTH_INBOX",
"method":"rpcEmail",
"keywords":[
"验证码","激活码","激活链接","确认链接","验证邮箱","确认邮件","账号激活","邮件验证","账户确认","安全码","认证码","安全验证","登陆码","确认码","启用账户","激活账户","账号验证","注册确认",
"account","activation","verify","verification","activate","confirmation","email","code","validate","registration","login","code","expire","confirm"
]
}
]
```
![another-worker-enhanced-03.png](/feature/another-worker-enhanced-03.png)
## 测试
发送一个邮件到临时邮箱观察worker日志到或者到 auth-inbox 提供的面板上查看验证码
![another-worker-enhanced-04.png](/feature/another-worker-enhanced-04.png)

View File

@@ -1,7 +1,7 @@
import { Context } from 'hono';
import { HonoCustomType } from '../types';
import { getAdminPasswords, getBooleanValue, getDefaultDomains, getDomains, getIntValue, getPasswords, getStringArray, getStringValue, getUserRoles } from '../utils';
import { getAdminPasswords, getBooleanValue, getDefaultDomains, getDomains, getIntValue, getPasswords, getStringArray, getStringValue, getUserRoles, getAnotherWorkerList } from '../utils';
import { CONSTANTS } from '../constants';
import { isS3Enabled } from '../mails_api/s3_attachment';
@@ -44,6 +44,9 @@ export default {
"DISABLE_ADMIN_PASSWORD_CHECK": getBooleanValue(c.env.DISABLE_ADMIN_PASSWORD_CHECK),
"ENABLE_CHECK_JUNK_MAIL": getBooleanValue(c.env.ENABLE_CHECK_JUNK_MAIL),
"JUNK_MAIL_FORCE_PASS_LIST": getStringArray(c.env.JUNK_MAIL_FORCE_PASS_LIST),
"ENABLE_ANOTHER_WORKER": getBooleanValue(c.env.ENABLE_ANOTHER_WORKER),
"ANOTHER_WORKER_LIST": getAnotherWorkerList(c),
})
}
}

View File

@@ -1,8 +1,8 @@
import { Context } from 'hono';
import { Jwt } from 'hono/utils/jwt'
import { getBooleanValue, getDomains, getStringValue, getIntValue, getUserRoles, getDefaultDomains, getJsonSetting } from './utils';
import { HonoCustomType, UserRole } from './types';
import { getBooleanValue, getDomains, getStringValue, getIntValue, getUserRoles, getDefaultDomains, getJsonSetting, getAnotherWorkerList } from './utils';
import { HonoCustomType, UserRole, AnotherWorker, RPCEmailMessage } from './types';
import { unbindTelegramByAddress } from './telegram_api/common';
import { CONSTANTS } from './constants';
import { AdminWebhookSettings, WebhookMail, WebhookSettings } from './models';
@@ -360,7 +360,7 @@ export async function triggerWebhook(
address: string,
raw_mail: string,
message_id: string | null
): Promise<void> {
): Promise<string | undefined | null> {
if (!c.env.KV || !getBooleanValue(c.env.ENABLE_WEBHOOK)) {
return
}
@@ -408,4 +408,49 @@ export async function triggerWebhook(
console.error(res.message);
}
}
return webhookMail.parsedText
}
export async function triggerAnotherWorker(
c: Context<HonoCustomType>,
rpcEmailMessage: RPCEmailMessage,
parsedText: string | undefined | null
): Promise<void> {
if (!parsedText) {
return;
}
const anotherWorkerList: AnotherWorker[] = getAnotherWorkerList(c);
if (!getBooleanValue(c.env.ENABLE_ANOTHER_WORKER) || anotherWorkerList.length === 0) {
console.log(`another worker disabled or anotherWorkerList is empty`);
return;
}
const parsedTextLowercase: string = parsedText.toLowerCase();
for (const worker of anotherWorkerList) {
const keywords = worker?.keywords ?? [];
const bindingName = worker?.binding ?? "";
const methodName = worker.method ?? "rpcEmail";
const serviceBinding = (c.env as any)[bindingName] ?? {};
const method = serviceBinding[methodName];
if (!method || typeof method !== "function") {
console.log(`method = ${methodName} not found or not function`);
continue;
}
if (!keywords.some(keyword => keyword && parsedTextLowercase.includes(keyword.toLowerCase()))) {
console.log(`worker.binding = ${bindingName} not match keywords, parsedText = ${parsedText}`);
continue;
}
try {
const requestBody = JSON.stringify(rpcEmailMessage);
await method(requestBody);
} catch (e1) {
console.error(`execute method = ${methodName} error`, e1);
}
}
}

View File

@@ -2,10 +2,10 @@ import { Context } from "hono";
import { getEnvStringList } from "../utils";
import { sendMailToTelegram } from "../telegram_api";
import { Bindings, HonoCustomType } from "../types";
import { Bindings, HonoCustomType, RPCEmailMessage } from "../types";
import { auto_reply } from "./auto_reply";
import { isBlocked } from "./black_list";
import { triggerWebhook } from "../common";
import { triggerWebhook, triggerAnotherWorker, commonParseMail} from "../common";
import { check_if_junk_mail } from "./check_junk";
@@ -61,8 +61,9 @@ async function email(message: ForwardableEmailMessage, env: Bindings, ctx: Execu
}
// send webhook
let parsedText;
try {
await triggerWebhook(
parsedText = await triggerWebhook(
{ env: env } as Context<HonoCustomType>,
message.to, rawEmail, message_id
);
@@ -70,6 +71,26 @@ async function email(message: ForwardableEmailMessage, env: Bindings, ctx: Execu
console.log("send webhook error", error);
}
// trigger another worker
try {
const headersMap = new Map<string, string>();
if(message.headers) {
message.headers.forEach((value, key) => {headersMap.set(key, value);});
}
if (!parsedText){
parsedText = (await commonParseMail(rawEmail))?.text ?? ""
}
const rpcEmail: RPCEmailMessage = {
from: message.from,
to: message.to,
rawEmail: rawEmail,
headers: headersMap
}
await triggerAnotherWorker({ env: env } as Context<HonoCustomType>, rpcEmail, parsedText);
} catch (error) {
console.error("trigger another worker error", error);
}
// auto reply email
await auto_reply(message, env);
}

17
worker/src/types.d.ts vendored
View File

@@ -46,6 +46,10 @@ export type Bindings = {
ENABLE_CHECK_JUNK_MAIL: string | boolean | undefined
JUNK_MAIL_FORCE_PASS_LIST: string | string[] | undefined
ENABLE_ANOTHER_WORKER: string | boolean | undefined
ANOTHER_WORKER_LIST: string | AnotherWorker[] | undefined
// s3 config
S3_ENDPOINT: string | undefined
S3_ACCESS_KEY_ID: string | undefined
@@ -92,3 +96,16 @@ type HonoCustomType = {
"Bindings": Bindings;
"Variables": Variables;
}
type AnotherWorker = {
binding: string | undefined | null,
method: string | undefined | null,
keywords: string[] | undefined | null
}
type RPCEmailMessage = {
from: string | undefined | null,
to: string | undefined | null,
rawEmail: string | undefined | null,
headers: Map<string, string>,
}

View File

@@ -1,6 +1,6 @@
import { Context } from "hono";
import { createMimeMessage } from "mimetext";
import { HonoCustomType, UserRole } from "./types";
import { HonoCustomType, UserRole,AnotherWorker } from "./types";
export const getJsonObjectValue = <T = any>(
value: string | any
@@ -156,6 +156,22 @@ export const getUserRoles = (c: Context<HonoCustomType>): UserRole[] => {
return c.env.USER_ROLES;
}
export const getAnotherWorkerList = (c: Context<HonoCustomType>): AnotherWorker[] => {
if (!c.env.ANOTHER_WORKER_LIST) {
return [];
}
// check if ANOTHER_WORKER_LIST is an array, if not use json.parse
if (!Array.isArray(c.env.ANOTHER_WORKER_LIST)) {
try {
return JSON.parse(c.env.ANOTHER_WORKER_LIST);
} catch (e) {
console.error("Failed to parse ANOTHER_WORKER_LIST", e);
return [];
}
}
return c.env.ANOTHER_WORKER_LIST;
}
export const getPasswords = (c: Context<HonoCustomType>): string[] => {
if (!c.env.PASSWORDS) {
return [];

View File

@@ -78,6 +78,20 @@ ENABLE_AUTO_REPLY = false
# ENABLE_CHECK_JUNK_MAIL = false
# junk mail force check pass list, if no status or status is not pass, will be marked as junk mail
# JUNK_MAIL_FORCE_PASS_LIST = ["spf", "dkim", "dmarc"]
# Calling other woker to process email
#ENABLE_ANOTHER_WORKER = false
#ANOTHER_WORKER_LIST ="""
#[
# {
# "binding":"AUTH_INBOX",
# "method":"rpcEmail",
# "keywords":[
# "验证码","激活码","激活链接","确认链接","验证邮箱","确认邮件","账号激活","邮件验证","账户确认","安全码","认证码","安全验证","登陆码","确认码","启用账户","激活账户","账号验证","注册确认",
# "account","activation","verify","verification","activate","confirmation","email","code","validate","registration","login","code","expire","confirm"
# ]
# }
#]
#"""
[[d1_databases]]
binding = "DB"
@@ -96,3 +110,8 @@ database_id = "xxx"
# namespace_id = "1001"
# # 10 requests per minute
# simple = { limit = 10, period = 60 }
# binding another worker service (parse the code or link), e.g. auth-inbox
# [[services]]
# binding = "AUTH_INBOX"
# service = "auth-inbox"