From 54b12c7a0435bf923cdc5b086f3b2f0d311dc848 Mon Sep 17 00:00:00 2001 From: Vaghr <161007666+jikssha@users.noreply.github.com> Date: Tue, 6 Jan 2026 01:23:24 +0800 Subject: [PATCH] Refactor worker.js for improved configuration and error handling Updated comments and added new constants for better configuration management. Enhanced error handling and improved user verification process. --- worker.js | 594 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 416 insertions(+), 178 deletions(-) diff --git a/worker.js b/worker.js index f76fadd..b0380b2 100644 --- a/worker.js +++ b/worker.js @@ -1,4 +1,4 @@ -// Cloudflare Worker:Telegram 双向机器人 v5.1 +// Cloudflare Worker:Telegram 双向机器人 (纯本地极速版 v5.3) // --- 配置常量 --- const CONFIG = { @@ -7,6 +7,9 @@ const CONFIG = { VERIFIED_EXPIRE_SECONDS: 2592000, // 30天 MEDIA_GROUP_EXPIRE_SECONDS: 60, MEDIA_GROUP_DELAY_MS: 3000, // 3秒(从2秒增加) + PENDING_MAX_MESSAGES: 10, // 验证期间最多暂存的消息数 + ADMIN_CACHE_TTL_SECONDS: 300, // 管理员权限缓存 5 分钟 + NEEDS_REVERIFY_TTL_SECONDS: 600, // 标记需重新验证的 TTL(用于并发兜底) RATE_LIMIT_MESSAGE: 45, RATE_LIMIT_VERIFY: 3, RATE_LIMIT_WINDOW: 60, @@ -22,6 +25,10 @@ const CONFIG = { // 线程健康检查缓存,减少频繁探测请求 const threadHealthCache = new Map(); +// 同一实例内的并发保护:避免同一用户短时间内重复创建话题 +const topicCreateInFlight = new Map(); +// 管理员权限缓存(实例内) +const adminStatusCache = new Map(); // --- 本地题库 (15条) --- const LOCAL_QUESTIONS = [ @@ -143,6 +150,187 @@ async function safeGetJSON(env, key, defaultValue = null) { } } +function normalizeTgDescription(description) { + return (description || "").toString().toLowerCase(); +} + +function isTopicMissingOrDeleted(description) { + const desc = normalizeTgDescription(description); + return desc.includes("thread not found") || + desc.includes("topic not found") || + desc.includes("message thread not found") || + desc.includes("topic deleted") || + desc.includes("thread deleted") || + desc.includes("forum topic not found") || + desc.includes("topic closed permanently"); +} + +function isTestMessageInvalid(description) { + const desc = normalizeTgDescription(description); + return desc.includes("message text is empty") || + desc.includes("bad request: message text is empty"); +} + +async function getOrCreateUserTopicRec(from, key, env, userId) { + const existing = await safeGetJSON(env, key, null); + if (existing && existing.thread_id) return existing; + + const inflight = topicCreateInFlight.get(String(userId)); + if (inflight) return await inflight; + + const p = (async () => { + // 并发下二次确认,避免已被其他请求创建却读到旧值 + const again = await safeGetJSON(env, key, null); + if (again && again.thread_id) return again; + return await createTopic(from, key, env, userId); + })(); + + topicCreateInFlight.set(String(userId), p); + try { + return await p; + } finally { + if (topicCreateInFlight.get(String(userId)) === p) { + topicCreateInFlight.delete(String(userId)); + } + } +} + +function withMessageThreadId(body, threadId) { + if (threadId === undefined || threadId === null) return body; + return { ...body, message_thread_id: threadId }; +} + +async function probeForumThread(env, expectedThreadId, { userId, reason } = {}) { + const attemptOnce = async () => { + const res = await tgCall(env, "sendMessage", { + chat_id: env.SUPERGROUP_ID, + message_thread_id: expectedThreadId, + text: "🔎" + }); + + const actualThreadId = res.result?.message_thread_id; + const probeMessageId = res.result?.message_id; + + // 尽可能清理探测消息(无论落到哪个话题/General) + if (res.ok && probeMessageId) { + try { + await tgCall(env, "deleteMessage", { + chat_id: env.SUPERGROUP_ID, + message_id: probeMessageId + }); + } catch (e) { + // 删除失败不影响主流程 + } + } + + if (!res.ok) { + if (isTopicMissingOrDeleted(res.description)) { + return { status: "missing", description: res.description }; + } + if (isTestMessageInvalid(res.description)) { + return { status: "probe_invalid", description: res.description }; + } + return { status: "unknown_error", description: res.description }; + } + + // 关键:有些情况下 Telegram 会返回 ok 但不带 message_thread_id(常见于 General) + if (actualThreadId === undefined || actualThreadId === null) { + return { status: "missing_thread_id" }; + } + + if (Number(actualThreadId) !== Number(expectedThreadId)) { + return { status: "redirected", actualThreadId }; + } + + return { status: "ok" }; + }; + + const first = await attemptOnce(); + if (first.status !== "missing_thread_id") return first; + + // 二次探测:避免偶发字段缺失导致误判并触发重建 + const second = await attemptOnce(); + if (second.status === "missing_thread_id") { + Logger.warn('thread_probe_missing_thread_id', { userId, expectedThreadId, reason }); + } + return second; +} + +async function resetUserVerificationAndRequireReverify(env, { userId, userKey, oldThreadId, pendingMsgId, reason }) { + // 清理旧映射与验证状态:用户需要重新做人机验证 + await env.TOPIC_MAP.delete(`verified:${userId}`); + await env.TOPIC_MAP.put(`needs_verify:${userId}`, "1", { expirationTtl: CONFIG.NEEDS_REVERIFY_TTL_SECONDS }); + await env.TOPIC_MAP.delete(`retry:${userId}`); + + if (userKey) { + await env.TOPIC_MAP.delete(userKey); + } + + if (oldThreadId !== undefined && oldThreadId !== null) { + await env.TOPIC_MAP.delete(`thread:${oldThreadId}`); + await env.TOPIC_MAP.delete(`thread_ok:${oldThreadId}`); + threadHealthCache.delete(oldThreadId); + } + + Logger.info('verification_reset_due_to_topic_loss', { + userId, + oldThreadId, + pendingMsgId, + reason + }); + + await sendVerificationChallenge(userId, env, pendingMsgId || null); +} + +function parseAdminIdAllowlist(env) { + const raw = (env.ADMIN_IDS || "").toString().trim(); + if (!raw) return null; + const ids = raw.split(/[,;\s]+/g).map(s => s.trim()).filter(Boolean); + const set = new Set(); + for (const id of ids) { + const n = Number(id); + if (!Number.isFinite(n)) continue; + set.add(String(n)); + } + return set.size > 0 ? set : null; +} + +async function isAdminUser(env, userId) { + const allowlist = parseAdminIdAllowlist(env); + if (allowlist && allowlist.has(String(userId))) return true; + + const cacheKey = String(userId); + const now = Date.now(); + const cached = adminStatusCache.get(cacheKey); + if (cached && (now - cached.ts < CONFIG.ADMIN_CACHE_TTL_SECONDS * 1000)) { + return cached.isAdmin; + } + + const kvKey = `admin:${userId}`; + const kvVal = await env.TOPIC_MAP.get(kvKey); + if (kvVal === "1" || kvVal === "0") { + const isAdmin = kvVal === "1"; + adminStatusCache.set(cacheKey, { ts: now, isAdmin }); + return isAdmin; + } + + try { + const res = await tgCall(env, "getChatMember", { + chat_id: env.SUPERGROUP_ID, + user_id: userId + }); + + const status = res.result?.status; + const isAdmin = res.ok && (status === "creator" || status === "administrator"); + await env.TOPIC_MAP.put(kvKey, isAdmin ? "1" : "0", { expirationTtl: CONFIG.ADMIN_CACHE_TTL_SECONDS }); + adminStatusCache.set(cacheKey, { ts: now, isAdmin }); + return isAdmin; + } catch (e) { + Logger.warn('admin_check_failed', { userId }); + return false; + } +} + // 获取所有 KV keys(处理分页) async function getAllKeys(env, prefix) { const allKeys = []; @@ -257,7 +445,9 @@ export default { } // 【修复】支持 General 话题和普通话题 // General 话题的 message_thread_id 可能不存在,或者等于 1 - if (msg.message_thread_id || msg.text) { + const text = (msg.text || "").trim(); + const isCommand = !!text && text.startsWith("/"); + if (msg.message_thread_id || isCommand) { await handleAdminReply(msg, normalizedEnv, ctx); return new Response("OK"); } @@ -294,16 +484,6 @@ async function handlePrivateMessage(msg, env, ctx) { const verified = await env.TOPIC_MAP.get(`verified:${userId}`); if (!verified) { - // 验证请求速率限制 - const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300); - if (!verifyLimit.allowed) { - await tgCall(env, "sendMessage", { - chat_id: userId, - text: "⚠️ 验证请求过于频繁,请5分钟后再试。" - }); - return; - } - const isStart = msg.text && msg.text.trim() === "/start"; const pendingMsgId = isStart ? null : msg.message_id; await sendVerificationChallenge(userId, env, pendingMsgId); @@ -314,6 +494,13 @@ async function handlePrivateMessage(msg, env, ctx) { } async function forwardToTopic(msg, userId, key, env, ctx) { + // 并发兜底:如果已被标记为需要重新验证,直接发起验证并暂停转发/建话题 + const needsVerify = await env.TOPIC_MAP.get(`needs_verify:${userId}`); + if (needsVerify) { + await sendVerificationChallenge(userId, env, msg.message_id || null); + return; + } + // 【修复 #4】使用安全的 JSON 解析 let rec = await safeGetJSON(env, key, null); @@ -336,7 +523,7 @@ async function forwardToTopic(msg, userId, key, env, ctx) { } if (!rec || !rec.thread_id) { - rec = await createTopic(msg.from, key, env, userId); + rec = await getOrCreateUserTopicRec(msg.from, key, env, userId); if (!rec || !rec.thread_id) { throw new Error("创建话题失败"); } @@ -359,72 +546,44 @@ async function forwardToTopic(msg, userId, key, env, ctx) { const withinTTL = cached && (now - cached.ts < CONFIG.THREAD_HEALTH_TTL_MS); if (!withinTTL) { - const testRes = await tgCall(env, "sendMessage", { - chat_id: env.SUPERGROUP_ID, - message_thread_id: rec.thread_id, - text: "​", // 零宽度字符,对用户不可见 - }); + // 跨节点缓存:避免由于 Workers 多 PoP 导致每次都做健康探测 + const kvHealthKey = `thread_ok:${rec.thread_id}`; + const kvHealthOk = await env.TOPIC_MAP.get(kvHealthKey); + if (kvHealthOk === "1") { + threadHealthCache.set(cacheKey, { ts: now, ok: true }); + } else { + const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "health_check" }); - const actualThreadId = testRes.result?.message_thread_id; - const expectedThreadId = rec.thread_id; - - // 只要未返回 thread_id,或 thread_id 不匹配,均视为话题失效/被重定向 - const redirectedOrMissing = !testRes.ok || !actualThreadId || Number(actualThreadId) !== Number(expectedThreadId); - - if (redirectedOrMissing) { - const desc = (testRes.description || "").toLowerCase(); - const isTopicDeleted = redirectedOrMissing || - desc.includes("thread not found") || - desc.includes("topic not found") || - desc.includes("message thread not found") || - desc.includes("topic deleted") || - desc.includes("thread deleted") || - desc.includes("forum topic not found") || - desc.includes("topic closed permanently"); - - if (isTopicDeleted) { - await env.TOPIC_MAP.put(retryKey, String(retryCount + 1), { expirationTtl: 60 }); - - Logger.info('topic_auto_repair', { + if (probe.status === "redirected" || probe.status === "missing" || probe.status === "missing_thread_id") { + await resetUserVerificationAndRequireReverify(env, { userId, + userKey: key, oldThreadId: rec.thread_id, - attempt: retryCount + 1, - maxAttempts: CONFIG.MAX_RETRY_ATTEMPTS, - errorDescription: testRes.description + pendingMsgId: msg.message_id, + reason: `health_check:${probe.status}` }); + return; + } else if (probe.status === "probe_invalid") { + Logger.warn('topic_health_probe_invalid_message', { + userId, + threadId: rec.thread_id, + errorDescription: probe.description + }); - await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`); - threadHealthCache.delete(cacheKey); - - rec = await createTopic(msg.from, key, env, userId); - - if (!rec || !rec.thread_id) { - throw new Error("重建话题失败"); - } - - Logger.info('topic_recreated', { - userId, - newThreadId: rec.thread_id - }); - } else { - Logger.warn('topic_test_failed_unknown', { - userId, - threadId: rec.thread_id, - errorDescription: testRes.description - }); - } + // 仍然设置短 TTL,避免每条消息都探测(并误触发重建) + threadHealthCache.set(cacheKey, { ts: now, ok: true }); + await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) }); + } else if (probe.status === "unknown_error") { + Logger.warn('topic_test_failed_unknown', { + userId, + threadId: rec.thread_id, + errorDescription: probe.description + }); } else { await env.TOPIC_MAP.delete(retryKey); threadHealthCache.set(cacheKey, { ts: now, ok: true }); - - try { - await tgCall(env, "deleteMessage", { - chat_id: env.SUPERGROUP_ID, - message_id: testRes.result.message_id - }); - } catch (e) { - // 删除失败不影响主流程 - } + await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) }); + } } } } @@ -447,18 +606,13 @@ async function forwardToTopic(msg, userId, key, env, ctx) { // 检测 Telegram 静默重定向到 General 的情况 const resThreadId = res.result?.message_thread_id; - if (res.ok && (!resThreadId || Number(resThreadId) !== Number(rec.thread_id))) { + if (res.ok && resThreadId !== undefined && resThreadId !== null && Number(resThreadId) !== Number(rec.thread_id)) { Logger.warn('forward_redirected_to_general', { userId, expectedThreadId: rec.thread_id, actualThreadId: resThreadId }); - await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`); - threadHealthCache.delete(rec.thread_id); - - const newRec = await createTopic(msg.from, key, env, userId); - // 删除误投到 General 的消息 if (res.result?.message_id) { try { @@ -470,32 +624,65 @@ async function forwardToTopic(msg, userId, key, env, ctx) { // 删除失败不影响重发 } } - - await tgCall(env, "copyMessage", { - chat_id: env.SUPERGROUP_ID, - from_chat_id: userId, - message_id: msg.message_id, - message_thread_id: newRec.thread_id + await resetUserVerificationAndRequireReverify(env, { + userId, + userKey: key, + oldThreadId: rec.thread_id, + pendingMsgId: msg.message_id, + reason: "forward_redirected_to_general" }); return; } + // 兜底:部分情况下 Telegram 返回 ok 但不带 message_thread_id(可能已落入 General) + if (res.ok && (resThreadId === undefined || resThreadId === null)) { + const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "forward_result_missing_thread_id" }); + if (probe.status !== "ok") { + Logger.warn('forward_suspected_redirect_or_missing', { + userId, + expectedThreadId: rec.thread_id, + probeStatus: probe.status, + probeDescription: probe.description + }); + + // 尽量删除误投消息(通常在 General) + if (res.result?.message_id) { + try { + await tgCall(env, "deleteMessage", { + chat_id: env.SUPERGROUP_ID, + message_id: res.result.message_id + }); + } catch (e) { + // 删除失败不影响重发 + } + } + await resetUserVerificationAndRequireReverify(env, { + userId, + userKey: key, + oldThreadId: rec.thread_id, + pendingMsgId: msg.message_id, + reason: `forward_missing_thread_id:${probe.status}` + }); + return; + } + } + // 【修复2】增强错误处理,双重保险 // 如果上面的测试没有捕获到,这里再次检测 if (!res.ok) { - const desc = (res.description || "").toLowerCase(); - if (desc.includes("thread not found") || - desc.includes("topic not found") || - desc.includes("message thread not found")) { - console.log(`[二次修复] 转发失败,话题不存在,正在重建...`); - await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`); - threadHealthCache.delete(rec.thread_id); - const newRec = await createTopic(msg.from, key, env, userId); - await tgCall(env, "forwardMessage", { - chat_id: env.SUPERGROUP_ID, - from_chat_id: userId, - message_id: msg.message_id, - message_thread_id: newRec.thread_id, + const desc = normalizeTgDescription(res.description); + if (isTopicMissingOrDeleted(desc)) { + Logger.warn('forward_failed_topic_missing', { + userId, + threadId: rec.thread_id, + errorDescription: res.description + }); + await resetUserVerificationAndRequireReverify(env, { + userId, + userKey: key, + oldThreadId: rec.thread_id, + pendingMsgId: msg.message_id, + reason: "forward_failed_topic_missing" }); return; } @@ -516,6 +703,12 @@ async function forwardToTopic(msg, userId, key, env, ctx) { async function handleAdminReply(msg, env, ctx) { const threadId = msg.message_thread_id; const text = (msg.text || "").trim(); + const senderId = msg.from?.id; + + // 仅允许管理员在群内操作与回信,防止任意群成员向用户私聊注入消息 + if (!senderId || !(await isAdminUser(env, senderId))) { + return; + } // 【修复】允许在任何话题执行 /cleanup 命令 if (text === "/cleanup") { @@ -576,6 +769,7 @@ async function handleAdminReply(msg, env, ctx) { if (text === "/trust") { await env.TOPIC_MAP.put(`verified:${userId}`, "trusted"); + await env.TOPIC_MAP.delete(`needs_verify:${userId}`); await tgCall(env, "sendMessage", { chat_id: env.SUPERGROUP_ID, message_thread_id: threadId, text: "🌟 **已设置永久信任**", parse_mode: "Markdown" }); return; } @@ -605,7 +799,7 @@ async function handleAdminReply(msg, env, ctx) { // 转发管理员消息给用户 if (msg.media_group_id) { - await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: null }); + await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: undefined }); return; } await tgCall(env, "copyMessage", { chat_id: userId, from_chat_id: env.SUPERGROUP_ID, message_id: msg.message_id }); @@ -617,7 +811,44 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) { // 【修复 #1】检查是否已有进行中的验证 const existingChallenge = await env.TOPIC_MAP.get(`user_challenge:${userId}`); if (existingChallenge) { - Logger.info('verification_duplicate_skipped', { userId }); + // 有正在进行的验证:仅将新消息加入待发送队列,避免重复下发题目/触发验证限速 + const chalKey = `chal:${existingChallenge}`; + const state = await safeGetJSON(env, chalKey, null); + + // KV 可能存在不一致/过期:自愈清理后重新下发 + if (!state || state.userId !== userId) { + await env.TOPIC_MAP.delete(`user_challenge:${userId}`); + } else { + if (pendingMsgId) { + let pendingIds = []; + if (Array.isArray(state.pending_ids)) { + pendingIds = state.pending_ids.slice(); + } else if (state.pending) { + pendingIds = [state.pending]; + } + + if (!pendingIds.includes(pendingMsgId)) { + pendingIds.push(pendingMsgId); + if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) { + pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES); + } + state.pending_ids = pendingIds; + delete state.pending; + await env.TOPIC_MAP.put(chalKey, JSON.stringify(state), { expirationTtl: CONFIG.VERIFY_EXPIRE_SECONDS }); + } + } + Logger.debug('verification_duplicate_skipped', { userId, verifyId: existingChallenge, hasPending: !!pendingMsgId }); + return; + } + } + + // 验证请求速率限制:仅在需要创建新挑战时检查 + const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300); + if (!verifyLimit.allowed) { + await tgCall(env, "sendMessage", { + chat_id: userId, + text: "⚠️ 验证请求过于频繁,请5分钟后再试。" + }); return; } @@ -638,7 +869,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) { const state = { answerIndex: answerIndex, // 存储索引 options: challenge.options, // 存储完整选项列表 - pending: pendingMsgId, + pending_ids: pendingMsgId ? [pendingMsgId] : [], userId: userId // 添加用户ID验证 }; @@ -651,7 +882,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) { userId, verifyId, question: q.question, - hasPending: !!pendingMsgId + pendingCount: state.pending_ids.length }); // 【修复 #6】按钮使用索引而非文本 @@ -741,6 +972,7 @@ async function handleCallbackQuery(query, env, ctx) { // 30天有效期 - 使用配置常量 await env.TOPIC_MAP.put(`verified:${userId}`, "1", { expirationTtl: CONFIG.VERIFIED_EXPIRE_SECONDS }); + await env.TOPIC_MAP.delete(`needs_verify:${userId}`); // 【修复 #1】清理所有相关挑战 await env.TOPIC_MAP.delete(`chal:${verifyId}`); @@ -753,36 +985,48 @@ async function handleCallbackQuery(query, env, ctx) { parse_mode: "Markdown" }); - if (state.pending) { + const hasPending = (Array.isArray(state.pending_ids) && state.pending_ids.length > 0) || !!state.pending; + if (hasPending) { try { - // 【修复 #3】检查消息是否已经被转发过 - const forwardedKey = `forwarded:${userId}:${state.pending}`; - const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey); - - if (alreadyForwarded) { - Logger.info('message_forward_duplicate_skipped', { - userId, - messageId: state.pending - }); - return; + let pendingIds = []; + if (Array.isArray(state.pending_ids)) { + pendingIds = state.pending_ids.slice(); + } else if (state.pending) { + pendingIds = [state.pending]; } - const fakeMsg = { - message_id: state.pending, - chat: { id: userId, type: "private" }, - from: query.from, - }; + // 限制一次性转发量,避免用户恶意堆积导致执行超时 + if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) { + pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES); + } - await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx); + let forwardedCount = 0; + for (const pendingId of pendingIds) { + if (!pendingId) continue; + const forwardedKey = `forwarded:${userId}:${pendingId}`; + const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey); + if (alreadyForwarded) { + Logger.info('message_forward_duplicate_skipped', { userId, messageId: pendingId }); + continue; + } - // 【修复 #3】标记已转发 - await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 }); + const fakeMsg = { + message_id: pendingId, + chat: { id: userId, type: "private" }, + from: query.from, + }; - await tgCall(env, "sendMessage", { - chat_id: userId, - text: "📩 刚才的消息已帮您送达。", - reply_to_message_id: state.pending - }); + await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx); + await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 }); + forwardedCount++; + } + + if (forwardedCount > 0) { + await tgCall(env, "sendMessage", { + chat_id: userId, + text: `📩 刚才的 ${forwardedCount} 条消息已帮您送达。` + }); + } } catch (e) { Logger.error('pending_message_forward_failed', e, { userId }); await tgCall(env, "sendMessage", { @@ -838,12 +1082,11 @@ async function handleCallbackQuery(query, env, ctx) { */ async function handleCleanupCommand(threadId, env) { // 发送处理中的消息 - await tgCall(env, "sendMessage", { + await tgCall(env, "sendMessage", withMessageThreadId({ chat_id: env.SUPERGROUP_ID, - message_thread_id: threadId, text: "🔄 **正在扫描需要清理的用户...**", parse_mode: "Markdown" - }); + }, threadId)); let cleanedCount = 0; let errorCount = 0; @@ -866,24 +1109,10 @@ async function handleCleanupCommand(threadId, env) { const topicThreadId = rec.thread_id; // 检测话题是否存在:尝试向话题发送测试消息 - const testRes = await tgCall(env, "sendMessage", { - chat_id: env.SUPERGROUP_ID, - message_thread_id: topicThreadId, - text: "​", // 零宽度字符 - }); + const probe = await probeForumThread(env, topicThreadId, { userId, reason: "cleanup_check" }); - const actualThreadId = testRes.result?.message_thread_id; - - // 同时检测 Telegram 静默重定向到 General 的场景 - if (!testRes.ok || (actualThreadId && Number(actualThreadId) !== Number(topicThreadId))) { - const desc = (testRes.description || "").toLowerCase(); - const redirected = actualThreadId && Number(actualThreadId) !== Number(topicThreadId); - const isTopicDeleted = redirected || - desc.includes("thread not found") || - desc.includes("topic not found") || - desc.includes("message thread not found"); - - if (isTopicDeleted) { + // cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录 + if (probe.status === "redirected" || probe.status === "missing") { await env.TOPIC_MAP.delete(name); await env.TOPIC_MAP.delete(`verified:${userId}`); await env.TOPIC_MAP.delete(`thread:${topicThreadId}`); @@ -893,19 +1122,20 @@ async function handleCleanupCommand(threadId, env) { threadId: topicThreadId, title: rec.title || "未知" }; - } - } - - // 话题存在或检测后,尝试删除测试消息 - if (testRes.ok && testRes.result?.message_id) { - try { - await tgCall(env, "deleteMessage", { - chat_id: env.SUPERGROUP_ID, - message_id: testRes.result.message_id - }); - } catch (e) { - // 删除失败不影响主流程 - } + } else if (probe.status === "probe_invalid") { + Logger.warn('cleanup_probe_invalid_message', { + userId, + threadId: topicThreadId, + errorDescription: probe.description + }); + } else if (probe.status === "unknown_error") { + Logger.warn('cleanup_probe_failed_unknown', { + userId, + threadId: topicThreadId, + errorDescription: probe.description + }); + } else if (probe.status === "missing_thread_id") { + Logger.warn('cleanup_probe_missing_thread_id', { userId, threadId: topicThreadId }); } return null; @@ -958,21 +1188,19 @@ async function handleCleanupCommand(threadId, env) { totalUsers: allKeys.length }); - await tgCall(env, "sendMessage", { + await tgCall(env, "sendMessage", withMessageThreadId({ chat_id: env.SUPERGROUP_ID, - message_thread_id: threadId, text: reportText, parse_mode: "Markdown" - }); + }, threadId)); } catch (e) { Logger.error('cleanup_failed', e, { threadId }); - await tgCall(env, "sendMessage", { + await tgCall(env, "sendMessage", withMessageThreadId({ chat_id: env.SUPERGROUP_ID, - message_thread_id: threadId, text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``, parse_mode: "Markdown" - }); + }, threadId)); } } @@ -995,9 +1223,22 @@ async function createTopic(from, key, env, userId) { // 【修复 #2】更新话题状态 - 修复异步操作未等待 async function updateThreadStatus(threadId, isClosed, env) { try { - const allKeys = await getAllKeys(env, "user:"); + const mappedUser = await env.TOPIC_MAP.get(`thread:${threadId}`); + if (mappedUser) { + const userKey = `user:${mappedUser}`; + const rec = await safeGetJSON(env, userKey, null); + if (rec && Number(rec.thread_id) === Number(threadId)) { + rec.closed = isClosed; + await env.TOPIC_MAP.put(userKey, JSON.stringify(rec)); + Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: 1 }); + return; + } - // 收集所有更新操作 + // 映射失效:清理后降级全量扫描 + await env.TOPIC_MAP.delete(`thread:${threadId}`); + } + + const allKeys = await getAllKeys(env, "user:"); const updates = []; for (const { name } of allKeys) { @@ -1008,14 +1249,8 @@ async function updateThreadStatus(threadId, isClosed, env) { } } - // 等待所有更新完成 await Promise.all(updates); - - Logger.info('thread_status_updated', { - threadId, - isClosed, - updatedCount: updates.length - }); + Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: updates.length }); } catch (e) { Logger.error('thread_status_update_failed', e, { threadId, isClosed }); throw e; @@ -1119,11 +1354,15 @@ async function handleMediaGroup(msg, env, ctx, { direction, targetChat, threadId const key = `mg:${direction}:${groupId}`; const item = extractMedia(msg); if (!item) { - await tgCall(env, "copyMessage", { chat_id: targetChat, from_chat_id: msg.chat.id, message_id: msg.message_id, message_thread_id: threadId }); + await tgCall(env, "copyMessage", withMessageThreadId({ + chat_id: targetChat, + from_chat_id: msg.chat.id, + message_id: msg.message_id + }, threadId)); return; } let rec = await safeGetJSON(env, key, null); - if (!rec) rec = { direction, targetChat, threadId, items: [], last_ts: Date.now() }; + if (!rec) rec = { direction, targetChat, threadId: (threadId === null ? undefined : threadId), items: [], last_ts: Date.now() }; rec.items.push({ ...item, msg_id: msg.message_id }); rec.last_ts = Date.now(); await env.TOPIC_MAP.put(key, JSON.stringify(rec), { expirationTtl: CONFIG.MEDIA_GROUP_EXPIRE_SECONDS }); @@ -1226,7 +1465,7 @@ async function delaySend(env, key, ts) { } // 【修复 #28】限制 caption 长度 const caption = i === 0 ? (it.cap || "").substring(0, 1024) : ""; - return { + return { type: it.type, media: it.id, caption @@ -1235,11 +1474,10 @@ async function delaySend(env, key, ts) { if (media.length > 0) { try { - const result = await tgCall(env, "sendMediaGroup", { + const result = await tgCall(env, "sendMediaGroup", withMessageThreadId({ chat_id: rec.targetChat, - message_thread_id: rec.threadId, media - }); + }, rec.threadId)); if (!result.ok) { Logger.error('media_group_send_failed', result.description, {