Enhance cleanup command and thread probing logic
Added doubleCheckOnMissingThreadId option to probeForumThread function and improved cleanup command handling.
This commit is contained in:
69
worker.js
69
worker.js
@@ -19,6 +19,7 @@ const CONFIG = {
|
||||
API_TIMEOUT_MS: 10000,
|
||||
CLEANUP_BATCH_SIZE: 10,
|
||||
MAX_CLEANUP_DISPLAY: 20,
|
||||
CLEANUP_LOCK_TTL_SECONDS: 1800, // /cleanup 防并发锁 30 分钟
|
||||
MAX_RETRY_ATTEMPTS: 3,
|
||||
THREAD_HEALTH_TTL_MS: 60000
|
||||
};
|
||||
@@ -200,7 +201,7 @@ function withMessageThreadId(body, threadId) {
|
||||
return { ...body, message_thread_id: threadId };
|
||||
}
|
||||
|
||||
async function probeForumThread(env, expectedThreadId, { userId, reason } = {}) {
|
||||
async function probeForumThread(env, expectedThreadId, { userId, reason, doubleCheckOnMissingThreadId = true } = {}) {
|
||||
const attemptOnce = async () => {
|
||||
const res = await tgCall(env, "sendMessage", {
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
@@ -246,7 +247,7 @@ async function probeForumThread(env, expectedThreadId, { userId, reason } = {})
|
||||
};
|
||||
|
||||
const first = await attemptOnce();
|
||||
if (first.status !== "missing_thread_id") return first;
|
||||
if (first.status !== "missing_thread_id" || !doubleCheckOnMissingThreadId) return first;
|
||||
|
||||
// 二次探测:避免偶发字段缺失导致误判并触发重建
|
||||
const second = await attemptOnce();
|
||||
@@ -712,7 +713,8 @@ async function handleAdminReply(msg, env, ctx) {
|
||||
|
||||
// 【修复】允许在任何话题执行 /cleanup 命令
|
||||
if (text === "/cleanup") {
|
||||
await handleCleanupCommand(threadId, env);
|
||||
// /cleanup 可能处理较久,使用 waitUntil 防止 webhook 请求超时导致“卡住”
|
||||
ctx.waitUntil(handleCleanupCommand(threadId, env));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1081,6 +1083,19 @@ async function handleCallbackQuery(query, env, ctx) {
|
||||
* @param {object} env - 环境变量对象
|
||||
*/
|
||||
async function handleCleanupCommand(threadId, env) {
|
||||
const lockKey = "cleanup:lock";
|
||||
const locked = await env.TOPIC_MAP.get(lockKey);
|
||||
if (locked) {
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
text: "⏳ **已有清理任务正在运行,请稍后再试。**",
|
||||
parse_mode: "Markdown"
|
||||
}, threadId));
|
||||
return;
|
||||
}
|
||||
|
||||
await env.TOPIC_MAP.put(lockKey, "1", { expirationTtl: CONFIG.CLEANUP_LOCK_TTL_SECONDS });
|
||||
|
||||
// 发送处理中的消息
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
chat_id: env.SUPERGROUP_ID,
|
||||
@@ -1091,25 +1106,34 @@ async function handleCleanupCommand(threadId, env) {
|
||||
let cleanedCount = 0;
|
||||
let errorCount = 0;
|
||||
const cleanedUsers = [];
|
||||
let scannedCount = 0;
|
||||
|
||||
try {
|
||||
// 【修复 #11】获取所有用户记录(处理分页)
|
||||
const allKeys = await getAllKeys(env, "user:");
|
||||
// 逐页扫描,避免一次性拉取全部 keys 导致超时/内存膨胀
|
||||
let cursor = undefined;
|
||||
do {
|
||||
const result = await env.TOPIC_MAP.list({ prefix: "user:", cursor });
|
||||
const names = (result.keys || []).map(k => k.name);
|
||||
scannedCount += names.length;
|
||||
|
||||
// 【修复 #8】批量并发处理(限制并发数)
|
||||
for (let i = 0; i < allKeys.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
|
||||
const batch = allKeys.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
|
||||
// 批量并发处理(限制并发数)
|
||||
for (let i = 0; i < names.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
|
||||
const batch = names.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
|
||||
|
||||
const results = await Promise.allSettled(
|
||||
batch.map(async ({ name }) => {
|
||||
const rec = await safeGetJSON(env, name, null);
|
||||
const results = await Promise.allSettled(
|
||||
batch.map(async (name) => {
|
||||
const rec = await safeGetJSON(env, name, null);
|
||||
if (!rec || !rec.thread_id) return null;
|
||||
|
||||
const userId = name.slice(5);
|
||||
const topicThreadId = rec.thread_id;
|
||||
|
||||
// 检测话题是否存在:尝试向话题发送测试消息
|
||||
const probe = await probeForumThread(env, topicThreadId, { userId, reason: "cleanup_check" });
|
||||
const probe = await probeForumThread(env, topicThreadId, {
|
||||
userId,
|
||||
reason: "cleanup_check",
|
||||
doubleCheckOnMissingThreadId: false
|
||||
});
|
||||
|
||||
// cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录
|
||||
if (probe.status === "redirected" || probe.status === "missing") {
|
||||
@@ -1157,15 +1181,24 @@ async function handleCleanupCommand(threadId, env) {
|
||||
}
|
||||
});
|
||||
|
||||
// 防止速率限制
|
||||
if (i + CONFIG.CLEANUP_BATCH_SIZE < allKeys.length) {
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
// 防止速率限制
|
||||
if (i + CONFIG.CLEANUP_BATCH_SIZE < names.length) {
|
||||
await new Promise(r => setTimeout(r, 600));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cursor = result.list_complete ? undefined : result.cursor;
|
||||
|
||||
// 在分页之间让出时间片,降低单次执行压力
|
||||
if (cursor) {
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
// 生成并发送清理报告
|
||||
let reportText = `✅ **清理完成**\n\n`;
|
||||
reportText += `📊 **统计信息**\n`;
|
||||
reportText += `- 扫描用户数: ${scannedCount}\n`;
|
||||
reportText += `- 已清理用户数: ${cleanedCount}\n`;
|
||||
reportText += `- 错误数: ${errorCount}\n\n`;
|
||||
|
||||
@@ -1185,7 +1218,7 @@ async function handleCleanupCommand(threadId, env) {
|
||||
Logger.info('cleanup_completed', {
|
||||
cleanedCount,
|
||||
errorCount,
|
||||
totalUsers: allKeys.length
|
||||
totalUsers: scannedCount
|
||||
});
|
||||
|
||||
await tgCall(env, "sendMessage", withMessageThreadId({
|
||||
@@ -1201,6 +1234,8 @@ async function handleCleanupCommand(threadId, env) {
|
||||
text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``,
|
||||
parse_mode: "Markdown"
|
||||
}, threadId));
|
||||
} finally {
|
||||
await env.TOPIC_MAP.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user