Expand image and edit support across messaging channels

This commit is contained in:
jxxghp
2026-04-11 22:10:54 +08:00
parent bf2d2cbd03
commit 2f53fd3108
12 changed files with 952 additions and 45 deletions

View File

@@ -1,4 +1,6 @@
import copy
import json
import re
import xml.dom.minidom
from typing import Optional, Union, List, Tuple, Any, Dict
@@ -103,7 +105,7 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
if not client_config:
return None
if self._is_bot_mode(client_config.config):
return None
return self._parse_bot_message(source=source, body=body, client_config=client_config)
client: WeChat = self.get_instance(client_config.name)
# URL参数
sVerifyMsgSig = args.get("msg_signature")
@@ -163,6 +165,8 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
logger.warn(f"解析不到消息类型和用户ID")
return None
# 解析消息内容
content = None
images = None
if msg_type == "event" and event == "click":
# 校验用户有权限执行交互命令
if client_config.config.get('WECHAT_ADMINS'):
@@ -178,17 +182,85 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
# 文本消息
content = DomUtils.tag_value(root_node, "Content", default="")
logger.info(f"收到来自 {client_config.name} 的微信消息userid={user_id}, text={content}")
elif msg_type == "image":
media_id = DomUtils.tag_value(root_node, "MediaId")
pic_url = DomUtils.tag_value(root_node, "PicUrl")
if media_id:
images = [f"wxwork://media_id/{media_id}"]
elif pic_url:
images = [pic_url]
logger.info(
f"收到来自 {client_config.name} 的微信图片消息userid={user_id}, images={len(images) if images else 0}"
)
else:
return None
if content:
if content or images:
# 处理消息内容
return CommingMessage(channel=MessageChannel.Wechat, source=client_config.name,
userid=user_id, username=user_id, text=content)
userid=user_id, username=user_id, text=content or "",
images=images)
except Exception as err:
logger.error(f"微信消息处理发生错误:{str(err)}")
return None
def _parse_bot_message(self, source: str, body: Any, client_config) -> Optional[CommingMessage]:
try:
if isinstance(body, bytes):
msg_json = json.loads(body)
elif isinstance(body, dict):
msg_json = body
else:
msg_json = json.loads(body)
while isinstance(msg_json, str):
msg_json = json.loads(msg_json)
except Exception as err:
logger.debug(f"解析企业微信智能机器人消息失败:{err}")
return None
if not isinstance(msg_json, dict):
return None
payload_body = msg_json.get("body") or {}
sender = ((payload_body.get("from") or {}).get("userid") or "").strip()
if not sender:
return None
if payload_body.get("chattype") == "group":
return None
text = WeChatBot._extract_text_from_body(payload_body)
images = WeChatBot._extract_images_from_body(payload_body)
if text:
text = re.sub(r"@\S+", "", text).strip()
if text and text.startswith("/") and client_config.config.get('WECHAT_ADMINS'):
wechat_admins = [
admin.strip()
for admin in client_config.config.get('WECHAT_ADMINS', '').split(',')
if admin.strip()
]
if wechat_admins and sender not in wechat_admins:
client: WeChatBot = self.get_instance(client_config.name)
if client:
client.send_msg(title="只有管理员才有权限执行此命令", userid=sender)
return None
if not text and not images:
return None
logger.info(
f"收到来自 {client_config.name} 的企业微信智能机器人消息:"
f"userid={sender}, text={text}, images={len(images) if images else 0}"
)
return CommingMessage(
channel=MessageChannel.Wechat,
source=client_config.name,
userid=sender,
username=sender,
text=text or "",
images=images,
)
def post_message(self, message: Notification, **kwargs) -> None:
"""
发送消息
@@ -210,6 +282,25 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
client.send_msg(title=message.title, text=message.text,
image=message.image, userid=userid, link=message.link)
def download_wechat_image_to_data_url(self, image_ref: str, source: str) -> Optional[str]:
"""
下载企业微信渠道图片并转换为 data URL
"""
if not image_ref:
return None
client_config = self.get_config(source)
if not client_config:
return None
client = self.get_instance(client_config.name)
if not client:
return None
if image_ref.startswith("wxwork://media_id/") and hasattr(client, "download_media_to_data_url"):
media_id = image_ref.replace("wxwork://media_id/", "", 1)
return client.download_media_to_data_url(media_id)
if image_ref.startswith("wxbot://image/") and hasattr(client, "download_image_to_data_url"):
return client.download_image_to_data_url(image_ref)
return None
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
发送媒体信息选择列表