From 7b096c0a0920c48ea54bd6d98067634dd4c8a58b Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Sun, 19 Apr 2026 04:19:52 +0800 Subject: [PATCH] feat(plugin): encode local repo path in source url --- app/core/plugin.py | 6 +- app/helper/plugin.py | 165 +++++++++++++++++++++++++++++++------------ 2 files changed, 124 insertions(+), 47 deletions(-) diff --git a/app/core/plugin.py b/app/core/plugin.py index cce1f10d..592f3c85 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -1245,7 +1245,11 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton): plugin = self._process_plugin_info( pid=pid, plugin_info=plugin_info, - market=PluginHelper.make_local_repo_url(pid), + market=PluginHelper.make_local_repo_url( + pid, + plugin_info.get("repo_path"), + package_version + ), installed_apps=installed_apps, add_time=0, package_version=package_version diff --git a/app/helper/plugin.py b/app/helper/plugin.py index 003391d3..7714abfc 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -9,6 +9,7 @@ import traceback import zipfile from pathlib import Path from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable +from urllib.parse import parse_qs, quote, unquote, urlsplit import aiofiles import aioshutil @@ -55,31 +56,77 @@ class PluginHelper(metaclass=WeakSingleton): @staticmethod def is_local_repo_url(repo_url: Optional[str]) -> bool: """ - 判断是否为本地插件来源标识。 + 判断是否为本地插件来源标识 """ return bool(repo_url and repo_url.startswith(LOCAL_REPO_PREFIX)) @staticmethod - def make_local_repo_url(pid: str) -> str: + def make_local_repo_url(pid: str, repo_path: Optional[Path] = None, + package_version: Optional[str] = None) -> str: """ - 生成本地插件安装来源标识。 + 生成本地插件安装来源标识 """ - return f"{LOCAL_REPO_PREFIX}{pid}" + repo_url = f"{LOCAL_REPO_PREFIX}{quote(pid, safe='')}" + params = [] + if repo_path: + params.append(f"path={quote(str(repo_path), safe='/:~')}") + if package_version: + params.append(f"version={quote(package_version, safe='')}") + if params: + repo_url = f"{repo_url}?{'&'.join(params)}" + return repo_url @staticmethod def parse_local_repo_url(repo_url: str) -> Optional[str]: """ - 从本地插件来源标识中解析插件ID。 + 从本地插件来源标识中解析插件ID """ if not PluginHelper.is_local_repo_url(repo_url): return None - pid = repo_url[len(LOCAL_REPO_PREFIX):].strip("/") + try: + parts = urlsplit(repo_url) + pid = unquote(parts.netloc or parts.path.strip("/")) + except Exception: + pid = repo_url[len(LOCAL_REPO_PREFIX):].split("?", 1)[0].strip("/") return pid or None + @staticmethod + def parse_local_repo_path(repo_url: str) -> Optional[Path]: + """ + 从本地插件来源标识中解析仓库路径 + """ + if not PluginHelper.is_local_repo_url(repo_url): + return None + try: + values = parse_qs(urlsplit(repo_url).query).get("path") + if not values: + return None + path = Path(values[0]).expanduser() + if not path.is_absolute(): + path = settings.ROOT_PATH / path + return path.resolve() + except Exception: + return None + + @staticmethod + def parse_local_repo_package_version(repo_url: str) -> Optional[str]: + """ + 从本地插件来源标识中解析 package 版本 + """ + if not PluginHelper.is_local_repo_url(repo_url): + return None + try: + values = parse_qs(urlsplit(repo_url).query).get("version") + if not values: + return None + return values[0] + except Exception: + return None + @staticmethod def get_local_repo_paths() -> List[Path]: """ - 获取本地插件仓库目录列表。 + 获取本地插件仓库目录列表 """ if not settings.PLUGIN_LOCAL_REPO_PATHS: return [] @@ -97,7 +144,7 @@ class PluginHelper(metaclass=WeakSingleton): @staticmethod def __get_local_package(repo_path: Path, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]: """ - 从本地插件仓库读取 package.json 或 package.{version}.json。 + 从本地插件仓库读取 package.json 或 package.{version}.json """ package_file = repo_path / ( f"package.{package_version}.json" if package_version else "package.json" @@ -108,10 +155,10 @@ class PluginHelper(metaclass=WeakSingleton): content = package_file.read_text(encoding="utf-8") payload = json.loads(content) except Exception as e: - logger.warning(f"读取本地插件包 {package_file} 失败:{e}") + logger.warn(f"读取本地插件包 {package_file} 失败:{e}") return None if not isinstance(payload, dict): - logger.warning(f"本地插件包 {package_file} 格式不正确") + logger.warn(f"本地插件包 {package_file} 格式不正确") return None return payload @@ -122,12 +169,12 @@ class PluginHelper(metaclass=WeakSingleton): def get_local_plugin_candidates(self) -> Dict[str, dict]: """ - 扫描本地插件仓库,按插件ID保留版本号最高的候选。 + 扫描本地插件仓库,按插件ID保留版本号最高的候选 """ candidates: Dict[str, dict] = {} - for source_order, repo_path in enumerate(self.get_local_repo_paths()): + for repo_order, repo_path in enumerate(self.get_local_repo_paths()): if not repo_path.exists() or not repo_path.is_dir(): - logger.warning(f"本地插件仓库目录不存在或不可读:{repo_path}") + logger.warn(f"本地插件仓库目录不存在或不可读:{repo_path}") continue package_candidates = [] @@ -158,7 +205,7 @@ class PluginHelper(metaclass=WeakSingleton): candidate = plugin_info.copy() candidate["id"] = pid candidate["package_version"] = package_version - candidate["source_order"] = source_order + candidate["repo_order"] = repo_order candidate["repo_path"] = repo_path candidate["path"] = plugin_dir candidate_version = str(candidate.get("version") or "0") @@ -173,7 +220,7 @@ class PluginHelper(metaclass=WeakSingleton): candidates[pid] = candidate elif ( candidate_version == existing_version - and source_order < int(existing.get("source_order", source_order)) + and repo_order < int(existing.get("repo_order", repo_order)) ): logger.info(f"本地插件 {pid} 存在同版本来源,使用靠前目录:{repo_path}") candidates[pid] = candidate @@ -184,42 +231,58 @@ class PluginHelper(metaclass=WeakSingleton): repo_path: Optional[Path] = None, strict_compat: bool = True) -> Optional[dict]: """ - 获取指定插件ID的本地插件候选。 + 获取指定插件ID的本地插件候选 """ if not pid: return None if package_version is not None or repo_path is not None: repo_paths = [repo_path.resolve()] if repo_path else self.get_local_repo_paths() - for source_order, local_repo_path in enumerate(self.get_local_repo_paths()): + package_versions = [package_version] if package_version is not None else [] + if package_version is None: + if settings.VERSION_FLAG: + package_versions.append(settings.VERSION_FLAG) + package_versions.append("") + selected_candidate = None + for repo_order, local_repo_path in enumerate(self.get_local_repo_paths()): if local_repo_path not in repo_paths: continue - local_plugins = self.__get_local_package(local_repo_path, package_version or "") - if not local_plugins: - continue - for candidate_pid, plugin_info in local_plugins.items(): - if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict): + for current_package_version in package_versions: + local_plugins = self.__get_local_package(local_repo_path, current_package_version or "") + if not local_plugins: continue - is_compatible = not ( - not package_version - and settings.VERSION_FLAG - and plugin_info.get(settings.VERSION_FLAG) is not True - ) - if not is_compatible and strict_compat: - return None - plugin_dir = self.__get_local_plugin_dir(local_repo_path, candidate_pid, package_version or "") - if not plugin_dir.is_dir(): - return None - candidate = plugin_info.copy() - candidate["id"] = candidate_pid - candidate["package_version"] = package_version or "" - candidate["source_order"] = source_order - candidate["repo_path"] = local_repo_path - candidate["path"] = plugin_dir - if not is_compatible: - candidate["compatible"] = False - candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容" - return candidate - return None + for candidate_pid, plugin_info in local_plugins.items(): + if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict): + continue + is_compatible = not ( + not current_package_version + and settings.VERSION_FLAG + and plugin_info.get(settings.VERSION_FLAG) is not True + ) + if not is_compatible and strict_compat: + continue + plugin_dir = self.__get_local_plugin_dir(local_repo_path, candidate_pid, + current_package_version or "") + if not plugin_dir.is_dir(): + continue + candidate = plugin_info.copy() + candidate["id"] = candidate_pid + candidate["package_version"] = current_package_version or "" + candidate["repo_order"] = repo_order + candidate["repo_path"] = local_repo_path + candidate["path"] = plugin_dir + if not is_compatible: + candidate["compatible"] = False + candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容" + if package_version is not None: + return candidate + if not selected_candidate: + selected_candidate = candidate + continue + selected_version = str(selected_candidate.get("version") or "0") + candidate_version = str(candidate.get("version") or "0") + if StringUtils.compare_version(candidate_version, ">", selected_version): + selected_candidate = candidate + return selected_candidate candidates = self.get_local_plugin_candidates() for candidate_pid, candidate in candidates.items(): @@ -454,13 +517,19 @@ class PluginHelper(metaclass=WeakSingleton): def install_local(self, pid: str, repo_url: str = "", force_install: bool = False) -> Tuple[bool, str]: """ - 从本地插件仓库目录安装插件。 + 从本地插件仓库目录安装插件 """ local_pid = self.parse_local_repo_url(repo_url) if repo_url else pid if not local_pid or local_pid.lower() != pid.lower(): return False, "本地插件来源与插件ID不匹配" - candidate = self.get_local_plugin_candidate(pid) + repo_path = self.parse_local_repo_path(repo_url) if repo_url else None + package_version = self.parse_local_repo_package_version(repo_url) if repo_url else None + candidate = self.get_local_plugin_candidate( + pid, + package_version=package_version, + repo_path=repo_path + ) if not candidate: return False, f"未找到本地插件:{pid}" @@ -489,7 +558,11 @@ class PluginHelper(metaclass=WeakSingleton): pid=pid, force_install=force_install, prepare_content=prepare_local, - repo_url=None + repo_url=repo_url or self.make_local_repo_url( + pid, + candidate.get("repo_path"), + candidate.get("package_version") + ) ) def __get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \