From 039558d2409a4cb548e1b9edde231cac4a2e368a Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 17 Jun 2026 11:09:16 +0800 Subject: [PATCH] feat: enhance WebAgent audio handling with format conversion and transcription support --- app/api/endpoints/agent.py | 158 ++++++++++++++++++++++++++++- tests/test_agent_llm_capability.py | 6 ++ tests/test_agent_tool_streaming.py | 2 +- tests/test_web_agent_stream.py | 111 ++++++++++++++++++++ 4 files changed, 273 insertions(+), 4 deletions(-) diff --git a/app/api/endpoints/agent.py b/app/api/endpoints/agent.py index 3c2028dd..d36a13ec 100644 --- a/app/api/endpoints/agent.py +++ b/app/api/endpoints/agent.py @@ -2,6 +2,8 @@ import asyncio import hashlib import json import mimetypes +import shutil +import subprocess import time import uuid from pathlib import Path @@ -12,6 +14,7 @@ from fastapi.responses import FileResponse, StreamingResponse from app import schemas from app.agent import MoviePilotAgent, ReplyMode, StreamingHandler +from app.agent.llm.capability import AgentCapabilityManager from app.core.config import global_vars, settings from app.db.models import User from app.db.user_oper import UserOper, get_current_active_user @@ -27,6 +30,7 @@ WEB_AGENT_FILE_TTL_SECONDS = 6 * 60 * 60 WEB_AGENT_FILE_MAX_ITEMS = 256 WEB_AGENT_UPLOAD_MAX_BYTES = 32 * 1024 * 1024 WEB_AGENT_UPLOAD_CHUNK_SIZE = 1024 * 1024 +WEB_AGENT_BROWSER_AUDIO_SUFFIXES = {".aac", ".m4a", ".mp3", ".mp4", ".wav", ".wave"} _WEB_AGENT_FILE_REGISTRY: dict[str, dict[str, Any]] = {} @@ -342,6 +346,137 @@ def _register_web_agent_file( } +def _get_web_agent_audio_mime_type(audio_path: Path) -> Optional[str]: + """ + 生成浏览器播放更友好的音频 MIME 类型。 + + :param audio_path: 音频文件路径 + :return: 可用于 FileResponse/audio 标签的 MIME 类型 + """ + suffix = audio_path.suffix.lower() + if suffix in {".wav", ".wave"}: + return "audio/wav" + if suffix == ".mp3": + return "audio/mpeg" + if suffix in {".m4a", ".mp4"}: + return "audio/mp4" + if suffix == ".aac": + return "audio/aac" + + return mimetypes.guess_type(audio_path.name)[0] + + +def _prepare_web_agent_audio_attachment_path(voice_path: str) -> Path: + """ + 将 Agent 语音回复准备成 Web 面板可稳定播放的音频文件。 + + 部分 TTS provider 会生成 Opus/Ogg,桌面 Chromium 通常可播放,但 iOS/Safari + 兼容性不稳定;WebAgent 只在浏览器内播放,因此这里单独转成 WAV。 + """ + try: + source_path = Path(voice_path).expanduser().resolve(strict=True) + except OSError: + return Path(voice_path) + if source_path.suffix.lower() in WEB_AGENT_BROWSER_AUDIO_SUFFIXES: + return source_path + if not shutil.which("ffmpeg"): + logger.warning("WebAgent 语音转 WAV 跳过:ffmpeg 不可用,path=%s", source_path) + return source_path + + voice_dir = settings.TEMP_PATH / "voice" + voice_dir.mkdir(parents=True, exist_ok=True) + output_path = voice_dir / f"{source_path.stem}_web_{uuid.uuid4().hex[:8]}.wav" + cmd = [ + "ffmpeg", + "-y", + "-i", + str(source_path), + "-ar", + "24000", + "-ac", + "1", + "-f", + "wav", + str(output_path), + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=False) + if result.returncode != 0 or not output_path.exists(): + logger.warning( + "WebAgent 语音转 WAV 失败,将回退原文件: returncode=%s, stderr=%s", + result.returncode, + (result.stderr or "").strip()[:500], + ) + return source_path + return output_path + + +def _get_web_agent_registered_file(ref: str) -> Optional[dict[str, Any]]: + """ + 根据前端附件引用读取 WebAgent 临时文件登记信息。 + + :param ref: message/agent/file/{file_id} 形式的短期引用 + :return: 文件登记信息,引用无效或过期时返回 None + """ + normalized_ref = (ref or "").strip() + prefix = "message/agent/file/" + if not normalized_ref.startswith(prefix): + return None + + _cleanup_web_agent_file_registry() + file_id = normalized_ref[len(prefix):].split("/", 1)[0] + return _WEB_AGENT_FILE_REGISTRY.get(file_id) + + +def _transcribe_web_agent_audio_refs(audio_refs: list[str]) -> Optional[str]: + """ + 转写 WebAgent 上传的本地录音附件。 + + Web 面板上传后的音频已经保存在短期文件登记表里,不能再像第三方渠道那样 + 走模块下载逻辑;这里直接读取临时文件并调用当前音频输入 provider。 + """ + if not audio_refs: + return None + if not AgentCapabilityManager.is_audio_input_available(): + logger.warning("WebAgent 音频输入能力未配置或未启用,跳过语音识别") + return None + + transcripts = [] + for audio_ref in audio_refs: + file_info = _get_web_agent_registered_file(audio_ref) + if not file_info: + logger.warning("WebAgent 语音引用不存在或已过期: ref=%s", audio_ref) + continue + + file_path = Path(file_info["path"]) + try: + content = file_path.read_bytes() + except OSError as err: + logger.warning("WebAgent 语音文件读取失败: ref=%s, error=%s", audio_ref, err) + continue + + transcript = AgentCapabilityManager.transcribe_audio( + content=content, + filename=file_info.get("name") or file_path.name, + ) + if transcript: + transcripts.append(transcript) + + return "\n".join(transcripts).strip() if transcripts else None + + +def _merge_web_agent_prompt_with_transcript(prompt: str, transcript: Optional[str]) -> str: + """合并用户输入文本和语音转写文本,避免重复发送相同内容。""" + merged_parts = [] + seen_parts = set() + for item in (prompt, transcript or ""): + normalized = item.strip() + if not normalized or normalized in seen_parts: + continue + seen_parts.add(normalized) + merged_parts.append(normalized) + return "\n".join(merged_parts).strip() + + def _parse_web_agent_choice_callback(callback_data: str) -> Optional[tuple[str, int]]: """ 解析 Web Agent 按钮选择回调数据。 @@ -484,10 +619,12 @@ def _build_web_agent_notification_events( events.append({"type": "attachment", "attachment": attachment}) if notification.voice_path: + audio_path = _prepare_web_agent_audio_attachment_path(notification.voice_path) attachment = _register_web_agent_file( - notification.voice_path, - file_name=Path(notification.voice_path).name, + str(audio_path), + file_name=audio_path.name, kind="audio", + mime_type=_get_web_agent_audio_mime_type(audio_path), ) if attachment: events.append({"type": "attachment", "attachment": attachment}) @@ -679,6 +816,19 @@ async def web_agent_stream( ) prompt = payload.text.strip() + transcript = _transcribe_web_agent_audio_refs(payload.audio_refs or []) + prompt = _merge_web_agent_prompt_with_transcript(prompt, transcript) + has_audio_input = bool(transcript) + if not prompt and payload.audio_refs and not payload.images and not payload.files: + return StreamingResponse( + iter([ + _build_web_agent_sse( + "error", + {"message": "语音识别失败,请稍后重试。"}, + ) + ]), + media_type="text/event-stream", + ) if not prompt and not payload.images and not payload.files and not payload.audio_refs: return StreamingResponse( iter([ @@ -715,9 +865,11 @@ async def web_agent_stream( """ 生成前端 Agent SSE 事件。 """ + audio_ref_set = set(payload.audio_refs or []) files = [ file.model_dump(exclude_none=True) for file in (payload.files or []) + if file.ref not in audio_ref_set ] for audio_ref in payload.audio_refs or []: files.append({"ref": audio_ref, "mime_type": "audio/*"}) @@ -742,7 +894,7 @@ async def web_agent_stream( message=prompt, images=payload.images or [], files=files or None, - has_audio_input=bool(payload.audio_refs), + has_audio_input=has_audio_input, ) except Exception as err: logger.error(f"Web智能助手执行失败: {str(err)}") diff --git a/tests/test_agent_llm_capability.py b/tests/test_agent_llm_capability.py index 616eb709..3be54a36 100644 --- a/tests/test_agent_llm_capability.py +++ b/tests/test_agent_llm_capability.py @@ -182,6 +182,11 @@ class AgentCapabilityManagerTest(unittest.TestCase): self.assertTrue( AgentCapabilityManager.supports_native_voice_reply("Feishu", None) ) + self.assertTrue( + AgentCapabilityManager.supports_native_voice_reply( + MessageChannel.WebAgent.value, None + ) + ) self.assertFalse( AgentCapabilityManager.supports_native_voice_reply("Slack", None) ) @@ -219,6 +224,7 @@ class AgentCapabilityManagerTest(unittest.TestCase): MessageChannel.Telegram, MessageChannel.Feishu, MessageChannel.Wechat, + MessageChannel.WebAgent, ): self.assertTrue( ChannelCapabilityManager.supports_capability( diff --git a/tests/test_agent_tool_streaming.py b/tests/test_agent_tool_streaming.py index 0ab24a72..e2683f0a 100644 --- a/tests/test_agent_tool_streaming.py +++ b/tests/test_agent_tool_streaming.py @@ -463,7 +463,7 @@ class TestAgentToolStreaming: result = await tool.run("你好") return result, synthesize_speech, send_notification_message - for channel in (MessageChannel.Telegram, MessageChannel.Feishu): + for channel in (MessageChannel.Telegram, MessageChannel.Feishu, MessageChannel.WebAgent): result, synthesize_speech, send_notification_message = asyncio.run( _run(channel) ) diff --git a/tests/test_web_agent_stream.py b/tests/test_web_agent_stream.py index 0e40d815..34fe6ffc 100644 --- a/tests/test_web_agent_stream.py +++ b/tests/test_web_agent_stream.py @@ -1,4 +1,5 @@ import asyncio +import time from types import SimpleNamespace from unittest.mock import AsyncMock, patch @@ -6,8 +7,12 @@ from app import schemas from app.agent import ReplyMode from app.api.endpoints.agent import ( _WebAgentMoviePilotAgent, + _WEB_AGENT_FILE_REGISTRY, _build_web_agent_notification_events, _build_web_agent_session_id, + _prepare_web_agent_audio_attachment_path, + _transcribe_web_agent_audio_refs, + web_agent_stream, _resolve_web_agent_choice_payload, _split_web_agent_output, ) @@ -162,6 +167,112 @@ def test_build_web_agent_notification_events_registers_local_file(tmp_path): assert attachment["url"].startswith("message/agent/file/") +def test_build_web_agent_notification_events_registers_voice_attachment(tmp_path): + """Agent 工具发送语音时应转换为可播放的音频附件事件。""" + voice_path = tmp_path / "reply.wav" + voice_path.write_bytes(b"wav-bytes") + + events = _build_web_agent_notification_events( + schemas.Notification( + channel=MessageChannel.WebAgent, + mtype=NotificationType.Agent, + text="你好", + voice_path=str(voice_path), + ) + ) + + assert len(events) == 2 + assert events[0] == {"type": "delta", "content": "你好"} + attachment = events[1]["attachment"] + assert events[1]["type"] == "attachment" + assert attachment["kind"] == "audio" + assert attachment["name"] == "reply.wav" + assert attachment["mime_type"] == "audio/wav" + assert attachment["size"] == len(b"wav-bytes") + assert attachment["url"].startswith("message/agent/file/") + + +def test_prepare_web_agent_audio_attachment_converts_unsupported_audio(tmp_path): + """WebAgent 会把浏览器不稳定支持的语音格式转为 WAV 供面板播放。""" + source_path = tmp_path / "reply.opus" + source_path.write_bytes(b"opus-bytes") + converted_path = tmp_path / "voice" / "reply_web_abcdef12.wav" + + with patch("app.api.endpoints.agent.shutil.which", return_value="/usr/bin/ffmpeg"), patch( + "app.api.endpoints.agent.uuid.uuid4", + return_value=SimpleNamespace(hex="abcdef1234567890"), + ), patch("app.api.endpoints.agent.subprocess.run") as run: + def write_converted_file(*args, **kwargs): + converted_path.write_bytes(b"wav-bytes") + return SimpleNamespace(returncode=0, stderr="") + + run.side_effect = write_converted_file + with patch("app.api.endpoints.agent.settings", SimpleNamespace(TEMP_PATH=tmp_path)): + output_path = _prepare_web_agent_audio_attachment_path(str(source_path)) + + assert output_path == converted_path + assert output_path.read_bytes() == b"wav-bytes" + + +def test_transcribe_web_agent_audio_refs_reads_registered_upload(tmp_path): + """WebAgent 上传录音应从临时附件登记表读取并转写为文本。""" + voice_path = tmp_path / "recording.webm" + voice_path.write_bytes(b"webm-bytes") + _WEB_AGENT_FILE_REGISTRY["audio-test"] = { + "path": voice_path, + "name": "recording.webm", + "mime_type": "audio/webm", + "created_at": time.time(), + } + + try: + with patch( + "app.api.endpoints.agent.AgentCapabilityManager.is_audio_input_available", + return_value=True, + ), patch( + "app.api.endpoints.agent.AgentCapabilityManager.transcribe_audio", + return_value="帮我推荐一部电影", + ) as transcribe_audio: + transcript = _transcribe_web_agent_audio_refs(["message/agent/file/audio-test"]) + finally: + _WEB_AGENT_FILE_REGISTRY.pop("audio-test", None) + + assert transcript == "帮我推荐一部电影" + transcribe_audio.assert_called_once_with( + content=b"webm-bytes", + filename="recording.webm", + ) + + +def test_web_agent_stream_returns_error_when_voice_transcription_fails(): + """仅发送语音且转写失败时应直接返回错误事件。""" + payload = schemas.AgentWebChatRequest( + text="", + session_id="browser-session", + audio_refs=["message/agent/file/missing"], + ) + request = SimpleNamespace() + user = SimpleNamespace(id=1, name="admin") + + with patch("app.api.endpoints.agent.settings.AI_AGENT_ENABLE", True), patch( + "app.api.endpoints.agent._transcribe_web_agent_audio_refs", + return_value=None, + ): + response = asyncio.run(web_agent_stream(payload, request, user)) + body = "".join(asyncio.run(_collect_streaming_response(response))) + + assert "error" in body + assert "语音识别失败" in body + + +async def _collect_streaming_response(response): + """读取 StreamingResponse,便于断言 SSE 内容。""" + chunks = [] + async for chunk in response.body_iterator: + chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else chunk) + return chunks + + def test_build_web_agent_notification_events_extracts_choice_card(): """Agent 按钮通知应转换为 Web 选择卡片事件而非普通文本。""" events = _build_web_agent_notification_events(