diff --git a/app/modules/wechat/wechat.py b/app/modules/wechat/wechat.py index 4c675f9d..5ee719b0 100644 --- a/app/modules/wechat/wechat.py +++ b/app/modules/wechat/wechat.py @@ -10,6 +10,7 @@ from app.log import logger from app.utils.common import retry from app.utils.http import RequestUtils from app.utils.string import StringUtils +from app.utils.url import UrlUtils lock = threading.Lock() @@ -31,14 +32,16 @@ class WeChat: _proxy = None # 企业微信发送消息URL - _send_msg_url = "/cgi-bin/message/send?access_token=%s" + _send_msg_url = "/cgi-bin/message/send?access_token={access_token}" # 企业微信获取TokenURL - _token_url = "/cgi-bin/gettoken?corpid=%s&corpsecret=%s" - # 企业微信创新菜单URL - _create_menu_url = "/cgi-bin/menu/create?access_token=%s&agentid=%s" + _token_url = "/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}" + # 企业微信创建菜单URL + _create_menu_url = "/cgi-bin/menu/create?access_token={access_token}&agentid={agentid}" + # 企业微信删除菜单URL + _delete_menu_url = "/cgi-bin/menu/delete?access_token={access_token}&agentid={agentid}" - def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, WECHAT_APP_ID: str = None, - WECHAT_PROXY: str = None, **kwargs): + def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, + WECHAT_APP_ID: str = None, WECHAT_PROXY: str = None, **kwargs): """ 初始化 """ @@ -51,10 +54,10 @@ class WeChat: self._proxy = WECHAT_PROXY or "https://qyapi.weixin.qq.com" if self._proxy: - self._proxy = self._proxy.rstrip("/") - self._send_msg_url = f"{self._proxy}/{self._send_msg_url}" - self._token_url = f"{self._proxy}/{self._token_url}" - self._create_menu_url = f"{self._proxy}/{self._create_menu_url}" + self._send_msg_url = UrlUtils.adapt_request_url(self._proxy, self._send_msg_url) + self._token_url = UrlUtils.adapt_request_url(self._proxy, self._token_url) + self._create_menu_url = UrlUtils.adapt_request_url(self._proxy, self._create_menu_url) + self._delete_menu_url = UrlUtils.adapt_request_url(self._proxy, self._delete_menu_url) if self._corpid and self._appsecret and self._appid: self.__get_access_token() @@ -82,13 +85,13 @@ class WeChat: if not self._corpid or not self._appsecret: return None try: - token_url = self._token_url % (self._corpid, self._appsecret) + token_url = self._token_url.format(corpid=self._corpid, corpsecret=self._appsecret) res = RequestUtils().get_res(token_url) if res: ret_json = res.json() - if ret_json.get('errcode') == 0: - self._access_token = ret_json.get('access_token') - self._expires_in = ret_json.get('expires_in') + if ret_json.get("errcode") == 0: + self._access_token = ret_json.get("access_token") + self._expires_in = ret_json.get("expires_in") self._access_token_time = datetime.now() elif res is not None: logger.error(f"获取微信access_token失败,错误码:{res.status_code},错误原因:{res.reason}") @@ -100,8 +103,64 @@ class WeChat: return None return self._access_token + @staticmethod + def __split_content(content: str, max_bytes: int = 2048) -> List[str]: + """ + 将内容分块为不超过 max_bytes 字节的块 + :param content: 待拆分的内容 + :param max_bytes: 最大字节数 + :return: 分块后的内容列表 + """ + content_chunks = [] + current_chunk = bytearray() + + for line in content.splitlines(): + encoded_line = (line + "\n").encode("utf-8") + line_length = len(encoded_line) + + if line_length > max_bytes: + # 在处理长行之前,先将 current_chunk 添加到 content_chunks + if current_chunk: + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + current_chunk = bytearray() + + # 处理长行,拆分为多个不超过 max_bytes 的块 + start = 0 + while start < line_length: + end = start + max_bytes # 不再需要为 "..." 预留空间 + if end >= line_length: + end = line_length + else: + # 调整以避免拆分多字节字符 + while end > start and (encoded_line[end] & 0xC0) == 0x80: + end -= 1 + if end == start: + # 单个字符超过了 max_bytes,强制包含整个字符 + end = start + 1 + while end < line_length and (encoded_line[end] & 0xC0) == 0x80: + end += 1 + truncated_line = encoded_line[start:end].decode("utf-8", errors="replace") + content_chunks.append(truncated_line.strip()) + start = end + continue # 继续处理下一行 + + # 检查添加当前行后是否会超过 max_bytes + if len(current_chunk) + line_length > max_bytes: + # 将 current_chunk 添加到 content_chunks + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + current_chunk = bytearray() + + # 将当前行添加到 current_chunk + current_chunk += encoded_line + + # 处理剩余的 current_chunk + if current_chunk: + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + + return content_chunks + def __send_message(self, title: str, text: str = None, - userid: str = None, link: str = None) -> Optional[bool]: + userid: str = None, link: str = None) -> bool: """ 发送文本消息 :param title: 消息标题 @@ -110,9 +169,14 @@ class WeChat: :param link: 跳转链接 :return: 发送状态,错误信息 """ - message_url = self._send_msg_url % self.__get_access_token() + if not title: + logger.error("消息标题不能为空") + return False + + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: - content = "%s\n%s" % (title, text.replace("\n\n", "\n")) + formatted_text = text.replace("\n\n", "\n") + content = f"{title}\n{formatted_text}" else: content = title @@ -122,50 +186,28 @@ class WeChat: if not userid: userid = "@all" - # Check if content exceeds 2048 bytes and split if necessary - if len(content.encode('utf-8')) > 2048: - content_chunks = [] - current_chunk = "" - for line in content.splitlines(): - if len(current_chunk.encode('utf-8')) + len(line.encode('utf-8')) > 2048: - content_chunks.append(current_chunk.strip()) - current_chunk = "" - current_chunk += line + "\n" - if current_chunk: - content_chunks.append(current_chunk.strip()) + # 分块处理逻辑 + content_chunks = self.__split_content(content) - # Send each chunk as a separate message - result = True - for chunk in content_chunks: - req_json = { - "touser": userid, - "msgtype": "text", - "agentid": self._appid, - "text": { - "content": chunk - }, - "safe": 0, - "enable_id_trans": 0, - "enable_duplicate_check": 0 - } - result = self.__post_request(message_url, req_json) - if not result: - return False - else: + # 逐块发送消息 + for chunk in content_chunks: req_json = { "touser": userid, "msgtype": "text", "agentid": self._appid, "text": { - "content": content + "content": chunk }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0 } result = self.__post_request(message_url, req_json) + if not result: + logger.error(f"发送消息块失败: {chunk}") + return False - return result + return True def __send_image_message(self, title: str, text: str, image_url: str, userid: str = None, link: str = None) -> Optional[bool]: @@ -178,7 +220,7 @@ class WeChat: :param link: 跳转链接 :return: 发送状态,错误信息 """ - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: text = text.replace("\n\n", "\n") if not userid: @@ -230,7 +272,7 @@ class WeChat: logger.error("获取微信access_token失败,请检查参数配置") return None - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -272,7 +314,7 @@ class WeChat: self.__send_message(title=title, userid=userid, link=link) # 发送列表 - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -289,11 +331,11 @@ class WeChat: f"{StringUtils.str_filesize(torrent.size)} " \ f"{torrent.volume_factor} " \ f"{torrent.seeders}↑" - title = re.sub(r"\s+", " ", title).strip() + torrent_title = re.sub(r"\s+", " ", torrent_title).strip() articles.append({ "title": torrent_title, - "description": torrent.description if index == 1 else '', - "picurl": mediainfo.get_message_image() if index == 1 else '', + "description": torrent.description if index == 1 else "", + "picurl": mediainfo.get_message_image() if index == 1 else "", "url": torrent.page_url }) index += 1 @@ -313,16 +355,16 @@ class WeChat: 向微信发送请求 """ try: - res = RequestUtils(content_type='application/json').post( + res = RequestUtils(content_type="application/json").post( message_url, - data=json.dumps(req_json, ensure_ascii=False).encode('utf-8') + data=json.dumps(req_json, ensure_ascii=False).encode("utf-8") ) if res and res.status_code == 200: ret_json = res.json() - if ret_json.get('errcode') == 0: + if ret_json.get("errcode") == 0: return True else: - if ret_json.get('errcode') == 42001: + if ret_json.get("errcode") == 42001: self.__get_access_token(force=True) logger.error(f"发送请求失败,错误信息:{ret_json.get('errmsg')}") return False @@ -376,12 +418,12 @@ class WeChat: } """ # 请求URL - req_url = self._create_menu_url % (self.__get_access_token(), self._appid) + req_url = self._create_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) # 对commands按category分组 category_dict = {} for key, value in commands.items(): - category: Dict[str, dict] = value.get("category") + category: str = value.get("category") if category: if not category_dict.get(category): category_dict[category] = {} @@ -408,3 +450,12 @@ class WeChat: self.__post_request(req_url, { "button": buttons[:3] }) + + def delete_menus(self): + """ + 删除微信菜单 + """ + # 请求URL + req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) + # 发送请求 + RequestUtils().get(req_url)