chore: update text processing dependencies

This commit is contained in:
jxxghp
2026-05-23 11:51:57 +08:00
parent 5f0ae3a75e
commit 00fc8b2f53
12 changed files with 87 additions and 215 deletions

View File

@@ -2,11 +2,9 @@
import json
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Annotated, Any, Literal, Union, NotRequired
from typing import Annotated, Any, NotRequired
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ContextT,
ModelRequest,
@@ -16,78 +14,18 @@ from langchain.agents.middleware.types import (
from langchain.agents.middleware.types import (
PrivateStateAttr, # noqa
)
from langchain.agents.middleware.tool_selection import (
DEFAULT_SYSTEM_PROMPT,
LLMToolSelectorMiddleware,
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import HumanMessage
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import BaseTool
from langgraph.runtime import Runtime
from pydantic import Field, TypeAdapter
from typing_extensions import TypedDict # noqa
from app.log import logger
DEFAULT_SYSTEM_PROMPT = (
"Your goal is to select the most relevant tools for answering the user's query."
)
@dataclass
class _SelectionRequest:
"""Prepared inputs for tool selection."""
available_tools: list[BaseTool]
system_message: str
last_user_message: HumanMessage
model: BaseChatModel
valid_tool_names: list[str]
def _create_tool_selection_response(tools: list[BaseTool]) -> TypeAdapter[Any]:
"""Create a structured output schema for tool selection.
Args:
tools: Available tools to include in the schema.
Returns:
`TypeAdapter` for a schema where each tool name is a `Literal` with its
description.
Raises:
AssertionError: If `tools` is empty.
"""
if not tools:
msg = "Invalid usage: tools must be non-empty"
raise AssertionError(msg)
# Create a Union of Annotated Literal types for each tool name with description
# For instance: Union[Annotated[Literal["tool1"], Field(description="...")], ...]
literals = [
Annotated[Literal[tool.name], Field(description=tool.description)]
for tool in tools # noqa
]
selected_tool_type = Union[tuple(literals)] # type: ignore[valid-type] # noqa: UP007
description = "Tools to use. Place the most relevant tools first."
class ToolSelectionResponse(TypedDict):
"""Use to select relevant tools."""
tools: Annotated[list[selected_tool_type], Field(description=description)] # type: ignore[valid-type]
return TypeAdapter(ToolSelectionResponse)
def _render_tool_list(tools: list[BaseTool]) -> str:
"""Format tools as markdown list.
Args:
tools: Tools to format.
Returns:
Markdown string with each tool on a new line.
"""
return "\n".join(f"- {tool.name}: {tool.description}" for tool in tools)
class ToolSelectionState(AgentState):
"""工具筛选中间件私有状态。"""
@@ -102,9 +40,7 @@ class ToolSelectionStateUpdate(TypedDict):
selected_tool_names: list[str] | None
class ToolSelectorMiddleware(
AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]
):
class ToolSelectorMiddleware(LLMToolSelectorMiddleware):
"""
为 DeepSeek 兼容端点提供更稳妥的工具筛选实现。
@@ -129,94 +65,19 @@ class ToolSelectorMiddleware(
def __init__(
self,
model: BaseChatModel,
model: BaseChatModel | str | None = None,
system_prompt: str = DEFAULT_SYSTEM_PROMPT,
selection_tools: list[Any] | None = None,
max_tools: int | None = None,
always_include: list[str] | None = None,
) -> None:
super().__init__()
self.model = model
self.system_prompt = system_prompt
self.max_tools = max_tools
self.always_include = always_include or []
self.selection_tools = selection_tools or []
def _prepare_selection_request(
self, request: ModelRequest[ContextT]
) -> _SelectionRequest | None:
"""Prepare inputs for tool selection.
Args:
request: the model request.
Returns:
`SelectionRequest` with prepared inputs, or `None` if no selection is
needed.
Raises:
ValueError: If tools in `always_include` are not found in the request.
AssertionError: If no user message is found in the request messages.
"""
# If no tools available, return None
if not request.tools or len(request.tools) == 0:
return None
# Filter to only BaseTool instances (exclude provider-specific tool dicts)
base_tools = [tool for tool in request.tools if not isinstance(tool, dict)]
# Validate that always_include tools exist
if self.always_include:
available_tool_names = {tool.name for tool in base_tools}
missing_tools = [
name for name in self.always_include if name not in available_tool_names
]
if missing_tools:
msg = (
f"Tools in always_include not found in request: {missing_tools}. "
f"Available tools: {sorted(available_tool_names)}"
)
raise ValueError(msg)
# Separate tools that are always included from those available for selection
available_tools = [
tool for tool in base_tools if tool.name not in self.always_include
]
# If no tools available for selection, return None
if not available_tools:
return None
system_message = self.system_prompt
# If there's a max_tools limit, append instructions to the system prompt
if self.max_tools is not None:
system_message += (
f"\nIMPORTANT: List the tool names in order of relevance, "
f"with the most relevant first. "
f"If you exceed the maximum number of tools, "
f"only the first {self.max_tools} will be used."
)
# Get the last user message from the conversation history
last_user_message: HumanMessage
for message in reversed(request.messages):
if isinstance(message, HumanMessage):
last_user_message = message
break
else:
msg = "No user message found in request messages"
raise AssertionError(msg)
model = self.model or request.model
valid_tool_names = [tool.name for tool in available_tools]
return _SelectionRequest(
available_tools=available_tools,
system_message=system_message,
last_user_message=last_user_message,
super().__init__(
model=model,
valid_tool_names=valid_tool_names,
system_prompt=system_prompt,
max_tools=max_tools,
always_include=always_include,
)
self.selection_tools = selection_tools or []
def _process_selection_response(
self,
@@ -225,46 +86,29 @@ class ToolSelectorMiddleware(
valid_tool_names: list[str],
request: ModelRequest[ContextT],
) -> ModelRequest[ContextT]:
"""Process the selection response and return filtered `ModelRequest`."""
selected_tool_names: list[str] = []
invalid_tool_selections = []
for tool_name in response["tools"]:
if tool_name not in valid_tool_names:
invalid_tool_selections.append(tool_name)
continue
# Only add if not already selected and within max_tools limit
if tool_name not in selected_tool_names and (
self.max_tools is None or len(selected_tool_names) < self.max_tools
):
selected_tool_names.append(tool_name)
if invalid_tool_selections:
msg = f"Model selected invalid tools: {invalid_tool_selections}"
raise ValueError(msg)
# Filter tools based on selection and append always-included tools
if selected_tool_names:
selected_tools: list[BaseTool] = [
tool for tool in available_tools if tool.name in selected_tool_names
]
else:
# 如果模型筛选结果为空,则不对工具进行裁剪,使用所有可用工具
"""
处理工具筛选响应,并保留空结果回退所有工具的 MoviePilot 策略。
"""
if response.get("tools") == []:
logger.warning("工具筛选结果为空,将恢复使用所有工具。")
selected_tools = available_tools
always_included_tools: list[BaseTool] = [
tool
for tool in request.tools
if not isinstance(tool, dict) and tool.name in self.always_include
]
selected_tools.extend(always_included_tools)
always_included_tools: list[BaseTool] = [
tool
for tool in request.tools
if not isinstance(tool, dict) and tool.name in self.always_include
]
provider_tools = [tool for tool in request.tools if isinstance(tool, dict)]
# Also preserve any provider-specific tool dicts from the original request
provider_tools = [tool for tool in request.tools if isinstance(tool, dict)]
return request.override(
tools=[*available_tools, *always_included_tools, *provider_tools]
)
return request.override(tools=[*selected_tools, *provider_tools])
return super()._process_selection_response(
response,
available_tools,
valid_tool_names,
request,
)
@staticmethod
def _is_deepseek_compatible_model(model: BaseChatModel) -> bool: