mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-22 07:54:06 +08:00
fix: parse feishu post messages
This commit is contained in:
@@ -234,6 +234,7 @@ class Feishu:
|
||||
return "", None, None, None
|
||||
|
||||
message_type = getattr(message, "message_type", None)
|
||||
message_id = str(getattr(message, "message_id", None) or "").strip()
|
||||
text = content.get("text", "").strip() if isinstance(content.get("text"), str) else ""
|
||||
images = None
|
||||
audio_refs = None
|
||||
@@ -241,7 +242,6 @@ class Feishu:
|
||||
|
||||
if message_type == "image":
|
||||
image_key = str(content.get("image_key") or "").strip()
|
||||
message_id = str(getattr(message, "message_id", None) or "").strip()
|
||||
if image_key:
|
||||
if message_id:
|
||||
images = [CommingMessage.MessageImage(ref=f"feishu://image/{message_id}/{image_key}")]
|
||||
@@ -250,7 +250,6 @@ class Feishu:
|
||||
elif message_type in {"audio", "media", "file"}:
|
||||
file_key = str(content.get("file_key") or "").strip()
|
||||
file_name = str(content.get("file_name") or "").strip() or None
|
||||
message_id = str(getattr(message, "message_id", None) or "").strip()
|
||||
if file_key:
|
||||
if message_type == "audio":
|
||||
resource_path = f"{message_id}/{file_key}" if message_id else file_key
|
||||
@@ -263,9 +262,99 @@ class Feishu:
|
||||
name=file_name,
|
||||
)
|
||||
]
|
||||
elif message_type == "post" and not text:
|
||||
text, images = Feishu._parse_post_message_content(
|
||||
content=content,
|
||||
message_id=message_id,
|
||||
)
|
||||
|
||||
return text, images, audio_refs, files
|
||||
|
||||
@staticmethod
|
||||
def _resolve_post_message_body(content: dict) -> Optional[dict]:
|
||||
"""解析飞书富文本消息在事件和 webhook 结构中的正文节点。"""
|
||||
if isinstance(content.get("content"), list):
|
||||
return content
|
||||
|
||||
post = content.get("post")
|
||||
if isinstance(post, dict):
|
||||
preferred_locales = ("zh_cn", "en_us", "ja_jp")
|
||||
for locale in preferred_locales:
|
||||
locale_body = post.get(locale)
|
||||
if isinstance(locale_body, dict):
|
||||
return locale_body
|
||||
for locale_body in post.values():
|
||||
if isinstance(locale_body, dict):
|
||||
return locale_body
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_post_element_text(element: dict) -> str:
|
||||
"""将飞书富文本元素转换为消息链可消费的纯文本片段。"""
|
||||
tag = str(element.get("tag") or "").strip()
|
||||
if tag in {"text", "plain_text"}:
|
||||
return str(element.get("text") or element.get("content") or "")
|
||||
if tag == "a":
|
||||
link_text = str(element.get("text") or "").strip()
|
||||
href = str(element.get("href") or element.get("url") or "").strip()
|
||||
if link_text and href and link_text != href:
|
||||
return f"{link_text} {href}"
|
||||
return link_text or href
|
||||
if tag == "at":
|
||||
user_name = str(element.get("user_name") or element.get("name") or "").strip()
|
||||
user_id = str(element.get("user_id") or "").strip()
|
||||
target = user_name or user_id
|
||||
return f" @{target}" if target else ""
|
||||
if tag in {"code_block", "pre"}:
|
||||
code = str(element.get("text") or element.get("content") or "").strip()
|
||||
language = str(element.get("language") or "").strip()
|
||||
if not code:
|
||||
return ""
|
||||
return f"```{language}\n{code}\n```" if language else f"```\n{code}\n```"
|
||||
return str(element.get("text") or element.get("content") or "")
|
||||
|
||||
@staticmethod
|
||||
def _parse_post_message_content(
|
||||
content: dict,
|
||||
message_id: Optional[str] = None,
|
||||
) -> Tuple[str, Optional[List[CommingMessage.MessageImage]]]:
|
||||
"""从飞书富文本消息中提取可转发的文本和图片引用。"""
|
||||
post_body = Feishu._resolve_post_message_body(content)
|
||||
if not post_body:
|
||||
return "", None
|
||||
|
||||
lines = []
|
||||
title = str(post_body.get("title") or "").strip()
|
||||
if title:
|
||||
lines.append(title)
|
||||
|
||||
images = []
|
||||
post_content = post_body.get("content")
|
||||
if isinstance(post_content, list):
|
||||
for row in post_content:
|
||||
if not isinstance(row, list):
|
||||
continue
|
||||
row_parts = []
|
||||
for element in row:
|
||||
if not isinstance(element, dict):
|
||||
continue
|
||||
image_key = str(element.get("image_key") or "").strip()
|
||||
if element.get("tag") == "img" and image_key:
|
||||
if message_id:
|
||||
images.append(CommingMessage.MessageImage(ref=f"feishu://image/{message_id}/{image_key}"))
|
||||
else:
|
||||
images.append(CommingMessage.MessageImage(ref=f"feishu://image/{image_key}"))
|
||||
element_text = Feishu._parse_post_element_text(element)
|
||||
if element_text:
|
||||
row_parts.append(element_text)
|
||||
row_text = "".join(row_parts).strip()
|
||||
if row_text:
|
||||
lines.append(row_text)
|
||||
|
||||
text = "\n".join(lines).strip()
|
||||
return text, images or None
|
||||
|
||||
def _remember_target(self, userid: Optional[str], chat_id: Optional[str]) -> None:
|
||||
"""记录最近互动的用户与会话映射,便于后续主动回复。"""
|
||||
normalized_userid = (userid or "").strip()
|
||||
|
||||
100
tests/test_feishu_post_message.py
Normal file
100
tests/test_feishu_post_message.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from app.testing.bootstrap import ensure_optional_stub
|
||||
|
||||
ensure_optional_stub("psutil")
|
||||
ensure_optional_stub("dateparser")
|
||||
ensure_optional_stub("Pinyin2Hanzi", is_pinyin=lambda value: False)
|
||||
|
||||
from app.modules.feishu.feishu import Feishu
|
||||
|
||||
|
||||
def _build_feishu_client() -> Feishu:
|
||||
"""构造不会启动飞书长连接的测试客户端。"""
|
||||
with (
|
||||
patch.object(Feishu, "_build_api_client", return_value=MagicMock()),
|
||||
patch.object(Feishu, "_start_ws_client"),
|
||||
):
|
||||
return Feishu(
|
||||
FEISHU_APP_ID="test_app_id",
|
||||
FEISHU_APP_SECRET="test_app_secret",
|
||||
name="feishu-test",
|
||||
)
|
||||
|
||||
|
||||
def test_on_message_extracts_localized_post_text_and_images():
|
||||
"""飞书富文本事件应提取标题、正文、链接和图片引用。"""
|
||||
client = _build_feishu_client()
|
||||
message = SimpleNamespace(
|
||||
message_id="om_post_evt",
|
||||
chat_id="oc_chat_evt",
|
||||
chat_type="p2p",
|
||||
message_type="post",
|
||||
content=json.dumps(
|
||||
{
|
||||
"post": {
|
||||
"zh_cn": {
|
||||
"title": "搜索请求",
|
||||
"content": [
|
||||
[
|
||||
{"tag": "text", "text": "/search "},
|
||||
{
|
||||
"tag": "a",
|
||||
"text": "MoviePilot",
|
||||
"href": "https://example.com/moviepilot",
|
||||
},
|
||||
],
|
||||
[
|
||||
{"tag": "at", "user_name": "管理员"},
|
||||
{"tag": "text", "text": " 请处理"},
|
||||
],
|
||||
[{"tag": "img", "image_key": "img_v2_post"}],
|
||||
],
|
||||
}
|
||||
}
|
||||
},
|
||||
ensure_ascii=False,
|
||||
),
|
||||
)
|
||||
sender = SimpleNamespace(
|
||||
sender_id=SimpleNamespace(open_id="ou_user_evt", user_id=None)
|
||||
)
|
||||
event = SimpleNamespace(sender=sender, message=message)
|
||||
|
||||
with patch.object(client, "_forward_to_message_chain") as forward:
|
||||
client._on_message(SimpleNamespace(event=event))
|
||||
|
||||
payload = forward.call_args.args[0]
|
||||
assert payload["text"] == (
|
||||
"搜索请求\n"
|
||||
"/search MoviePilot https://example.com/moviepilot\n"
|
||||
"@管理员 请处理"
|
||||
)
|
||||
assert payload["images"][0]["ref"] == "feishu://image/om_post_evt/img_v2_post"
|
||||
|
||||
|
||||
def test_parse_message_content_supports_direct_post_body():
|
||||
"""飞书富文本直接正文结构应被转换为普通文本。"""
|
||||
message = SimpleNamespace(
|
||||
message_id="om_post_direct",
|
||||
message_type="post",
|
||||
content=json.dumps(
|
||||
{
|
||||
"title": "",
|
||||
"content": [
|
||||
[{"tag": "text", "text": "/help"}],
|
||||
[{"tag": "text", "text": "第二行"}],
|
||||
],
|
||||
},
|
||||
ensure_ascii=False,
|
||||
),
|
||||
)
|
||||
|
||||
text, images, audio_refs, files = Feishu._parse_message_content(message)
|
||||
|
||||
assert text == "/help\n第二行"
|
||||
assert images is None
|
||||
assert audio_refs is None
|
||||
assert files is None
|
||||
Reference in New Issue
Block a user