Refactor worker.js for improved configuration and error handling

Updated comments and added new constants for better configuration management. Enhanced error handling and improved user verification process.
This commit is contained in:
Vaghr
2026-01-06 01:23:24 +08:00
committed by GitHub
parent 9085a339ee
commit 54b12c7a04

594
worker.js
View File

@@ -1,4 +1,4 @@
// Cloudflare WorkerTelegram 双向机器人 v5.1
// Cloudflare WorkerTelegram 双向机器人 (纯本地极速版 v5.3)
// --- 配置常量 ---
const CONFIG = {
@@ -7,6 +7,9 @@ const CONFIG = {
VERIFIED_EXPIRE_SECONDS: 2592000, // 30天
MEDIA_GROUP_EXPIRE_SECONDS: 60,
MEDIA_GROUP_DELAY_MS: 3000, // 3秒从2秒增加
PENDING_MAX_MESSAGES: 10, // 验证期间最多暂存的消息数
ADMIN_CACHE_TTL_SECONDS: 300, // 管理员权限缓存 5 分钟
NEEDS_REVERIFY_TTL_SECONDS: 600, // 标记需重新验证的 TTL用于并发兜底
RATE_LIMIT_MESSAGE: 45,
RATE_LIMIT_VERIFY: 3,
RATE_LIMIT_WINDOW: 60,
@@ -22,6 +25,10 @@ const CONFIG = {
// 线程健康检查缓存,减少频繁探测请求
const threadHealthCache = new Map();
// 同一实例内的并发保护:避免同一用户短时间内重复创建话题
const topicCreateInFlight = new Map();
// 管理员权限缓存(实例内)
const adminStatusCache = new Map();
// --- 本地题库 (15条) ---
const LOCAL_QUESTIONS = [
@@ -143,6 +150,187 @@ async function safeGetJSON(env, key, defaultValue = null) {
}
}
function normalizeTgDescription(description) {
return (description || "").toString().toLowerCase();
}
function isTopicMissingOrDeleted(description) {
const desc = normalizeTgDescription(description);
return desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found") ||
desc.includes("topic deleted") ||
desc.includes("thread deleted") ||
desc.includes("forum topic not found") ||
desc.includes("topic closed permanently");
}
function isTestMessageInvalid(description) {
const desc = normalizeTgDescription(description);
return desc.includes("message text is empty") ||
desc.includes("bad request: message text is empty");
}
async function getOrCreateUserTopicRec(from, key, env, userId) {
const existing = await safeGetJSON(env, key, null);
if (existing && existing.thread_id) return existing;
const inflight = topicCreateInFlight.get(String(userId));
if (inflight) return await inflight;
const p = (async () => {
// 并发下二次确认,避免已被其他请求创建却读到旧值
const again = await safeGetJSON(env, key, null);
if (again && again.thread_id) return again;
return await createTopic(from, key, env, userId);
})();
topicCreateInFlight.set(String(userId), p);
try {
return await p;
} finally {
if (topicCreateInFlight.get(String(userId)) === p) {
topicCreateInFlight.delete(String(userId));
}
}
}
function withMessageThreadId(body, threadId) {
if (threadId === undefined || threadId === null) return body;
return { ...body, message_thread_id: threadId };
}
async function probeForumThread(env, expectedThreadId, { userId, reason } = {}) {
const attemptOnce = async () => {
const res = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: expectedThreadId,
text: "🔎"
});
const actualThreadId = res.result?.message_thread_id;
const probeMessageId = res.result?.message_id;
// 尽可能清理探测消息(无论落到哪个话题/General
if (res.ok && probeMessageId) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: probeMessageId
});
} catch (e) {
// 删除失败不影响主流程
}
}
if (!res.ok) {
if (isTopicMissingOrDeleted(res.description)) {
return { status: "missing", description: res.description };
}
if (isTestMessageInvalid(res.description)) {
return { status: "probe_invalid", description: res.description };
}
return { status: "unknown_error", description: res.description };
}
// 关键:有些情况下 Telegram 会返回 ok 但不带 message_thread_id常见于 General
if (actualThreadId === undefined || actualThreadId === null) {
return { status: "missing_thread_id" };
}
if (Number(actualThreadId) !== Number(expectedThreadId)) {
return { status: "redirected", actualThreadId };
}
return { status: "ok" };
};
const first = await attemptOnce();
if (first.status !== "missing_thread_id") return first;
// 二次探测:避免偶发字段缺失导致误判并触发重建
const second = await attemptOnce();
if (second.status === "missing_thread_id") {
Logger.warn('thread_probe_missing_thread_id', { userId, expectedThreadId, reason });
}
return second;
}
async function resetUserVerificationAndRequireReverify(env, { userId, userKey, oldThreadId, pendingMsgId, reason }) {
// 清理旧映射与验证状态:用户需要重新做人机验证
await env.TOPIC_MAP.delete(`verified:${userId}`);
await env.TOPIC_MAP.put(`needs_verify:${userId}`, "1", { expirationTtl: CONFIG.NEEDS_REVERIFY_TTL_SECONDS });
await env.TOPIC_MAP.delete(`retry:${userId}`);
if (userKey) {
await env.TOPIC_MAP.delete(userKey);
}
if (oldThreadId !== undefined && oldThreadId !== null) {
await env.TOPIC_MAP.delete(`thread:${oldThreadId}`);
await env.TOPIC_MAP.delete(`thread_ok:${oldThreadId}`);
threadHealthCache.delete(oldThreadId);
}
Logger.info('verification_reset_due_to_topic_loss', {
userId,
oldThreadId,
pendingMsgId,
reason
});
await sendVerificationChallenge(userId, env, pendingMsgId || null);
}
function parseAdminIdAllowlist(env) {
const raw = (env.ADMIN_IDS || "").toString().trim();
if (!raw) return null;
const ids = raw.split(/[,;\s]+/g).map(s => s.trim()).filter(Boolean);
const set = new Set();
for (const id of ids) {
const n = Number(id);
if (!Number.isFinite(n)) continue;
set.add(String(n));
}
return set.size > 0 ? set : null;
}
async function isAdminUser(env, userId) {
const allowlist = parseAdminIdAllowlist(env);
if (allowlist && allowlist.has(String(userId))) return true;
const cacheKey = String(userId);
const now = Date.now();
const cached = adminStatusCache.get(cacheKey);
if (cached && (now - cached.ts < CONFIG.ADMIN_CACHE_TTL_SECONDS * 1000)) {
return cached.isAdmin;
}
const kvKey = `admin:${userId}`;
const kvVal = await env.TOPIC_MAP.get(kvKey);
if (kvVal === "1" || kvVal === "0") {
const isAdmin = kvVal === "1";
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
return isAdmin;
}
try {
const res = await tgCall(env, "getChatMember", {
chat_id: env.SUPERGROUP_ID,
user_id: userId
});
const status = res.result?.status;
const isAdmin = res.ok && (status === "creator" || status === "administrator");
await env.TOPIC_MAP.put(kvKey, isAdmin ? "1" : "0", { expirationTtl: CONFIG.ADMIN_CACHE_TTL_SECONDS });
adminStatusCache.set(cacheKey, { ts: now, isAdmin });
return isAdmin;
} catch (e) {
Logger.warn('admin_check_failed', { userId });
return false;
}
}
// 获取所有 KV keys处理分页
async function getAllKeys(env, prefix) {
const allKeys = [];
@@ -257,7 +445,9 @@ export default {
}
// 【修复】支持 General 话题和普通话题
// General 话题的 message_thread_id 可能不存在,或者等于 1
if (msg.message_thread_id || msg.text) {
const text = (msg.text || "").trim();
const isCommand = !!text && text.startsWith("/");
if (msg.message_thread_id || isCommand) {
await handleAdminReply(msg, normalizedEnv, ctx);
return new Response("OK");
}
@@ -294,16 +484,6 @@ async function handlePrivateMessage(msg, env, ctx) {
const verified = await env.TOPIC_MAP.get(`verified:${userId}`);
if (!verified) {
// 验证请求速率限制
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
if (!verifyLimit.allowed) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "⚠️ 验证请求过于频繁请5分钟后再试。"
});
return;
}
const isStart = msg.text && msg.text.trim() === "/start";
const pendingMsgId = isStart ? null : msg.message_id;
await sendVerificationChallenge(userId, env, pendingMsgId);
@@ -314,6 +494,13 @@ async function handlePrivateMessage(msg, env, ctx) {
}
async function forwardToTopic(msg, userId, key, env, ctx) {
// 并发兜底:如果已被标记为需要重新验证,直接发起验证并暂停转发/建话题
const needsVerify = await env.TOPIC_MAP.get(`needs_verify:${userId}`);
if (needsVerify) {
await sendVerificationChallenge(userId, env, msg.message_id || null);
return;
}
// 【修复 #4】使用安全的 JSON 解析
let rec = await safeGetJSON(env, key, null);
@@ -336,7 +523,7 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
}
if (!rec || !rec.thread_id) {
rec = await createTopic(msg.from, key, env, userId);
rec = await getOrCreateUserTopicRec(msg.from, key, env, userId);
if (!rec || !rec.thread_id) {
throw new Error("创建话题失败");
}
@@ -359,72 +546,44 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
const withinTTL = cached && (now - cached.ts < CONFIG.THREAD_HEALTH_TTL_MS);
if (!withinTTL) {
const testRes = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: rec.thread_id,
text: "", // 零宽度字符,对用户不可见
});
// 跨节点缓存:避免由于 Workers 多 PoP 导致每次都做健康探测
const kvHealthKey = `thread_ok:${rec.thread_id}`;
const kvHealthOk = await env.TOPIC_MAP.get(kvHealthKey);
if (kvHealthOk === "1") {
threadHealthCache.set(cacheKey, { ts: now, ok: true });
} else {
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "health_check" });
const actualThreadId = testRes.result?.message_thread_id;
const expectedThreadId = rec.thread_id;
// 只要未返回 thread_id或 thread_id 不匹配,均视为话题失效/被重定向
const redirectedOrMissing = !testRes.ok || !actualThreadId || Number(actualThreadId) !== Number(expectedThreadId);
if (redirectedOrMissing) {
const desc = (testRes.description || "").toLowerCase();
const isTopicDeleted = redirectedOrMissing ||
desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found") ||
desc.includes("topic deleted") ||
desc.includes("thread deleted") ||
desc.includes("forum topic not found") ||
desc.includes("topic closed permanently");
if (isTopicDeleted) {
await env.TOPIC_MAP.put(retryKey, String(retryCount + 1), { expirationTtl: 60 });
Logger.info('topic_auto_repair', {
if (probe.status === "redirected" || probe.status === "missing" || probe.status === "missing_thread_id") {
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
attempt: retryCount + 1,
maxAttempts: CONFIG.MAX_RETRY_ATTEMPTS,
errorDescription: testRes.description
pendingMsgId: msg.message_id,
reason: `health_check:${probe.status}`
});
return;
} else if (probe.status === "probe_invalid") {
Logger.warn('topic_health_probe_invalid_message', {
userId,
threadId: rec.thread_id,
errorDescription: probe.description
});
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(cacheKey);
rec = await createTopic(msg.from, key, env, userId);
if (!rec || !rec.thread_id) {
throw new Error("重建话题失败");
}
Logger.info('topic_recreated', {
userId,
newThreadId: rec.thread_id
});
} else {
Logger.warn('topic_test_failed_unknown', {
userId,
threadId: rec.thread_id,
errorDescription: testRes.description
});
}
// 仍然设置短 TTL避免每条消息都探测并误触发重建
threadHealthCache.set(cacheKey, { ts: now, ok: true });
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
} else if (probe.status === "unknown_error") {
Logger.warn('topic_test_failed_unknown', {
userId,
threadId: rec.thread_id,
errorDescription: probe.description
});
} else {
await env.TOPIC_MAP.delete(retryKey);
threadHealthCache.set(cacheKey, { ts: now, ok: true });
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: testRes.result.message_id
});
} catch (e) {
// 删除失败不影响主流程
}
await env.TOPIC_MAP.put(kvHealthKey, "1", { expirationTtl: Math.ceil(CONFIG.THREAD_HEALTH_TTL_MS / 1000) });
}
}
}
}
@@ -447,18 +606,13 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
// 检测 Telegram 静默重定向到 General 的情况
const resThreadId = res.result?.message_thread_id;
if (res.ok && (!resThreadId || Number(resThreadId) !== Number(rec.thread_id))) {
if (res.ok && resThreadId !== undefined && resThreadId !== null && Number(resThreadId) !== Number(rec.thread_id)) {
Logger.warn('forward_redirected_to_general', {
userId,
expectedThreadId: rec.thread_id,
actualThreadId: resThreadId
});
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(rec.thread_id);
const newRec = await createTopic(msg.from, key, env, userId);
// 删除误投到 General 的消息
if (res.result?.message_id) {
try {
@@ -470,32 +624,65 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
// 删除失败不影响重发
}
}
await tgCall(env, "copyMessage", {
chat_id: env.SUPERGROUP_ID,
from_chat_id: userId,
message_id: msg.message_id,
message_thread_id: newRec.thread_id
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: "forward_redirected_to_general"
});
return;
}
// 兜底:部分情况下 Telegram 返回 ok 但不带 message_thread_id可能已落入 General
if (res.ok && (resThreadId === undefined || resThreadId === null)) {
const probe = await probeForumThread(env, rec.thread_id, { userId, reason: "forward_result_missing_thread_id" });
if (probe.status !== "ok") {
Logger.warn('forward_suspected_redirect_or_missing', {
userId,
expectedThreadId: rec.thread_id,
probeStatus: probe.status,
probeDescription: probe.description
});
// 尽量删除误投消息(通常在 General
if (res.result?.message_id) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: res.result.message_id
});
} catch (e) {
// 删除失败不影响重发
}
}
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: `forward_missing_thread_id:${probe.status}`
});
return;
}
}
// 【修复2】增强错误处理双重保险
// 如果上面的测试没有捕获到,这里再次检测
if (!res.ok) {
const desc = (res.description || "").toLowerCase();
if (desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found")) {
console.log(`[二次修复] 转发失败,话题不存在,正在重建...`);
await env.TOPIC_MAP.delete(`thread:${rec.thread_id}`);
threadHealthCache.delete(rec.thread_id);
const newRec = await createTopic(msg.from, key, env, userId);
await tgCall(env, "forwardMessage", {
chat_id: env.SUPERGROUP_ID,
from_chat_id: userId,
message_id: msg.message_id,
message_thread_id: newRec.thread_id,
const desc = normalizeTgDescription(res.description);
if (isTopicMissingOrDeleted(desc)) {
Logger.warn('forward_failed_topic_missing', {
userId,
threadId: rec.thread_id,
errorDescription: res.description
});
await resetUserVerificationAndRequireReverify(env, {
userId,
userKey: key,
oldThreadId: rec.thread_id,
pendingMsgId: msg.message_id,
reason: "forward_failed_topic_missing"
});
return;
}
@@ -516,6 +703,12 @@ async function forwardToTopic(msg, userId, key, env, ctx) {
async function handleAdminReply(msg, env, ctx) {
const threadId = msg.message_thread_id;
const text = (msg.text || "").trim();
const senderId = msg.from?.id;
// 仅允许管理员在群内操作与回信,防止任意群成员向用户私聊注入消息
if (!senderId || !(await isAdminUser(env, senderId))) {
return;
}
// 【修复】允许在任何话题执行 /cleanup 命令
if (text === "/cleanup") {
@@ -576,6 +769,7 @@ async function handleAdminReply(msg, env, ctx) {
if (text === "/trust") {
await env.TOPIC_MAP.put(`verified:${userId}`, "trusted");
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
await tgCall(env, "sendMessage", { chat_id: env.SUPERGROUP_ID, message_thread_id: threadId, text: "🌟 **已设置永久信任**", parse_mode: "Markdown" });
return;
}
@@ -605,7 +799,7 @@ async function handleAdminReply(msg, env, ctx) {
// 转发管理员消息给用户
if (msg.media_group_id) {
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: null });
await handleMediaGroup(msg, env, ctx, { direction: "t2p", targetChat: userId, threadId: undefined });
return;
}
await tgCall(env, "copyMessage", { chat_id: userId, from_chat_id: env.SUPERGROUP_ID, message_id: msg.message_id });
@@ -617,7 +811,44 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
// 【修复 #1】检查是否已有进行中的验证
const existingChallenge = await env.TOPIC_MAP.get(`user_challenge:${userId}`);
if (existingChallenge) {
Logger.info('verification_duplicate_skipped', { userId });
// 有正在进行的验证:仅将新消息加入待发送队列,避免重复下发题目/触发验证限速
const chalKey = `chal:${existingChallenge}`;
const state = await safeGetJSON(env, chalKey, null);
// KV 可能存在不一致/过期:自愈清理后重新下发
if (!state || state.userId !== userId) {
await env.TOPIC_MAP.delete(`user_challenge:${userId}`);
} else {
if (pendingMsgId) {
let pendingIds = [];
if (Array.isArray(state.pending_ids)) {
pendingIds = state.pending_ids.slice();
} else if (state.pending) {
pendingIds = [state.pending];
}
if (!pendingIds.includes(pendingMsgId)) {
pendingIds.push(pendingMsgId);
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
}
state.pending_ids = pendingIds;
delete state.pending;
await env.TOPIC_MAP.put(chalKey, JSON.stringify(state), { expirationTtl: CONFIG.VERIFY_EXPIRE_SECONDS });
}
}
Logger.debug('verification_duplicate_skipped', { userId, verifyId: existingChallenge, hasPending: !!pendingMsgId });
return;
}
}
// 验证请求速率限制:仅在需要创建新挑战时检查
const verifyLimit = await checkRateLimit(userId, env, 'verify', CONFIG.RATE_LIMIT_VERIFY, 300);
if (!verifyLimit.allowed) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "⚠️ 验证请求过于频繁请5分钟后再试。"
});
return;
}
@@ -638,7 +869,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
const state = {
answerIndex: answerIndex, // 存储索引
options: challenge.options, // 存储完整选项列表
pending: pendingMsgId,
pending_ids: pendingMsgId ? [pendingMsgId] : [],
userId: userId // 添加用户ID验证
};
@@ -651,7 +882,7 @@ async function sendVerificationChallenge(userId, env, pendingMsgId) {
userId,
verifyId,
question: q.question,
hasPending: !!pendingMsgId
pendingCount: state.pending_ids.length
});
// 【修复 #6】按钮使用索引而非文本
@@ -741,6 +972,7 @@ async function handleCallbackQuery(query, env, ctx) {
// 30天有效期 - 使用配置常量
await env.TOPIC_MAP.put(`verified:${userId}`, "1", { expirationTtl: CONFIG.VERIFIED_EXPIRE_SECONDS });
await env.TOPIC_MAP.delete(`needs_verify:${userId}`);
// 【修复 #1】清理所有相关挑战
await env.TOPIC_MAP.delete(`chal:${verifyId}`);
@@ -753,36 +985,48 @@ async function handleCallbackQuery(query, env, ctx) {
parse_mode: "Markdown"
});
if (state.pending) {
const hasPending = (Array.isArray(state.pending_ids) && state.pending_ids.length > 0) || !!state.pending;
if (hasPending) {
try {
// 【修复 #3】检查消息是否已经被转发过
const forwardedKey = `forwarded:${userId}:${state.pending}`;
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
if (alreadyForwarded) {
Logger.info('message_forward_duplicate_skipped', {
userId,
messageId: state.pending
});
return;
let pendingIds = [];
if (Array.isArray(state.pending_ids)) {
pendingIds = state.pending_ids.slice();
} else if (state.pending) {
pendingIds = [state.pending];
}
const fakeMsg = {
message_id: state.pending,
chat: { id: userId, type: "private" },
from: query.from,
};
// 限制一次性转发量,避免用户恶意堆积导致执行超时
if (pendingIds.length > CONFIG.PENDING_MAX_MESSAGES) {
pendingIds = pendingIds.slice(pendingIds.length - CONFIG.PENDING_MAX_MESSAGES);
}
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
let forwardedCount = 0;
for (const pendingId of pendingIds) {
if (!pendingId) continue;
const forwardedKey = `forwarded:${userId}:${pendingId}`;
const alreadyForwarded = await env.TOPIC_MAP.get(forwardedKey);
if (alreadyForwarded) {
Logger.info('message_forward_duplicate_skipped', { userId, messageId: pendingId });
continue;
}
// 【修复 #3】标记已转发
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
const fakeMsg = {
message_id: pendingId,
chat: { id: userId, type: "private" },
from: query.from,
};
await tgCall(env, "sendMessage", {
chat_id: userId,
text: "📩 刚才的消息已帮您送达。",
reply_to_message_id: state.pending
});
await forwardToTopic(fakeMsg, userId, `user:${userId}`, env, ctx);
await env.TOPIC_MAP.put(forwardedKey, "1", { expirationTtl: 3600 });
forwardedCount++;
}
if (forwardedCount > 0) {
await tgCall(env, "sendMessage", {
chat_id: userId,
text: `📩 刚才的 ${forwardedCount} 条消息已帮您送达。`
});
}
} catch (e) {
Logger.error('pending_message_forward_failed', e, { userId });
await tgCall(env, "sendMessage", {
@@ -838,12 +1082,11 @@ async function handleCallbackQuery(query, env, ctx) {
*/
async function handleCleanupCommand(threadId, env) {
// 发送处理中的消息
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: "🔄 **正在扫描需要清理的用户...**",
parse_mode: "Markdown"
});
}, threadId));
let cleanedCount = 0;
let errorCount = 0;
@@ -866,24 +1109,10 @@ async function handleCleanupCommand(threadId, env) {
const topicThreadId = rec.thread_id;
// 检测话题是否存在:尝试向话题发送测试消息
const testRes = await tgCall(env, "sendMessage", {
chat_id: env.SUPERGROUP_ID,
message_thread_id: topicThreadId,
text: "", // 零宽度字符
});
const probe = await probeForumThread(env, topicThreadId, { userId, reason: "cleanup_check" });
const actualThreadId = testRes.result?.message_thread_id;
// 同时检测 Telegram 静默重定向到 General 的场景
if (!testRes.ok || (actualThreadId && Number(actualThreadId) !== Number(topicThreadId))) {
const desc = (testRes.description || "").toLowerCase();
const redirected = actualThreadId && Number(actualThreadId) !== Number(topicThreadId);
const isTopicDeleted = redirected ||
desc.includes("thread not found") ||
desc.includes("topic not found") ||
desc.includes("message thread not found");
if (isTopicDeleted) {
// cleanup 要求更保守:仅在明确缺失/重定向时清理,避免误删有效记录
if (probe.status === "redirected" || probe.status === "missing") {
await env.TOPIC_MAP.delete(name);
await env.TOPIC_MAP.delete(`verified:${userId}`);
await env.TOPIC_MAP.delete(`thread:${topicThreadId}`);
@@ -893,19 +1122,20 @@ async function handleCleanupCommand(threadId, env) {
threadId: topicThreadId,
title: rec.title || "未知"
};
}
}
// 话题存在或检测后,尝试删除测试消息
if (testRes.ok && testRes.result?.message_id) {
try {
await tgCall(env, "deleteMessage", {
chat_id: env.SUPERGROUP_ID,
message_id: testRes.result.message_id
});
} catch (e) {
// 删除失败不影响主流程
}
} else if (probe.status === "probe_invalid") {
Logger.warn('cleanup_probe_invalid_message', {
userId,
threadId: topicThreadId,
errorDescription: probe.description
});
} else if (probe.status === "unknown_error") {
Logger.warn('cleanup_probe_failed_unknown', {
userId,
threadId: topicThreadId,
errorDescription: probe.description
});
} else if (probe.status === "missing_thread_id") {
Logger.warn('cleanup_probe_missing_thread_id', { userId, threadId: topicThreadId });
}
return null;
@@ -958,21 +1188,19 @@ async function handleCleanupCommand(threadId, env) {
totalUsers: allKeys.length
});
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: reportText,
parse_mode: "Markdown"
});
}, threadId));
} catch (e) {
Logger.error('cleanup_failed', e, { threadId });
await tgCall(env, "sendMessage", {
await tgCall(env, "sendMessage", withMessageThreadId({
chat_id: env.SUPERGROUP_ID,
message_thread_id: threadId,
text: `❌ **清理过程出错**\n\n错误信息: \`${e.message}\``,
parse_mode: "Markdown"
});
}, threadId));
}
}
@@ -995,9 +1223,22 @@ async function createTopic(from, key, env, userId) {
// 【修复 #2】更新话题状态 - 修复异步操作未等待
async function updateThreadStatus(threadId, isClosed, env) {
try {
const allKeys = await getAllKeys(env, "user:");
const mappedUser = await env.TOPIC_MAP.get(`thread:${threadId}`);
if (mappedUser) {
const userKey = `user:${mappedUser}`;
const rec = await safeGetJSON(env, userKey, null);
if (rec && Number(rec.thread_id) === Number(threadId)) {
rec.closed = isClosed;
await env.TOPIC_MAP.put(userKey, JSON.stringify(rec));
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: 1 });
return;
}
// 收集所有更新操作
// 映射失效:清理后降级全量扫描
await env.TOPIC_MAP.delete(`thread:${threadId}`);
}
const allKeys = await getAllKeys(env, "user:");
const updates = [];
for (const { name } of allKeys) {
@@ -1008,14 +1249,8 @@ async function updateThreadStatus(threadId, isClosed, env) {
}
}
// 等待所有更新完成
await Promise.all(updates);
Logger.info('thread_status_updated', {
threadId,
isClosed,
updatedCount: updates.length
});
Logger.info('thread_status_updated', { threadId, isClosed, updatedCount: updates.length });
} catch (e) {
Logger.error('thread_status_update_failed', e, { threadId, isClosed });
throw e;
@@ -1119,11 +1354,15 @@ async function handleMediaGroup(msg, env, ctx, { direction, targetChat, threadId
const key = `mg:${direction}:${groupId}`;
const item = extractMedia(msg);
if (!item) {
await tgCall(env, "copyMessage", { chat_id: targetChat, from_chat_id: msg.chat.id, message_id: msg.message_id, message_thread_id: threadId });
await tgCall(env, "copyMessage", withMessageThreadId({
chat_id: targetChat,
from_chat_id: msg.chat.id,
message_id: msg.message_id
}, threadId));
return;
}
let rec = await safeGetJSON(env, key, null);
if (!rec) rec = { direction, targetChat, threadId, items: [], last_ts: Date.now() };
if (!rec) rec = { direction, targetChat, threadId: (threadId === null ? undefined : threadId), items: [], last_ts: Date.now() };
rec.items.push({ ...item, msg_id: msg.message_id });
rec.last_ts = Date.now();
await env.TOPIC_MAP.put(key, JSON.stringify(rec), { expirationTtl: CONFIG.MEDIA_GROUP_EXPIRE_SECONDS });
@@ -1226,7 +1465,7 @@ async function delaySend(env, key, ts) {
}
// 【修复 #28】限制 caption 长度
const caption = i === 0 ? (it.cap || "").substring(0, 1024) : "";
return {
return {
type: it.type,
media: it.id,
caption
@@ -1235,11 +1474,10 @@ async function delaySend(env, key, ts) {
if (media.length > 0) {
try {
const result = await tgCall(env, "sendMediaGroup", {
const result = await tgCall(env, "sendMediaGroup", withMessageThreadId({
chat_id: rec.targetChat,
message_thread_id: rec.threadId,
media
});
}, rec.threadId));
if (!result.ok) {
Logger.error('media_group_send_failed', result.description, {