feat(plugin): support local plugin sources

This commit is contained in:
InfinityPacer
2026-04-18 03:01:08 +08:00
committed by jxxghp
parent 115fb40772
commit 6fa48afa34
4 changed files with 422 additions and 24 deletions

View File

@@ -155,9 +155,13 @@ async def all_plugins(_: User = Depends(get_current_active_superuser_async),
# 未安装的本地插件
not_installed_plugins = [plugin for plugin in local_plugins if not plugin.installed]
# 本地插件来源目录中的插件
local_source_plugins = plugin_manager.get_local_source_plugins()
# 在线插件
online_plugins = await plugin_manager.async_get_online_plugins(force)
if not online_plugins:
candidate_plugins = plugin_manager._process_plugins_list(online_plugins + local_source_plugins, []) \
if online_plugins or local_source_plugins else []
if not candidate_plugins:
# 没有获取在线插件
if state == "market":
# 返回未安装的本地插件
@@ -169,7 +173,7 @@ async def all_plugins(_: User = Depends(get_current_active_superuser_async),
# 已安装插件IDS
_installed_ids = [plugin.id for plugin in installed_plugins]
# 未安装的线上插件或者有更新的插件
for plugin in online_plugins:
for plugin in candidate_plugins:
if plugin.id not in _installed_ids:
market_plugins.append(plugin)
elif plugin.has_update:
@@ -228,11 +232,21 @@ async def install(plugin_id: str,
install_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
# 首先检查插件是否已经存在,并且是否强制安装,否则只进行安装统计
plugin_helper = PluginHelper()
if not force and plugin_id in PluginManager().get_plugin_ids():
is_local_install = plugin_helper.is_local_repo_url(repo_url)
if not force and plugin_id in PluginManager().get_plugin_ids() and not is_local_install:
await plugin_helper.async_install_reg(pid=plugin_id)
else:
# 插件不存在或需要强制安装,下载安装并注册插件
if repo_url:
if is_local_install:
state, msg = await run_in_threadpool(
plugin_helper.install_local,
plugin_id,
repo_url,
force
)
if not state:
return schemas.Response(success=False, message=msg)
elif repo_url:
state, msg = await plugin_helper.async_install(pid=plugin_id, repo_url=repo_url)
# 安装失败则直接响应
if not state:

View File

@@ -417,6 +417,8 @@ class ConfigModel(BaseModel):
PLUGIN_STATISTIC_SHARE: bool = True
# 是否开启插件热加载
PLUGIN_AUTO_RELOAD: bool = False
# 本地插件仓库目录,多个地址使用,分隔
PLUGIN_LOCAL_PATHS: Optional[str] = None
# ==================== Github & PIP ====================
# Github token提高请求api限流阈值 ghp_****

View File

@@ -6,6 +6,7 @@ import importlib.util
import inspect
import os
import posixpath
import shutil
import sys
import threading
import time
@@ -51,6 +52,8 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
self._monitor_thread: Optional[threading.Thread] = None
# 监控停止事件
self._stop_monitor_event = threading.Event()
# 本地插件同步写入运行目录后的短时忽略窗口
self._recent_local_sync: Dict[str, float] = {}
# 开发者模式监测插件修改
if settings.DEV or settings.PLUGIN_AUTO_RELOAD:
self.__start_monitor()
@@ -308,11 +311,14 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
运行 watchfiles 监视器的主循环。
"""
# 监视插件目录
plugins_path = str(settings.ROOT_PATH / "app" / "plugins")
plugin_paths = [str(settings.ROOT_PATH / "app" / "plugins")]
for local_path in PluginHelper.get_local_source_paths():
if local_path.exists() and local_path.is_dir():
plugin_paths.append(str(local_path))
logger.info(">>> 监控线程已启动准备进入watch循环...")
# 使用 watchfiles 监视目录变化,并响应变化事件
# Todo: yield_on_timeout = True 时,每秒检查停止事件,会返回空集合;后续可以考虑用来做心跳之类的功能?
for changes in watch(plugins_path, stop_event=self._stop_monitor_event, rust_timeout=1000,
for changes in watch(*plugin_paths, stop_event=self._stop_monitor_event, rust_timeout=1000,
yield_on_timeout=True):
# 如果收到停止事件,退出循环
if not changes:
@@ -320,18 +326,56 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
# 处理变化事件
plugins_to_reload = set()
local_plugins_to_sync = {}
for _change_type, path_str in changes:
event_path = Path(path_str)
# 跳过非 .py 文件以及 pycache 目录中的文件
if not event_path.name.endswith(".py") or "__pycache__" in event_path.parts:
# 跳过 pycache 目录中的文件
if "__pycache__" in event_path.parts:
continue
if event_path.name == "requirements.txt":
candidate = self._get_local_plugin_candidate_from_path(event_path)
if candidate:
if candidate.get("compatible") is False:
logger.info(
f"检测到本地插件 {candidate.get('id')} 依赖文件变化,"
f"但跳过处理:{candidate.get('skip_reason')}"
)
continue
logger.warning(f"检测到本地插件 {candidate.get('id')} 依赖文件变化,请重新安装本地插件以安装依赖")
continue
# 跳过非 .py 文件
if not event_path.name.endswith(".py"):
continue
# 解析插件ID
pid = self._get_plugin_id_from_path(event_path)
# 跳过无效插件文件
if pid:
# 收集需要重载的插件ID自动去重避免重复重载
runtime_pid = self._get_plugin_id_from_path(event_path)
local_candidate = self._get_local_plugin_candidate_from_path(event_path) if not runtime_pid else None
if runtime_pid:
last_sync_time = self._recent_local_sync.get(runtime_pid)
if last_sync_time and time.time() - last_sync_time < 2:
logger.debug(f"忽略本地插件同步产生的运行目录变化:{runtime_pid}")
continue
# 运行目录变化只重载,不能反向触发本地同步。
plugins_to_reload.add(runtime_pid)
elif local_candidate:
if local_candidate.get("compatible") is False:
package_version = local_candidate.get("package_version")
source_root = f"plugins.{package_version}" if package_version else "plugins"
logger.info(
f"检测到本地插件 {local_candidate.get('id')} 文件变化,来源:{source_root}"
f"文件:{event_path},但跳过同步:{local_candidate.get('skip_reason')}"
)
continue
local_plugins_to_sync[local_candidate.get("id")] = (local_candidate, event_path)
for pid, (candidate, event_path) in local_plugins_to_sync.items():
package_version = candidate.get("package_version")
source_root = f"plugins.{package_version}" if package_version else "plugins"
logger.info(f"检测到本地插件 {pid} 文件变化,来源:{source_root},文件:{event_path}")
if self._sync_local_plugin_if_installed(pid, candidate):
plugins_to_reload.add(pid)
# 触发重载
@@ -351,6 +395,7 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
:return: 插件ID字符串如果不是有效插件文件则返回 None。
"""
try:
event_path = event_path.resolve()
plugins_root = settings.ROOT_PATH / "app" / "plugins"
# 确保修改的文件在 plugins 目录下
if not event_path.is_relative_to(plugins_root):
@@ -389,6 +434,78 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
logger.error(f"从路径解析插件ID时出错: {e}")
return None
@staticmethod
def _get_local_plugin_candidate_from_path(event_path: Path) -> Optional[dict]:
"""
根据本地插件来源路径解析具体插件候选,保留 plugins/plugins.v2 来源差异。
"""
try:
event_path = event_path.resolve()
for local_path in PluginHelper.get_local_source_paths():
if not local_path.exists() or not local_path.is_dir():
continue
if not event_path.is_relative_to(local_path):
continue
try:
relative_parts = event_path.relative_to(local_path).parts
except (ValueError, IndexError):
continue
if len(relative_parts) < 2:
continue
if relative_parts[0] == "plugins":
package_version = ""
elif relative_parts[0].startswith("plugins."):
package_version = relative_parts[0].split(".", 1)[1]
else:
continue
plugin_dir_name = relative_parts[1]
candidate = PluginHelper().get_local_plugin_candidate(
pid=plugin_dir_name,
package_version=package_version,
source_path=local_path,
strict_compat=False
)
if candidate:
return candidate
return None
except Exception as e:
logger.error(f"从本地来源路径解析插件候选时出错: {e}")
return None
@staticmethod
def _sync_local_plugin_if_installed(pid: str, candidate: Optional[dict] = None) -> bool:
"""
已安装本地插件源码变化时,同步到运行目录。
"""
installed_plugins = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
if pid not in installed_plugins:
logger.info(f"本地插件 {pid} 尚未安装,跳过自动同步和热重载")
return False
candidate = candidate or PluginHelper().get_local_plugin_candidate(pid)
if not candidate:
return False
source_dir = Path(candidate.get("path"))
dest_dir = settings.ROOT_PATH / "app" / "plugins" / pid.lower()
try:
if source_dir.resolve() == dest_dir.resolve():
return True
if dest_dir.exists():
shutil.rmtree(dest_dir, ignore_errors=True)
shutil.copytree(
source_dir,
dest_dir,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".DS_Store")
)
PluginManager()._recent_local_sync[pid] = time.time()
logger.info(f"已同步本地插件 {pid}{source_dir} -> {dest_dir}")
return True
except Exception as e:
logger.error(f"同步本地插件 {pid} 失败:{e}")
return False
@staticmethod
def __stop_plugin(plugin: Any):
"""
@@ -1116,6 +1233,31 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0)
return plugins
def get_local_source_plugins(self) -> List[schemas.Plugin]:
"""
获取本地插件来源目录中的插件信息。
"""
plugins = []
installed_apps = SystemConfigOper().get(SystemConfigKey.UserInstalledPlugins) or []
local_candidates = PluginHelper().get_local_plugin_candidates()
for pid, plugin_info in local_candidates.items():
package_version = plugin_info.get("package_version")
plugin = self._process_plugin_info(
pid=pid,
plugin_info=plugin_info,
market=PluginHelper.make_local_repo_url(pid),
installed_apps=installed_apps,
add_time=0,
package_version=package_version
)
if not plugin:
continue
plugin.is_local = True
plugins.append(plugin)
plugins.sort(key=lambda x: x.plugin_order if hasattr(x, "plugin_order") else 0)
return plugins
@staticmethod
def is_plugin_exists(pid: str, version: str = None) -> bool:
"""
@@ -1194,18 +1336,41 @@ class PluginManager(ConfigReloadMixin, metaclass=Singleton):
# 将未出现在高版本插件列表中的 v1 插件加入 all_plugins
higher_plugin_ids = {f"{p.id}{p.plugin_version}" for p in higher_version_plugins}
all_plugins.extend([p for p in base_version_plugins if f"{p.id}{p.plugin_version}" not in higher_plugin_ids])
# 去重
all_plugins = list({f"{p.id}{p.plugin_version}": p for p in all_plugins}.values())
# 所有插件按 repo 在设置中的顺序排序
all_plugins.sort(
key=lambda x: settings.PLUGIN_MARKET.split(",").index(x.repo_url) if x.repo_url else 0
)
# 相同 ID 的插件保留版本号最大的版本
max_versions = {}
for p in all_plugins:
if p.id not in max_versions or StringUtils.compare_version(p.plugin_version, ">", max_versions[p.id]):
max_versions[p.id] = p.plugin_version
result = [p for p in all_plugins if p.plugin_version == max_versions[p.id]]
markets = [item for item in settings.PLUGIN_MARKET.split(",") if item]
def repo_order(plugin: schemas.Plugin) -> int:
if PluginHelper.is_local_repo_url(plugin.repo_url):
return len(markets) + 1
if plugin.repo_url in markets:
return markets.index(plugin.repo_url)
return len(markets)
# 去重:同 ID + 版本优先保留市场来源,其次按来源顺序稳定保留。
dedup_plugins = {}
for plugin in sorted(all_plugins, key=repo_order):
key = f"{plugin.id}{plugin.plugin_version}"
exists = dedup_plugins.get(key)
if not exists:
dedup_plugins[key] = plugin
continue
if PluginHelper.is_local_repo_url(exists.repo_url) and not PluginHelper.is_local_repo_url(plugin.repo_url):
dedup_plugins[key] = plugin
# 相同 ID 的插件保留版本号最大的版本;同版本市场来源优先。
result_by_id = {}
for plugin in sorted(dedup_plugins.values(), key=repo_order):
exists = result_by_id.get(plugin.id)
if not exists:
result_by_id[plugin.id] = plugin
continue
if StringUtils.compare_version(plugin.plugin_version, ">", exists.plugin_version):
result_by_id[plugin.id] = plugin
elif plugin.plugin_version == exists.plugin_version \
and PluginHelper.is_local_repo_url(exists.repo_url) \
and not PluginHelper.is_local_repo_url(plugin.repo_url):
result_by_id[plugin.id] = plugin
result = list(result_by_id.values())
logger.info(f"共获取到 {len(result)} 个线上插件")
return result

View File

@@ -26,10 +26,12 @@ from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins"
LOCAL_REPO_PREFIX = "local://"
class PluginHelper(metaclass=WeakSingleton):
@@ -49,6 +51,181 @@ class PluginHelper(metaclass=WeakSingleton):
if self.install_report():
self.systemconfig.set(SystemConfigKey.PluginInstallReport, "1")
@staticmethod
def is_local_repo_url(repo_url: Optional[str]) -> bool:
"""
判断是否为本地插件来源标识。
"""
return bool(repo_url and repo_url.startswith(LOCAL_REPO_PREFIX))
@staticmethod
def make_local_repo_url(pid: str) -> str:
"""
生成本地插件安装来源标识。
"""
return f"{LOCAL_REPO_PREFIX}{pid}"
@staticmethod
def parse_local_repo_url(repo_url: str) -> Optional[str]:
"""
从本地插件来源标识中解析插件ID。
"""
if not PluginHelper.is_local_repo_url(repo_url):
return None
pid = repo_url[len(LOCAL_REPO_PREFIX):].strip("/")
return pid or None
@staticmethod
def get_local_source_paths() -> List[Path]:
"""
获取本地插件来源目录列表。
"""
if not settings.PLUGIN_LOCAL_PATHS:
return []
paths = []
for item in settings.PLUGIN_LOCAL_PATHS.split(","):
local_path = item.strip()
if not local_path:
continue
path = Path(local_path).expanduser()
if not path.is_absolute():
path = settings.ROOT_PATH / path
paths.append(path.resolve())
return paths
@staticmethod
def __get_local_package(source_path: Path, package_version: Optional[str] = None) -> Optional[Dict[str, dict]]:
"""
从本地插件仓库读取 package.json 或 package.{version}.json。
"""
package_file = source_path / (
f"package.{package_version}.json" if package_version else "package.json"
)
if not package_file.exists():
return {}
try:
content = package_file.read_text(encoding="utf-8")
payload = json.loads(content)
except Exception as e:
logger.warning(f"读取本地插件包 {package_file} 失败:{e}")
return None
if not isinstance(payload, dict):
logger.warning(f"本地插件包 {package_file} 格式不正确")
return None
return payload
@staticmethod
def __get_local_plugin_dir(source_path: Path, pid: str, package_version: Optional[str]) -> Path:
plugin_root = f"plugins.{package_version}" if package_version else "plugins"
return source_path / plugin_root / pid.lower()
def get_local_plugin_candidates(self) -> Dict[str, dict]:
"""
扫描本地插件仓库按插件ID保留版本号最高的候选。
"""
candidates: Dict[str, dict] = {}
for source_order, source_path in enumerate(self.get_local_source_paths()):
if not source_path.exists() or not source_path.is_dir():
logger.warning(f"本地插件来源目录不存在或不可读:{source_path}")
continue
package_candidates = []
if settings.VERSION_FLAG:
package_candidates.append((settings.VERSION_FLAG, self.__get_local_package(source_path,
settings.VERSION_FLAG)))
package_candidates.append(("", self.__get_local_package(source_path)))
for package_version, local_plugins in package_candidates:
if local_plugins is None:
continue
for pid, plugin_info in local_plugins.items():
if not isinstance(plugin_info, dict):
continue
# package.json 中的旧结构需要声明兼容当前版本。
if (
not package_version
and settings.VERSION_FLAG
and plugin_info.get(settings.VERSION_FLAG) is not True
):
continue
plugin_dir = self.__get_local_plugin_dir(source_path, pid, package_version)
if not plugin_dir.is_dir():
logger.debug(f"跳过本地插件 {pid}:插件目录不存在 {plugin_dir}")
continue
candidate = plugin_info.copy()
candidate["id"] = pid
candidate["package_version"] = package_version
candidate["source_order"] = source_order
candidate["source_path"] = source_path
candidate["path"] = plugin_dir
candidate_version = str(candidate.get("version") or "0")
existing = candidates.get(pid)
if not existing:
candidates[pid] = candidate
continue
existing_version = str(existing.get("version") or "0")
if StringUtils.compare_version(candidate_version, ">", existing_version):
candidates[pid] = candidate
elif (
candidate_version == existing_version
and source_order < int(existing.get("source_order", source_order))
):
logger.info(f"本地插件 {pid} 存在同版本来源,使用靠前目录:{source_path}")
candidates[pid] = candidate
return candidates
def get_local_plugin_candidate(self, pid: str, package_version: Optional[str] = None,
source_path: Optional[Path] = None,
strict_compat: bool = True) -> Optional[dict]:
"""
获取指定插件ID的本地插件候选。
"""
if not pid:
return None
if package_version is not None or source_path is not None:
source_paths = [source_path.resolve()] if source_path else self.get_local_source_paths()
for source_order, local_source_path in enumerate(self.get_local_source_paths()):
if local_source_path not in source_paths:
continue
local_plugins = self.__get_local_package(local_source_path, package_version or "")
if not local_plugins:
continue
for candidate_pid, plugin_info in local_plugins.items():
if candidate_pid.lower() != pid.lower() or not isinstance(plugin_info, dict):
continue
is_compatible = not (
not package_version
and settings.VERSION_FLAG
and plugin_info.get(settings.VERSION_FLAG) is not True
)
if not is_compatible and strict_compat:
return None
plugin_dir = self.__get_local_plugin_dir(local_source_path, candidate_pid, package_version or "")
if not plugin_dir.is_dir():
return None
candidate = plugin_info.copy()
candidate["id"] = candidate_pid
candidate["package_version"] = package_version or ""
candidate["source_order"] = source_order
candidate["source_path"] = local_source_path
candidate["path"] = plugin_dir
if not is_compatible:
candidate["compatible"] = False
candidate["skip_reason"] = f"package.json 未声明 {settings.VERSION_FLAG} 兼容"
return candidate
return None
candidates = self.get_local_plugin_candidates()
for candidate_pid, candidate in candidates.items():
if candidate_pid.lower() == pid.lower():
return candidate
return None
@staticmethod
def __parse_plugin_index_response(content: str) -> Optional[Dict[str, dict]]:
"""
@@ -271,6 +448,46 @@ class PluginHelper(metaclass=WeakSingleton):
return self.__install_flow_sync(pid, force_install, prepare_filelist, repo_url)
def install_local(self, pid: str, repo_url: str = "", force_install: bool = False) -> Tuple[bool, str]:
"""
从本地插件来源目录安装插件。
"""
local_pid = self.parse_local_repo_url(repo_url) if repo_url else pid
if not local_pid or local_pid.lower() != pid.lower():
return False, "本地插件来源与插件ID不匹配"
candidate = self.get_local_plugin_candidate(pid)
if not candidate:
return False, f"未找到本地插件:{pid}"
source_dir = Path(candidate.get("path"))
dest_dir = PLUGIN_DIR / pid.lower()
try:
if source_dir.resolve() == dest_dir.resolve():
return False, "本地插件来源不能与运行目录相同"
except Exception:
return False, "本地插件来源路径无效"
def prepare_local() -> Tuple[bool, str]:
try:
shutil.copytree(
source_dir,
dest_dir,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".DS_Store")
)
return True, ""
except Exception as e:
logger.error(f"复制本地插件 {pid} 失败:{e}")
return False, f"复制本地插件失败:{e}"
return self.__install_flow_sync(
pid=pid,
force_install=force_install,
prepare_content=prepare_local,
repo_url=None
)
def __get_file_list(self, pid: str, user_repo: str, package_version: Optional[str] = None) -> \
Tuple[Optional[list], Optional[str]]:
"""