feat(agent): 优化Agent流式输出与工具消息发送逻辑

- 新增 _should_stream() 方法,根据运行环境决定是否启用流式输出:
  后台模式不启用;渠道支持编辑启用;啰嗦模式开启时也启用
- 非流式模式下使用非流式LLM + ainvoke,避免不必要的流式开销
- 非啰嗦模式下工具调用时不发送任何中间消息(agent文字和工具提示),直接清掉缓冲区
This commit is contained in:
jxxghp
2026-04-09 22:12:20 +08:00
parent abc4154e2c
commit 770065d9ed
2 changed files with 100 additions and 57 deletions

View File

@@ -30,6 +30,8 @@ from app.core.config import settings
from app.helper.llm import LLMHelper
from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas.message import ChannelCapabilityManager, ChannelCapability
from app.schemas.types import MessageChannel
class AgentChain(ChainBase):
@@ -138,12 +140,37 @@ class MoviePilotAgent:
"""
return not self.channel and not self.source
def _should_stream(self) -> bool:
"""
判断是否应启用流式输出:
- 后台模式不启用流式输出
- 渠道支持消息编辑:启用流式输出(实时推送 token
- 渠道不支持消息编辑但开启了啰嗦模式:也需要启用流式输出,
以便在工具调用前捕获 Agent 的中间文字并随工具消息一起发送
- 其他情况不启用流式输出
"""
if self.is_background:
return False
if not self.channel:
return False
# 啰嗦模式下始终需要流式输出来捕获工具调用前的 Agent 文字
if settings.AI_AGENT_VERBOSE:
return True
try:
channel_enum = MessageChannel(self.channel)
return ChannelCapabilityManager.supports_capability(
channel_enum, ChannelCapability.MESSAGE_EDITING
)
except (ValueError, KeyError):
return False
@staticmethod
def _initialize_llm():
def _initialize_llm(streaming: bool = False):
"""
初始化 LLM(带流式回调)
初始化 LLM
:param streaming: 是否启用流式输出
"""
return LLMHelper.get_llm(streaming=True)
return LLMHelper.get_llm(streaming=streaming)
@staticmethod
def _extract_text_content(content) -> str:
@@ -191,16 +218,17 @@ class MoviePilotAgent:
stream_handler=self.stream_handler,
)
def _create_agent(self):
def _create_agent(self, streaming: bool = False):
"""
创建 LangGraph Agent使用 create_agent + SummarizationMiddleware
:param streaming: 是否启用流式输出
"""
try:
# 系统提示词
system_prompt = prompt_manager.get_agent_prompt(channel=self.channel)
# LLM 模型(用于 agent 执行)
llm = self._initialize_llm()
llm = self._initialize_llm(streaming=streaming)
# 工具列表
tools = self._initialize_tools()
@@ -344,9 +372,11 @@ class MoviePilotAgent:
async def _execute_agent(self, messages: List[BaseMessage]):
"""
调用 LangGraph Agent,通过 astream 流式获取 token
支持流式输出:在支持消息编辑的渠道上实时推送 token。
后台任务模式(无渠道信息):不进行流式输出,仅广播最终结果
调用 LangGraph Agent 执行推理
根据运行环境选择不同的执行模式:
- 后台任务模式(无渠道信息):非流式 LLM + ainvoke,仅广播最终结果
- 渠道不支持消息编辑:非流式 LLM + ainvoke完成后发送最终回复
- 渠道支持消息编辑:流式 LLM + astream实时推送 token
"""
try:
# Agent运行配置
@@ -356,39 +386,14 @@ class MoviePilotAgent:
}
}
# 创建智能体
agent = self._create_agent()
# 判断是否启用流式输出
use_streaming = self._should_stream()
if self.is_background:
# 后台任务模式非流式执行等待完成后只取最后一条AI回复
await agent.ainvoke(
{"messages": messages},
config=agent_config,
)
# 创建智能体(根据是否流式传入不同 LLM
agent = self._create_agent(streaming=use_streaming)
# 从最终状态中提取最后一条AI回复内容
final_messages = agent.get_state(agent_config).values.get(
"messages", []
)
final_text = ""
for msg in reversed(final_messages):
if hasattr(msg, "type") and msg.type == "ai" and msg.content:
# 过滤掉思考/推理内容,只提取纯文本
text = self._extract_text_content(msg.content)
if text:
# 过滤掉包含在 <think> 标签中的内容
text = re.sub(
r"<think>.*?(?:</think>|$)", "", text, flags=re.DOTALL
)
final_text = text.strip()
break
# 后台任务仅广播最终回复,带标题
if final_text:
await self.send_agent_message(final_text, title="MoviePilot助手")
else:
# 正常渠道模式:启动流式输出
if use_streaming:
# 流式模式:渠道支持消息编辑,启动流式输出实时推送 token
await self.stream_handler.start_streaming(
channel=self.channel,
source=self.source,
@@ -411,7 +416,7 @@ class MoviePilotAgent:
) = await self.stream_handler.stop_streaming()
if not all_sent_via_stream:
# 流式输出未能发送全部内容(渠道不支持编辑,或发送失败)
# 流式输出未能发送全部内容(发送失败
# 通过常规方式发送剩余内容
remaining_text = await self.stream_handler.take()
if remaining_text:
@@ -420,6 +425,40 @@ class MoviePilotAgent:
# 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录
await self._save_agent_message_to_db(streamed_text)
else:
# 非流式模式:后台任务或渠道不支持消息编辑
await agent.ainvoke(
{"messages": messages},
config=agent_config,
)
# 从最终状态中提取最后一条AI回复内容
final_messages = agent.get_state(agent_config).values.get(
"messages", []
)
final_text = ""
for msg in reversed(final_messages):
if hasattr(msg, "type") and msg.type == "ai" and msg.content:
# 过滤掉思考/推理内容,只提取纯文本
text = self._extract_text_content(msg.content)
if text:
# 过滤掉包含在 <think> 标签中的内容
text = re.sub(
r"<think>.*?(?:</think>|$)", "", text, flags=re.DOTALL
)
final_text = text.strip()
break
if final_text:
if self.is_background:
# 后台任务仅广播最终回复,带标题
await self.send_agent_message(
final_text, title="MoviePilot助手"
)
else:
# 非流式渠道:发送最终回复
await self.send_agent_message(final_text)
# 保存消息
memory_manager.save_agent_messages(
session_id=self.session_id,
@@ -435,8 +474,7 @@ class MoviePilotAgent:
return str(e), {}
finally:
# 确保停止流式输出
if not self.is_background:
await self.stream_handler.stop_streaming()
await self.stream_handler.stop_streaming()
async def send_agent_message(self, message: str, title: str = ""):
"""

View File

@@ -72,22 +72,27 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
# 非VERBOSE重置缓冲区从头更新保持消息编辑能力
self._stream_handler.reset()
else:
# 后台模式(无渠道信息)不发送工具调用消息
# 非流式模式(后台任务或渠道不支持消息编辑)
if self._channel:
# 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送
agent_message = (
await self._stream_handler.take() if self._stream_handler else ""
)
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message)
if settings.AI_AGENT_VERBOSE:
# 啰嗦模式:取出 Agent 文字 + 工具消息合并发送
agent_message = (
await self._stream_handler.take()
if self._stream_handler
else ""
)
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message)
else:
# 非啰嗦模式:不发送中间消息,清掉缓冲区
if self._stream_handler:
await self._stream_handler.take()
logger.debug(f"Executing tool {self.name} with args: {kwargs}")