feat(plugin): encode local repo path in source url

This commit is contained in:
InfinityPacer
2026-04-19 04:19:52 +08:00
committed by jxxghp
parent 3a93efb082
commit 7b096c0a09
2 changed files with 124 additions and 47 deletions

View File

@@ -1245,7 +1245,11 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
plugin = self._process_plugin_info(
pid=pid,
plugin_info=plugin_info,
market=PluginHelper.make_local_repo_url(pid),
market=PluginHelper.make_local_repo_url(
pid,
plugin_info.get("repo_path"),
package_version
),
installed_apps=installed_apps,
add_time=0,
package_version=package_version

View File

@@ -9,6 +9,7 @@ import traceback
import zipfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Callable, Awaitable
from urllib.parse import parse_qs, quote, unquote, urlsplit
import aiofiles
import aioshutil
@@ -55,31 +56,77 @@ class PluginHelper(metaclass=WeakSingleton):
@staticmethod
def is_local_repo_url(repo_url: Optional[str]) -> bool:
"""
判断是否为本地插件来源标识
判断是否为本地插件来源标识
"""
return bool(repo_url and repo_url.startswith(LOCAL_REPO_PREFIX))
@staticmethod
def make_local_repo_url(pid: str) -> str:
def make_local_repo_url(pid: str, repo_path: Optional[Path] = None,
package_version: Optional[str] = None) -> str:
"""
生成本地插件安装来源标识
生成本地插件安装来源标识
"""
return f"{LOCAL_REPO_PREFIX}{pid}"
repo_url = f"{LOCAL_REPO_PREFIX}{quote(pid, safe='')}"
params = []
if repo_path:
params.append(f"path={quote(str(repo_path), safe='/:~')}")
if package_version:
params.append(f"version={quote(package_version, safe='')}")
if params:
repo_url = f"{repo_url}?{'&'.join(params)}"
return repo_url
@staticmethod
def parse_local_repo_url(repo_url: str) -> Optional[str]:
"""
从本地插件来源标识中解析插件ID
从本地插件来源标识中解析插件ID
"""
if not PluginHelper.is_local_repo_url(repo_url):
return None
pid = repo_url[len(LOCAL_REPO_PREFIX):].strip("/")
try:
parts = urlsplit(repo_url)
pid = unquote(parts.netloc or parts.path.strip("/"))
except Exception:
pid = repo_url[len(LOCAL_REPO_PREFIX):].split("?", 1)[0].strip("/")
return pid or None
@staticmethod
def parse_local_repo_path(repo_url: str) -> Optional[Path]:
"""
从本地插件来源标识中解析仓库路径
"""
if not PluginHelper.is_local_repo_url(repo_url):
return None
try:
values = parse_qs(urlsplit(repo_url).query).get("path")
if not values:
return None
path = Path(values[0]).expanduser()
if not path.is_absolute():
path = settings.ROOT_PATH / path
return path.resolve()
except Exception:
return None
@staticmethod
def parse_local_repo_package_version(repo_url: str) -> Optional[str]:
"""
从本地插件来源标识中解析 package 版本
"""
if not PluginHelper.is_local_repo_url(repo_url):
return None
try:
values = parse_qs(urlsplit(repo_url).query).get("version")
if not values:
return None
return values[0]
except Exception:
return None
@staticmethod
def get_local_repo_paths() -> List[Path]:
"""
获取本地插件仓库目录列表
获取本地插件仓库目录列表
"""
if not settings.PLUGIN_LOCAL_REPO_PATHS:
return []
@@ -97,7 +144,7 @@ class PluginHelper(metaclass=WeakSingleton):
@staticmethod
def __get_local_package(repo_path: Path, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
从本地插件仓库读取 package.json 或 package.{version}.json
从本地插件仓库读取 package.json 或 package.{version}.json
"""
package_file = repo_path / (
f"package.{package_version}.json" if package_version else "package.json"
@@ -108,10 +155,10 @@ class PluginHelper(metaclass=WeakSingleton):
content = package_file.read_text(encoding="utf-8")
payload = json.loads(content)
except Exception as e:
logger.warning(f"读取本地插件包 {package_file} 失败:{e}")
logger.warn(f"读取本地插件包 {package_file} 失败:{e}")
return None
if not isinstance(payload, dict):
logger.warning(f"本地插件包 {package_file} 格式不正确")
logger.warn(f"本地插件包 {package_file} 格式不正确")
return None
return payload
@@ -122,12 +169,12 @@ class PluginHelper(metaclass=WeakSingleton):
def get_local_plugin_candidates(self) -> Dict[str, dict]:
"""
扫描本地插件仓库按插件ID保留版本号最高的候选
扫描本地插件仓库按插件ID保留版本号最高的候选
"""
candidates: Dict[str, dict] = {}
for source_order, repo_path in enumerate(self.get_local_repo_paths()):
for repo_order, repo_path in enumerate(self.get_local_repo_paths()):
if not repo_path.exists() or not repo_path.is_dir():
logger.warning(f"本地插件仓库目录不存在或不可读:{repo_path}")
logger.warn(f"本地插件仓库目录不存在或不可读:{repo_path}")
continue
package_candidates = []
@@ -158,7 +205,7 @@ class PluginHelper(metaclass=WeakSingleton):
candidate = plugin_info.copy()
candidate["id"] = pid
candidate["package_version"] = package_version
candidate["source_order"] = source_order
candidate["repo_order"] = repo_order
candidate["repo_path"] = repo_path
candidate["path"] = plugin_dir
candidate_version = str(candidate.get("version") or "0")
@@ -173,7 +220,7 @@ class PluginHelper(metaclass=WeakSingleton):
candidates[pid] = candidate
elif (
candidate_version == existing_version
and source_order < int(existing.get("source_order", source_order))
and repo_order < int(existing.get("repo_order", repo_order))
):
logger.info(f"本地插件 {pid} 存在同版本来源,使用靠前目录:{repo_path}")
candidates[pid] = candidate
@@ -184,42 +231,58 @@ class PluginHelper(metaclass=WeakSingleton):
repo_path: Optional[Path] = None,
strict_compat: bool = True) -> Optional[dict]:
"""
获取指定插件ID的本地插件候选
获取指定插件ID的本地插件候选
"""
if not pid:
return None
if package_version is not None or repo_path is not None:
repo_paths = [repo_path.resolve()] if repo_path else self.get_local_repo_paths()
for source_order, local_repo_path in enumerate(self.get_local_repo_paths()):
package_versions = [package_version] if package_version is not None else []
if package_version is None:
if settings.VERSION_FLAG:
package_versions.append(settings.VERSION_FLAG)
package_versions.append("")
selected_candidate = None
for repo_order, local_repo_path in enumerate(self.get_local_repo_paths()):
if local_repo_path not in repo_paths:
continue
local_plugins = self.__get_local_package(local_repo_path, package_version or "")
if not local_plugins:
continue
for candidate_pid, plugin_info in local_plugins.items():
if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict):
for current_package_version in package_versions:
local_plugins = self.__get_local_package(local_repo_path, current_package_version or "")
if not local_plugins:
continue
is_compatible = not (
not package_version
and settings.VERSION_FLAG
and plugin_info.get(settings.VERSION_FLAG) is not True
)
if not is_compatible and strict_compat:
return None
plugin_dir = self.__get_local_plugin_dir(local_repo_path, candidate_pid, package_version or "")
if not plugin_dir.is_dir():
return None
candidate = plugin_info.copy()
candidate["id"] = candidate_pid
candidate["package_version"] = package_version or ""
candidate["source_order"] = source_order
candidate["repo_path"] = local_repo_path
candidate["path"] = plugin_dir
if not is_compatible:
candidate["compatible"] = False
candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容"
return candidate
return None
for candidate_pid, plugin_info in local_plugins.items():
if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict):
continue
is_compatible = not (
not current_package_version
and settings.VERSION_FLAG
and plugin_info.get(settings.VERSION_FLAG) is not True
)
if not is_compatible and strict_compat:
continue
plugin_dir = self.__get_local_plugin_dir(local_repo_path, candidate_pid,
current_package_version or "")
if not plugin_dir.is_dir():
continue
candidate = plugin_info.copy()
candidate["id"] = candidate_pid
candidate["package_version"] = current_package_version or ""
candidate["repo_order"] = repo_order
candidate["repo_path"] = local_repo_path
candidate["path"] = plugin_dir
if not is_compatible:
candidate["compatible"] = False
candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容"
if package_version is not None:
return candidate
if not selected_candidate:
selected_candidate = candidate
continue
selected_version = str(selected_candidate.get("version") or "0")
candidate_version = str(candidate.get("version") or "0")
if StringUtils.compare_version(candidate_version, ">", selected_version):
selected_candidate = candidate
return selected_candidate
candidates = self.get_local_plugin_candidates()
for candidate_pid, candidate in candidates.items():
@@ -454,13 +517,19 @@ class PluginHelper(metaclass=WeakSingleton):
def install_local(self, pid: str, repo_url: str = "", force_install: bool = False) -> Tuple[bool, str]:
"""
从本地插件仓库目录安装插件
从本地插件仓库目录安装插件
"""
local_pid = self.parse_local_repo_url(repo_url) if repo_url else pid
if not local_pid or local_pid.lower() != pid.lower():
return False, "本地插件来源与插件ID不匹配"
candidate = self.get_local_plugin_candidate(pid)
repo_path = self.parse_local_repo_path(repo_url) if repo_url else None
package_version = self.parse_local_repo_package_version(repo_url) if repo_url else None
candidate = self.get_local_plugin_candidate(
pid,
package_version=package_version,
repo_path=repo_path
)
if not candidate:
return False, f"未找到本地插件:{pid}"
@@ -489,7 +558,11 @@ class PluginHelper(metaclass=WeakSingleton):
pid=pid,
force_install=force_install,
prepare_content=prepare_local,
repo_url=None
repo_url=repo_url or self.make_local_repo_url(
pid,
candidate.get("repo_path"),
candidate.get("package_version")
)
)
def __get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \