From 06bcf8f15c011911b9afeea0094f8e66cbc83afe Mon Sep 17 00:00:00 2001 From: Vaghr <161007666+jikssha@users.noreply.github.com> Date: Tue, 6 Jan 2026 02:04:14 +0800 Subject: [PATCH] Enhance cleanup command and thread probing logic Added doubleCheckOnMissingThreadId option to probeForumThread function and improved cleanup command handling. --- worker.js | 69 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/worker.js b/worker.js index 6aaa383..9ea4dca 100644 --- a/worker.js +++ b/worker.js @@ -19,6 +19,7 @@ const CONFIG = { API_TIMEOUT_MS: 10000, CLEANUP_BATCH_SIZE: 10, MAX_CLEANUP_DISPLAY: 20, + CLEANUP_LOCK_TTL_SECONDS: 1800, // /cleanup 防并发锁 30 分钟 MAX_RETRY_ATTEMPTS: 3, THREAD_HEALTH_TTL_MS: 60000 }; @@ -200,7 +201,7 @@ function withMessageThreadId(body, threadId) { return { ...body, message_thread_id: threadId }; } -async function probeForumThread(env, expectedThreadId, { userId, reason } = {}) { +async function probeForumThread(env, expectedThreadId, { userId, reason, doubleCheckOnMissingThreadId = true } = {}) { const attemptOnce = async () => { const res = await tgCall(env, "sendMessage", { chat_id: env.SUPERGROUP_ID, @@ -246,7 +247,7 @@ async function probeForumThread(env, expectedThreadId, { userId, reason } = {}) }; const first = await attemptOnce(); - if (first.status !== "missing_thread_id") return first; + if (first.status !== "missing_thread_id" || !doubleCheckOnMissingThreadId) return first; // 二次探测:避免偶发字段缺失导致误判并触发重建 const second = await attemptOnce(); @@ -712,7 +713,8 @@ async function handleAdminReply(msg, env, ctx) { // 【修复】允许在任何话题执行 /cleanup 命令 if (text === "/cleanup") { - await handleCleanupCommand(threadId, env); + // /cleanup 可能处理较久,使用 waitUntil 防止 webhook 请求超时导致“卡住” + ctx.waitUntil(handleCleanupCommand(threadId, env)); return; } @@ -1081,6 +1083,19 @@ async function handleCallbackQuery(query, env, ctx) { * @param {object} env - 环境变量对象 */ async function handleCleanupCommand(threadId, env) { + const lockKey = "cleanup:lock"; + const locked = await env.TOPIC_MAP.get(lockKey); + if (locked) { + await tgCall(env, "sendMessage", withMessageThreadId({ + chat_id: env.SUPERGROUP_ID, + text: "⏳ **已有清理任务正在运行,请稍后再试。**", + parse_mode: "Markdown" + }, threadId)); + return; + } + + await env.TOPIC_MAP.put(lockKey, "1", { expirationTtl: CONFIG.CLEANUP_LOCK_TTL_SECONDS }); + // 发送处理中的消息 await tgCall(env, "sendMessage", withMessageThreadId({ chat_id: env.SUPERGROUP_ID, @@ -1091,25 +1106,34 @@ async function handleCleanupCommand(threadId, env) { let cleanedCount = 0; let errorCount = 0; const cleanedUsers = []; + let scannedCount = 0; try { - // 【修复 #11】获取所有用户记录(处理分页) - const allKeys = await getAllKeys(env, "user:"); + // 逐页扫描,避免一次性拉取全部 keys 导致超时/内存膨胀 + let cursor = undefined; + do { + const result = await env.TOPIC_MAP.list({ prefix: "user:", cursor }); + const names = (result.keys || []).map(k => k.name); + scannedCount += names.length; - // 【修复 #8】批量并发处理(限制并发数) - for (let i = 0; i < allKeys.length; i += CONFIG.CLEANUP_BATCH_SIZE) { - const batch = allKeys.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE); + // 批量并发处理(限制并发数) + for (let i = 0; i < names.length; i += CONFIG.CLEANUP_BATCH_SIZE) { + const batch = names.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE); - const results = await Promise.allSettled( - batch.map(async ({ name }) => { - const rec = await safeGetJSON(env, name, null); + const results = await Promise.allSettled( + batch.map(async (name) => { + const rec = await safeGetJSON(env, name, null); if (!rec || !rec.thread_id) return null; const userId = name.slice(5); const topicThreadId = rec.thread_id; // 检测话题是否存在:尝试向话题发送测试消息 - const probe = await probeForumThread(env, topicThreadId, { userId, reason: "cleanup_check" }); + const probe = await probeForumThread(env, topicThreadId, { + userId, + reason: "cleanup_check", + doubleCheckOnMissingThreadId: false + }); // cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录 if (probe.status === "redirected" || probe.status === "missing") { @@ -1157,15 +1181,24 @@ async function handleCleanupCommand(threadId, env) { } }); - // 防止速率限制 - if (i + CONFIG.CLEANUP_BATCH_SIZE < allKeys.length) { - await new Promise(r => setTimeout(r, 1000)); + // 防止速率限制 + if (i + CONFIG.CLEANUP_BATCH_SIZE < names.length) { + await new Promise(r => setTimeout(r, 600)); + } } - } + + cursor = result.list_complete ? undefined : result.cursor; + + // 在分页之间让出时间片,降低单次执行压力 + if (cursor) { + await new Promise(r => setTimeout(r, 200)); + } + } while (cursor); // 生成并发送清理报告 let reportText = `✅ **清理完成**\n\n`; reportText += `📊 **统计信息**\n`; + reportText += `- 扫描用户数: ${scannedCount}\n`; reportText += `- 已清理用户数: ${cleanedCount}\n`; reportText += `- 错误数: ${errorCount}\n\n`; @@ -1185,7 +1218,7 @@ async function handleCleanupCommand(threadId, env) { Logger.info('cleanup_completed', { cleanedCount, errorCount, - totalUsers: allKeys.length + totalUsers: scannedCount }); await tgCall(env, "sendMessage", withMessageThreadId({ @@ -1201,6 +1234,8 @@ async function handleCleanupCommand(threadId, env) { text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``, parse_mode: "Markdown" }, threadId)); + } finally { + await env.TOPIC_MAP.delete(lockKey); } }