diff --git a/app/modules/feishu/feishu.py b/app/modules/feishu/feishu.py index 607d6ec0..6da10e05 100644 --- a/app/modules/feishu/feishu.py +++ b/app/modules/feishu/feishu.py @@ -234,6 +234,7 @@ class Feishu: return "", None, None, None message_type = getattr(message, "message_type", None) + message_id = str(getattr(message, "message_id", None) or "").strip() text = content.get("text", "").strip() if isinstance(content.get("text"), str) else "" images = None audio_refs = None @@ -241,7 +242,6 @@ class Feishu: if message_type == "image": image_key = str(content.get("image_key") or "").strip() - message_id = str(getattr(message, "message_id", None) or "").strip() if image_key: if message_id: images = [CommingMessage.MessageImage(ref=f"feishu://image/{message_id}/{image_key}")] @@ -250,7 +250,6 @@ class Feishu: elif message_type in {"audio", "media", "file"}: file_key = str(content.get("file_key") or "").strip() file_name = str(content.get("file_name") or "").strip() or None - message_id = str(getattr(message, "message_id", None) or "").strip() if file_key: if message_type == "audio": resource_path = f"{message_id}/{file_key}" if message_id else file_key @@ -263,9 +262,99 @@ class Feishu: name=file_name, ) ] + elif message_type == "post" and not text: + text, images = Feishu._parse_post_message_content( + content=content, + message_id=message_id, + ) return text, images, audio_refs, files + @staticmethod + def _resolve_post_message_body(content: dict) -> Optional[dict]: + """解析飞书富文本消息在事件和 webhook 结构中的正文节点。""" + if isinstance(content.get("content"), list): + return content + + post = content.get("post") + if isinstance(post, dict): + preferred_locales = ("zh_cn", "en_us", "ja_jp") + for locale in preferred_locales: + locale_body = post.get(locale) + if isinstance(locale_body, dict): + return locale_body + for locale_body in post.values(): + if isinstance(locale_body, dict): + return locale_body + + return None + + @staticmethod + def _parse_post_element_text(element: dict) -> str: + """将飞书富文本元素转换为消息链可消费的纯文本片段。""" + tag = str(element.get("tag") or "").strip() + if tag in {"text", "plain_text"}: + return str(element.get("text") or element.get("content") or "") + if tag == "a": + link_text = str(element.get("text") or "").strip() + href = str(element.get("href") or element.get("url") or "").strip() + if link_text and href and link_text != href: + return f"{link_text} {href}" + return link_text or href + if tag == "at": + user_name = str(element.get("user_name") or element.get("name") or "").strip() + user_id = str(element.get("user_id") or "").strip() + target = user_name or user_id + return f" @{target}" if target else "" + if tag in {"code_block", "pre"}: + code = str(element.get("text") or element.get("content") or "").strip() + language = str(element.get("language") or "").strip() + if not code: + return "" + return f"```{language}\n{code}\n```" if language else f"```\n{code}\n```" + return str(element.get("text") or element.get("content") or "") + + @staticmethod + def _parse_post_message_content( + content: dict, + message_id: Optional[str] = None, + ) -> Tuple[str, Optional[List[CommingMessage.MessageImage]]]: + """从飞书富文本消息中提取可转发的文本和图片引用。""" + post_body = Feishu._resolve_post_message_body(content) + if not post_body: + return "", None + + lines = [] + title = str(post_body.get("title") or "").strip() + if title: + lines.append(title) + + images = [] + post_content = post_body.get("content") + if isinstance(post_content, list): + for row in post_content: + if not isinstance(row, list): + continue + row_parts = [] + for element in row: + if not isinstance(element, dict): + continue + image_key = str(element.get("image_key") or "").strip() + if element.get("tag") == "img" and image_key: + if message_id: + images.append(CommingMessage.MessageImage(ref=f"feishu://image/{message_id}/{image_key}")) + else: + images.append(CommingMessage.MessageImage(ref=f"feishu://image/{image_key}")) + element_text = Feishu._parse_post_element_text(element) + if element_text: + row_parts.append(element_text) + row_text = "".join(row_parts).strip() + if row_text: + lines.append(row_text) + + text = "\n".join(lines).strip() + return text, images or None + def _remember_target(self, userid: Optional[str], chat_id: Optional[str]) -> None: """记录最近互动的用户与会话映射,便于后续主动回复。""" normalized_userid = (userid or "").strip() diff --git a/tests/test_feishu_post_message.py b/tests/test_feishu_post_message.py new file mode 100644 index 00000000..b1c541c1 --- /dev/null +++ b/tests/test_feishu_post_message.py @@ -0,0 +1,100 @@ +import json +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +from app.testing.bootstrap import ensure_optional_stub + +ensure_optional_stub("psutil") +ensure_optional_stub("dateparser") +ensure_optional_stub("Pinyin2Hanzi", is_pinyin=lambda value: False) + +from app.modules.feishu.feishu import Feishu + + +def _build_feishu_client() -> Feishu: + """构造不会启动飞书长连接的测试客户端。""" + with ( + patch.object(Feishu, "_build_api_client", return_value=MagicMock()), + patch.object(Feishu, "_start_ws_client"), + ): + return Feishu( + FEISHU_APP_ID="test_app_id", + FEISHU_APP_SECRET="test_app_secret", + name="feishu-test", + ) + + +def test_on_message_extracts_localized_post_text_and_images(): + """飞书富文本事件应提取标题、正文、链接和图片引用。""" + client = _build_feishu_client() + message = SimpleNamespace( + message_id="om_post_evt", + chat_id="oc_chat_evt", + chat_type="p2p", + message_type="post", + content=json.dumps( + { + "post": { + "zh_cn": { + "title": "搜索请求", + "content": [ + [ + {"tag": "text", "text": "/search "}, + { + "tag": "a", + "text": "MoviePilot", + "href": "https://example.com/moviepilot", + }, + ], + [ + {"tag": "at", "user_name": "管理员"}, + {"tag": "text", "text": " 请处理"}, + ], + [{"tag": "img", "image_key": "img_v2_post"}], + ], + } + } + }, + ensure_ascii=False, + ), + ) + sender = SimpleNamespace( + sender_id=SimpleNamespace(open_id="ou_user_evt", user_id=None) + ) + event = SimpleNamespace(sender=sender, message=message) + + with patch.object(client, "_forward_to_message_chain") as forward: + client._on_message(SimpleNamespace(event=event)) + + payload = forward.call_args.args[0] + assert payload["text"] == ( + "搜索请求\n" + "/search MoviePilot https://example.com/moviepilot\n" + "@管理员 请处理" + ) + assert payload["images"][0]["ref"] == "feishu://image/om_post_evt/img_v2_post" + + +def test_parse_message_content_supports_direct_post_body(): + """飞书富文本直接正文结构应被转换为普通文本。""" + message = SimpleNamespace( + message_id="om_post_direct", + message_type="post", + content=json.dumps( + { + "title": "", + "content": [ + [{"tag": "text", "text": "/help"}], + [{"tag": "text", "text": "第二行"}], + ], + }, + ensure_ascii=False, + ), + ) + + text, images, audio_refs, files = Feishu._parse_message_content(message) + + assert text == "/help\n第二行" + assert images is None + assert audio_refs is None + assert files is None