refactor(wechat): optimize message handling and add menu deletion

This commit is contained in:
InfinityPacer
2024-10-24 20:27:41 +08:00
parent 1d252f4eb2
commit ac086a7640

View File

@@ -10,6 +10,7 @@ from app.log import logger
from app.utils.common import retry
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.utils.url import UrlUtils
lock = threading.Lock()
@@ -31,14 +32,16 @@ class WeChat:
_proxy = None
# 企业微信发送消息URL
_send_msg_url = "/cgi-bin/message/send?access_token=%s"
_send_msg_url = "/cgi-bin/message/send?access_token={access_token}"
# 企业微信获取TokenURL
_token_url = "/cgi-bin/gettoken?corpid=%s&corpsecret=%s"
# 企业微信创菜单URL
_create_menu_url = "/cgi-bin/menu/create?access_token=%s&agentid=%s"
_token_url = "/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
# 企业微信创菜单URL
_create_menu_url = "/cgi-bin/menu/create?access_token={access_token}&agentid={agentid}"
# 企业微信删除菜单URL
_delete_menu_url = "/cgi-bin/menu/delete?access_token={access_token}&agentid={agentid}"
def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, WECHAT_APP_ID: str = None,
WECHAT_PROXY: str = None, **kwargs):
def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None,
WECHAT_APP_ID: str = None, WECHAT_PROXY: str = None, **kwargs):
"""
初始化
"""
@@ -51,10 +54,10 @@ class WeChat:
self._proxy = WECHAT_PROXY or "https://qyapi.weixin.qq.com"
if self._proxy:
self._proxy = self._proxy.rstrip("/")
self._send_msg_url = f"{self._proxy}/{self._send_msg_url}"
self._token_url = f"{self._proxy}/{self._token_url}"
self._create_menu_url = f"{self._proxy}/{self._create_menu_url}"
self._send_msg_url = UrlUtils.adapt_request_url(self._proxy, self._send_msg_url)
self._token_url = UrlUtils.adapt_request_url(self._proxy, self._token_url)
self._create_menu_url = UrlUtils.adapt_request_url(self._proxy, self._create_menu_url)
self._delete_menu_url = UrlUtils.adapt_request_url(self._proxy, self._delete_menu_url)
if self._corpid and self._appsecret and self._appid:
self.__get_access_token()
@@ -82,13 +85,13 @@ class WeChat:
if not self._corpid or not self._appsecret:
return None
try:
token_url = self._token_url % (self._corpid, self._appsecret)
token_url = self._token_url.format(corpid=self._corpid, corpsecret=self._appsecret)
res = RequestUtils().get_res(token_url)
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
self._access_token = ret_json.get('access_token')
self._expires_in = ret_json.get('expires_in')
if ret_json.get("errcode") == 0:
self._access_token = ret_json.get("access_token")
self._expires_in = ret_json.get("expires_in")
self._access_token_time = datetime.now()
elif res is not None:
logger.error(f"获取微信access_token失败错误码{res.status_code},错误原因:{res.reason}")
@@ -100,8 +103,64 @@ class WeChat:
return None
return self._access_token
@staticmethod
def __split_content(content: str, max_bytes: int = 2048) -> List[str]:
"""
将内容分块为不超过 max_bytes 字节的块
:param content: 待拆分的内容
:param max_bytes: 最大字节数
:return: 分块后的内容列表
"""
content_chunks = []
current_chunk = bytearray()
for line in content.splitlines():
encoded_line = (line + "\n").encode("utf-8")
line_length = len(encoded_line)
if line_length > max_bytes:
# 在处理长行之前,先将 current_chunk 添加到 content_chunks
if current_chunk:
content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip())
current_chunk = bytearray()
# 处理长行,拆分为多个不超过 max_bytes 的块
start = 0
while start < line_length:
end = start + max_bytes # 不再需要为 "..." 预留空间
if end >= line_length:
end = line_length
else:
# 调整以避免拆分多字节字符
while end > start and (encoded_line[end] & 0xC0) == 0x80:
end -= 1
if end == start:
# 单个字符超过了 max_bytes强制包含整个字符
end = start + 1
while end < line_length and (encoded_line[end] & 0xC0) == 0x80:
end += 1
truncated_line = encoded_line[start:end].decode("utf-8", errors="replace")
content_chunks.append(truncated_line.strip())
start = end
continue # 继续处理下一行
# 检查添加当前行后是否会超过 max_bytes
if len(current_chunk) + line_length > max_bytes:
# 将 current_chunk 添加到 content_chunks
content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip())
current_chunk = bytearray()
# 将当前行添加到 current_chunk
current_chunk += encoded_line
# 处理剩余的 current_chunk
if current_chunk:
content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip())
return content_chunks
def __send_message(self, title: str, text: str = None,
userid: str = None, link: str = None) -> Optional[bool]:
userid: str = None, link: str = None) -> bool:
"""
发送文本消息
:param title: 消息标题
@@ -110,9 +169,14 @@ class WeChat:
:param link: 跳转链接
:return: 发送状态,错误信息
"""
message_url = self._send_msg_url % self.__get_access_token()
if not title:
logger.error("消息标题不能为空")
return False
message_url = self._send_msg_url.format(access_token=self.__get_access_token())
if text:
content = "%s\n%s" % (title, text.replace("\n\n", "\n"))
formatted_text = text.replace("\n\n", "\n")
content = f"{title}\n{formatted_text}"
else:
content = title
@@ -122,50 +186,28 @@ class WeChat:
if not userid:
userid = "@all"
# Check if content exceeds 2048 bytes and split if necessary
if len(content.encode('utf-8')) > 2048:
content_chunks = []
current_chunk = ""
for line in content.splitlines():
if len(current_chunk.encode('utf-8')) + len(line.encode('utf-8')) > 2048:
content_chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk += line + "\n"
if current_chunk:
content_chunks.append(current_chunk.strip())
# 分块处理逻辑
content_chunks = self.__split_content(content)
# Send each chunk as a separate message
result = True
for chunk in content_chunks:
req_json = {
"touser": userid,
"msgtype": "text",
"agentid": self._appid,
"text": {
"content": chunk
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
result = self.__post_request(message_url, req_json)
if not result:
return False
else:
# 逐块发送消息
for chunk in content_chunks:
req_json = {
"touser": userid,
"msgtype": "text",
"agentid": self._appid,
"text": {
"content": content
"content": chunk
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
result = self.__post_request(message_url, req_json)
if not result:
logger.error(f"发送消息块失败: {chunk}")
return False
return result
return True
def __send_image_message(self, title: str, text: str, image_url: str,
userid: str = None, link: str = None) -> Optional[bool]:
@@ -178,7 +220,7 @@ class WeChat:
:param link: 跳转链接
:return: 发送状态,错误信息
"""
message_url = self._send_msg_url % self.__get_access_token()
message_url = self._send_msg_url.format(access_token=self.__get_access_token())
if text:
text = text.replace("\n\n", "\n")
if not userid:
@@ -230,7 +272,7 @@ class WeChat:
logger.error("获取微信access_token失败请检查参数配置")
return None
message_url = self._send_msg_url % self.__get_access_token()
message_url = self._send_msg_url.format(access_token=self.__get_access_token())
if not userid:
userid = "@all"
articles = []
@@ -272,7 +314,7 @@ class WeChat:
self.__send_message(title=title, userid=userid, link=link)
# 发送列表
message_url = self._send_msg_url % self.__get_access_token()
message_url = self._send_msg_url.format(access_token=self.__get_access_token())
if not userid:
userid = "@all"
articles = []
@@ -289,11 +331,11 @@ class WeChat:
f"{StringUtils.str_filesize(torrent.size)} " \
f"{torrent.volume_factor} " \
f"{torrent.seeders}"
title = re.sub(r"\s+", " ", title).strip()
torrent_title = re.sub(r"\s+", " ", torrent_title).strip()
articles.append({
"title": torrent_title,
"description": torrent.description if index == 1 else '',
"picurl": mediainfo.get_message_image() if index == 1 else '',
"description": torrent.description if index == 1 else "",
"picurl": mediainfo.get_message_image() if index == 1 else "",
"url": torrent.page_url
})
index += 1
@@ -313,16 +355,16 @@ class WeChat:
向微信发送请求
"""
try:
res = RequestUtils(content_type='application/json').post(
res = RequestUtils(content_type="application/json").post(
message_url,
data=json.dumps(req_json, ensure_ascii=False).encode('utf-8')
data=json.dumps(req_json, ensure_ascii=False).encode("utf-8")
)
if res and res.status_code == 200:
ret_json = res.json()
if ret_json.get('errcode') == 0:
if ret_json.get("errcode") == 0:
return True
else:
if ret_json.get('errcode') == 42001:
if ret_json.get("errcode") == 42001:
self.__get_access_token(force=True)
logger.error(f"发送请求失败,错误信息:{ret_json.get('errmsg')}")
return False
@@ -376,12 +418,12 @@ class WeChat:
}
"""
# 请求URL
req_url = self._create_menu_url % (self.__get_access_token(), self._appid)
req_url = self._create_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 对commands按category分组
category_dict = {}
for key, value in commands.items():
category: Dict[str, dict] = value.get("category")
category: str = value.get("category")
if category:
if not category_dict.get(category):
category_dict[category] = {}
@@ -408,3 +450,12 @@ class WeChat:
self.__post_request(req_url, {
"button": buttons[:3]
})
def delete_menus(self):
"""
删除微信菜单
"""
# 请求URL
req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 发送请求
RequestUtils().get(req_url)