feat(telegram): 使用 telegramify_markdown 库标准化消息内容,增强长消息与复杂格式的处理能力

This commit is contained in:
Attente
2025-11-24 20:59:32 +08:00
parent c96633eb83
commit 6c1e0058c1

View File

@@ -1,19 +1,21 @@
import asyncio
import re
import threading
import uuid
from pathlib import Path
from threading import Event
from typing import Optional, List, Dict, Callable
from urllib.parse import urljoin
import telebot
from telegramify_markdown import standardize, telegramify
from telegramify_markdown.type import ContentTypes, SentType
from telebot import apihelper
from telebot.types import InputFile, InlineKeyboardMarkup, InlineKeyboardButton
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
from telebot.types import InputMediaPhoto
from app.core.config import settings
from app.core.context import MediaInfo, Context
from app.core.metainfo import MetaInfo
from app.helper.thread import ThreadHelper
from app.log import logger
from app.utils.common import retry
from app.utils.http import RequestUtils
@@ -52,7 +54,7 @@ class Telegram:
else:
apihelper.proxy = settings.PROXY
# bot
_bot = telebot.TeleBot(self._telegram_token, parse_mode="Markdown")
_bot = telebot.TeleBot(self._telegram_token, parse_mode="MarkdownV2")
# 记录句柄
self._bot = _bot
# 获取并存储bot用户名用于@检测
@@ -236,12 +238,14 @@ class Telegram:
return False
try:
if text:
# 对text进行Markdown特殊字符转义
text = re.sub(r"([_`])", r"\\\1", text)
caption = f"*{title}*\n{text}"
if title and text:
caption = f"**{title}**\n{text}"
elif title:
caption = f"**{title}**"
elif text:
caption = text
else:
caption = f"*{title}*"
caption = ""
if link:
caption = f"{caption}\n[查看详情]({link})"
@@ -499,7 +503,7 @@ class Telegram:
if image:
# 如果有图片使用edit_message_media
media = InputMediaPhoto(media=image, caption=text, parse_mode="Markdown")
media = InputMediaPhoto(media=image, caption=standardize(text), parse_mode="MarkdownV2")
self._bot.edit_message_media(
chat_id=chat_id,
message_id=message_id,
@@ -511,8 +515,8 @@ class Telegram:
self._bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
parse_mode="Markdown",
text=standardize(text),
parse_mode="MarkdownV2",
reply_markup=reply_markup
)
return True
@@ -520,49 +524,120 @@ class Telegram:
logger.error(f"编辑消息失败:{str(e)}")
return False
@retry(RetryException, logger=logger)
def __send_request(self, userid: Optional[str] = None, image="", caption="",
reply_markup: Optional[InlineKeyboardMarkup] = None) -> bool:
"""
向Telegram发送报文
:param reply_markup: 内联键盘
"""
if image:
res = RequestUtils(proxies=settings.PROXY, ua=settings.NORMAL_USER_AGENT).get_res(image)
if res is None:
raise Exception("获取图片失败")
if res.content:
# 使用随机标识构建图片文件的完整路径,并写入图片内容到文件
image_file = Path(settings.TEMP_PATH) / "telegram" / str(uuid.uuid4())
if not image_file.parent.exists():
image_file.parent.mkdir(parents=True, exist_ok=True)
image_file.write_bytes(res.content)
photo = InputFile(image_file)
# 发送图片到Telegram
ret = self._bot.send_photo(chat_id=userid or self._telegram_chat_id,
photo=photo,
caption=caption,
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise RetryException("发送图片消息失败")
return True
# 按4096分段循环发送消息
ret = None
if len(caption) > 4095:
for i in range(0, len(caption), 4095):
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption[i:i + 4095],
parse_mode="Markdown",
reply_markup=reply_markup if i == 0 else None)
else:
ret = self._bot.send_message(chat_id=userid or self._telegram_chat_id,
text=caption,
parse_mode="Markdown",
reply_markup=reply_markup)
if ret is None:
raise RetryException("发送文本消息失败")
return True if ret else False
kwargs = {
'chat_id': userid or self._telegram_chat_id,
'parse_mode': "MarkdownV2",
'reply_markup': reply_markup
}
try:
# 处理图片
image = self.__process_image(image) if image else None
# 图片消息的标题长度限制为1024文本消息为4096
caption_limit = 1024 if image else 4096
if len(caption) < caption_limit:
ret = self.__send_short_message(image, caption, **kwargs)
else:
sent_idx = set()
ret = self.__send_long_message(image, caption, sent_idx, **kwargs)
return ret is not None
except Exception as e:
logger.error(f"发送Telegram消息失败: {e}")
return False
@retry(RetryException, logger=logger)
def __process_image(self, image_url: str) -> bytes:
"""
处理图片URL获取图片内容
"""
try:
res = RequestUtils(
proxies=settings.PROXY,
ua=settings.NORMAL_USER_AGENT
).get_res(image_url)
if not res or not res.content:
raise RetryException("获取图片失败")
return res.content
except Exception as e:
raise
@retry(RetryException, logger=logger)
def __send_short_message(self, image: Optional[bytes], caption: str, **kwargs):
"""
发送短消息
"""
try:
if image:
return self._bot.send_photo(
photo=image,
caption=standardize(caption),
**kwargs
)
else:
return self._bot.send_message(
text=standardize(caption),
**kwargs
)
except Exception as e:
raise RetryException(f"发送{'图片' if image else '文本'}消息失败")
@retry(RetryException, logger=logger)
def __send_long_message(self, image: Optional[bytes], caption: str, sent_idx: set, **kwargs):
"""
发送长消息
"""
try:
reply_markup = kwargs.pop("reply_markup", None)
boxs: SentType = ThreadHelper().submit(lambda x: asyncio.run(telegramify(x)), caption).result()
ret = None
for i, item in enumerate(boxs):
if i in sent_idx:
# 跳过已发送消息
continue
current_reply_markup = reply_markup if i == 0 else None
if item.content_type == ContentTypes.TEXT and (i != 0 or not image):
ret = self._bot.send_message(**kwargs,
text=item.content,
reply_markup=current_reply_markup
)
elif item.content_type == ContentTypes.PHOTO or (image and i == 0):
ret = self._bot.send_photo(**kwargs,
photo=(getattr(item, "file_name", ""),
getattr(item, "file_data", image)),
caption=getattr(item, "caption", item.content),
reply_markup=current_reply_markup
)
elif item.content_type == ContentTypes.FILE:
ret = self._bot.send_document(**kwargs,
document=(item.file_name, item.file_data),
caption=item.caption,
reply_markup=current_reply_markup
)
sent_idx.add(i)
return ret
except Exception as e:
try:
raise RetryException(f"消息 [{i + 1}/{len(boxs)}] 发送失败") from e
except NameError:
raise RetryException("发送长消息失败") from e
def register_commands(self, commands: Dict[str, dict]):
"""