fix(wechat): add error handling

This commit is contained in:
InfinityPacer
2024-10-26 04:47:42 +08:00
parent 133195cc0a
commit e876ba38a7

View File

@@ -252,51 +252,55 @@ class WeChat:
:param link: 跳转链接
:return: 发送状态,错误信息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if image:
ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link)
else:
ret_code = self.__send_message(title=title, text=text, userid=userid, link=link)
if image:
ret_code = self.__send_image_message(title=title, text=text, image_url=image, userid=userid, link=link)
else:
ret_code = self.__send_message(title=title, text=text, userid=userid, link=link)
return ret_code
return ret_code
except Exception as e:
logger.error(f"发送消息失败:{e}")
return False
def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional[bool]:
"""
发送列表类消息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if not userid:
userid = "@all"
articles = []
index = 1
for media in medias:
if media.vote_average:
title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}"
else:
title = f"{index}. {media.title_year}\n类型:{media.type.value}"
articles.append({
"title": title,
"description": "",
"picurl": media.get_message_image() if index == 1 else media.get_poster_image(),
"url": media.detail_link
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
if not userid:
userid = "@all"
articles = []
index = 1
for media in medias:
if media.vote_average:
title = f"{index}. {media.title_year}\n类型:{media.type.value},评分:{media.vote_average}"
else:
title = f"{index}. {media.title_year}\n类型:{media.type.value}"
articles.append({
"title": title,
"description": "",
"picurl": media.get_message_image() if index == 1 else media.get_poster_image(),
"url": media.detail_link
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
return self.__post_request(self._send_msg_url, req_json)
except Exception as e:
logger.error(f"发送消息失败:{e}")
@@ -307,49 +311,49 @@ class WeChat:
"""
发送列表消息
"""
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
# 先发送标题
if title:
self.__send_message(title=title, userid=userid, link=link)
# 发送列表
if not userid:
userid = "@all"
articles = []
index = 1
for context in torrents:
torrent = context.torrent_info
meta = MetaInfo(title=torrent.title, subtitle=torrent.description)
mediainfo = context.media_info
torrent_title = f"{index}.【{torrent.site_name}" \
f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group} " \
f"{StringUtils.str_filesize(torrent.size)} " \
f"{torrent.volume_factor} " \
f"{torrent.seeders}"
torrent_title = re.sub(r"\s+", " ", torrent_title).strip()
articles.append({
"title": torrent_title,
"description": torrent.description if index == 1 else "",
"picurl": mediainfo.get_message_image() if index == 1 else "",
"url": torrent.page_url
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
try:
if not self.__get_access_token():
logger.error("获取微信access_token失败请检查参数配置")
return None
# 先发送标题
if title:
self.__send_message(title=title, userid=userid, link=link)
# 发送列表
if not userid:
userid = "@all"
articles = []
index = 1
for context in torrents:
torrent = context.torrent_info
meta = MetaInfo(title=torrent.title, subtitle=torrent.description)
mediainfo = context.media_info
torrent_title = f"{index}.【{torrent.site_name}" \
f"{meta.season_episode} " \
f"{meta.resource_term} " \
f"{meta.video_term} " \
f"{meta.release_group} " \
f"{StringUtils.str_filesize(torrent.size)} " \
f"{torrent.volume_factor} " \
f"{torrent.seeders}"
torrent_title = re.sub(r"\s+", " ", torrent_title).strip()
articles.append({
"title": torrent_title,
"description": torrent.description if index == 1 else "",
"picurl": mediainfo.get_message_image() if index == 1 else "",
"url": torrent.page_url
})
index += 1
req_json = {
"touser": userid,
"msgtype": "news",
"agentid": self._appid,
"news": {
"articles": articles
}
}
return self.__post_request(self._send_msg_url, req_json)
except Exception as e:
logger.error(f"发送消息失败:{e}")
@@ -424,49 +428,53 @@ class WeChat:
]
}
"""
# 请求URL
req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid)
try:
# 请求URL
req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid)
# 对commands按category分组
category_dict = {}
for key, value in commands.items():
category: str = value.get("category")
if category:
if not category_dict.get(category):
category_dict[category] = {}
category_dict[category][key] = value
# 对commands按category分组
category_dict = {}
for key, value in commands.items():
category: str = value.get("category")
if category:
if not category_dict.get(category):
category_dict[category] = {}
category_dict[category][key] = value
# 一级菜单
buttons = []
for category, menu in category_dict.items():
# 二级菜单
sub_buttons = []
for key, value in menu.items():
sub_buttons.append({
"type": "click",
"name": value.get("description"),
"key": key
# 一级菜单
buttons = []
for category, menu in category_dict.items():
# 二级菜单
sub_buttons = []
for key, value in menu.items():
sub_buttons.append({
"type": "click",
"name": value.get("description"),
"key": key
})
buttons.append({
"name": category,
"sub_button": sub_buttons[:5]
})
buttons.append({
"name": category,
"sub_button": sub_buttons[:5]
})
if buttons:
# 发送请求
try:
if buttons:
# 发送请求
self.__post_request(req_url, {
"button": buttons[:3]
})
except Exception as e:
logger.error(f"创建菜单失败:{e}")
return False
except Exception as e:
logger.error(f"创建菜单失败:{e}")
return False
def delete_menus(self):
"""
删除微信菜单
"""
# 请求URL
req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 发送请求
RequestUtils().get(req_url)
try:
# 请求URL
req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid)
# 发送请求
RequestUtils().get(req_url)
except Exception as e:
logger.error(f"删除菜单失败:{e}")
return False