From 6c1e0058c168c081a91abf73c354abccfe34e16b Mon Sep 17 00:00:00 2001 From: Attente <19653207+wikrin@users.noreply.github.com> Date: Mon, 24 Nov 2025 20:59:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(telegram):=20=E4=BD=BF=E7=94=A8=20`telegra?= =?UTF-8?q?mify=5Fmarkdown`=20=E5=BA=93=E6=A0=87=E5=87=86=E5=8C=96?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=86=85=E5=AE=B9=EF=BC=8C=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=E9=95=BF=E6=B6=88=E6=81=AF=E4=B8=8E=E5=A4=8D=E6=9D=82=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E7=9A=84=E5=A4=84=E7=90=86=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/telegram/telegram.py | 173 ++++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 49 deletions(-) diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index a3c72898..b4ba7c6d 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -1,19 +1,21 @@ +import asyncio import re import threading -import uuid -from pathlib import Path from threading import Event from typing import Optional, List, Dict, Callable from urllib.parse import urljoin import telebot +from telegramify_markdown import standardize, telegramify +from telegramify_markdown.type import ContentTypes, SentType from telebot import apihelper -from telebot.types import InputFile, InlineKeyboardMarkup, InlineKeyboardButton +from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton from telebot.types import InputMediaPhoto from app.core.config import settings from app.core.context import MediaInfo, Context from app.core.metainfo import MetaInfo +from app.helper.thread import ThreadHelper from app.log import logger from app.utils.common import retry from app.utils.http import RequestUtils @@ -52,7 +54,7 @@ class Telegram: else: apihelper.proxy = settings.PROXY # bot - _bot = telebot.TeleBot(self._telegram_token, parse_mode="Markdown") + _bot = telebot.TeleBot(self._telegram_token, parse_mode="MarkdownV2") # 记录句柄 self._bot = _bot # 获取并存储bot用户名用于@检测 @@ -236,12 +238,14 @@ class Telegram: return False try: - if text: - # 对text进行Markdown特殊字符转义 - text = re.sub(r"([_`])", r"\\\1", text) - caption = f"*{title}*\n{text}" + if title and text: + caption = f"**{title}**\n{text}" + elif title: + caption = f"**{title}**" + elif text: + caption = text else: - caption = f"*{title}*" + caption = "" if link: caption = f"{caption}\n[查看详情]({link})" @@ -499,7 +503,7 @@ class Telegram: if image: # 如果有图片,使用edit_message_media - media = InputMediaPhoto(media=image, caption=text, parse_mode="Markdown") + media = InputMediaPhoto(media=image, caption=standardize(text), parse_mode="MarkdownV2") self._bot.edit_message_media( chat_id=chat_id, message_id=message_id, @@ -511,8 +515,8 @@ class Telegram: self._bot.edit_message_text( chat_id=chat_id, message_id=message_id, - text=text, - parse_mode="Markdown", + text=standardize(text), + parse_mode="MarkdownV2", reply_markup=reply_markup ) return True @@ -520,49 +524,120 @@ class Telegram: logger.error(f"编辑消息失败:{str(e)}") return False - @retry(RetryException, logger=logger) def __send_request(self, userid: Optional[str] = None, image="", caption="", reply_markup: Optional[InlineKeyboardMarkup] = None) -> bool: """ 向Telegram发送报文 :param reply_markup: 内联键盘 """ - if image: - res = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT).get_res(image) - if res is None: - raise Exception("获取图片失败") - if res.content: - # 使用随机标识构建图片文件的完整路径,并写入图片内容到文件 - image_file = Path(settings.TEMP_PATH) / "telegram" / str(uuid.uuid4()) - if not image_file.parent.exists(): - image_file.parent.mkdir(parents=True, exist_ok=True) - image_file.write_bytes(res.content) - photo = InputFile(image_file) - # 发送图片到Telegram - ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id, - photo=photo, - caption=caption, - parse_mode="Markdown", - reply_markup=reply_markup) - if ret is None: - raise RetryException("发送图片消息失败") - return True - # 按4096分段循环发送消息 - ret = None - if len(caption) > 4095: - for i in range(0, len(caption), 4095): - ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id, - text=caption[i:i + 4095], - parse_mode="Markdown", - reply_markup=reply_markup if i == 0 else None) - else: - ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id, - text=caption, - parse_mode="Markdown", - reply_markup=reply_markup) - if ret is None: - raise RetryException("发送文本消息失败") - return True if ret else False + kwargs = { + 'chat_id': userid or self._telegram_chat_id, + 'parse_mode': "MarkdownV2", + 'reply_markup': reply_markup + } + + try: + # 处理图片 + image = self.__process_image(image) if image else None + + # 图片消息的标题长度限制为1024,文本消息为4096 + caption_limit = 1024 if image else 4096 + if len(caption) < caption_limit: + ret = self.__send_short_message(image, caption, **kwargs) + else: + sent_idx = set() + ret = self.__send_long_message(image, caption, sent_idx, **kwargs) + + return ret is not None + except Exception as e: + logger.error(f"发送Telegram消息失败: {e}") + return False + + @retry(RetryException, logger=logger) + def __process_image(self, image_url: str) -> bytes: + """ + 处理图片URL,获取图片内容 + """ + try: + res = RequestUtils( + proxies=settings.PROXY, + ua=settings.NORMAL_USER_AGENT + ).get_res(image_url) + + if not res or not res.content: + raise RetryException("获取图片失败") + + return res.content + except Exception as e: + raise + + @retry(RetryException, logger=logger) + def __send_short_message(self, image: Optional[bytes], caption: str, **kwargs): + """ + 发送短消息 + """ + try: + if image: + return self._bot.send_photo( + photo=image, + caption=standardize(caption), + **kwargs + ) + else: + return self._bot.send_message( + text=standardize(caption), + **kwargs + ) + except Exception as e: + raise RetryException(f"发送{'图片' if image else '文本'}消息失败") + + @retry(RetryException, logger=logger) + def __send_long_message(self, image: Optional[bytes], caption: str, sent_idx: set, **kwargs): + """ + 发送长消息 + """ + try: + reply_markup = kwargs.pop("reply_markup", None) + + boxs: SentType = ThreadHelper().submit(lambda x: asyncio.run(telegramify(x)), caption).result() + + ret = None + for i, item in enumerate(boxs): + if i in sent_idx: + # 跳过已发送消息 + continue + + current_reply_markup = reply_markup if i == 0 else None + + if item.content_type == ContentTypes.TEXT and (i != 0 or not image): + ret = self._bot.send_message(**kwargs, + text=item.content, + reply_markup=current_reply_markup + ) + + elif item.content_type == ContentTypes.PHOTO or (image and i == 0): + ret = self._bot.send_photo(**kwargs, + photo=(getattr(item, "file_name", ""), + getattr(item, "file_data", image)), + caption=getattr(item, "caption", item.content), + reply_markup=current_reply_markup + ) + + elif item.content_type == ContentTypes.FILE: + ret = self._bot.send_document(**kwargs, + document=(item.file_name, item.file_data), + caption=item.caption, + reply_markup=current_reply_markup + ) + + sent_idx.add(i) + + return ret + except Exception as e: + try: + raise RetryException(f"消息 [{i + 1}/{len(boxs)}] 发送失败") from e + except NameError: + raise RetryException("发送长消息失败") from e def register_commands(self, commands: Dict[str, dict]): """