fix: handle images in Feishu streaming replies

This commit is contained in:
jxxghp
2026-05-22 20:43:54 +08:00
parent bd4d493f34
commit 052e1ca8e4
3 changed files with 494 additions and 3 deletions

View File

@@ -191,6 +191,7 @@ class FeishuModule(_ModuleBase, _MessageBase[Feishu]):
text=text,
buttons=buttons,
metadata=metadata,
chat_id=str(chat_id) if chat_id else None,
):
return True
return False

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import re
import tempfile
import threading
import uuid
@@ -65,6 +66,7 @@ class Feishu:
STREAM_CARD_TITLE_ELEMENT_ID = "mp_stream_title"
STREAM_CARD_BODY_ELEMENT_ID = "mp_stream_body"
IMAGE_SUFFIXES = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".ico", ".tiff", ".heic"}
MARKDOWN_IMAGE_PATTERN = re.compile(r"!\[(?P<alt>[^\]\n]*)]\((?P<target>[^)\n]*)\)")
def __init__(
self,
@@ -582,6 +584,104 @@ class Feishu:
escaped = escaped.replace(source, target)
return escaped
@classmethod
def _strip_streaming_markdown_images(cls, text: Optional[str]) -> str:
"""从流式卡片文本中剥离 Markdown 图片语法,图片由独立消息发送。"""
if not text:
return ""
normalized_text = cls._strip_trailing_incomplete_markdown_image(str(text))
parts = []
last_end = 0
for match in cls.MARKDOWN_IMAGE_PATTERN.finditer(normalized_text):
parts.append(normalized_text[last_end:match.start()])
alt_text = (match.group("alt") or "").strip()
if alt_text:
parts.append(alt_text)
last_end = match.end()
parts.append(normalized_text[last_end:])
return "".join(parts)
@classmethod
def _strip_trailing_incomplete_markdown_image(cls, text: str) -> str:
"""隐藏末尾尚未闭合的 Markdown 图片片段,等流式累计完整后再处理。"""
if not text:
return ""
start = text.rfind("![")
if start < 0:
return text
fragment = text[start:]
if "\n" in fragment or "\r" in fragment or cls.MARKDOWN_IMAGE_PATTERN.fullmatch(fragment):
return text
if ")" not in fragment:
return text[:start].rstrip()
return text
@classmethod
def _extract_markdown_image_urls(cls, text: Optional[str]) -> List[str]:
"""提取 Markdown 图片中的外部 URL供 Agent 流式回复单独发送图片。"""
if not text:
return []
urls = []
for match in cls.MARKDOWN_IMAGE_PATTERN.finditer(str(text)):
image_url = (match.group("target") or "").strip()
if image_url and cls._is_external_image_url(image_url):
urls.append(image_url)
return urls
@staticmethod
def _is_external_image_url(image_url: str) -> bool:
"""判断图片地址是否可以按远程图片下载上传。"""
normalized_url = (image_url or "").strip().lower()
return normalized_url.startswith(("http://", "https://", "feishu://image/"))
@classmethod
def _is_supported_remote_image_response(
cls,
image_url: str,
content_type: Optional[str] = None,
content: Optional[bytes] = None,
) -> bool:
"""校验远程响应是否像图片,避免把普通网页链接上传到飞书图片接口。"""
normalized_type = (content_type or "").split(";", 1)[0].strip().lower()
if normalized_type:
return normalized_type.startswith("image/")
path_suffix = Path(urlparse(image_url).path).suffix.lower()
return path_suffix in cls.IMAGE_SUFFIXES and cls._looks_like_image_content(content)
@staticmethod
def _looks_like_image_content(content: Optional[bytes]) -> bool:
"""在响应缺少 Content-Type 时用文件头兜底判断是否为常见图片。"""
if not content:
return False
head = bytes(content[:32])
if head.startswith((b"\xff\xd8\xff", b"\x89PNG\r\n\x1a\n", b"GIF87a", b"GIF89a", b"BM")):
return True
if head.startswith((b"II*\x00", b"MM\x00*", b"\x00\x00\x01\x00")):
return True
if len(head) >= 12 and head[:4] == b"RIFF" and head[8:12] == b"WEBP":
return True
if len(head) >= 12 and head[4:8] == b"ftyp" and head[8:12] in {
b"heic", b"heix", b"hevc", b"hevx", b"mif1", b"msf1", b"avif",
}:
return True
return False
@staticmethod
def _dedupe_image_urls(image_urls: List[str]) -> List[str]:
"""按出现顺序去重图片 URL避免 Agent 同一张图重复发送。"""
deduped = []
seen = set()
for image_url in image_urls:
normalized_url = (image_url or "").strip()
if not normalized_url or normalized_url in seen:
continue
seen.add(normalized_url)
deduped.append(normalized_url)
return deduped
@classmethod
def _build_markdown_section(
cls,
@@ -652,6 +752,9 @@ class Feishu:
logger.warning(f"飞书图片下载失败:{image_url}")
return None
content_type = response.headers.get("Content-Type") if response.headers else None
if not self._is_supported_remote_image_response(image_url, content_type, response.content):
logger.warning(f"飞书图片地址不是有效图片:{image_url}, content_type={content_type}")
return None
suffix = self._guess_image_suffix(image_url=image_url, content_type=content_type)
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as fp:
fp.write(response.content)
@@ -809,6 +912,9 @@ class Feishu:
"""构建支持 CardKit 流式更新的飞书卡片 JSON 2.0。"""
elements: List[dict] = []
title_content = self._escape_card_text(title).strip() if title else ""
body_content = self._escape_card_text(
self._strip_streaming_markdown_images(text)
).strip()
if title_content:
elements.append(
{
@@ -821,7 +927,7 @@ class Feishu:
{
"tag": "markdown",
"element_id": self.STREAM_CARD_BODY_ELEMENT_ID,
"content": self._escape_card_text(text).strip() or " ",
"content": body_content or " ",
}
)
return {
@@ -914,6 +1020,49 @@ class Feishu:
}
return result
def _send_agent_streaming_images(
self,
image_urls: List[str],
userid: Optional[str] = None,
chat_id: Optional[str] = None,
receive_id_type: Optional[str] = None,
sent_image_urls: Optional[List[str]] = None,
) -> List[str]:
"""将 Agent 流式回复中的图片作为独立图片卡片发送,避免污染流式文本组件。"""
sent_images = list(sent_image_urls or [])
pending_image_urls = [
image_url
for image_url in self._dedupe_image_urls(image_urls)
if image_url not in sent_images
]
for image_url in pending_image_urls:
image_key = self._upload_remote_image(image_url)
if not image_key:
continue
payload = self._build_card(
title=None,
text=None,
link=None,
buttons=None,
image_key=image_key,
)
try:
receive_id, resolved_receive_id_type = self._resolve_target(
userid=userid,
chat_id=chat_id,
receive_id_type=receive_id_type,
)
self._send_message(
receive_id,
resolved_receive_id_type,
"interactive",
payload,
)
sent_images.append(image_url)
except Exception as err:
logger.error(f"飞书 Agent 图片消息发送失败:{err}")
return sent_images
def _update_streaming_card_content(
self,
card_id: str,
@@ -1371,6 +1520,10 @@ class Feishu:
)
if is_streaming_agent_text:
try:
stream_image_urls = []
if self._is_external_image_url(message.image):
stream_image_urls.append(message.image)
stream_image_urls.extend(self._extract_markdown_image_urls(message.text))
result = self._send_streaming_card_message(
title=message.title,
text=message.text,
@@ -1386,6 +1539,15 @@ class Feishu:
return {"success": False}
result["chat_id"] = result.get("chat_id") or chat_id or self._user_chat_mapping.get(
userid or "") or self._default_chat_id
sent_image_urls = self._send_agent_streaming_images(
stream_image_urls,
userid=userid,
chat_id=result.get("chat_id") or chat_id,
receive_id_type=receive_id_type,
)
stream_meta = result.get("metadata", {}).get("feishu_streaming")
if isinstance(stream_meta, dict):
stream_meta["sent_image_urls"] = sent_image_urls
return result
image_key = self._upload_remote_image(message.image)
@@ -1426,7 +1588,8 @@ class Feishu:
return result
def edit_message(self, message_id: str, title: Optional[str] = None, text: Optional[str] = None,
buttons: Optional[List[List[dict]]] = None, metadata: Optional[dict] = None) -> bool:
buttons: Optional[List[List[dict]]] = None, metadata: Optional[dict] = None,
chat_id: Optional[str] = None) -> bool:
"""编辑已发送的飞书交互卡片消息。"""
if not self._api_client:
return False
@@ -1441,12 +1604,21 @@ class Feishu:
stream_meta["sequence"] = sequence
if card_id and element_id:
content = self._escape_card_text(
self._strip_streaming_markdown_images(text)
).strip()
if self._update_streaming_card_content(
card_id=card_id,
element_id=element_id,
content=self._escape_card_text(text).strip() or " ",
content=content or " ",
sequence=sequence,
):
stream_image_urls = self._extract_markdown_image_urls(text)
stream_meta["sent_image_urls"] = self._send_agent_streaming_images(
stream_image_urls,
chat_id=chat_id,
sent_image_urls=stream_meta.get("sent_image_urls") or [],
)
return True
logger.error("飞书流式更新失败被拦截,直接返回 False 以防止降级为普通卡片")
return False