fix: avoid blocking event loop during plugin install

This commit is contained in:
jxxghp
2026-05-21 09:16:42 +08:00
parent 2109d323ae
commit ee9eced2f1
4 changed files with 102 additions and 4 deletions

View File

@@ -1,5 +1,6 @@
"""插件 Agent 工具共享辅助方法"""
import asyncio
import json
import shutil
from typing import Any, Optional
@@ -248,7 +249,7 @@ async def install_plugin_runtime(
SystemConfigKey.UserInstalledPlugins, install_plugins
)
reload_plugin_runtime(plugin_id)
await asyncio.to_thread(reload_plugin_runtime, plugin_id)
return True, message or "插件安装成功", refreshed_only

View File

@@ -2117,10 +2117,23 @@ class PluginHelper(metaclass=WeakSingleton):
async with aiofiles.open(requirements_file_path, "w", encoding="utf-8") as f:
await f.write(requirements_txt)
return self.pip_install_with_fallback(Path(requirements_file_path))
return await self.__async_pip_install_with_fallback(Path(requirements_file_path))
return True, "" # 如果 requirements.txt 为空,视作成功
async def __async_pip_install_with_fallback(
self,
requirements_file: Path,
find_links_dirs: Optional[List[Path]] = None) -> Tuple[bool, str]:
"""
在线程池中执行插件依赖安装,避免同步 pip 子进程阻塞事件循环。
"""
return await asyncio.to_thread(
self.pip_install_with_fallback,
requirements_file,
find_links_dirs
)
async def __async_backup_plugin(self, pid: str) -> str:
"""
异步备份旧插件目录
@@ -2204,7 +2217,7 @@ class PluginHelper(metaclass=WeakSingleton):
# 检查是否存在 requirements.txt 文件
if await requirements_file.exists():
logger.info(f"{pid} 存在依赖,开始尝试安装依赖")
success, error_message = self.pip_install_with_fallback(Path(requirements_file))
success, error_message = await self.__async_pip_install_with_fallback(Path(requirements_file))
if success:
return True, True, ""
else:
@@ -2234,7 +2247,7 @@ class PluginHelper(metaclass=WeakSingleton):
try:
# 使用自动降级策略安装依赖
wheels_dirs = self.__collect_plugin_wheels_dirs()
return self.pip_install_with_fallback(Path(requirements_temp_file), wheels_dirs)
return await self.__async_pip_install_with_fallback(Path(requirements_temp_file), wheels_dirs)
finally:
# 删除临时文件
await requirements_temp_file.unlink()

View File

@@ -5,6 +5,7 @@ from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
from app.agent.tools.impl.install_plugin import InstallPluginTool
from app.agent.tools.impl._plugin_tool_utils import install_plugin_runtime
from app.agent.tools.impl.query_installed_plugins import QueryInstalledPluginsTool
from app.agent.tools.impl.query_market_plugins import QueryMarketPluginsTool
from app.agent.tools.impl.query_plugin_config import QueryPluginConfigTool
@@ -170,6 +171,54 @@ class TestAgentPluginTools(unittest.TestCase):
"DemoPlugin", "https://example.com/market", force=False
)
def test_install_plugin_runtime_reloads_in_threadpool(self):
plugin_manager = MagicMock()
plugin_manager.get_plugin_ids.return_value = ["DemoPlugin"]
plugin_helper = MagicMock()
plugin_helper.async_install_reg = AsyncMock(return_value=True)
config_oper = MagicMock()
config_oper.get.return_value = ["DemoPlugin"]
calls = []
async def fake_to_thread(func, *args, **kwargs):
calls.append((func, args, kwargs))
return None
with patch(
"app.agent.tools.impl._plugin_tool_utils.SystemConfigOper",
return_value=config_oper,
), patch(
"app.agent.tools.impl._plugin_tool_utils.PluginManager",
return_value=plugin_manager,
), patch(
"app.agent.tools.impl._plugin_tool_utils.PluginHelper",
return_value=plugin_helper,
), patch(
"app.agent.tools.impl._plugin_tool_utils.reload_plugin_runtime",
) as reload_runtime, patch(
"app.agent.tools.impl._plugin_tool_utils.asyncio.to_thread",
side_effect=fake_to_thread,
):
success, message, refreshed_only = asyncio.run(
install_plugin_runtime(
"DemoPlugin",
"https://example.com/market",
force=False,
)
)
self.assertTrue(success)
self.assertEqual("插件已存在,已刷新加载", message)
self.assertTrue(refreshed_only)
plugin_helper.async_install_reg.assert_awaited_once_with(
pid="DemoPlugin",
repo_url="https://example.com/market",
)
self.assertEqual(1, len(calls))
self.assertEqual(reload_runtime, calls[0][0])
self.assertEqual(("DemoPlugin",), calls[0][1])
self.assertEqual({}, calls[0][2])
def test_uninstall_plugin_uninstalls_installed_candidate(self):
tool = UninstallPluginTool(session_id="session-1", user_id="10001")
installed_plugin = self._market_plugin(

View File

@@ -1,3 +1,4 @@
import asyncio
import sys
import tempfile
import threading
@@ -360,3 +361,37 @@ class PluginHelperTest(TestCase):
self.assertIn("已自动恢复主程序依赖", message)
self.assertEqual(1, len(repair_commands))
self.assertIn("runtime-constraints-", repair_commands[0][-1])
def test_async_pip_install_runs_in_threadpool(self):
"""
验证异步安装路径会把同步 pip 安装派发到线程池,避免阻塞事件循环。
"""
try:
from app.helper.plugin import PluginHelper
except ModuleNotFoundError as exc:
self.skipTest(f"missing dependency: {exc}")
helper = PluginHelper()
requirements_file = Path("/tmp/demo-requirements.txt")
find_links_dirs = [Path("/tmp/demo-wheels")]
calls = []
async def run_install():
return await helper._PluginHelper__async_pip_install_with_fallback(
requirements_file,
find_links_dirs
)
async def fake_to_thread(func, *args, **kwargs):
calls.append((func, args, kwargs))
return True, "ok"
with patch("app.helper.plugin.asyncio.to_thread", side_effect=fake_to_thread):
success, message = asyncio.run(run_install())
self.assertTrue(success)
self.assertEqual("ok", message)
self.assertEqual(1, len(calls))
self.assertEqual(helper.pip_install_with_fallback, calls[0][0])
self.assertEqual((requirements_file, find_links_dirs), calls[0][1])
self.assertEqual({}, calls[0][2])