diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py index a9c55a89..eedab3a5 100644 --- a/app/modules/telegram/__init__.py +++ b/app/modules/telegram/__init__.py @@ -1,5 +1,6 @@ import copy import json +import re from typing import Dict from typing import Optional, Union, List, Tuple, Any @@ -195,20 +196,25 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): text = msg.get("text") user_id = msg.get("from", {}).get("id") user_name = msg.get("from", {}).get("username") + # Extract chat_id to enable correct reply targeting + chat_id = msg.get("chat", {}).get("id") if text and user_id: logger.info(f"收到来自 {client_config.name} 的Telegram消息:" - f"userid={user_id}, username={user_name}, text={text}") + f"userid={user_id}, username={user_name}, chat_id={chat_id}, text={text}") + + # Clean bot mentions from text to ensure consistent processing + cleaned_text = TelegramModule._clean_bot_mention(text, client._bot_username if client else None) # 检查权限 admin_users = client_config.config.get("TELEGRAM_ADMINS") user_list = client_config.config.get("TELEGRAM_USERS") - chat_id = client_config.config.get("TELEGRAM_CHAT_ID") + config_chat_id = client_config.config.get("TELEGRAM_CHAT_ID") - if text.startswith("/"): + if cleaned_text.startswith("/"): if admin_users \ and str(user_id) not in admin_users.split(',') \ - and str(user_id) != chat_id: + and str(user_id) != config_chat_id: client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id) return None else: @@ -223,10 +229,38 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]): source=client_config.name, userid=user_id, username=user_name, - text=text + text=cleaned_text, # Use cleaned text + chat_id=str(chat_id) if chat_id else None ) return None + @staticmethod + def _clean_bot_mention(text: str, bot_username: Optional[str]) -> str: + """ + 清理消息中的@bot部分,确保文本处理一致性 + :param text: 原始消息文本 + :param bot_username: bot用户名 + :return: 清理后的文本 + """ + if not text or not bot_username: + return text + + # Remove @bot_username from the beginning and any position in text + cleaned = text + mention_pattern = f"@{bot_username}" + + # Remove mention at the beginning with optional following space + if cleaned.startswith(mention_pattern): + cleaned = cleaned[len(mention_pattern):].lstrip() + + # Remove mention at any other position + cleaned = cleaned.replace(mention_pattern, "").strip() + + # Clean up multiple spaces + cleaned = re.sub(r'\s+', ' ', cleaned).strip() + + return cleaned + def post_message(self, message: Notification) -> None: """ 发送消息 diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index feb6a437..afab8d98 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -25,6 +25,8 @@ class Telegram: _event = Event() _bot: telebot.TeleBot = None _callback_handlers: Dict[str, Callable] = {} # 存储回调处理器 + _user_chat_mapping: Dict[str, str] = {} # userid -> chat_id mapping for reply targeting + _bot_username: Optional[str] = None # Bot username for mention detection def __init__(self, TELEGRAM_TOKEN: Optional[str] = None, TELEGRAM_CHAT_ID: Optional[str] = None, **kwargs): """ @@ -49,6 +51,15 @@ class Telegram: _bot = telebot.TeleBot(self._telegram_token, parse_mode="Markdown") # 记录句柄 self._bot = _bot + # 获取并存储bot用户名用于@检测 + try: + bot_info = _bot.get_me() + self._bot_username = bot_info.username + logger.info(f"Telegram bot用户名: @{self._bot_username}") + except Exception as e: + logger.error(f"获取bot信息失败: {e}") + self._bot_username = None + # 标记渠道来源 if kwargs.get("name"): self._ds_url = f"{self._ds_url}&source={kwargs.get('name')}" @@ -59,7 +70,12 @@ class Telegram: @_bot.message_handler(func=lambda message: True) def echo_all(message): - RequestUtils(timeout=15).post_res(self._ds_url, json=message.json) + # Update user-chat mapping when receiving messages + self._update_user_chat_mapping(message.from_user.id, message.chat.id) + + # Check if we should process this message + if self._should_process_message(message): + RequestUtils(timeout=15).post_res(self._ds_url, json=message.json) @_bot.callback_query_handler(func=lambda call: True) def callback_query(call): @@ -67,6 +83,9 @@ class Telegram: 处理按钮点击回调 """ try: + # Update user-chat mapping for callbacks too + self._update_user_chat_mapping(call.from_user.id, call.message.chat.id) + # 解析回调数据 callback_data = call.data user_id = str(call.from_user.id) @@ -112,6 +131,68 @@ class Telegram: self._polling_thread.start() logger.info("Telegram消息接收服务启动") + def _update_user_chat_mapping(self, userid: int, chat_id: int) -> None: + """ + 更新用户与聊天的映射关系 + :param userid: 用户ID + :param chat_id: 聊天ID + """ + if userid and chat_id: + self._user_chat_mapping[str(userid)] = str(chat_id) + + def _get_user_chat_id(self, userid: str) -> Optional[str]: + """ + 获取用户对应的聊天ID + :param userid: 用户ID + :return: 聊天ID或None + """ + return self._user_chat_mapping.get(str(userid)) if userid else None + + def _should_process_message(self, message) -> bool: + """ + 判断是否应该处理这条消息 + :param message: Telegram消息对象 + :return: 是否处理 + """ + # 私聊消息总是处理 + if message.chat.type == 'private': + logger.debug(f"处理私聊消息:用户 {message.from_user.id}") + return True + + # 群聊中的命令消息总是处理(以/开头) + if message.text and message.text.startswith('/'): + logger.debug(f"处理群聊命令消息:{message.text[:20]}...") + return True + + # 群聊中检查是否@了机器人 + if message.chat.type in ['group', 'supergroup']: + if not self._bot_username: + # 如果没有获取到bot用户名,为了安全起见处理所有消息 + logger.debug("未获取到bot用户名,处理所有群聊消息") + return True + + # 检查消息文本中是否包含@bot_username + if message.text and f"@{self._bot_username}" in message.text: + logger.debug(f"检测到@{self._bot_username},处理群聊消息") + return True + + # 检查消息实体中是否有提及bot + if message.entities: + for entity in message.entities: + if entity.type == 'mention': + mention_text = message.text[entity.offset:entity.offset + entity.length] + if mention_text == f"@{self._bot_username}": + logger.debug(f"通过实体检测到@{self._bot_username},处理群聊消息") + return True + + # 群聊中没有@机器人,不处理 + logger.debug(f"群聊消息未@机器人,跳过处理:{message.text[:30] if message.text else 'No text'}...") + return False + + # 其他类型的聊天默认处理 + logger.debug(f"处理其他类型聊天消息:{message.chat.type}") + return True + def get_state(self) -> bool: """ 获取状态 @@ -153,10 +234,8 @@ class Telegram: if link: caption = f"{caption}\n[查看详情]({link})" - if userid: - chat_id = userid - else: - chat_id = self._telegram_chat_id + # Determine target chat_id with improved logic using user mapping + chat_id = self._determine_target_chat_id(userid, original_chat_id) # 创建按钮键盘 reply_markup = None @@ -175,6 +254,29 @@ class Telegram: logger.error(f"发送消息失败:{msg_e}") return False + def _determine_target_chat_id(self, userid: Optional[str] = None, + original_chat_id: Optional[str] = None) -> str: + """ + 确定目标聊天ID,使用用户映射确保回复到正确的聊天 + :param userid: 用户ID + :param original_chat_id: 原消息的聊天ID + :return: 目标聊天ID + """ + # 1. 优先使用原消息的聊天ID (编辑消息场景) + if original_chat_id: + return original_chat_id + + # 2. 如果有userid,尝试从映射中获取用户的聊天ID + if userid: + mapped_chat_id = self._get_user_chat_id(userid) + if mapped_chat_id: + return mapped_chat_id + # 如果映射中没有,回退到使用userid作为聊天ID (私聊场景) + return userid + + # 3. 最后使用默认聊天ID + return self._telegram_chat_id + def send_medias_msg(self, medias: List[MediaInfo], userid: Optional[str] = None, title: Optional[str] = None, link: Optional[str] = None, buttons: Optional[List[List[Dict]]] = None, @@ -216,10 +318,8 @@ class Telegram: if link: caption = f"{caption}\n[查看详情]({link})" - if userid: - chat_id = userid - else: - chat_id = self._telegram_chat_id + # Determine target chat_id with improved logic using user mapping + chat_id = self._determine_target_chat_id(userid, original_chat_id) # 创建按钮键盘 reply_markup = None @@ -278,10 +378,8 @@ class Telegram: if link: caption = f"{caption}\n[查看详情]({link})" - if userid: - chat_id = userid - else: - chat_id = self._telegram_chat_id + # Determine target chat_id with improved logic using user mapping + chat_id = self._determine_target_chat_id(userid, original_chat_id) # 创建按钮键盘 reply_markup = None