Merge pull request #4631 from ChanningHe/fix-telegram-msg

This commit is contained in:
jxxghp
2025-07-18 21:22:17 +08:00
committed by GitHub
2 changed files with 150 additions and 18 deletions

View File

@@ -1,5 +1,6 @@
import copy
import json
import re
from typing import Dict
from typing import Optional, Union, List, Tuple, Any
@@ -195,20 +196,25 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
text = msg.get("text")
user_id = msg.get("from", {}).get("id")
user_name = msg.get("from", {}).get("username")
# Extract chat_id to enable correct reply targeting
chat_id = msg.get("chat", {}).get("id")
if text and user_id:
logger.info(f"收到来自 {client_config.name} 的Telegram消息"
f"userid={user_id}, username={user_name}, text={text}")
f"userid={user_id}, username={user_name}, chat_id={chat_id}, text={text}")
# Clean bot mentions from text to ensure consistent processing
cleaned_text = TelegramModule._clean_bot_mention(text, client._bot_username if client else None)
# 检查权限
admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
config_chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if text.startswith("/"):
if cleaned_text.startswith("/"):
if admin_users \
and str(user_id) not in admin_users.split(',') \
and str(user_id) != chat_id:
and str(user_id) != config_chat_id:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
return None
else:
@@ -223,10 +229,38 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
source=client_config.name,
userid=user_id,
username=user_name,
text=text
text=cleaned_text, # Use cleaned text
chat_id=str(chat_id) if chat_id else None
)
return None
@staticmethod
def _clean_bot_mention(text: str, bot_username: Optional[str]) -> str:
"""
清理消息中的@bot部分确保文本处理一致性
:param text: 原始消息文本
:param bot_username: bot用户名
:return: 清理后的文本
"""
if not text or not bot_username:
return text
# Remove @bot_username from the beginning and any position in text
cleaned = text
mention_pattern = f"@{bot_username}"
# Remove mention at the beginning with optional following space
if cleaned.startswith(mention_pattern):
cleaned = cleaned[len(mention_pattern):].lstrip()
# Remove mention at any other position
cleaned = cleaned.replace(mention_pattern, "").strip()
# Clean up multiple spaces
cleaned = re.sub(r'\s+', ' ', cleaned).strip()
return cleaned
def post_message(self, message: Notification) -> None:
"""
发送消息

View File

@@ -25,6 +25,8 @@ class Telegram:
_event = Event()
_bot: telebot.TeleBot = None
_callback_handlers: Dict[str, Callable] = {} # 存储回调处理器
_user_chat_mapping: Dict[str, str] = {} # userid -> chat_id mapping for reply targeting
_bot_username: Optional[str] = None # Bot username for mention detection
def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs):
"""
@@ -49,6 +51,15 @@ class Telegram:
_bot = telebot.TeleBot(self._telegram_token, parse_mode="Markdown")
# 记录句柄
self._bot = _bot
# 获取并存储bot用户名用于@检测
try:
bot_info = _bot.get_me()
self._bot_username = bot_info.username
logger.info(f"Telegram bot用户名: @{self._bot_username}")
except Exception as e:
logger.error(f"获取bot信息失败: {e}")
self._bot_username = None
# 标记渠道来源
if kwargs.get("name"):
self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}"
@@ -59,7 +70,12 @@ class Telegram:
@_bot.message_handler(func=lambda message: True)
def echo_all(message):
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
# Update user-chat mapping when receiving messages
self._update_user_chat_mapping(message.from_user.id, message.chat.id)
# Check if we should process this message
if self._should_process_message(message):
RequestUtils(timeout=15).post_res(self._ds_url, json=message.json)
@_bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
@@ -67,6 +83,9 @@ class Telegram:
处理按钮点击回调
"""
try:
# Update user-chat mapping for callbacks too
self._update_user_chat_mapping(call.from_user.id, call.message.chat.id)
# 解析回调数据
callback_data = call.data
user_id = str(call.from_user.id)
@@ -112,6 +131,68 @@ class Telegram:
self._polling_thread.start()
logger.info("Telegram消息接收服务启动")
def _update_user_chat_mapping(self, userid: int, chat_id: int) -> None:
"""
更新用户与聊天的映射关系
:param userid: 用户ID
:param chat_id: 聊天ID
"""
if userid and chat_id:
self._user_chat_mapping[str(userid)] = str(chat_id)
def _get_user_chat_id(self, userid: str) -> Optional[str]:
"""
获取用户对应的聊天ID
:param userid: 用户ID
:return: 聊天ID或None
"""
return self._user_chat_mapping.get(str(userid)) if userid else None
def _should_process_message(self, message) -> bool:
"""
判断是否应该处理这条消息
:param message: Telegram消息对象
:return: 是否处理
"""
# 私聊消息总是处理
if message.chat.type == 'private':
logger.debug(f"处理私聊消息:用户 {message.from_user.id}")
return True
# 群聊中的命令消息总是处理(以/开头)
if message.text and message.text.startswith('/'):
logger.debug(f"处理群聊命令消息:{message.text[:20]}...")
return True
# 群聊中检查是否@了机器人
if message.chat.type in ['group', 'supergroup']:
if not self._bot_username:
# 如果没有获取到bot用户名为了安全起见处理所有消息
logger.debug("未获取到bot用户名处理所有群聊消息")
return True
# 检查消息文本中是否包含@bot_username
if message.text and f"@{self._bot_username}" in message.text:
logger.debug(f"检测到@{self._bot_username},处理群聊消息")
return True
# 检查消息实体中是否有提及bot
if message.entities:
for entity in message.entities:
if entity.type == 'mention':
mention_text = message.text[entity.offset:entity.offset + entity.length]
if mention_text == f"@{self._bot_username}":
logger.debug(f"通过实体检测到@{self._bot_username},处理群聊消息")
return True
# 群聊中没有@机器人,不处理
logger.debug(f"群聊消息未@机器人,跳过处理:{message.text[:30] if message.text else 'No text'}...")
return False
# 其他类型的聊天默认处理
logger.debug(f"处理其他类型聊天消息:{message.chat.type}")
return True
def get_state(self) -> bool:
"""
获取状态
@@ -153,10 +234,8 @@ class Telegram:
if link:
caption = f"{caption}\n[查看详情]({link})"
if userid:
chat_id = userid
else:
chat_id = self._telegram_chat_id
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
@@ -175,6 +254,29 @@ class Telegram:
logger.error(f"发送消息失败:{msg_e}")
return False
def _determine_target_chat_id(self, userid: Optional[str] = None,
original_chat_id: Optional[str] = None) -> str:
"""
确定目标聊天ID使用用户映射确保回复到正确的聊天
:param userid: 用户ID
:param original_chat_id: 原消息的聊天ID
:return: 目标聊天ID
"""
# 1. 优先使用原消息的聊天ID (编辑消息场景)
if original_chat_id:
return original_chat_id
# 2. 如果有userid尝试从映射中获取用户的聊天ID
if userid:
mapped_chat_id = self._get_user_chat_id(userid)
if mapped_chat_id:
return mapped_chat_id
# 如果映射中没有回退到使用userid作为聊天ID (私聊场景)
return userid
# 3. 最后使用默认聊天ID
return self._telegram_chat_id
def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None,
title: Optional[str] = None, link: Optional[str] = None,
buttons: Optional[List[List[Dict]]] = None,
@@ -216,10 +318,8 @@ class Telegram:
if link:
caption = f"{caption}\n[查看详情]({link})"
if userid:
chat_id = userid
else:
chat_id = self._telegram_chat_id
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
@@ -278,10 +378,8 @@ class Telegram:
if link:
caption = f"{caption}\n[查看详情]({link})"
if userid:
chat_id = userid
else:
chat_id = self._telegram_chat_id
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None