feat(message-processing-status): unified processing status indicator for Telegram, Slack, Discord, Feishu

- Add ChannelCapability.PROCESSING_STATUS and capability detection for supported channels
- Implement mark_message_processing_started/finished in Telegram, Slack, Discord, Feishu modules
  - Telegram: manage typing lifecycle with max duration and explicit stop
  - Slack: add/remove reaction as processing indicator
  - Discord: start/stop typing indicator with async task management
  - Feishu: add/remove reaction for processing status
- Refactor message chain to invoke processing status hooks for supported channels
- Ensure processing status is properly finished on sync and async message handling paths
- Add tests for processing status lifecycle and capability detection across channels
This commit is contained in:
jxxghp
2026-05-15 12:45:41 +08:00
parent 5a06e7b8bc
commit b2a18f9ae4
13 changed files with 1101 additions and 92 deletions

View File

@@ -596,6 +596,57 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
return True
return False
def mark_message_processing_started(
self,
channel: MessageChannel,
source: str,
userid: Optional[Union[str, int]] = None,
message_id: Optional[Union[str, int]] = None,
chat_id: Optional[Union[str, int]] = None,
text: Optional[str] = None,
) -> Optional[dict]:
"""
标记 Telegram 消息正在处理。
入站侧已经启动 typing 任务,这里只返回可用于统一收口的上下文。
"""
if channel != self._channel:
return None
if not text:
return None
return {
"channel": channel.value,
"source": source,
"userid": userid,
"message_id": message_id,
"chat_id": chat_id,
"metadata": {"kind": "typing"},
}
def mark_message_processing_finished(
self,
channel: MessageChannel,
source: str,
userid: Optional[Union[str, int]] = None,
message_id: Optional[Union[str, int]] = None,
chat_id: Optional[Union[str, int]] = None,
status: Optional[dict] = None,
) -> Optional[bool]:
"""
结束 Telegram typing 状态。
"""
if channel != self._channel:
return None
if status:
chat_id = status.get("chat_id") or chat_id
userid = status.get("userid") or userid
client_config = self.get_config(source)
if not client_config:
return False
client: Telegram = self.get_instance(client_config.name)
if not client:
return False
return client.stop_typing(chat_id=chat_id, userid=userid)
def send_direct_message(self, message: Notification) -> Optional[MessageResponse]:
"""
直接发送消息并返回消息ID等信息

View File

@@ -43,7 +43,12 @@ class Telegram:
] = {} # userid -> chat_id mapping for reply targeting
_bot_username: Optional[str] = None # Bot username for mention detection
_typing_tasks: Dict[str, threading.Thread] = {} # chat_id -> typing任务
_typing_stop_flags: Dict[str, bool] = {} # chat_id -> 停止标志
_typing_stop_flags: Dict[str, threading.Event] = {} # chat_id -> 停止信号
_typing_lock = threading.RLock()
_typing_interval_seconds = 5
_typing_max_duration_seconds = 5 * 60
_typing_command_max_duration_seconds = 30
_typing_callback_max_duration_seconds = 60
def __init__(
self,
@@ -54,13 +59,13 @@ class Telegram:
"""
初始化参数
"""
# 即使配置不完整也保留基础属性,便于测试和未启用实例安全调用发送方法。
self._telegram_token = TELEGRAM_TOKEN
self._telegram_chat_id = TELEGRAM_CHAT_ID
self._polling_thread = None
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
logger.error("Telegram配置不完整")
return
# Token
self._telegram_token = TELEGRAM_TOKEN
# Chat Id
self._telegram_chat_id = TELEGRAM_CHAT_ID
# 初始化机器人
if self._telegram_token and self._telegram_chat_id:
# telegram bot api 地址格式https://api.telegram.org
@@ -114,22 +119,42 @@ class Telegram:
# Check if we should process this message
if self._should_process_message(message):
# 启动持续发送正在输入状态
self._start_typing_task(message.chat.id)
message_text = message.text or message.caption or ""
max_duration = (
self._typing_command_max_duration_seconds
if (
message_text.startswith("/")
and not message_text.lower().startswith("/ai")
)
else None
)
self._start_typing_task(
message.chat.id, max_duration_seconds=max_duration
)
payload = self._serialize_update_payload(message)
if not payload:
logger.warn("Telegram消息序列化失败跳过转发")
self._stop_typing_task(message.chat.id)
return
RequestUtils(timeout=15).post_res(self._ds_url, json=payload)
response = RequestUtils(timeout=15).post_res(
self._ds_url, json=payload
)
if not response or response.status_code >= 400:
logger.warn("Telegram消息转发失败停止typing状态")
self._stop_typing_task(message.chat.id)
@_bot.callback_query_handler(func=lambda call: True)
def callback_query(call):
"""
处理按钮点击回调
"""
chat_id = None
typing_started = False
try:
# Update user-chat mapping for callbacks too
chat_id = call.message.chat.id
self._update_user_chat_mapping(
call.from_user.id, call.message.chat.id
call.from_user.id, chat_id
)
# 解析回调数据
@@ -146,7 +171,7 @@ class Telegram:
"message": {
"message_id": call.message.message_id,
"chat": {
"id": call.message.chat.id,
"id": chat_id,
},
},
"data": callback_data,
@@ -157,13 +182,24 @@ class Telegram:
_bot.answer_callback_query(call.id)
# 启动持续发送正在输入状态
self._start_typing_task(call.message.chat.id)
self._start_typing_task(
chat_id,
max_duration_seconds=self._typing_callback_max_duration_seconds,
)
typing_started = True
# 发送给主程序处理
RequestUtils(timeout=15).post_res(self._ds_url, json=callback_json)
response = RequestUtils(timeout=15).post_res(
self._ds_url, json=callback_json
)
if not response or response.status_code >= 400:
logger.warn("Telegram按钮回调转发失败停止typing状态")
self._stop_typing_task(chat_id)
except Exception as err:
logger.error(f"处理按钮回调失败:{str(err)}")
if typing_started and chat_id is not None:
self._stop_typing_task(chat_id)
_bot.answer_callback_query(call.id, "处理失败,请重试")
def run_polling():
@@ -326,46 +362,85 @@ class Telegram:
"""
return self._bot is not None
def _start_typing_task(self, chat_id: Union[str, int]) -> None:
def _start_typing_task(
self,
chat_id: Union[str, int],
max_duration_seconds: Optional[float] = None,
) -> None:
"""
启动持续发送正在输入状态的任务
"""
chat_id_str = str(chat_id)
# 如果已有任务在运行,先停止
if chat_id_str in self._typing_tasks:
self._stop_typing_task(chat_id_str)
self._stop_typing_task(chat_id_str)
# 设置停止标
self._typing_stop_flags[chat_id_str] = False
# 使用独立 Event 避免同一 chat 新旧 typing 线程互相误改停止标记。
stop_event = threading.Event()
max_duration = max_duration_seconds or self._typing_max_duration_seconds
def typing_worker():
"""定期发送typing状态的后台线程"""
while not self._typing_stop_flags.get(chat_id_str, True):
try:
if self._bot:
self._bot.send_chat_action(chat_id, "typing")
except Exception as e:
logger.debug(f"发送typing状态失败: {e}")
# 每5秒发送一次Telegram客户端会在约5-6秒后消失状态
for _ in range(50):
if self._typing_stop_flags.get(chat_id_str, True):
started_at = time.monotonic()
try:
while not stop_event.is_set():
if time.monotonic() - started_at >= max_duration:
logger.warning(
"Telegram typing状态超过最大续期自动停止: chat_id=%s",
chat_id_str,
)
break
time.sleep(0.1)
try:
if self._bot:
self._bot.send_chat_action(chat_id, "typing")
except Exception as e:
logger.debug(f"发送typing状态失败: {e}")
# Telegram 客户端约 5-6 秒后会隐藏 typing需要周期性续发。
stop_event.wait(self._typing_interval_seconds)
finally:
with self._typing_lock:
current = self._typing_tasks.get(chat_id_str)
if current is threading.current_thread():
self._typing_tasks.pop(chat_id_str, None)
self._typing_stop_flags.pop(chat_id_str, None)
thread = threading.Thread(target=typing_worker, daemon=True)
with self._typing_lock:
self._typing_stop_flags[chat_id_str] = stop_event
self._typing_tasks[chat_id_str] = thread
thread.start()
self._typing_tasks[chat_id_str] = thread
def _stop_typing_task(self, chat_id: Union[str, int]) -> None:
"""
停止正在输入状态的任务
"""
chat_id_str = str(chat_id)
self._typing_stop_flags[chat_id_str] = True
if chat_id_str in self._typing_tasks:
with self._typing_lock:
stop_event = self._typing_stop_flags.pop(chat_id_str, None)
task = self._typing_tasks.pop(chat_id_str, None)
if task and task.is_alive():
task.join(timeout=1)
if stop_event:
stop_event.set()
if task and task.is_alive() and task is not threading.current_thread():
task.join(timeout=1)
def stop_typing(
self,
chat_id: Optional[Union[str, int]] = None,
userid: Optional[Union[str, int]] = None,
) -> bool:
"""
外部链路主动停止 typing 状态。
"""
if chat_id:
target_chat_id = chat_id
elif userid:
target_chat_id = self._get_user_chat_id(str(userid)) or str(userid)
else:
target_chat_id = None
target_chat_id = target_chat_id or (str(userid) if userid else None)
if not target_chat_id:
return False
self._stop_typing_task(target_chat_id)
return True
def send_msg(
self,
@@ -395,12 +470,12 @@ class Telegram:
if not self._telegram_token or not self._telegram_chat_id:
return None
if not title and not text:
logger.warn("标题和内容不能同时为空")
return {"success": False}
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
if not title and not text:
logger.warn("标题和内容不能同时为空")
self._stop_typing_task(chat_id)
return {"success": False}
try:
# 标准化标题后再加粗,避免**符号被显示为文本
@@ -483,6 +558,7 @@ class Telegram:
voice_file = Path(voice_path)
if not voice_file.exists():
logger.error(f"语音文件不存在: {voice_file}")
self._stop_typing_task(chat_id)
return {"success": False}
try:
@@ -526,12 +602,13 @@ class Telegram:
if not self._bot or not file_path:
return None
chat_id = self._determine_target_chat_id(userid, original_chat_id)
local_file = Path(file_path)
if not local_file.exists() or not local_file.is_file():
logger.error(f"附件文件不存在: {local_file}")
self._stop_typing_task(chat_id)
return {"success": False}
chat_id = self._determine_target_chat_id(userid, original_chat_id)
send_name = file_name or local_file.name
suffix = local_file.suffix.lower()
is_image = suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp"}
@@ -622,6 +699,8 @@ class Telegram:
if not self._telegram_token or not self._telegram_chat_id:
return None
# 列表消息也可能是一次交互的最终响应,需要确保 typing 状态在发送后结束。
chat_id = self._determine_target_chat_id(userid, original_chat_id)
try:
index, image, caption = 1, "", "*%s*" % title
for media in medias:
@@ -649,9 +728,6 @@ class Telegram:
if link:
caption = f"{caption}\n[查看详情]({link})"
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
if buttons:
@@ -675,6 +751,8 @@ class Telegram:
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
finally:
self._stop_typing_task(chat_id)
def send_torrents_msg(
self,
@@ -699,6 +777,8 @@ class Telegram:
if not self._telegram_token or not self._telegram_chat_id:
return None
# 资源列表是搜索交互的常见出口,也必须统一释放 typing 状态。
chat_id = self._determine_target_chat_id(userid, original_chat_id)
try:
index, caption = 1, "*%s*" % title
image = torrents[0].media_info.get_message_image()
@@ -725,9 +805,6 @@ class Telegram:
if link:
caption = f"{caption}\n[查看详情]({link})"
# Determine target chat_id with improved logic using user mapping
chat_id = self._determine_target_chat_id(userid, original_chat_id)
# 创建按钮键盘
reply_markup = None
if buttons:
@@ -751,6 +828,8 @@ class Telegram:
except Exception as msg_e:
logger.error(f"发送消息失败:{msg_e}")
return False
finally:
self._stop_typing_task(chat_id)
@staticmethod
def _create_inline_keyboard(buttons: List[List[Dict]]) -> InlineKeyboardMarkup:
@@ -872,6 +951,8 @@ class Telegram:
except Exception as e:
logger.error(f"编辑Telegram消息异常: {str(e)}")
return False
finally:
self._stop_typing_task(chat_id)
def __edit_message(
self,