4 Commits
v5.1 ... main

Author SHA1 Message Date
Vaghr
96ffd6b492 Update version in README to v5.3 2026-01-06 02:07:22 +08:00
Vaghr
06bcf8f15c Enhance cleanup command and thread probing logic
Added doubleCheckOnMissingThreadId option to probeForumThread function and improved cleanup command handling.
2026-01-06 02:04:14 +08:00
Vaghr
78e7a88a8c Update comment for Telegram bot version 2026-01-06 01:23:40 +08:00
Vaghr
54b12c7a04 Refactor worker.js for improved configuration and error handling
Updated comments and added new constants for better configuration management. Enhanced error handling and improved user verification process.
2026-01-06 01:23:24 +08:00
2 changed files with 465 additions and 192 deletions

View File

@@ -1,4 +1,4 @@
# 🤖 Telegram Private Chatbot (v5.1)
# 🤖 Telegram Private Chatbot (v5.3)
[![Deploy to Cloudflare Workers](https://deploy.workers.cloudflare.com/button)](https://deploy.workers.cloudflare.com/?url=https://github.com/jikssha/telegram_private_chatbot)
![GitHub stars](https://img.shields.io/github/stars/jikssha/telegram_private_chatbot?style=social)

655
worker.js
View File

@@ -1,4 +1,4 @@
// Cloudflare WorkerTelegram 双向机器人 v5.1
// Cloudflare WorkerTelegram 双向机器人 v5.3
// --- 配置常量 ---
const CONFIG = {
@@ -7,6 +7,9 @@ const CONFIG = {
VERIFIED_EXPIRE_SECONDS: 2592000, // 30天
MEDIA_GROUP_EXPIRE_SECONDS: 60,
MEDIA_GROUP_DELAY_MS: 3000, // 3秒从2秒增加
PENDING_MAX_MESSAGES: 10, // 验证期间最多暂存的消息数
ADMIN_CACHE_TTL_SECONDS: 300, // 管理员权限缓存 5 分钟
NEEDS_REVERIFY_TTL_SECONDS: 600, // 标记需重新验证的 TTL用于并发兜底
RATE_LIMIT_MESSAGE: 45,
RATE_LIMIT_VERIFY: 3,
RATE_LIMIT_WINDOW: 60,
@@ -16,12 +19,17 @@ const CONFIG = {
API_TIMEOUT_MS: 10000,
CLEANUP_BATCH_SIZE: 10,
MAX_CLEANUP_DISPLAY: 20,
CLEANUP_LOCK_TTL_SECONDS: 1800, // /cleanup 防并发锁 30 分钟
MAX_RETRY_ATTEMPTS: 3,
THREAD_HEALTH_TTL_MS: 60000
};
// 线程健康检查缓存,减少频繁探测请求
const threadHealthCache = new Map();
// 同一实例内的并发保护:避免同一用户短时间内重复创建话题
const topicCreateInFlight = new Map();
// 管理员权限缓存(实例内)
const adminStatusCache = new Map();
// --- 本地题库 (15条) ---
const LOCAL_QUESTIONS = [
@@ -143,6 +151,187 @@ async function safeGetJSON(env, key, defaultValue = null) {
}
}
function normalizeTgDescription(description) {
return (description || "").toString().toLowerCase();
}
function isTopicMissingOrDeleted(description) {
const desc = normalizeTgDescription(description);
return desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found") ||
desc.includes("topic deleted") ||
desc.includes("thread deleted") ||
desc.includes("forum topic not found") ||
desc.includes("topic closed permanently");
}
function isTestMessageInvalid(description) {
const desc = normalizeTgDescription(description);
return desc.includes("message text is empty") ||
desc.includes("bad request: message text is empty");
}
async function getOrCreateUserTopicRec(from, key, env, userId) {
const existing = await safeGetJSON(env, key, null);
if (existing && existing.thread_id) return existing;
const inflight = topicCreateInFlight.get(String(userId));
if (inflight) return await inflight;
const p = (async () => {
// 并发下二次确认,避免已被其他请求创建却读到旧值
const again = await safeGetJSON(env, key, null);
if (again && again.thread_id) return again;
return await createTopic(from, key, env, userId);
})();
topicCreateInFlight.set(String(userId), p);
try {
return await p;
} finally {
if (topicCreateInFlight.get(String(userId)) === p) {
topicCreateInFlight.delete(String(userId));
}
}
}
function withMessageThreadId(body, threadId) {
if (threadId === undefined || threadId === null) return body;
return { ...body, message_thread_id: threadId };
}
async function probeForumThread(env, expectedThreadId, { userId, reason, doubleCheckOnMissingThreadId = true } = {}) {
const attemptOnce = async () => {
const res = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: expectedThreadId,
text: "🔎"
});
const actualThreadId = res.result?.message_thread_id;
const probeMessageId = res.result?.message_id;
// 尽可能清理探测消息(无论落到哪个话题/General
if (res.ok && probeMessageId) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: probeMessageId
});
} catch (e) {
// 删除失败不影响主流程
}
}
if (!res.ok) {
if (isTopicMissingOrDeleted(res.description)) {
return { status: "missing", description: res.description };
}
if (isTestMessageInvalid(res.description)) {
return { status: "probe_invalid", description: res.description };
}
return { status: "unknown_error", description: res.description };
}
// 关键:有些情况下 Telegram 会返回 ok 但不带 message_thread_id常见于 General
if (actualThreadId === undefined || actualThreadId === null) {
return { status: "missing_thread_id" };
}
if (Number(actualThreadId) !== Number(expectedThreadId)) {
return { status: "redirected", actualThreadId };
}
return { status: "ok" };
};
const first = await attemptOnce();
if (first.status !== "missing_thread_id" || !doubleCheckOnMissingThreadId) return first;
// 二次探测:避免偶发字段缺失导致误判并触发重建
const second = await attemptOnce();
if (second.status === "missing_thread_id") {
Logger.warn('thread_probe_missing_thread_id', { userId, expectedThreadId, reason });
}
return second;
}
async function resetUserVerificationAndRequireReverify(env, { userId, userKey, oldThreadId, pendingMsgId, reason }) {
// 清理旧映射与验证状态:用户需要重新做人机验证
await env.TOPIC_MAP.delete(`verified:${userId}`);
await env.TOPIC_MAP.put(`needs_verify:${userId}`, "1", { expirationTtl: CONFIG.NEEDS_REVERIFY_TTL_SECONDS });
await env.TOPIC_MAP.delete(`retry:${userId}`);
if (userKey) {
await env.TOPIC_MAP.delete(userKey);
}
if (oldThreadId !== undefined && oldThreadId !== null) {
await env.TOPIC_MAP.delete(`thread:${oldThreadId}`);
await env.TOPIC_MAP.delete(`thread_ok:${oldThreadId}`);
threadHealthCache.delete(oldThreadId);
}
Logger.info('verification_reset_due_to_topic_loss', {
userId,
oldThreadId,
pendingMsgId,
reason
});
await sendVerificationChallenge(userId, env, pendingMsgId || null);
}
function parseAdminIdAllowlist(env) {
const raw = (env.ADMIN_IDS || "").toString().trim();
if (!raw) return null;
const ids = raw.split(/[,;\s]+/g).map(s => s.trim()).filter(Boolean);
const set = new Set();
for (const id of ids) {
const n = Number(id);
if (!Number.isFinite(n)) continue;
set.add(String(n));
}
return set.size > 0 ? set : null;
}
async function isAdminUser(env, userId) {
const allowlist = parseAdminIdAllowlist(env);
if (allowlist && allowlist.has(String(userId))) return true;
const cacheKey = String(userId);
const now = Date.now();
const cached = adminStatusCache.get(cacheKey);
if (cached && (now - cached.ts < CONFIG.ADMIN_CACHE_TTL_SECONDS * 1000)) {
return cached.isAdmin;
}
const kvKey = `admin:${userId}`;
const kvVal = await env.TOPIC_MAP.get(kvKey);
if (kvVal === "1" || kvVal === "0") {
const isAdmin = kvVal === "1";
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
return isAdmin;
}
try {
const res = await tgCall(env, "getChatMember", {
chat_id: env.SUPERGROUP_ID,
user_id: userId
});
const status = res.result?.status;
const isAdmin = res.ok && (status === "creator" || status === "administrator");
await env.TOPIC_MAP.put(kvKey, isAdmin ? "1" : "0", { expirationTtl: CONFIG.ADMIN_CACHE_TTL_SECONDS });
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
return isAdmin;
} catch (e) {
Logger.warn('admin_check_failed', { userId });
return false;
}
}
// 获取所有 KV keys处理分页
async function getAllKeys(env, prefix) {
const allKeys = [];
@@ -257,7 +446,9 @@ export default {
}
// 【修复】支持 General 话题和普通话题
// General 话题的 message_thread_id 可能不存在,或者等于 1
if (msg.message_thread_id || msg.text) {
const text = (msg.text || "").trim();
const isCommand = !!text && text.startsWith("/");
if (msg.message_thread_id || isCommand) {
await handleAdminReply(msg, normalizedEnv, ctx);
return new Response("OK");
}
@@ -294,16 +485,6 @@ async function handlePrivateMessage(msg, env, ctx) {
const verified = await env.TOPIC_MAP.get(`verified:${userId}`);
if (!verified) {
// 验证请求速率限制
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
if (!verifyLimit.allowed) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "⚠️ 验证请求过于频繁请5分钟后再试。"
});
return;
}
const isStart = msg.text && msg.text.trim() === "/start";
const pendingMsgId = isStart ? null : msg.message_id;
await sendVerificationChallenge(userId, env, pendingMsgId);
@@ -314,6 +495,13 @@ async function handlePrivateMessage(msg, env, ctx) {
}
async function forwardToTopic(msg, userId, key, env, ctx) {
// 并发兜底:如果已被标记为需要重新验证,直接发起验证并暂停转发/建话题
const needsVerify = await env.TOPIC_MAP.get(`needs_verify:${userId}`);
if (needsVerify) {
await sendVerificationChallenge(userId, env, msg.message_id || null);
return;
}
// 【修复 #4】使用安全的 JSON 解析
let rec = await safeGetJSON(env, key, null);
@@ -336,7 +524,7 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
}
if (!rec || !rec.thread_id) {
rec = await createTopic(msg.from, key, env, userId);
rec = await getOrCreateUserTopicRec(msg.from, key, env, userId);
if (!rec || !rec.thread_id) {
throw new Error("创建话题失败");
}
@@ -359,72 +547,44 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
const withinTTL = cached && (now - cached.ts < CONFIG.THREAD_HEALTH_TTL_MS);
if (!withinTTL) {
const testRes = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: rec.thread_id,
text: "", // 零宽度字符,对用户不可见
});
// 跨节点缓存:避免由于 Workers 多 PoP 导致每次都做健康探测
const kvHealthKey = `thread_ok:${rec.thread_id}`;
const kvHealthOk = await env.TOPIC_MAP.get(kvHealthKey);
if (kvHealthOk === "1") {
threadHealthCache.set(cacheKey, { ts: now, ok: true });
} else {
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "health_check" });
const actualThreadId = testRes.result?.message_thread_id;
const expectedThreadId = rec.thread_id;
// 只要未返回 thread_id或 thread_id 不匹配,均视为话题失效/被重定向
const redirectedOrMissing = !testRes.ok || !actualThreadId || Number(actualThreadId) !== Number(expectedThreadId);
if (redirectedOrMissing) {
const desc = (testRes.description || "").toLowerCase();
const isTopicDeleted = redirectedOrMissing ||
desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found") ||
desc.includes("topic deleted") ||
desc.includes("thread deleted") ||
desc.includes("forum topic not found") ||
desc.includes("topic closed permanently");
if (isTopicDeleted) {
await env.TOPIC_MAP.put(retryKey, String(retryCount + 1), { expirationTtl: 60 });
Logger.info('topic_auto_repair', {
if (probe.status === "redirected" || probe.status === "missing" || probe.status === "missing_thread_id") {
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
attempt: retryCount + 1,
maxAttempts: CONFIG.MAX_RETRY_ATTEMPTS,
errorDescription: testRes.description
pendingMsgId: msg.message_id,
reason: `health_check:${probe.status}`
});
return;
} else if (probe.status === "probe_invalid") {
Logger.warn('topic_health_probe_invalid_message', {
userId,
threadId: rec.thread_id,
errorDescription: probe.description
});
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(cacheKey);
rec = await createTopic(msg.from, key, env, userId);
if (!rec || !rec.thread_id) {
throw new Error("重建话题失败");
}
Logger.info('topic_recreated', {
userId,
newThreadId: rec.thread_id
});
} else {
Logger.warn('topic_test_failed_unknown', {
userId,
threadId: rec.thread_id,
errorDescription: testRes.description
});
}
// 仍然设置短 TTL避免每条消息都探测并误触发重建
threadHealthCache.set(cacheKey, { ts: now, ok: true });
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
} else if (probe.status === "unknown_error") {
Logger.warn('topic_test_failed_unknown', {
userId,
threadId: rec.thread_id,
errorDescription: probe.description
});
} else {
await env.TOPIC_MAP.delete(retryKey);
threadHealthCache.set(cacheKey, { ts: now, ok: true });
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: testRes.result.message_id
});
} catch (e) {
// 删除失败不影响主流程
}
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
}
}
}
}
@@ -447,18 +607,13 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
// 检测 Telegram 静默重定向到 General 的情况
const resThreadId = res.result?.message_thread_id;
if (res.ok && (!resThreadId || Number(resThreadId) !== Number(rec.thread_id))) {
if (res.ok && resThreadId !== undefined && resThreadId !== null && Number(resThreadId) !== Number(rec.thread_id)) {
Logger.warn('forward_redirected_to_general', {
userId,
expectedThreadId: rec.thread_id,
actualThreadId: resThreadId
});
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(rec.thread_id);
const newRec = await createTopic(msg.from, key, env, userId);
// 删除误投到 General 的消息
if (res.result?.message_id) {
try {
@@ -470,32 +625,65 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
// 删除失败不影响重发
}
}
await tgCall(env, "copyMessage", {
chat_id: env.SUPERGROUP_ID,
from_chat_id: userId,
message_id: msg.message_id,
message_thread_id: newRec.thread_id
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: "forward_redirected_to_general"
});
return;
}
// 兜底:部分情况下 Telegram 返回 ok 但不带 message_thread_id可能已落入 General
if (res.ok && (resThreadId === undefined || resThreadId === null)) {
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "forward_result_missing_thread_id" });
if (probe.status !== "ok") {
Logger.warn('forward_suspected_redirect_or_missing', {
userId,
expectedThreadId: rec.thread_id,
probeStatus: probe.status,
probeDescription: probe.description
});
// 尽量删除误投消息(通常在 General
if (res.result?.message_id) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: res.result.message_id
});
} catch (e) {
// 删除失败不影响重发
}
}
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: `forward_missing_thread_id:${probe.status}`
});
return;
}
}
// 【修复2】增强错误处理双重保险
// 如果上面的测试没有捕获到,这里再次检测
if (!res.ok) {
const desc = (res.description || "").toLowerCase();
if (desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found")) {
console.log(`[二次修复] 转发失败,话题不存在,正在重建...`);
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(rec.thread_id);
const newRec = await createTopic(msg.from, key, env, userId);
await tgCall(env, "forwardMessage", {
chat_id: env.SUPERGROUP_ID,
from_chat_id: userId,
message_id: msg.message_id,
message_thread_id: newRec.thread_id,
const desc = normalizeTgDescription(res.description);
if (isTopicMissingOrDeleted(desc)) {
Logger.warn('forward_failed_topic_missing', {
userId,
threadId: rec.thread_id,
errorDescription: res.description
});
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: "forward_failed_topic_missing"
});
return;
}
@@ -516,10 +704,17 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
async function handleAdminReply(msg, env, ctx) {
const threadId = msg.message_thread_id;
const text = (msg.text || "").trim();
const senderId = msg.from?.id;
// 仅允许管理员在群内操作与回信,防止任意群成员向用户私聊注入消息
if (!senderId || !(await isAdminUser(env, senderId))) {
return;
}
// 【修复】允许在任何话题执行 /cleanup 命令
if (text === "/cleanup") {
await handleCleanupCommand(threadId, env);
// /cleanup 可能处理较久,使用 waitUntil 防止 webhook 请求超时导致“卡住”
ctx.waitUntil(handleCleanupCommand(threadId, env));
return;
}
@@ -576,6 +771,7 @@ async function handleAdminReply(msg, env, ctx) {
if (text === "/trust") {
await env.TOPIC_MAP.put(`verified:${userId}`, "trusted");
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
await tgCall(env, "sendMessage", { chat_id: env.SUPERGROUP_ID, message_thread_id: threadId, text: "🌟 **已设置永久信任**", parse_mode: "Markdown" });
return;
}
@@ -605,7 +801,7 @@ async function handleAdminReply(msg, env, ctx) {
// 转发管理员消息给用户
if (msg.media_group_id) {
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: null });
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: undefined });
return;
}
await tgCall(env, "copyMessage", { chat_id: userId, from_chat_id: env.SUPERGROUP_ID, message_id: msg.message_id });
@@ -617,7 +813,44 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
// 【修复 #1】检查是否已有进行中的验证
const existingChallenge = await env.TOPIC_MAP.get(`user_challenge:${userId}`);
if (existingChallenge) {
Logger.info('verification_duplicate_skipped', { userId });
// 有正在进行的验证:仅将新消息加入待发送队列,避免重复下发题目/触发验证限速
const chalKey = `chal:${existingChallenge}`;
const state = await safeGetJSON(env, chalKey, null);
// KV 可能存在不一致/过期:自愈清理后重新下发
if (!state || state.userId !== userId) {
await env.TOPIC_MAP.delete(`user_challenge:${userId}`);
} else {
if (pendingMsgId) {
let pendingIds = [];
if (Array.isArray(state.pending_ids)) {
pendingIds = state.pending_ids.slice();
} else if (state.pending) {
pendingIds = [state.pending];
}
if (!pendingIds.includes(pendingMsgId)) {
pendingIds.push(pendingMsgId);
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
}
state.pending_ids = pendingIds;
delete state.pending;
await env.TOPIC_MAP.put(chalKey, JSON.stringify(state), { expirationTtl: CONFIG.VERIFY_EXPIRE_SECONDS });
}
}
Logger.debug('verification_duplicate_skipped', { userId, verifyId: existingChallenge, hasPending: !!pendingMsgId });
return;
}
}
// 验证请求速率限制:仅在需要创建新挑战时检查
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
if (!verifyLimit.allowed) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "⚠️ 验证请求过于频繁请5分钟后再试。"
});
return;
}
@@ -638,7 +871,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
const state = {
answerIndex: answerIndex, // 存储索引
options: challenge.options, // 存储完整选项列表
pending: pendingMsgId,
pending_ids: pendingMsgId ? [pendingMsgId] : [],
userId: userId // 添加用户ID验证
};
@@ -651,7 +884,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
userId,
verifyId,
question: q.question,
hasPending: !!pendingMsgId
pendingCount: state.pending_ids.length
});
// 【修复 #6】按钮使用索引而非文本
@@ -741,6 +974,7 @@ async function handleCallbackQuery(query, env, ctx) {
// 30天有效期 - 使用配置常量
await env.TOPIC_MAP.put(`verified:${userId}`, "1", { expirationTtl: CONFIG.VERIFIED_EXPIRE_SECONDS });
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
// 【修复 #1】清理所有相关挑战
await env.TOPIC_MAP.delete(`chal:${verifyId}`);
@@ -753,36 +987,48 @@ async function handleCallbackQuery(query, env, ctx) {
parse_mode: "Markdown"
});
if (state.pending) {
const hasPending = (Array.isArray(state.pending_ids) && state.pending_ids.length > 0) || !!state.pending;
if (hasPending) {
try {
// 【修复 #3】检查消息是否已经被转发过
const forwardedKey = `forwarded:${userId}:${state.pending}`;
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
if (alreadyForwarded) {
Logger.info('message_forward_duplicate_skipped', {
userId,
messageId: state.pending
});
return;
let pendingIds = [];
if (Array.isArray(state.pending_ids)) {
pendingIds = state.pending_ids.slice();
} else if (state.pending) {
pendingIds = [state.pending];
}
const fakeMsg = {
message_id: state.pending,
chat: { id: userId, type: "private" },
from: query.from,
};
// 限制一次性转发量,避免用户恶意堆积导致执行超时
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
}
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
let forwardedCount = 0;
for (const pendingId of pendingIds) {
if (!pendingId) continue;
const forwardedKey = `forwarded:${userId}:${pendingId}`;
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
if (alreadyForwarded) {
Logger.info('message_forward_duplicate_skipped', { userId, messageId: pendingId });
continue;
}
// 【修复 #3】标记已转发
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
const fakeMsg = {
message_id: pendingId,
chat: { id: userId, type: "private" },
from: query.from,
};
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "📩 刚才的消息已帮您送达。",
reply_to_message_id: state.pending
});
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
forwardedCount++;
}
if (forwardedCount > 0) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: `📩 刚才的 ${forwardedCount} 条消息已帮您送达。`
});
}
} catch (e) {
Logger.error('pending_message_forward_failed', e, { userId });
await tgCall(env, "sendMessage", {
@@ -837,53 +1083,60 @@ async function handleCallbackQuery(query, env, ctx) {
* @param {object} env - 环境变量对象
*/
async function handleCleanupCommand(threadId, env) {
const lockKey = "cleanup:lock";
const locked = await env.TOPIC_MAP.get(lockKey);
if (locked) {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
text: "⏳ **已有清理任务正在运行,请稍后再试。**",
parse_mode: "Markdown"
}, threadId));
return;
}
await env.TOPIC_MAP.put(lockKey, "1", { expirationTtl: CONFIG.CLEANUP_LOCK_TTL_SECONDS });
// 发送处理中的消息
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: "🔄 **正在扫描需要清理的用户...**",
parse_mode: "Markdown"
});
}, threadId));
let cleanedCount = 0;
let errorCount = 0;
const cleanedUsers = [];
let scannedCount = 0;
try {
// 【修复 #11】获取所有用户记录处理分页
const allKeys = await getAllKeys(env, "user:");
// 逐页扫描,避免一次性拉取全部 keys 导致超时/内存膨胀
let cursor = undefined;
do {
const result = await env.TOPIC_MAP.list({ prefix: "user:", cursor });
const names = (result.keys || []).map(k => k.name);
scannedCount += names.length;
// 【修复 #8】批量并发处理(限制并发数)
for (let i = 0; i < allKeys.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
const batch = allKeys.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
// 批量并发处理(限制并发数)
for (let i = 0; i < names.length; i += CONFIG.CLEANUP_BATCH_SIZE) {
const batch = names.slice(i, i + CONFIG.CLEANUP_BATCH_SIZE);
const results = await Promise.allSettled(
batch.map(async ({ name }) => {
const rec = await safeGetJSON(env, name, null);
const results = await Promise.allSettled(
batch.map(async (name) => {
const rec = await safeGetJSON(env, name, null);
if (!rec || !rec.thread_id) return null;
const userId = name.slice(5);
const topicThreadId = rec.thread_id;
// 检测话题是否存在:尝试向话题发送测试消息
const testRes = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: topicThreadId,
text: "", // 零宽度字符
const probe = await probeForumThread(env, topicThreadId, {
userId,
reason: "cleanup_check",
doubleCheckOnMissingThreadId: false
});
const actualThreadId = testRes.result?.message_thread_id;
// 同时检测 Telegram 静默重定向到 General 的场景
if (!testRes.ok || (actualThreadId && Number(actualThreadId) !== Number(topicThreadId))) {
const desc = (testRes.description || "").toLowerCase();
const redirected = actualThreadId && Number(actualThreadId) !== Number(topicThreadId);
const isTopicDeleted = redirected ||
desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found");
if (isTopicDeleted) {
// cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录
if (probe.status === "redirected" || probe.status === "missing") {
await env.TOPIC_MAP.delete(name);
await env.TOPIC_MAP.delete(`verified:${userId}`);
await env.TOPIC_MAP.delete(`thread:${topicThreadId}`);
@@ -893,19 +1146,20 @@ async function handleCleanupCommand(threadId, env) {
threadId: topicThreadId,
title: rec.title || "未知"
};
}
}
// 话题存在或检测后,尝试删除测试消息
if (testRes.ok && testRes.result?.message_id) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: testRes.result.message_id
});
} catch (e) {
// 删除失败不影响主流程
}
} else if (probe.status === "probe_invalid") {
Logger.warn('cleanup_probe_invalid_message', {
userId,
threadId: topicThreadId,
errorDescription: probe.description
});
} else if (probe.status === "unknown_error") {
Logger.warn('cleanup_probe_failed_unknown', {
userId,
threadId: topicThreadId,
errorDescription: probe.description
});
} else if (probe.status === "missing_thread_id") {
Logger.warn('cleanup_probe_missing_thread_id', { userId, threadId: topicThreadId });
}
return null;
@@ -927,15 +1181,24 @@ async function handleCleanupCommand(threadId, env) {
}
});
// 防止速率限制
if (i + CONFIG.CLEANUP_BATCH_SIZE < allKeys.length) {
await new Promise(r => setTimeout(r, 1000));
// 防止速率限制
if (i + CONFIG.CLEANUP_BATCH_SIZE < names.length) {
await new Promise(r => setTimeout(r, 600));
}
}
}
cursor = result.list_complete ? undefined : result.cursor;
// 在分页之间让出时间片,降低单次执行压力
if (cursor) {
await new Promise(r => setTimeout(r, 200));
}
} while (cursor);
// 生成并发送清理报告
let reportText = `✅ **清理完成**\n\n`;
reportText += `📊 **统计信息**\n`;
reportText += `- 扫描用户数: ${scannedCount}\n`;
reportText += `- 已清理用户数: ${cleanedCount}\n`;
reportText += `- 错误数: ${errorCount}\n\n`;
@@ -955,24 +1218,24 @@ async function handleCleanupCommand(threadId, env) {
Logger.info('cleanup_completed', {
cleanedCount,
errorCount,
totalUsers: allKeys.length
totalUsers: scannedCount
});
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: reportText,
parse_mode: "Markdown"
});
}, threadId));
} catch (e) {
Logger.error('cleanup_failed', e, { threadId });
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``,
parse_mode: "Markdown"
});
}, threadId));
} finally {
await env.TOPIC_MAP.delete(lockKey);
}
}
@@ -995,9 +1258,22 @@ async function createTopic(from, key, env, userId) {
// 【修复 #2】更新话题状态 - 修复异步操作未等待
async function updateThreadStatus(threadId, isClosed, env) {
try {
const allKeys = await getAllKeys(env, "user:");
const mappedUser = await env.TOPIC_MAP.get(`thread:${threadId}`);
if (mappedUser) {
const userKey = `user:${mappedUser}`;
const rec = await safeGetJSON(env, userKey, null);
if (rec && Number(rec.thread_id) === Number(threadId)) {
rec.closed = isClosed;
await env.TOPIC_MAP.put(userKey, JSON.stringify(rec));
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: 1 });
return;
}
// 收集所有更新操作
// 映射失效:清理后降级全量扫描
await env.TOPIC_MAP.delete(`thread:${threadId}`);
}
const allKeys = await getAllKeys(env, "user:");
const updates = [];
for (const { name } of allKeys) {
@@ -1008,14 +1284,8 @@ async function updateThreadStatus(threadId, isClosed, env) {
}
}
// 等待所有更新完成
await Promise.all(updates);
Logger.info('thread_status_updated', {
threadId,
isClosed,
updatedCount: updates.length
});
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: updates.length });
} catch (e) {
Logger.error('thread_status_update_failed', e, { threadId, isClosed });
throw e;
@@ -1119,11 +1389,15 @@ async function handleMediaGroup(msg, env, ctx, { direction, targetChat, threadId
const key = `mg:${direction}:${groupId}`;
const item = extractMedia(msg);
if (!item) {
await tgCall(env, "copyMessage", { chat_id: targetChat, from_chat_id: msg.chat.id, message_id: msg.message_id, message_thread_id: threadId });
await tgCall(env, "copyMessage", withMessageThreadId({
chat_id: targetChat,
from_chat_id: msg.chat.id,
message_id: msg.message_id
}, threadId));
return;
}
let rec = await safeGetJSON(env, key, null);
if (!rec) rec = { direction, targetChat, threadId, items: [], last_ts: Date.now() };
if (!rec) rec = { direction, targetChat, threadId: (threadId === null ? undefined : threadId), items: [], last_ts: Date.now() };
rec.items.push({ ...item, msg_id: msg.message_id });
rec.last_ts = Date.now();
await env.TOPIC_MAP.put(key, JSON.stringify(rec), { expirationTtl: CONFIG.MEDIA_GROUP_EXPIRE_SECONDS });
@@ -1226,7 +1500,7 @@ async function delaySend(env, key, ts) {
}
// 【修复 #28】限制 caption 长度
const caption = i === 0 ? (it.cap || "").substring(0, 1024) : "";
return {
return {
type: it.type,
media: it.id,
caption
@@ -1235,11 +1509,10 @@ async function delaySend(env, key, ts) {
if (media.length > 0) {
try {
const result = await tgCall(env, "sendMediaGroup", {
const result = await tgCall(env, "sendMediaGroup", withMessageThreadId({
chat_id: rec.targetChat,
message_thread_id: rec.threadId,
media
});
}, rec.threadId));
if (!result.ok) {
Logger.error('media_group_send_failed', result.description, {