diff --git a/app/modules/wechatclawbot/__init__.py b/app/modules/wechatclawbot/__init__.py index 7d39e857..a24441d8 100644 --- a/app/modules/wechatclawbot/__init__.py +++ b/app/modules/wechatclawbot/__init__.py @@ -1,6 +1,7 @@ import json from typing import Any, Dict, List, Optional, Tuple, Union +from app.core.cache import TTLCache from app.core.context import Context, MediaInfo from app.log import logger from app.modules import _MessageBase, _ModuleBase @@ -10,6 +11,16 @@ from app.schemas.types import MessageChannel, ModuleType class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): + def __init__(self): + """初始化模块级去重缓存,拦截 iLink 偶发的重复回放消息。""" + super().__init__() + # iLink 偶发会重复回放同一条 update,这里按 message_id 做渠道内幂等保护。 + self._recent_message_ids = TTLCache( + region="wechatclawbot_message_dedup", + maxsize=8192, + ttl=7 * 24 * 60 * 60, + ) + def init_module(self) -> None: """初始化模块。""" self.stop() @@ -108,6 +119,18 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): ) return normalized or None + def _is_duplicate_message( + self, source: str, message_id: Optional[Union[str, int]] + ) -> bool: + """按渠道名和消息ID判断是否重复,避免重复回放再次进入业务链路。""" + if message_id in (None, ""): + return False + cache_key = f"{source}:{message_id}" + if self._recent_message_ids.exists(cache_key): + return True + self._recent_message_ids.set(cache_key, True) + return False + def message_parser( self, source: str, body: Any, form: Any, args: Any ) -> Optional[CommingMessage]: @@ -131,6 +154,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): if not user_id: return None + message_id = message.get("message_id") text = str(message.get("text") or "").strip() username = str(message.get("username") or user_id).strip() or user_id images = CommingMessage.MessageImage.normalize_list(message.get("images")) @@ -138,6 +162,14 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): files = self._normalize_files(message.get("files")) if not text and not images and not audio_refs and not files: return None + if self._is_duplicate_message(client_config.name, message_id): + logger.info( + "忽略重复的微信 ClawBot 消息:source=%s, userid=%s, message_id=%s", + client_config.name, + user_id, + message_id, + ) + return None admins = [ admin.strip() @@ -152,7 +184,8 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): logger.info( f"收到来自 {client_config.name} 的微信 ClawBot 消息:" - f"userid={user_id}, text={text}, images={len(images) if images else 0}, " + f"userid={user_id}, message_id={message_id}, text={text}, " + f"images={len(images) if images else 0}, " f"audios={len(audio_refs) if audio_refs else 0}, files={len(files) if files else 0}" ) return CommingMessage( @@ -161,7 +194,7 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): userid=user_id, username=username, text=text, - message_id=message.get("message_id"), + message_id=message_id, chat_id=str(message.get("chat_id") or "") or None, images=images, audio_refs=audio_refs, diff --git a/app/modules/wechatclawbot/wechatclawbot.py b/app/modules/wechatclawbot/wechatclawbot.py index 47d0a8a3..f28c3098 100644 --- a/app/modules/wechatclawbot/wechatclawbot.py +++ b/app/modules/wechatclawbot/wechatclawbot.py @@ -10,7 +10,7 @@ import time import uuid from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from urllib.parse import quote from Crypto.Cipher import AES @@ -69,33 +69,13 @@ class ILinkClient: account_id: Optional[str] = None, sync_buf: Optional[str] = None, timeout: int = 20, - log_func: Optional[Callable[[str, str], None]] = None, ): + """保存 iLink 会话参数,供二维码登录、长轮询和消息发送复用。""" self.base_url = (base_url or "https://ilinkai.weixin.qq.com").rstrip("/") self.bot_token = bot_token self.account_id = account_id self.sync_buf = sync_buf self.timeout = timeout - self._log_func = log_func - - def _log(self, level: str, message: str) -> None: - if self._log_func: - try: - self._log_func(level, f"[ILinkClient] {message}") - return - except Exception: - pass - - text = f"[WechatClawBot][ILinkClient] {message}" - level_value = (level or "info").lower() - if level_value == "debug": - logger.debug(text) - elif level_value == "warning": - logger.warning(text) - elif level_value == "error": - logger.error(text) - else: - logger.info(text) def set_credentials( self, @@ -537,6 +517,12 @@ class ILinkClient: plaintext: bytes, media_types: Optional[List[int]] = None, ) -> Tuple[Optional[str], Optional[str], Optional[bytes], Optional[int], Optional[str]]: + """ + 向 iLink 申请 CDN 上传参数。 + + iLink 对同一文件类型的兼容性并不稳定,这里会按候选 media_type 依次尝试, + 成功后返回上传地址、AES 密钥和文件元信息给后续 CDN 上传流程复用。 + """ rawsize = len(plaintext) rawfilemd5 = hashlib.md5(plaintext).hexdigest() filesize = self._aes_ecb_padded_size(rawsize) @@ -581,10 +567,7 @@ class ILinkClient: filesize, filekey, ) - self._log( - "warning", - f"getuploadurl 失败: resp={self._short_text(last_payload)}", - ) + logger.warning(f"getuploadurl 失败: resp={self._short_text(last_payload)}") return None, None, None, None, None def _upload_encrypted_to_cdn( @@ -595,6 +578,7 @@ class ILinkClient: plaintext: bytes, aeskey: bytes, ) -> Tuple[Optional[str], Optional[int]]: + """将文件按协议加密后上传到微信 CDN,并返回后续发送消息所需的下载参数。""" ciphertext = self._encrypt_aes_ecb(plaintext, aeskey) if upload_full_url: upload_url = str(upload_full_url).strip() @@ -604,23 +588,23 @@ class ILinkClient: f"filekey={quote(filekey, safe='')}" ) else: - self._log("warning", "CDN 上传失败: 缺少 upload_url 参数") + logger.warning("CDN 上传失败: 缺少 upload_url 参数") return None, None resp = RequestUtils( headers={"Content-Type": "application/octet-stream"}, timeout=self.timeout, ).post(upload_url, data=ciphertext) if getattr(resp, "status_code", None) != 200: - self._log( - "warning", - f"CDN 上传失败: http={getattr(resp, 'status_code', None)}, err={self._short_text(getattr(resp, 'text', ''))}", + logger.warning( + f"CDN 上传失败: http={getattr(resp, 'status_code', None)}, " + f"err={self._short_text(getattr(resp, 'text', ''))}" ) return None, None download_param = None if resp is not None and getattr(resp, "headers", None): download_param = resp.headers.get("x-encrypted-param") if not download_param: - self._log("warning", "CDN 上传成功但缺少 x-encrypted-param") + logger.warning("CDN 上传成功但缺少 x-encrypted-param") return None, None return str(download_param), len(ciphertext) @@ -630,6 +614,12 @@ class ILinkClient: payload_candidates: List[Dict[str, Any]], url_candidates: Optional[List[str]] = None, ) -> bool: + """ + 尝试一组候选发送报文,兼容 iLink 不同接口形态。 + + 这里不会只依赖单一 URL 或单一报文结构,而是按“用户候选 -> 接口候选 -> 报文候选” + 逐层回退,尽量提高不同账号和不同版本服务端的发送成功率。 + """ url_candidates = url_candidates or [ f"{self.base_url}/ilink/bot/sendmessage", f"{self.base_url}/ilink/bot/sendmessage?bot_type=3", @@ -644,7 +634,7 @@ class ILinkClient: ).post(url, json=request_body) payload = self._json(resp) if self._is_send_success(payload) or self._is_send_http_success(resp, payload): - self._log("info", f"发送消息成功: to_user={user_id}, variant={index}") + logger.info(f"发送消息成功: to_user={user_id}, variant={index}") return True http_code = getattr(resp, "status_code", None) err_msg = ( @@ -655,16 +645,17 @@ class ILinkClient: if not err_msg and resp is not None: err_msg = self._short_text(getattr(resp, "text", "")) last_error = f"http={http_code}, err={self._short_text(err_msg)}" - self._log( - "debug", - f"发送候选失败: to_user={user_id}, variant={index}, {last_error}, req={self._short_text(request_body)}, resp={self._short_text(payload)}", + logger.debug( + f"发送候选失败: to_user={user_id}, variant={index}, " + f"{last_error}, req={self._short_text(request_body)}, " + f"resp={self._short_text(payload)}" ) - self._log("warning", f"发送消息失败: to_user={to_user}, {last_error}") + logger.warning(f"发送消息失败: to_user={to_user}, {last_error}") return False def get_qrcode(self) -> Dict[str, Any]: url = f"{self.base_url}/ilink/bot/get_bot_qrcode?bot_type=3" - self._log("debug", f"请求二维码: {url}") + logger.debug(f"请求二维码: {url}") resp = RequestUtils( headers=self._headers(auth_required=False), timeout=self.timeout ).get_res(url) @@ -782,10 +773,10 @@ class ILinkClient: def send_text(self, to_user: str, text: str, context_token: Optional[str] = None) -> bool: if not self.bot_token: - self._log("warning", "发送消息失败:bot token 未配置") + logger.warning("发送消息失败:bot token 未配置") return False if not to_user or not text: - self._log("warning", "发送消息失败:to_user 或 text 为空") + logger.warning("发送消息失败:to_user 或 text 为空") return False payload_candidates = [ self._build_protocol_text_payload( @@ -799,10 +790,10 @@ class ILinkClient: self, to_user: str, text: str, context_token: Optional[str] = None ) -> bool: if not self.bot_token: - self._log("warning", "发送 Markdown 失败:bot token 未配置") + logger.warning("发送 Markdown 失败:bot token 未配置") return False if not to_user or not text: - self._log("warning", "发送 Markdown 失败:to_user 或 text 为空") + logger.warning("发送 Markdown 失败:to_user 或 text 为空") return False payload_candidates = self._build_markdown_payloads(str(to_user), text) if self._send_payload_candidates(to_user=to_user, payload_candidates=payload_candidates): @@ -817,10 +808,10 @@ class ILinkClient: context_token: Optional[str] = None, ) -> bool: if not self.bot_token: - self._log("warning", "发送图文失败:bot token 未配置") + logger.warning("发送图文失败:bot token 未配置") return False if not to_user or not image_bytes or not text: - self._log("warning", "发送图文失败:to_user 或 image_bytes 或 text 为空") + logger.warning("发送图文失败:to_user 或 image_bytes 或 text 为空") return False for user_id in self._build_user_candidates(to_user): upload_param, upload_full_url, aeskey, _, filekey = self._request_upload_param( @@ -878,10 +869,10 @@ class ILinkClient: self, to_user: str, image_bytes: bytes, context_token: Optional[str] = None ) -> bool: if not self.bot_token: - self._log("warning", "发送图片失败:bot token 未配置") + logger.warning("发送图片失败:bot token 未配置") return False if not to_user or not image_bytes: - self._log("warning", "发送图片失败:to_user 或 image_bytes 为空") + logger.warning("发送图片失败:to_user 或 image_bytes 为空") return False for user_id in self._build_user_candidates(to_user): upload_param, upload_full_url, aeskey, _, filekey = self._request_upload_param( @@ -922,10 +913,10 @@ class ILinkClient: context_token: Optional[str] = None, ) -> bool: if not self.bot_token: - self._log("warning", "发送文件失败:bot token 未配置") + logger.warning("发送文件失败:bot token 未配置") return False if not to_user or not file_bytes: - self._log("warning", "发送文件失败:to_user 或 file_bytes 为空") + logger.warning("发送文件失败:to_user 或 file_bytes 为空") return False file_name = file_name or "attachment" mime_type = mime_type or "application/octet-stream" @@ -1013,6 +1004,12 @@ class ILinkClient: return value def _parse_incoming(self, item: Dict[str, Any]) -> Optional[ILinkIncomingMessage]: + """ + 将 getupdates 返回的原始事件归一化为 MoviePilot 可消费的入站消息。 + + iLink 返回结构存在多种嵌套和字段别名,这里集中做兼容处理,统一提取 + 用户、消息ID、文本、图片、语音和文件附件。 + """ if not isinstance(item, dict): return None message = item @@ -1088,8 +1085,10 @@ class ILinkClient: or str(user_id) ) message_id = self._pick_value( - message, ["message_id", "msg_id", "id", "client_msg_id", "msgId"] - ) or self._pick_value(item, ["message_id", "msg_id", "id", "client_msg_id", "msgId"]) + message, ["message_id", "msg_id", "id", "client_msg_id", "msgId", "seq"] + ) or self._pick_value( + item, ["message_id", "msg_id", "id", "client_msg_id", "msgId", "seq"] + ) chat_id = self._pick_value( message, ["chat_id", "conversation_id", "room_id", "chatId", "conversationId", "roomId"], @@ -1261,6 +1260,7 @@ class ILinkClient: def _extract_updates( self, payload: Dict[str, Any] ) -> Tuple[List[Dict[str, Any]], Optional[str]]: + """从轮询响应中提取消息列表和下一轮使用的 sync_buf 游标。""" data = payload.get("data") or payload.get("result") or payload sync_buf = ( data.get("get_updates_buf") @@ -1306,6 +1306,12 @@ class ILinkClient: def poll_updates( self, timeout_seconds: int = 25 ) -> Tuple[List[ILinkIncomingMessage], Optional[str], Dict[str, Any]]: + """ + 执行一次 iLink 长轮询。 + + 返回值同时包含归一化后的消息、最新游标和原始结果摘要, + 便于上层在推进 sync_buf 的同时记录调试信息。 + """ if not self.bot_token: return [], self.sync_buf, {"success": False, "message": "bot token 未配置"} url = f"{self.base_url}/ilink/bot/getupdates" @@ -1407,18 +1413,6 @@ class WechatClawBot: if self._state.get("bot_token") and self._auto_start_polling: self._start_polling() - def _log(self, level: str, message: str) -> None: - text = f"[WechatClawBot][{self._config_name}] {message}" - level_value = (level or "info").lower() - if level_value == "debug": - logger.debug(text) - elif level_value == "warning": - logger.warning(text) - elif level_value == "error": - logger.error(text) - else: - logger.info(text) - def _load_state(self) -> Dict[str, Any]: content = self._filecache.get(self._cache_key) if not content: @@ -1441,7 +1435,7 @@ class WechatClawBot: data.setdefault("base_url", self._base_url) return data except Exception as err: - self._log("warning", f"加载登录状态失败,已重置缓存:{err}") + logger.warning(f"加载登录状态失败,已重置缓存:{err}") return { "bot_token": None, "account_id": None, @@ -1460,13 +1454,13 @@ class WechatClawBot: ) def _build_client(self) -> ILinkClient: + """根据当前持久化状态构建一次性的 iLink 客户端。""" return ILinkClient( base_url=self._state.get("base_url") or self._base_url, bot_token=self._state.get("bot_token"), account_id=self._state.get("account_id"), sync_buf=self._state.get("sync_buf"), timeout=max(self._poll_timeout, 20), - log_func=self._log, ) def _update_state(self, **kwargs) -> None: @@ -1607,7 +1601,7 @@ class WechatClawBot: if not content_type or "image" in content_type: return resp.content except Exception as err: - self._log("warning", f"加载图片失败:{err}") + logger.warning(f"加载图片失败:{err}") return None def get_state(self) -> bool: @@ -1627,9 +1621,15 @@ class WechatClawBot: self._stop_event.clear() self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True) self._poll_thread.start() - self._log("info", "消息轮询线程已启动") + logger.info("消息轮询线程已启动") def _poll_loop(self) -> None: + """ + 持续拉取微信消息并转发到本地消息入口。 + + 轮询异常时按退避策略重试;本地消息入口失败只记日志,不回滚 sync_buf, + 由上层的消息ID去重兜底,避免单次本地异常导致后续消息全部阻塞。 + """ consecutive_failures = 0 backoff = [1, 2, 5, 10, 30] while not self._stop_event.is_set() and self._state.get("bot_token"): @@ -1648,20 +1648,40 @@ class WechatClawBot: username=message.username, context_token=message.context_token, ) + response = None try: - RequestUtils(timeout=15).post_res( + response = RequestUtils(timeout=15).post_res( self._message_endpoint, json=message.to_message_payload(), ) + if response is None: + logger.error( + f"转发微信 ClawBot 消息失败:message_id={message.message_id}, " + "本地消息入口无响应" + ) + elif response.status_code != 200: + logger.error( + "转发微信 ClawBot 消息失败:" + f"message_id={message.message_id}, status={response.status_code}, " + f"body={ILinkClient._short_text(response.text)}" + ) except Exception as err: - self._log("error", f"转发微信 ClawBot 消息失败:{err}") + logger.error( + f"转发微信 ClawBot 消息失败:message_id={message.message_id}, error={err}" + ) + finally: + if response is not None: + try: + response.close() + except Exception: + pass consecutive_failures = 0 except Exception as err: consecutive_failures += 1 delay = backoff[min(consecutive_failures - 1, len(backoff) - 1)] - self._log("warning", f"轮询异常,{delay}s 后重试:{err}") + logger.warning(f"轮询异常,{delay}s 后重试:{err}") if consecutive_failures >= 10: - self._log("error", "轮询连续失败,已清理登录状态") + logger.error("轮询连续失败,已清理登录状态") self._clear_login_state() break self._stop_event.wait(delay) @@ -1684,7 +1704,10 @@ class WechatClawBot: def refresh_qrcode(self) -> Dict[str, Any]: if self._state.get("bot_token"): return self.get_status(refresh_remote=False) - client = ILinkClient(base_url=self._base_url, timeout=max(self._poll_timeout, 20), log_func=self._log) + client = ILinkClient( + base_url=self._base_url, + timeout=max(self._poll_timeout, 20), + ) result = client.get_qrcode() if not result.get("success"): return result @@ -1711,7 +1734,10 @@ class WechatClawBot: self.refresh_qrcode() qrcode = self._state.get("qrcode") or {} if refresh_remote and not self._state.get("bot_token") and qrcode.get("qrcode"): - client = ILinkClient(base_url=self._base_url, timeout=max(self._poll_timeout, 20), log_func=self._log) + client = ILinkClient( + base_url=self._base_url, + timeout=max(self._poll_timeout, 20), + ) result = client.get_qrcode_status(str(qrcode.get("qrcode"))) updated_qrcode = dict(qrcode) updated_qrcode["status"] = result.get("status") or updated_qrcode.get("status") or "waiting" @@ -1919,7 +1945,7 @@ class WechatClawBot: try: resp = RequestUtils(timeout=30).get_res(download_url) except Exception as err: - self._log("error", f"下载 {kind} 失败:{err}") + logger.error(f"下载 {kind} 失败:{err}") return None if not resp or not resp.content: return None @@ -1965,7 +1991,7 @@ class WechatClawBot: ) -> Optional[bool]: targets = self._get_targets(userid=userid) if not targets: - self._log("warning", "未找到可发送的微信 ClawBot 目标") + logger.warning("未找到可发送的微信 ClawBot 目标") return False image_bytes = self._load_remote_image(image) if image else None content = self._compose_markdown(title=title, text=text, link=link) @@ -2010,7 +2036,7 @@ class WechatClawBot: ) -> Optional[bool]: path = Path(file_path) if not path.exists() or not path.is_file(): - self._log("warning", f"待发送文件不存在:{file_path}") + logger.warning(f"待发送文件不存在:{file_path}") return False file_bytes = path.read_bytes() effective_name = file_name or path.name diff --git a/tests/test_wechatclawbot.py b/tests/test_wechatclawbot.py new file mode 100644 index 00000000..240f6863 --- /dev/null +++ b/tests/test_wechatclawbot.py @@ -0,0 +1,62 @@ +import json +import unittest +from types import SimpleNamespace +from unittest.mock import patch + +from app.modules.wechatclawbot import WechatClawBotModule +from app.modules.wechatclawbot.wechatclawbot import ILinkClient + + +class WechatClawBotTest(unittest.TestCase): + def test_ilink_parse_incoming_uses_seq_as_message_id_fallback(self): + client = ILinkClient(base_url="https://ilinkai.weixin.qq.com") + + message = client._parse_incoming( + { + "seq": 123456, + "from_user_id": "wxid_user_1", + "item_list": [{"type": 1, "text_item": {"text": "你好"}}], + } + ) + + self.assertIsNotNone(message) + self.assertEqual(message.message_id, "123456") + self.assertEqual(message.text, "你好") + + def test_wechatclawbot_message_parser_deduplicates_message_id(self): + module = WechatClawBotModule() + body = json.dumps( + { + "__channel__": "wechatclawbot", + "userid": "wxid_user_1", + "username": "tester", + "message_id": "msg-1001", + "text": "刷新订阅", + } + ) + + with patch.object( + module, + "get_config", + return_value=SimpleNamespace(name="wechatclawbot-test", config={}), + ): + first = module.message_parser( + source="wechatclawbot-test", + body=body, + form={}, + args={}, + ) + second = module.message_parser( + source="wechatclawbot-test", + body=body, + form={}, + args={}, + ) + + self.assertIsNotNone(first) + self.assertEqual(first.message_id, "msg-1001") + self.assertIsNone(second) + + +if __name__ == "__main__": + unittest.main()