Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96ffd6b492 | ||
|
|
06bcf8f15c | ||
|
|
78e7a88a8c | ||
|
|
54b12c7a04 |
@@ -1,4 +1,4 @@
|
||||
# 🤖 Telegram Private Chatbot (v5.1)
|
||||
# 🤖 Telegram Private Chatbot (v5.3)
|
||||
|
||||
[](https://deploy.workers.cloudflare.com/?url=https://github.com/jikssha/telegram_private_chatbot)
|
||||

|
||||
|
||||
655
worker.js
655
worker.js
@@ -1,4 +1,4 @@
|
||||
// Cloudflare Worker:Telegram 双向机器人 v5.1
|
||||
// Cloudflare Worker:Telegram 双向机器人 v5.3
|
||||
|
||||
// --- 配置常量 ---
|
||||
const CONFIG = {
|
||||
@@ -7,6 +7,9 @@ const CONFIG = {
|
||||
VERIFIED_EXPIRE_SECONDS: 2592000, // 30天
|
||||
MEDIA_GROUP_EXPIRE_SECONDS: 60,
|
||||
MEDIA_GROUP_DELAY_MS: 3000, // 3秒(从2秒增加)
|
||||
PENDING_MAX_MESSAGES: 10, // 验证期间最多暂存的消息数
|
||||
ADMIN_CACHE_TTL_SECONDS: 300, // 管理员权限缓存 5 分钟
|
||||
NEEDS_REVERIFY_TTL_SECONDS: 600, // 标记需重新验证的 TTL(用于并发兜底)
|
||||
RATE_LIMIT_MESSAGE: 45,
|
||||
RATE_LIMIT_VERIFY: 3,
|
||||
RATE_LIMIT_WINDOW: 60,
|
||||
@@ -16,12 +19,17 @@ const CONFIG = {
|
||||
API_TIMEOUT_MS: 10000,
|
||||
CLEANUP_BATCH_SIZE: 10,
|
||||
MAX_CLEANUP_DISPLAY: 20,
|
||||
CLEANUP_LOCK_TTL_SECONDS: 1800, // /cleanup 防并发锁 30 分钟
|
||||
MAX_RETRY_ATTEMPTS: 3,
|
||||
THREAD_HEALTH_TTL_MS: 60000
|
||||
};
|
||||
|
||||
// 线程健康检查缓存,减少频繁探测请求
|
||||
const threadHealthCache = new Map();
|
||||
// 同一实例内的并发保护:避免同一用户短时间内重复创建话题
|
||||
const topicCreateInFlight = new Map();
|
||||
// 管理员权限缓存(实例内)
|
||||
const adminStatusCache = new Map();
|
||||
|
||||
// --- 本地题库 (15条) ---
|
||||
const LOCAL_QUESTIONS = [
|
||||
@@ -143,6 +151,187 @@ async function safeGetJSON(env, key, defaultValue = null) {
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeTgDescription(description) {
|
||||
return (description || "").toString().toLowerCase();
|
||||
}
|
||||
|
||||
function isTopicMissingOrDeleted(description) {
|
||||
const desc = normalizeTgDescription(description);
|
||||
return desc.includes("thread not found") ||
|
||||
desc.includes("topic not found") ||
|
||||
desc.includes("message thread not found") ||
|
||||
desc.includes("topic deleted") ||
|
||||
desc.includes("thread deleted") ||
|
||||
desc.includes("forum topic not found") ||
|
||||
desc.includes("topic closed permanently");
|
||||
}
|
||||
|
||||
function isTestMessageInvalid(description) {
|
||||
const desc = normalizeTgDescription(description);
|
||||
return desc.includes("message text is empty") ||
|
||||
desc.includes("bad request: message text is empty");
|
||||
}
|
||||
|
||||
async function getOrCreateUserTopicRec(from, key, env, userId) {
|
||||
const existing = await safeGetJSON(env, key, null);
|
||||
if (existing && existing.thread_id) return existing;
|
||||
|
||||
const inflight = topicCreateInFlight.get(String(userId));
|
||||
if (inflight) return await inflight;
|
||||
|
||||
const p = (async () => {
|
||||
// 并发下二次确认,避免已被其他请求创建却读到旧值
|
||||
const again = await safeGetJSON(env, key, null);
|
||||
if (again && again.thread_id) return again;
|
||||
return await createTopic(from, key, env, userId);
|
||||
})();
|
||||
|
||||
topicCreateInFlight.set(String(userId), p);
|
||||
try {
|
||||
return await p;
|
||||
} finally {
|
||||
if (topicCreateInFlight.get(String(userId)) === p) {
|
||||
topicCreateInFlight.delete(String(userId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function withMessageThreadId(body, threadId) {
|
||||
if (threadId === undefined || threadId === null) return body;
|
||||
return { ...body, message_thread_id: threadId };
|
||||
}
|
||||
|
||||
async function probeForumThread(env, expectedThreadId, { userId, reason, doubleCheckOnMissingThreadId = true } = {}) {
|
||||
const attemptOnce = async () => {
|
||||
const res = await tgCall(env, "sendMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: expectedThreadId,
|
||||
text: "🔎"
|
||||
});
|
||||
|
||||
const actualThreadId = res.result?.message_thread_id;
|
||||
const probeMessageId = res.result?.message_id;
|
||||
|
||||
// 尽可能清理探测消息(无论落到哪个话题/General)
|
||||
if (res.ok && probeMessageId) {
|
||||
try {
|
||||
await tgCall(env, "deleteMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_id: probeMessageId
|
||||
});
|
||||
} catch (e) {
|
||||
// 删除失败不影响主流程
|
||||
}
|
||||
}
|
||||
|
||||
if (!res.ok) {
|
||||
if (isTopicMissingOrDeleted(res.description)) {
|
||||
return { status: "missing", description: res.description };
|
||||
}
|
||||
if (isTestMessageInvalid(res.description)) {
|
||||
return { status: "probe_invalid", description: res.description };
|
||||
}
|
||||
return { status: "unknown_error", description: res.description };
|
||||
}
|
||||
|
||||
// 关键:有些情况下 Telegram 会返回 ok 但不带 message_thread_id(常见于 General)
|
||||
if (actualThreadId === undefined || actualThreadId === null) {
|
||||
return { status: "missing_thread_id" };
|
||||
}
|
||||
|
||||
if (Number(actualThreadId) !== Number(expectedThreadId)) {
|
||||
return { status: "redirected", actualThreadId };
|
||||
}
|
||||
|
||||
return { status: "ok" };
|
||||
};
|
||||
|
||||
const first = await attemptOnce();
|
||||
if (first.status !== "missing_thread_id" || !doubleCheckOnMissingThreadId) return first;
|
||||
|
||||
// 二次探测:避免偶发字段缺失导致误判并触发重建
|
||||
const second = await attemptOnce();
|
||||
if (second.status === "missing_thread_id") {
|
||||
Logger.warn('thread_probe_missing_thread_id', { userId, expectedThreadId, reason });
|
||||
}
|
||||
return second;
|
||||
}
|
||||
|
||||
async function resetUserVerificationAndRequireReverify(env, { userId, userKey, oldThreadId, pendingMsgId, reason }) {
|
||||
// 清理旧映射与验证状态:用户需要重新做人机验证
|
||||
await env.TOPIC_MAP.delete(`verified:${userId}`);
|
||||
await env.TOPIC_MAP.put(`needs_verify:${userId}`, "1", { expirationTtl: CONFIG.NEEDS_REVERIFY_TTL_SECONDS });
|
||||
await env.TOPIC_MAP.delete(`retry:${userId}`);
|
||||
|
||||
if (userKey) {
|
||||
await env.TOPIC_MAP.delete(userKey);
|
||||
}
|
||||
|
||||
if (oldThreadId !== undefined && oldThreadId !== null) {
|
||||
await env.TOPIC_MAP.delete(`thread:${oldThreadId}`);
|
||||
await env.TOPIC_MAP.delete(`thread_ok:${oldThreadId}`);
|
||||
threadHealthCache.delete(oldThreadId);
|
||||
}
|
||||
|
||||
Logger.info('verification_reset_due_to_topic_loss', {
|
||||
userId,
|
||||
oldThreadId,
|
||||
pendingMsgId,
|
||||
reason
|
||||
});
|
||||
|
||||
await sendVerificationChallenge(userId, env, pendingMsgId || null);
|
||||
}
|
||||
|
||||
function parseAdminIdAllowlist(env) {
|
||||
const raw = (env.ADMIN_IDS || "").toString().trim();
|
||||
if (!raw) return null;
|
||||
const ids = raw.split(/[,;\s]+/g).map(s => s.trim()).filter(Boolean);
|
||||
const set = new Set();
|
||||
for (const id of ids) {
|
||||
const n = Number(id);
|
||||
if (!Number.isFinite(n)) continue;
|
||||
set.add(String(n));
|
||||
}
|
||||
return set.size > 0 ? set : null;
|
||||
}
|
||||
|
||||
async function isAdminUser(env, userId) {
|
||||
const allowlist = parseAdminIdAllowlist(env);
|
||||
if (allowlist && allowlist.has(String(userId))) return true;
|
||||
|
||||
const cacheKey = String(userId);
|
||||
const now = Date.now();
|
||||
const cached = adminStatusCache.get(cacheKey);
|
||||
if (cached && (now - cached.ts < CONFIG.ADMIN_CACHE_TTL_SECONDS * 1000)) {
|
||||
return cached.isAdmin;
|
||||
}
|
||||
|
||||
const kvKey = `admin:${userId}`;
|
||||
const kvVal = await env.TOPIC_MAP.get(kvKey);
|
||||
if (kvVal === "1" || kvVal === "0") {
|
||||
const isAdmin = kvVal === "1";
|
||||
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
|
||||
return isAdmin;
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await tgCall(env, "getChatMember", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
user_id: userId
|
||||
});
|
||||
|
||||
const status = res.result?.status;
|
||||
const isAdmin = res.ok && (status === "creator" || status === "administrator");
|
||||
await env.TOPIC_MAP.put(kvKey, isAdmin ? "1" : "0", { expirationTtl: CONFIG.ADMIN_CACHE_TTL_SECONDS });
|
||||
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
|
||||
return isAdmin;
|
||||
} catch (e) {
|
||||
Logger.warn('admin_check_failed', { userId });
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// 获取所有 KV keys(处理分页)
|
||||
async function getAllKeys(env, prefix) {
|
||||
const allKeys = [];
|
||||
@@ -257,7 +446,9 @@ export default {
|
||||
}
|
||||
// 【修复】支持 General 话题和普通话题
|
||||
// General 话题的 message_thread_id 可能不存在,或者等于 1
|
||||
if (msg.message_thread_id || msg.text) {
|
||||
const text = (msg.text || "").trim();
|
||||
const isCommand = !!text && text.startsWith("/");
|
||||
if (msg.message_thread_id || isCommand) {
|
||||
await handleAdminReply(msg, normalizedEnv, ctx);
|
||||
return new Response("OK");
|
||||
}
|
||||
@@ -294,16 +485,6 @@ async function handlePrivateMessage(msg, env, ctx) {
|
||||
const verified = await env.TOPIC_MAP.get(`verified:${userId}`);
|
||||
|
||||
if (!verified) {
|
||||
// 验证请求速率限制
|
||||
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
|
||||
if (!verifyLimit.allowed) {
|
||||
await tgCall(env, "sendMessage", {
|
||||
chat_id: userId,
|
||||
text: "⚠️ 验证请求过于频繁,请5分钟后再试。"
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const isStart = msg.text && msg.text.trim() === "/start";
|
||||
const pendingMsgId = isStart ? null : msg.message_id;
|
||||
await sendVerificationChallenge(userId, env, pendingMsgId);
|
||||
@@ -314,6 +495,13 @@ async function handlePrivateMessage(msg, env, ctx) {
|
||||
}
|
||||
|
||||
async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
// 并发兜底:如果已被标记为需要重新验证,直接发起验证并暂停转发/建话题
|
||||
const needsVerify = await env.TOPIC_MAP.get(`needs_verify:${userId}`);
|
||||
if (needsVerify) {
|
||||
await sendVerificationChallenge(userId, env, msg.message_id || null);
|
||||
return;
|
||||
}
|
||||
|
||||
// 【修复 #4】使用安全的 JSON 解析
|
||||
let rec = await safeGetJSON(env, key, null);
|
||||
|
||||
@@ -336,7 +524,7 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
}
|
||||
|
||||
if (!rec || !rec.thread_id) {
|
||||
rec = await createTopic(msg.from, key, env, userId);
|
||||
rec = await getOrCreateUserTopicRec(msg.from, key, env, userId);
|
||||
if (!rec || !rec.thread_id) {
|
||||
throw new Error("创建话题失败");
|
||||
}
|
||||
@@ -359,72 +547,44 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
const withinTTL = cached && (now - cached.ts < CONFIG.THREAD_HEALTH_TTL_MS);
|
||||
|
||||
if (!withinTTL) {
|
||||
const testRes = await tgCall(env, "sendMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: rec.thread_id,
|
||||
text: "", // 零宽度字符,对用户不可见
|
||||
});
|
||||
// 跨节点缓存:避免由于 Workers 多 PoP 导致每次都做健康探测
|
||||
const kvHealthKey = `thread_ok:${rec.thread_id}`;
|
||||
const kvHealthOk = await env.TOPIC_MAP.get(kvHealthKey);
|
||||
if (kvHealthOk === "1") {
|
||||
threadHealthCache.set(cacheKey, { ts: now, ok: true });
|
||||
} else {
|
||||
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "health_check" });
|
||||
|
||||
const actualThreadId = testRes.result?.message_thread_id;
|
||||
const expectedThreadId = rec.thread_id;
|
||||
|
||||
// 只要未返回 thread_id,或 thread_id 不匹配,均视为话题失效/被重定向
|
||||
const redirectedOrMissing = !testRes.ok || !actualThreadId || Number(actualThreadId) !== Number(expectedThreadId);
|
||||
|
||||
if (redirectedOrMissing) {
|
||||
const desc = (testRes.description || "").toLowerCase();
|
||||
const isTopicDeleted = redirectedOrMissing ||
|
||||
desc.includes("thread not found") ||
|
||||
desc.includes("topic not found") ||
|
||||
desc.includes("message thread not found") ||
|
||||
desc.includes("topic deleted") ||
|
||||
desc.includes("thread deleted") ||
|
||||
desc.includes("forum topic not found") ||
|
||||
desc.includes("topic closed permanently");
|
||||
|
||||
if (isTopicDeleted) {
|
||||
await env.TOPIC_MAP.put(retryKey, String(retryCount + 1), { expirationTtl: 60 });
|
||||
|
||||
Logger.info('topic_auto_repair', {
|
||||
if (probe.status === "redirected" || probe.status === "missing" || probe.status === "missing_thread_id") {
|
||||
await resetUserVerificationAndRequireReverify(env, {
|
||||
userId,
|
||||
userKey: key,
|
||||
oldThreadId: rec.thread_id,
|
||||
attempt: retryCount + 1,
|
||||
maxAttempts: CONFIG.MAX_RETRY_ATTEMPTS,
|
||||
errorDescription: testRes.description
|
||||
pendingMsgId: msg.message_id,
|
||||
reason: `health_check:${probe.status}`
|
||||
});
|
||||
return;
|
||||
} else if (probe.status === "probe_invalid") {
|
||||
Logger.warn('topic_health_probe_invalid_message', {
|
||||
userId,
|
||||
threadId: rec.thread_id,
|
||||
errorDescription: probe.description
|
||||
});
|
||||
|
||||
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
|
||||
threadHealthCache.delete(cacheKey);
|
||||
|
||||
rec = await createTopic(msg.from, key, env, userId);
|
||||
|
||||
if (!rec || !rec.thread_id) {
|
||||
throw new Error("重建话题失败");
|
||||
}
|
||||
|
||||
Logger.info('topic_recreated', {
|
||||
userId,
|
||||
newThreadId: rec.thread_id
|
||||
});
|
||||
} else {
|
||||
Logger.warn('topic_test_failed_unknown', {
|
||||
userId,
|
||||
threadId: rec.thread_id,
|
||||
errorDescription: testRes.description
|
||||
});
|
||||
}
|
||||
// 仍然设置短 TTL,避免每条消息都探测(并误触发重建)
|
||||
threadHealthCache.set(cacheKey, { ts: now, ok: true });
|
||||
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
|
||||
} else if (probe.status === "unknown_error") {
|
||||
Logger.warn('topic_test_failed_unknown', {
|
||||
userId,
|
||||
threadId: rec.thread_id,
|
||||
errorDescription: probe.description
|
||||
});
|
||||
} else {
|
||||
await env.TOPIC_MAP.delete(retryKey);
|
||||
threadHealthCache.set(cacheKey, { ts: now, ok: true });
|
||||
|
||||
try {
|
||||
await tgCall(env, "deleteMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_id: testRes.result.message_id
|
||||
});
|
||||
} catch (e) {
|
||||
// 删除失败不影响主流程
|
||||
}
|
||||
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -447,18 +607,13 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
|
||||
// 检测 Telegram 静默重定向到 General 的情况
|
||||
const resThreadId = res.result?.message_thread_id;
|
||||
if (res.ok && (!resThreadId || Number(resThreadId) !== Number(rec.thread_id))) {
|
||||
if (res.ok && resThreadId !== undefined && resThreadId !== null && Number(resThreadId) !== Number(rec.thread_id)) {
|
||||
Logger.warn('forward_redirected_to_general', {
|
||||
userId,
|
||||
expectedThreadId: rec.thread_id,
|
||||
actualThreadId: resThreadId
|
||||
});
|
||||
|
||||
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
|
||||
threadHealthCache.delete(rec.thread_id);
|
||||
|
||||
const newRec = await createTopic(msg.from, key, env, userId);
|
||||
|
||||
// 删除误投到 General 的消息
|
||||
if (res.result?.message_id) {
|
||||
try {
|
||||
@@ -470,32 +625,65 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
// 删除失败不影响重发
|
||||
}
|
||||
}
|
||||
|
||||
await tgCall(env, "copyMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
from_chat_id: userId,
|
||||
message_id: msg.message_id,
|
||||
message_thread_id: newRec.thread_id
|
||||
await resetUserVerificationAndRequireReverify(env, {
|
||||
userId,
|
||||
userKey: key,
|
||||
oldThreadId: rec.thread_id,
|
||||
pendingMsgId: msg.message_id,
|
||||
reason: "forward_redirected_to_general"
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// 兜底:部分情况下 Telegram 返回 ok 但不带 message_thread_id(可能已落入 General)
|
||||
if (res.ok && (resThreadId === undefined || resThreadId === null)) {
|
||||
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "forward_result_missing_thread_id" });
|
||||
if (probe.status !== "ok") {
|
||||
Logger.warn('forward_suspected_redirect_or_missing', {
|
||||
userId,
|
||||
expectedThreadId: rec.thread_id,
|
||||
probeStatus: probe.status,
|
||||
probeDescription: probe.description
|
||||
});
|
||||
|
||||
// 尽量删除误投消息(通常在 General)
|
||||
if (res.result?.message_id) {
|
||||
try {
|
||||
await tgCall(env, "deleteMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_id: res.result.message_id
|
||||
});
|
||||
} catch (e) {
|
||||
// 删除失败不影响重发
|
||||
}
|
||||
}
|
||||
await resetUserVerificationAndRequireReverify(env, {
|
||||
userId,
|
||||
userKey: key,
|
||||
oldThreadId: rec.thread_id,
|
||||
pendingMsgId: msg.message_id,
|
||||
reason: `forward_missing_thread_id:${probe.status}`
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 【修复2】增强错误处理,双重保险
|
||||
// 如果上面的测试没有捕获到,这里再次检测
|
||||
if (!res.ok) {
|
||||
const desc = (res.description || "").toLowerCase();
|
||||
if (desc.includes("thread not found") ||
|
||||
desc.includes("topic not found") ||
|
||||
desc.includes("message thread not found")) {
|
||||
console.log(`[二次修复] 转发失败,话题不存在,正在重建...`);
|
||||
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
|
||||
threadHealthCache.delete(rec.thread_id);
|
||||
const newRec = await createTopic(msg.from, key, env, userId);
|
||||
await tgCall(env, "forwardMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
from_chat_id: userId,
|
||||
message_id: msg.message_id,
|
||||
message_thread_id: newRec.thread_id,
|
||||
const desc = normalizeTgDescription(res.description);
|
||||
if (isTopicMissingOrDeleted(desc)) {
|
||||
Logger.warn('forward_failed_topic_missing', {
|
||||
userId,
|
||||
threadId: rec.thread_id,
|
||||
errorDescription: res.description
|
||||
});
|
||||
await resetUserVerificationAndRequireReverify(env, {
|
||||
userId,
|
||||
userKey: key,
|
||||
oldThreadId: rec.thread_id,
|
||||
pendingMsgId: msg.message_id,
|
||||
reason: "forward_failed_topic_missing"
|
||||
});
|
||||
return;
|
||||
}
|
||||
@@ -516,10 +704,17 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
|
||||
async function handleAdminReply(msg, env, ctx) {
|
||||
const threadId = msg.message_thread_id;
|
||||
const text = (msg.text || "").trim();
|
||||
const senderId = msg.from?.id;
|
||||
|
||||
// 仅允许管理员在群内操作与回信,防止任意群成员向用户私聊注入消息
|
||||
if (!senderId || !(await isAdminUser(env, senderId))) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 【修复】允许在任何话题执行 /cleanup 命令
|
||||
if (text === "/cleanup") {
|
||||
await handleCleanupCommand(threadId, env);
|
||||
// /cleanup 可能处理较久,使用 waitUntil 防止 webhook 请求超时导致“卡住”
|
||||
ctx.waitUntil(handleCleanupCommand(threadId, env));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -576,6 +771,7 @@ async function handleAdminReply(msg, env, ctx) {
|
||||
|
||||
if (text === "/trust") {
|
||||
await env.TOPIC_MAP.put(`verified:${userId}`, "trusted");
|
||||
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
|
||||
await tgCall(env, "sendMessage", { chat_id: env.SUPERGROUP_ID, message_thread_id: threadId, text: "🌟 **已设置永久信任**", parse_mode: "Markdown" });
|
||||
return;
|
||||
}
|
||||
@@ -605,7 +801,7 @@ async function handleAdminReply(msg, env, ctx) {
|
||||
|
||||
// 转发管理员消息给用户
|
||||
if (msg.media_group_id) {
|
||||
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: null });
|
||||
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: undefined });
|
||||
return;
|
||||
}
|
||||
await tgCall(env, "copyMessage", { chat_id: userId, from_chat_id: env.SUPERGROUP_ID, message_id: msg.message_id });
|
||||
@@ -617,7 +813,44 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
|
||||
// 【修复 #1】检查是否已有进行中的验证
|
||||
const existingChallenge = await env.TOPIC_MAP.get(`user_challenge:${userId}`);
|
||||
if (existingChallenge) {
|
||||
Logger.info('verification_duplicate_skipped', { userId });
|
||||
// 有正在进行的验证:仅将新消息加入待发送队列,避免重复下发题目/触发验证限速
|
||||
const chalKey = `chal:${existingChallenge}`;
|
||||
const state = await safeGetJSON(env, chalKey, null);
|
||||
|
||||
// KV 可能存在不一致/过期:自愈清理后重新下发
|
||||
if (!state || state.userId !== userId) {
|
||||
await env.TOPIC_MAP.delete(`user_challenge:${userId}`);
|
||||
} else {
|
||||
if (pendingMsgId) {
|
||||
let pendingIds = [];
|
||||
if (Array.isArray(state.pending_ids)) {
|
||||
pendingIds = state.pending_ids.slice();
|
||||
} else if (state.pending) {
|
||||
pendingIds = [state.pending];
|
||||
}
|
||||
|
||||
if (!pendingIds.includes(pendingMsgId)) {
|
||||
pendingIds.push(pendingMsgId);
|
||||
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
|
||||
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
|
||||
}
|
||||
state.pending_ids = pendingIds;
|
||||
delete state.pending;
|
||||
await env.TOPIC_MAP.put(chalKey, JSON.stringify(state), { expirationTtl: CONFIG.VERIFY_EXPIRE_SECONDS });
|
||||
}
|
||||
}
|
||||
Logger.debug('verification_duplicate_skipped', { userId, verifyId: existingChallenge, hasPending: !!pendingMsgId });
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 验证请求速率限制:仅在需要创建新挑战时检查
|
||||
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
|
||||
if (!verifyLimit.allowed) {
|
||||
await tgCall(env, "sendMessage", {
|
||||
chat_id: userId,
|
||||
text: "⚠️ 验证请求过于频繁,请5分钟后再试。"
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -638,7 +871,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
|
||||
const state = {
|
||||
answerIndex: answerIndex, // 存储索引
|
||||
options: challenge.options, // 存储完整选项列表
|
||||
pending: pendingMsgId,
|
||||
pending_ids: pendingMsgId ? [pendingMsgId] : [],
|
||||
userId: userId // 添加用户ID验证
|
||||
};
|
||||
|
||||
@@ -651,7 +884,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
|
||||
userId,
|
||||
verifyId,
|
||||
question: q.question,
|
||||
hasPending: !!pendingMsgId
|
||||
pendingCount: state.pending_ids.length
|
||||
});
|
||||
|
||||
// 【修复 #6】按钮使用索引而非文本
|
||||
@@ -741,6 +974,7 @@ async function handleCallbackQuery(query, env, ctx) {
|
||||
|
||||
// 30天有效期 - 使用配置常量
|
||||
await env.TOPIC_MAP.put(`verified:${userId}`, "1", { expirationTtl: CONFIG.VERIFIED_EXPIRE_SECONDS });
|
||||
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
|
||||
|
||||
// 【修复 #1】清理所有相关挑战
|
||||
await env.TOPIC_MAP.delete(`chal:${verifyId}`);
|
||||
@@ -753,36 +987,48 @@ async function handleCallbackQuery(query, env, ctx) {
|
||||
parse_mode: "Markdown"
|
||||
});
|
||||
|
||||
if (state.pending) {
|
||||
const hasPending = (Array.isArray(state.pending_ids) && state.pending_ids.length > 0) || !!state.pending;
|
||||
if (hasPending) {
|
||||
try {
|
||||
// 【修复 #3】检查消息是否已经被转发过
|
||||
const forwardedKey = `forwarded:${userId}:${state.pending}`;
|
||||
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
|
||||
|
||||
if (alreadyForwarded) {
|
||||
Logger.info('message_forward_duplicate_skipped', {
|
||||
userId,
|
||||
messageId: state.pending
|
||||
});
|
||||
return;
|
||||
let pendingIds = [];
|
||||
if (Array.isArray(state.pending_ids)) {
|
||||
pendingIds = state.pending_ids.slice();
|
||||
} else if (state.pending) {
|
||||
pendingIds = [state.pending];
|
||||
}
|
||||
|
||||
const fakeMsg = {
|
||||
message_id: state.pending,
|
||||
chat: { id: userId, type: "private" },
|
||||
from: query.from,
|
||||
};
|
||||
// 限制一次性转发量,避免用户恶意堆积导致执行超时
|
||||
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
|
||||
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
|
||||
}
|
||||
|
||||
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
|
||||
let forwardedCount = 0;
|
||||
for (const pendingId of pendingIds) {
|
||||
if (!pendingId) continue;
|
||||
const forwardedKey = `forwarded:${userId}:${pendingId}`;
|
||||
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
|
||||
if (alreadyForwarded) {
|
||||
Logger.info('message_forward_duplicate_skipped', { userId, messageId: pendingId });
|
||||
continue;
|
||||
}
|
||||
|
||||
// 【修复 #3】标记已转发
|
||||
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
|
||||
const fakeMsg = {
|
||||
message_id: pendingId,
|
||||
chat: { id: userId, type: "private" },
|
||||
from: query.from,
|
||||
};
|
||||
|
||||
await tgCall(env, "sendMessage", {
|
||||
chat_id: userId,
|
||||
text: "📩 刚才的消息已帮您送达。",
|
||||
reply_to_message_id: state.pending
|
||||
});
|
||||
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
|
||||
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
|
||||
forwardedCount++;
|
||||
}
|
||||
|
||||
if (forwardedCount > 0) {
|
||||
await tgCall(env, "sendMessage", {
|
||||
chat_id: userId,
|
||||
text: `📩 刚才的 ${forwardedCount} 条消息已帮您送达。`
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
Logger.error('pending_message_forward_failed', e, { userId });
|
||||
await tgCall(env, "sendMessage", {
|
||||
@@ -837,53 +1083,60 @@ async function handleCallbackQuery(query, env, ctx) {
|
||||
* @param {object} env - 环境变量对象
|
||||
*/
|
||||
async function handleCleanupCommand(threadId, env) {
|
||||
const lockKey = "cleanup:lock";
|
||||
const locked = await env.TOPIC_MAP.get(lockKey);
|
||||
if (locked) {
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
text: "⏳ **已有清理任务正在运行,请稍后再试。**",
|
||||
parse_mode: "Markdown"
|
||||
}, threadId));
|
||||
return;
|
||||
}
|
||||
|
||||
await env.TOPIC_MAP.put(lockKey, "1", { expirationTtl: CONFIG.CLEANUP_LOCK_TTL_SECONDS });
|
||||
|
||||
// 发送处理中的消息
|
||||
await tgCall(env, "sendMessage", {
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: threadId,
|
||||
text: "🔄 **正在扫描需要清理的用户...**",
|
||||
parse_mode: "Markdown"
|
||||
});
|
||||
}, threadId));
|
||||
|
||||
let cleanedCount = 0;
|
||||
let errorCount = 0;
|
||||
const cleanedUsers = [];
|
||||
let scannedCount = 0;
|
||||
|
||||
try {
|
||||
// 【修复 #11】获取所有用户记录(处理分页)
|
||||
const allKeys = await getAllKeys(env, "user:");
|
||||
// 逐页扫描,避免一次性拉取全部 keys 导致超时/内存膨胀
|
||||
let cursor = undefined;
|
||||
do {
|
||||
const result = await env.TOPIC_MAP.list({ prefix: "user:", cursor });
|
||||
const names = (result.keys || []).map(k => k.name);
|
||||
scannedCount += names.length;
|
||||
|
||||
// 【修复 #8】批量并发处理(限制并发数)
|
||||
for (let i = 0; i < allKeys.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
|
||||
const batch = allKeys.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
|
||||
// 批量并发处理(限制并发数)
|
||||
for (let i = 0; i < names.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
|
||||
const batch = names.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
batch.map(async ({ name }) => {
|
||||
const rec = await safeGetJSON(env, name, null);
|
||||
const results = await Promise.allSettled(
|
||||
batch.map(async (name) => {
|
||||
const rec = await safeGetJSON(env, name, null);
|
||||
if (!rec || !rec.thread_id) return null;
|
||||
|
||||
const userId = name.slice(5);
|
||||
const topicThreadId = rec.thread_id;
|
||||
|
||||
// 检测话题是否存在:尝试向话题发送测试消息
|
||||
const testRes = await tgCall(env, "sendMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: topicThreadId,
|
||||
text: "", // 零宽度字符
|
||||
const probe = await probeForumThread(env, topicThreadId, {
|
||||
userId,
|
||||
reason: "cleanup_check",
|
||||
doubleCheckOnMissingThreadId: false
|
||||
});
|
||||
|
||||
const actualThreadId = testRes.result?.message_thread_id;
|
||||
|
||||
// 同时检测 Telegram 静默重定向到 General 的场景
|
||||
if (!testRes.ok || (actualThreadId && Number(actualThreadId) !== Number(topicThreadId))) {
|
||||
const desc = (testRes.description || "").toLowerCase();
|
||||
const redirected = actualThreadId && Number(actualThreadId) !== Number(topicThreadId);
|
||||
const isTopicDeleted = redirected ||
|
||||
desc.includes("thread not found") ||
|
||||
desc.includes("topic not found") ||
|
||||
desc.includes("message thread not found");
|
||||
|
||||
if (isTopicDeleted) {
|
||||
// cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录
|
||||
if (probe.status === "redirected" || probe.status === "missing") {
|
||||
await env.TOPIC_MAP.delete(name);
|
||||
await env.TOPIC_MAP.delete(`verified:${userId}`);
|
||||
await env.TOPIC_MAP.delete(`thread:${topicThreadId}`);
|
||||
@@ -893,19 +1146,20 @@ async function handleCleanupCommand(threadId, env) {
|
||||
threadId: topicThreadId,
|
||||
title: rec.title || "未知"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// 话题存在或检测后,尝试删除测试消息
|
||||
if (testRes.ok && testRes.result?.message_id) {
|
||||
try {
|
||||
await tgCall(env, "deleteMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_id: testRes.result.message_id
|
||||
});
|
||||
} catch (e) {
|
||||
// 删除失败不影响主流程
|
||||
}
|
||||
} else if (probe.status === "probe_invalid") {
|
||||
Logger.warn('cleanup_probe_invalid_message', {
|
||||
userId,
|
||||
threadId: topicThreadId,
|
||||
errorDescription: probe.description
|
||||
});
|
||||
} else if (probe.status === "unknown_error") {
|
||||
Logger.warn('cleanup_probe_failed_unknown', {
|
||||
userId,
|
||||
threadId: topicThreadId,
|
||||
errorDescription: probe.description
|
||||
});
|
||||
} else if (probe.status === "missing_thread_id") {
|
||||
Logger.warn('cleanup_probe_missing_thread_id', { userId, threadId: topicThreadId });
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -927,15 +1181,24 @@ async function handleCleanupCommand(threadId, env) {
|
||||
}
|
||||
});
|
||||
|
||||
// 防止速率限制
|
||||
if (i + CONFIG.CLEANUP_BATCH_SIZE < allKeys.length) {
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
// 防止速率限制
|
||||
if (i + CONFIG.CLEANUP_BATCH_SIZE < names.length) {
|
||||
await new Promise(r => setTimeout(r, 600));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cursor = result.list_complete ? undefined : result.cursor;
|
||||
|
||||
// 在分页之间让出时间片,降低单次执行压力
|
||||
if (cursor) {
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
// 生成并发送清理报告
|
||||
let reportText = `✅ **清理完成**\n\n`;
|
||||
reportText += `📊 **统计信息**\n`;
|
||||
reportText += `- 扫描用户数: ${scannedCount}\n`;
|
||||
reportText += `- 已清理用户数: ${cleanedCount}\n`;
|
||||
reportText += `- 错误数: ${errorCount}\n\n`;
|
||||
|
||||
@@ -955,24 +1218,24 @@ async function handleCleanupCommand(threadId, env) {
|
||||
Logger.info('cleanup_completed', {
|
||||
cleanedCount,
|
||||
errorCount,
|
||||
totalUsers: allKeys.length
|
||||
totalUsers: scannedCount
|
||||
});
|
||||
|
||||
await tgCall(env, "sendMessage", {
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: threadId,
|
||||
text: reportText,
|
||||
parse_mode: "Markdown"
|
||||
});
|
||||
}, threadId));
|
||||
|
||||
} catch (e) {
|
||||
Logger.error('cleanup_failed', e, { threadId });
|
||||
await tgCall(env, "sendMessage", {
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
message_thread_id: threadId,
|
||||
text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``,
|
||||
parse_mode: "Markdown"
|
||||
});
|
||||
}, threadId));
|
||||
} finally {
|
||||
await env.TOPIC_MAP.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -995,9 +1258,22 @@ async function createTopic(from, key, env, userId) {
|
||||
// 【修复 #2】更新话题状态 - 修复异步操作未等待
|
||||
async function updateThreadStatus(threadId, isClosed, env) {
|
||||
try {
|
||||
const allKeys = await getAllKeys(env, "user:");
|
||||
const mappedUser = await env.TOPIC_MAP.get(`thread:${threadId}`);
|
||||
if (mappedUser) {
|
||||
const userKey = `user:${mappedUser}`;
|
||||
const rec = await safeGetJSON(env, userKey, null);
|
||||
if (rec && Number(rec.thread_id) === Number(threadId)) {
|
||||
rec.closed = isClosed;
|
||||
await env.TOPIC_MAP.put(userKey, JSON.stringify(rec));
|
||||
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: 1 });
|
||||
return;
|
||||
}
|
||||
|
||||
// 收集所有更新操作
|
||||
// 映射失效:清理后降级全量扫描
|
||||
await env.TOPIC_MAP.delete(`thread:${threadId}`);
|
||||
}
|
||||
|
||||
const allKeys = await getAllKeys(env, "user:");
|
||||
const updates = [];
|
||||
|
||||
for (const { name } of allKeys) {
|
||||
@@ -1008,14 +1284,8 @@ async function updateThreadStatus(threadId, isClosed, env) {
|
||||
}
|
||||
}
|
||||
|
||||
// 等待所有更新完成
|
||||
await Promise.all(updates);
|
||||
|
||||
Logger.info('thread_status_updated', {
|
||||
threadId,
|
||||
isClosed,
|
||||
updatedCount: updates.length
|
||||
});
|
||||
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: updates.length });
|
||||
} catch (e) {
|
||||
Logger.error('thread_status_update_failed', e, { threadId, isClosed });
|
||||
throw e;
|
||||
@@ -1119,11 +1389,15 @@ async function handleMediaGroup(msg, env, ctx, { direction, targetChat, threadId
|
||||
const key = `mg:${direction}:${groupId}`;
|
||||
const item = extractMedia(msg);
|
||||
if (!item) {
|
||||
await tgCall(env, "copyMessage", { chat_id: targetChat, from_chat_id: msg.chat.id, message_id: msg.message_id, message_thread_id: threadId });
|
||||
await tgCall(env, "copyMessage", withMessageThreadId({
|
||||
chat_id: targetChat,
|
||||
from_chat_id: msg.chat.id,
|
||||
message_id: msg.message_id
|
||||
}, threadId));
|
||||
return;
|
||||
}
|
||||
let rec = await safeGetJSON(env, key, null);
|
||||
if (!rec) rec = { direction, targetChat, threadId, items: [], last_ts: Date.now() };
|
||||
if (!rec) rec = { direction, targetChat, threadId: (threadId === null ? undefined : threadId), items: [], last_ts: Date.now() };
|
||||
rec.items.push({ ...item, msg_id: msg.message_id });
|
||||
rec.last_ts = Date.now();
|
||||
await env.TOPIC_MAP.put(key, JSON.stringify(rec), { expirationTtl: CONFIG.MEDIA_GROUP_EXPIRE_SECONDS });
|
||||
@@ -1226,7 +1500,7 @@ async function delaySend(env, key, ts) {
|
||||
}
|
||||
// 【修复 #28】限制 caption 长度
|
||||
const caption = i === 0 ? (it.cap || "").substring(0, 1024) : "";
|
||||
return {
|
||||
return {
|
||||
type: it.type,
|
||||
media: it.id,
|
||||
caption
|
||||
@@ -1235,11 +1509,10 @@ async function delaySend(env, key, ts) {
|
||||
|
||||
if (media.length > 0) {
|
||||
try {
|
||||
const result = await tgCall(env, "sendMediaGroup", {
|
||||
const result = await tgCall(env, "sendMediaGroup", withMessageThreadId({
|
||||
chat_id: rec.targetChat,
|
||||
message_thread_id: rec.threadId,
|
||||
media
|
||||
});
|
||||
}, rec.threadId));
|
||||
|
||||
if (!result.ok) {
|
||||
Logger.error('media_group_send_failed', result.description, {
|
||||
|
||||
Reference in New Issue
Block a user