diff --git a/app/modules/wechatclawbot/wechatclawbot.py b/app/modules/wechatclawbot/wechatclawbot.py index 7c0f65ed..d8282c06 100644 --- a/app/modules/wechatclawbot/wechatclawbot.py +++ b/app/modules/wechatclawbot/wechatclawbot.py @@ -1238,23 +1238,32 @@ class ILinkClient: def _extract_updates( self, payload: Dict[str, Any] ) -> Tuple[List[Dict[str, Any]], Optional[str]]: - """按官方 getupdates 协议提取顶层 msgs 与 get_updates_buf。""" - sync_buf = self._pick_present_value(payload, ["get_updates_buf"]) + """ + 提取轮询结果中的消息列表与游标。 + + 线上存在两种等价字段命名:较新的实现返回 `get_updates_buf`, + 部分实例仍然返回 `sync_buf`。两者都表示下一轮轮询应携带的游标。 + """ + sync_buf = self._pick_present_value( + payload, ["get_updates_buf", "sync_buf", "syncBuf"] + ) items = payload.get("msgs") if isinstance(items, list): return items, sync_buf return [], sync_buf - def _has_canonical_poll_shape(self, payload: Dict[str, Any]) -> bool: + @staticmethod + def _has_canonical_poll_shape(payload: Dict[str, Any]) -> bool: """官方响应至少应包含顶层 msgs 列表。""" return isinstance(payload.get("msgs"), list) - def _is_poll_success(self, payload: Dict[str, Any]) -> bool: + def _resolve_poll_success(self, payload: Dict[str, Any]) -> Optional[bool]: """ - 判断 getupdates 是否明确成功。 + 判断 getupdates 是否给出了明确的成功/失败信号。 轮询接口不能沿用“只要没有明显报错就算成功”的宽松策略,否则服务端返回旧消息列表、 但状态码其实失败时,会被误判为可消费响应,导致旧消息再次进入业务链路。 + 返回 `None` 表示响应里没有显式状态,需要交给协议结构继续判断。 """ if not payload: return False @@ -1286,7 +1295,7 @@ class ILinkClient: return True if lowered in {"failed", "fail", "error", "denied", "blocked"}: return False - return False + return None def _build_poll_result( self, @@ -1320,16 +1329,22 @@ class ILinkClient: if not self.bot_token: return [], self.sync_buf, {"success": False, "message": "bot token 未配置"} url = f"{self.base_url}/ilink/bot/getupdates" - payload = {} request_body = self._with_base_info({"get_updates_buf": self.sync_buf or ""}) resp = RequestUtils( headers=self._headers(auth_required=True), timeout=timeout_seconds + 10, ).post(url, json=request_body) payload = self._json(resp) - success = bool(payload and self._is_poll_success(payload)) + explicit_success = self._resolve_poll_success(payload) + has_canonical_shape = self._has_canonical_poll_shape(payload) + # 某些 iLink 部署不会返回 ret/success,但顶层 msgs + sync_buf 已经足够表明 + # 这是一次有效的轮询响应;只有出现显式失败信号时才应拒绝消费。 + success = bool(payload) and ( + explicit_success is True + or (explicit_success is None and has_canonical_shape) + ) last_message = None - if payload and not success: + if payload and explicit_success is False: last_message = self._find_first_value( payload, ["errmsg", "message", "error", "error_msg", "detail"] ) or self._short_text(payload) @@ -1344,7 +1359,7 @@ class ILinkClient: payload=payload, message=last_message or "轮询响应未明确成功", ) - if not self._has_canonical_poll_shape(payload): + if not has_canonical_shape: logger.warning( "getupdates 返回非官方结构,已拒绝消费: %s", self._short_text(payload), @@ -1592,7 +1607,8 @@ class WechatClawBot: return "image/gif" return "application/octet-stream" - def _load_remote_image(self, image: str) -> Optional[bytes]: + @staticmethod + def _load_remote_image(image: str) -> Optional[bytes]: image_url = str(image or "").strip() if not image_url: return None diff --git a/tests/test_wechatclawbot.py b/tests/test_wechatclawbot.py index e3d7cb9c..599fc369 100644 --- a/tests/test_wechatclawbot.py +++ b/tests/test_wechatclawbot.py @@ -108,6 +108,33 @@ class WechatClawBotTest(unittest.TestCase): self.assertEqual(client.sync_buf, "") self.assertEqual(len(messages), 1) + def test_ilink_poll_updates_accepts_canonical_payload_without_explicit_success_flag(self): + client = ILinkClient( + base_url="https://ilinkai.weixin.qq.com", + bot_token="token", + sync_buf="cursor-old", + ) + response = MagicMock() + response.json.return_value = { + "sync_buf": "cursor-new", + "msgs": [ + { + "message_id": "msg-1002", + "from_user_id": "wxid_user_2", + "item_list": [{"type": 1, "text_item": {"text": "收到"}}], + } + ], + } + + with patch("app.modules.wechatclawbot.wechatclawbot.RequestUtils.post", return_value=response): + messages, sync_buf, result = client.poll_updates() + + self.assertTrue(result["success"]) + self.assertEqual(sync_buf, "cursor-new") + self.assertEqual(client.sync_buf, "cursor-new") + self.assertEqual(len(messages), 1) + self.assertEqual(messages[0].text, "收到") + def test_ilink_poll_updates_rejects_noncanonical_nested_success_payload(self): client = ILinkClient( base_url="https://ilinkai.weixin.qq.com",