From 6fa48afa34b51705d4a35d0423c5b43cdce023e3 Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Sat, 18 Apr 2026 03:01:08 +0800 Subject: [PATCH] feat(plugin): support local plugin sources --- app/api/endpoints/plugin.py | 22 +++- app/core/config.py | 2 + app/core/plugin.py | 205 ++++++++++++++++++++++++++++++---- app/helper/plugin.py | 217 ++++++++++++++++++++++++++++++++++++ 4 files changed, 422 insertions(+), 24 deletions(-) diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index cbb7da32..b2e5679f 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -155,9 +155,13 @@ async def all_plugins(_: User = Depends(get_current_active_superuser_async), # 未安装的本地插件 not_installed_plugins = [plugin for plugin in local_plugins if not plugin.installed] + # 本地插件来源目录中的插件 + local_source_plugins = plugin_manager.get_local_source_plugins() # 在线插件 online_plugins = await plugin_manager.async_get_online_plugins(force) - if not online_plugins: + candidate_plugins = plugin_manager._process_plugins_list(online_plugins + local_source_plugins, []) \ + if online_plugins or local_source_plugins else [] + if not candidate_plugins: # 没有获取在线插件 if state == "market": # 返回未安装的本地插件 @@ -169,7 +173,7 @@ async def all_plugins(_: User = Depends(get_current_active_superuser_async), # 已安装插件IDS _installed_ids = [plugin.id for plugin in installed_plugins] # 未安装的线上插件或者有更新的插件 - for plugin in online_plugins: + for plugin in candidate_plugins: if plugin.id not in _installed_ids: market_plugins.append(plugin) elif plugin.has_update: @@ -228,11 +232,21 @@ async def install(plugin_id: str, install_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or [] # 首先检查插件是否已经存在,并且是否强制安装,否则只进行安装统计 plugin_helper = PluginHelper() - if not force and plugin_id in PluginManager().get_plugin_ids(): + is_local_install = plugin_helper.is_local_repo_url(repo_url) + if not force and plugin_id in PluginManager().get_plugin_ids() and not is_local_install: await plugin_helper.async_install_reg(pid=plugin_id) else: # 插件不存在或需要强制安装,下载安装并注册插件 - if repo_url: + if is_local_install: + state, msg = await run_in_threadpool( + plugin_helper.install_local, + plugin_id, + repo_url, + force + ) + if not state: + return schemas.Response(success=False, message=msg) + elif repo_url: state, msg = await plugin_helper.async_install(pid=plugin_id, repo_url=repo_url) # 安装失败则直接响应 if not state: diff --git a/app/core/config.py b/app/core/config.py index 88d3756a..9785b6f5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -417,6 +417,8 @@ class ConfigModel(BaseModel): PLUGIN_STATISTIC_SHARE: bool = True # 是否开启插件热加载 PLUGIN_AUTO_RELOAD: bool = False + # 本地插件仓库目录,多个地址使用,分隔 + PLUGIN_LOCAL_PATHS: Optional[str] = None # ==================== Github & PIP ==================== # Github token,提高请求api限流阈值 ghp_**** diff --git a/app/core/plugin.py b/app/core/plugin.py index d733e2b1..14c7998b 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -6,6 +6,7 @@ import importlib.util import inspect import os import posixpath +import shutil import sys import threading import time @@ -51,6 +52,8 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): self._monitor_thread: Optional[threading.Thread] = None # 监控停止事件 self._stop_monitor_event = threading.Event() + # 本地插件同步写入运行目录后的短时忽略窗口 + self._recent_local_sync: Dict[str, float] = {} # 开发者模式监测插件修改 if settings.DEV or settings.PLUGIN_AUTO_RELOAD: self.__start_monitor() @@ -308,11 +311,14 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): 运行 watchfiles 监视器的主循环。 """ # 监视插件目录 - plugins_path = str(settings.ROOT_PATH / "app" / "plugins") + plugin_paths = [str(settings.ROOT_PATH / "app" / "plugins")] + for local_path in PluginHelper.get_local_source_paths(): + if local_path.exists() and local_path.is_dir(): + plugin_paths.append(str(local_path)) logger.info(">>> 监控线程已启动,准备进入watch循环...") # 使用 watchfiles 监视目录变化,并响应变化事件 # Todo: yield_on_timeout = True 时,每秒检查停止事件,会返回空集合;后续可以考虑用来做心跳之类的功能? - for changes in watch(plugins_path, stop_event=self._stop_monitor_event, rust_timeout=1000, + for changes in watch(*plugin_paths, stop_event=self._stop_monitor_event, rust_timeout=1000, yield_on_timeout=True): # 如果收到停止事件,退出循环 if not changes: @@ -320,18 +326,56 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): # 处理变化事件 plugins_to_reload = set() + local_plugins_to_sync = {} for _change_type, path_str in changes: event_path = Path(path_str) - # 跳过非 .py 文件以及 pycache 目录中的文件 - if not event_path.name.endswith(".py") or "__pycache__" in event_path.parts: + # 跳过 pycache 目录中的文件 + if "__pycache__" in event_path.parts: + continue + + if event_path.name == "requirements.txt": + candidate = self._get_local_plugin_candidate_from_path(event_path) + if candidate: + if candidate.get("compatible") is False: + logger.info( + f"检测到本地插件 {candidate.get('id')} 依赖文件变化," + f"但跳过处理:{candidate.get('skip_reason')}" + ) + continue + logger.warning(f"检测到本地插件 {candidate.get('id')} 依赖文件变化,请重新安装本地插件以安装依赖") + continue + + # 跳过非 .py 文件 + if not event_path.name.endswith(".py"): continue # 解析插件ID - pid = self._get_plugin_id_from_path(event_path) - # 跳过无效插件文件 - if pid: - # 收集需要重载的插件ID,自动去重,避免重复重载 + runtime_pid = self._get_plugin_id_from_path(event_path) + local_candidate = self._get_local_plugin_candidate_from_path(event_path) if not runtime_pid else None + if runtime_pid: + last_sync_time = self._recent_local_sync.get(runtime_pid) + if last_sync_time and time.time() - last_sync_time < 2: + logger.debug(f"忽略本地插件同步产生的运行目录变化:{runtime_pid}") + continue + # 运行目录变化只重载,不能反向触发本地同步。 + plugins_to_reload.add(runtime_pid) + elif local_candidate: + if local_candidate.get("compatible") is False: + package_version = local_candidate.get("package_version") + source_root = f"plugins.{package_version}" if package_version else "plugins" + logger.info( + f"检测到本地插件 {local_candidate.get('id')} 文件变化,来源:{source_root}," + f"文件:{event_path},但跳过同步:{local_candidate.get('skip_reason')}" + ) + continue + local_plugins_to_sync[local_candidate.get("id")] = (local_candidate, event_path) + + for pid, (candidate, event_path) in local_plugins_to_sync.items(): + package_version = candidate.get("package_version") + source_root = f"plugins.{package_version}" if package_version else "plugins" + logger.info(f"检测到本地插件 {pid} 文件变化,来源:{source_root},文件:{event_path}") + if self._sync_local_plugin_if_installed(pid, candidate): plugins_to_reload.add(pid) # 触发重载 @@ -351,6 +395,7 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): :return: 插件ID字符串,如果不是有效插件文件则返回 None。 """ try: + event_path = event_path.resolve() plugins_root = settings.ROOT_PATH / "app" / "plugins" # 确保修改的文件在 plugins 目录下 if not event_path.is_relative_to(plugins_root): @@ -389,6 +434,78 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): logger.error(f"从路径解析插件ID时出错: {e}") return None + @staticmethod + def _get_local_plugin_candidate_from_path(event_path: Path) -> Optional[dict]: + """ + 根据本地插件来源路径解析具体插件候选,保留 plugins/plugins.v2 来源差异。 + """ + try: + event_path = event_path.resolve() + for local_path in PluginHelper.get_local_source_paths(): + if not local_path.exists() or not local_path.is_dir(): + continue + if not event_path.is_relative_to(local_path): + continue + try: + relative_parts = event_path.relative_to(local_path).parts + except (ValueError, IndexError): + continue + if len(relative_parts) < 2: + continue + if relative_parts[0] == "plugins": + package_version = "" + elif relative_parts[0].startswith("plugins."): + package_version = relative_parts[0].split(".", 1)[1] + else: + continue + plugin_dir_name = relative_parts[1] + candidate = PluginHelper().get_local_plugin_candidate( + pid=plugin_dir_name, + package_version=package_version, + source_path=local_path, + strict_compat=False + ) + if candidate: + return candidate + return None + except Exception as e: + logger.error(f"从本地来源路径解析插件候选时出错: {e}") + return None + + @staticmethod + def _sync_local_plugin_if_installed(pid: str, candidate: Optional[dict] = None) -> bool: + """ + 已安装本地插件源码变化时,同步到运行目录。 + """ + installed_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or [] + if pid not in installed_plugins: + logger.info(f"本地插件 {pid} 尚未安装,跳过自动同步和热重载") + return False + + candidate = candidate or PluginHelper().get_local_plugin_candidate(pid) + if not candidate: + return False + + source_dir = Path(candidate.get("path")) + dest_dir = settings.ROOT_PATH / "app" / "plugins" / pid.lower() + try: + if source_dir.resolve() == dest_dir.resolve(): + return True + if dest_dir.exists(): + shutil.rmtree(dest_dir, ignore_errors=True) + shutil.copytree( + source_dir, + dest_dir, + dirs_exist_ok=True, + ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".DS_Store") + ) + PluginManager()._recent_local_sync[pid] = time.time() + logger.info(f"已同步本地插件 {pid}:{source_dir} -> {dest_dir}") + return True + except Exception as e: + logger.error(f"同步本地插件 {pid} 失败:{e}") + return False + @staticmethod def __stop_plugin(plugin: Any): """ @@ -1116,6 +1233,31 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0) return plugins + def get_local_source_plugins(self) -> List[schemas.Plugin]: + """ + 获取本地插件来源目录中的插件信息。 + """ + plugins = [] + installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or [] + local_candidates = PluginHelper().get_local_plugin_candidates() + for pid, plugin_info in local_candidates.items(): + package_version = plugin_info.get("package_version") + plugin = self._process_plugin_info( + pid=pid, + plugin_info=plugin_info, + market=PluginHelper.make_local_repo_url(pid), + installed_apps=installed_apps, + add_time=0, + package_version=package_version + ) + if not plugin: + continue + plugin.is_local = True + plugins.append(plugin) + + plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0) + return plugins + @staticmethod def is_plugin_exists(pid: str, version: str = None) -> bool: """ @@ -1194,18 +1336,41 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): # 将未出现在高版本插件列表中的 v1 插件加入 all_plugins higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins} all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids]) - # 去重 - all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values()) - # 所有插件按 repo 在设置中的顺序排序 - all_plugins.sort( - key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0 - ) - # 相同 ID 的插件保留版本号最大的版本 - max_versions = {} - for p in all_plugins: - if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]): - max_versions[p.id] = p.plugin_version - result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]] + markets = [item for item in settings.PLUGIN_MARKET.split(",") if item] + + def repo_order(plugin: schemas.Plugin) -> int: + if PluginHelper.is_local_repo_url(plugin.repo_url): + return len(markets) + 1 + if plugin.repo_url in markets: + return markets.index(plugin.repo_url) + return len(markets) + + # 去重:同 ID + 版本优先保留市场来源,其次按来源顺序稳定保留。 + dedup_plugins = {} + for plugin in sorted(all_plugins, key=repo_order): + key = f"{plugin.id}{plugin.plugin_version}" + exists = dedup_plugins.get(key) + if not exists: + dedup_plugins[key] = plugin + continue + if PluginHelper.is_local_repo_url(exists.repo_url) and not PluginHelper.is_local_repo_url(plugin.repo_url): + dedup_plugins[key] = plugin + + # 相同 ID 的插件保留版本号最大的版本;同版本市场来源优先。 + result_by_id = {} + for plugin in sorted(dedup_plugins.values(), key=repo_order): + exists = result_by_id.get(plugin.id) + if not exists: + result_by_id[plugin.id] = plugin + continue + if StringUtils.compare_version(plugin.plugin_version, ">", exists.plugin_version): + result_by_id[plugin.id] = plugin + elif plugin.plugin_version == exists.plugin_version \ + and PluginHelper.is_local_repo_url(exists.repo_url) \ + and not PluginHelper.is_local_repo_url(plugin.repo_url): + result_by_id[plugin.id] = plugin + + result = list(result_by_id.values()) logger.info(f"共获取到 {len(result)} 个线上插件") return result diff --git a/app/helper/plugin.py b/app/helper/plugin.py index eeaa4d5a..649b78b4 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -26,10 +26,12 @@ from app.log import logger from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.singleton import WeakSingleton +from app.utils.string import StringUtils from app.utils.system import SystemUtils from app.utils.url import UrlUtils PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins" +LOCAL_REPO_PREFIX = "local://" class PluginHelper(metaclass=WeakSingleton): @@ -49,6 +51,181 @@ class PluginHelper(metaclass=WeakSingleton): if self.install_report(): self.systemconfig.set(SystemConfigKey.PluginInstallReport, "1") + @staticmethod + def is_local_repo_url(repo_url: Optional[str]) -> bool: + """ + 判断是否为本地插件来源标识。 + """ + return bool(repo_url and repo_url.startswith(LOCAL_REPO_PREFIX)) + + @staticmethod + def make_local_repo_url(pid: str) -> str: + """ + 生成本地插件安装来源标识。 + """ + return f"{LOCAL_REPO_PREFIX}{pid}" + + @staticmethod + def parse_local_repo_url(repo_url: str) -> Optional[str]: + """ + 从本地插件来源标识中解析插件ID。 + """ + if not PluginHelper.is_local_repo_url(repo_url): + return None + pid = repo_url[len(LOCAL_REPO_PREFIX):].strip("/") + return pid or None + + @staticmethod + def get_local_source_paths() -> List[Path]: + """ + 获取本地插件来源目录列表。 + """ + if not settings.PLUGIN_LOCAL_PATHS: + return [] + paths = [] + for item in settings.PLUGIN_LOCAL_PATHS.split(","): + local_path = item.strip() + if not local_path: + continue + path = Path(local_path).expanduser() + if not path.is_absolute(): + path = settings.ROOT_PATH / path + paths.append(path.resolve()) + return paths + + @staticmethod + def __get_local_package(source_path: Path, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]: + """ + 从本地插件仓库读取 package.json 或 package.{version}.json。 + """ + package_file = source_path / ( + f"package.{package_version}.json" if package_version else "package.json" + ) + if not package_file.exists(): + return {} + try: + content = package_file.read_text(encoding="utf-8") + payload = json.loads(content) + except Exception as e: + logger.warning(f"读取本地插件包 {package_file} 失败:{e}") + return None + if not isinstance(payload, dict): + logger.warning(f"本地插件包 {package_file} 格式不正确") + return None + return payload + + @staticmethod + def __get_local_plugin_dir(source_path: Path, pid: str, package_version: Optional[str]) -> Path: + plugin_root = f"plugins.{package_version}" if package_version else "plugins" + return source_path / plugin_root / pid.lower() + + def get_local_plugin_candidates(self) -> Dict[str, dict]: + """ + 扫描本地插件仓库,按插件ID保留版本号最高的候选。 + """ + candidates: Dict[str, dict] = {} + for source_order, source_path in enumerate(self.get_local_source_paths()): + if not source_path.exists() or not source_path.is_dir(): + logger.warning(f"本地插件来源目录不存在或不可读:{source_path}") + continue + + package_candidates = [] + if settings.VERSION_FLAG: + package_candidates.append((settings.VERSION_FLAG, self.__get_local_package(source_path, + settings.VERSION_FLAG))) + package_candidates.append(("", self.__get_local_package(source_path))) + + for package_version, local_plugins in package_candidates: + if local_plugins is None: + continue + for pid, plugin_info in local_plugins.items(): + if not isinstance(plugin_info, dict): + continue + # package.json 中的旧结构需要声明兼容当前版本。 + if ( + not package_version + and settings.VERSION_FLAG + and plugin_info.get(settings.VERSION_FLAG) is not True + ): + continue + + plugin_dir = self.__get_local_plugin_dir(source_path, pid, package_version) + if not plugin_dir.is_dir(): + logger.debug(f"跳过本地插件 {pid}:插件目录不存在 {plugin_dir}") + continue + + candidate = plugin_info.copy() + candidate["id"] = pid + candidate["package_version"] = package_version + candidate["source_order"] = source_order + candidate["source_path"] = source_path + candidate["path"] = plugin_dir + candidate_version = str(candidate.get("version") or "0") + + existing = candidates.get(pid) + if not existing: + candidates[pid] = candidate + continue + + existing_version = str(existing.get("version") or "0") + if StringUtils.compare_version(candidate_version, ">", existing_version): + candidates[pid] = candidate + elif ( + candidate_version == existing_version + and source_order < int(existing.get("source_order", source_order)) + ): + logger.info(f"本地插件 {pid} 存在同版本来源,使用靠前目录:{source_path}") + candidates[pid] = candidate + + return candidates + + def get_local_plugin_candidate(self, pid: str, package_version: Optional[str] = None, + source_path: Optional[Path] = None, + strict_compat: bool = True) -> Optional[dict]: + """ + 获取指定插件ID的本地插件候选。 + """ + if not pid: + return None + if package_version is not None or source_path is not None: + source_paths = [source_path.resolve()] if source_path else self.get_local_source_paths() + for source_order, local_source_path in enumerate(self.get_local_source_paths()): + if local_source_path not in source_paths: + continue + local_plugins = self.__get_local_package(local_source_path, package_version or "") + if not local_plugins: + continue + for candidate_pid, plugin_info in local_plugins.items(): + if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict): + continue + is_compatible = not ( + not package_version + and settings.VERSION_FLAG + and plugin_info.get(settings.VERSION_FLAG) is not True + ) + if not is_compatible and strict_compat: + return None + plugin_dir = self.__get_local_plugin_dir(local_source_path, candidate_pid, package_version or "") + if not plugin_dir.is_dir(): + return None + candidate = plugin_info.copy() + candidate["id"] = candidate_pid + candidate["package_version"] = package_version or "" + candidate["source_order"] = source_order + candidate["source_path"] = local_source_path + candidate["path"] = plugin_dir + if not is_compatible: + candidate["compatible"] = False + candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容" + return candidate + return None + + candidates = self.get_local_plugin_candidates() + for candidate_pid, candidate in candidates.items(): + if candidate_pid.lower() == pid.lower(): + return candidate + return None + @staticmethod def __parse_plugin_index_response(content: str) -> Optional[Dict[str, dict]]: """ @@ -271,6 +448,46 @@ class PluginHelper(metaclass=WeakSingleton): return self.__install_flow_sync(pid, force_install, prepare_filelist, repo_url) + def install_local(self, pid: str, repo_url: str = "", force_install: bool = False) -> Tuple[bool, str]: + """ + 从本地插件来源目录安装插件。 + """ + local_pid = self.parse_local_repo_url(repo_url) if repo_url else pid + if not local_pid or local_pid.lower() != pid.lower(): + return False, "本地插件来源与插件ID不匹配" + + candidate = self.get_local_plugin_candidate(pid) + if not candidate: + return False, f"未找到本地插件:{pid}" + + source_dir = Path(candidate.get("path")) + dest_dir = PLUGIN_DIR / pid.lower() + try: + if source_dir.resolve() == dest_dir.resolve(): + return False, "本地插件来源不能与运行目录相同" + except Exception: + return False, "本地插件来源路径无效" + + def prepare_local() -> Tuple[bool, str]: + try: + shutil.copytree( + source_dir, + dest_dir, + dirs_exist_ok=True, + ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".DS_Store") + ) + return True, "" + except Exception as e: + logger.error(f"复制本地插件 {pid} 失败:{e}") + return False, f"复制本地插件失败:{e}" + + return self.__install_flow_sync( + pid=pid, + force_install=force_install, + prepare_content=prepare_local, + repo_url=None + ) + def __get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \ Tuple[Optional[list], Optional[str]]: """