diff --git a/app/chain/user.py b/app/chain/user.py index 5a2650b8..6a36aef2 100644 --- a/app/chain/user.py +++ b/app/chain/user.py @@ -202,9 +202,9 @@ class UserChain(ChainBase, metaclass=Singleton): # 触发认证通过的拦截事件 intercept_event = self.eventmanager.send_event( etype=ChainEventType.AuthIntercept, - data=AuthInterceptCredentials(username=username, channel=channel, service=service, token=token) + data=AuthInterceptCredentials(username=username, channel=channel, service=service, + token=token, status="completed") ) - if intercept_event and intercept_event.event_data: intercept_data: AuthInterceptCredentials = intercept_event.event_data if intercept_data.cancel: diff --git a/app/core/plugin.py b/app/core/plugin.py index 068cdd0d..b1fd0a5b 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -331,6 +331,25 @@ class PluginManager(metaclass=Singleton): ) return sync_plugins + def install_plugin_missing_dependencies(self) -> List[str]: + """ + 安装插件中缺失或不兼容的依赖项 + """ + # 第一步:获取需要安装的依赖项列表 + missing_dependencies = self.pluginhelper.find_missing_dependencies() + if not missing_dependencies: + return missing_dependencies + logger.info(f"开始安装缺失的依赖项,共 {len(missing_dependencies)} 个...") + # 第二步:安装依赖项并返回结果 + total_start_time = time.time() + success, message = self.pluginhelper.install_dependencies(missing_dependencies) + total_elapsed_time = time.time() - total_start_time + if success: + logger.info(f"已完成 {len(missing_dependencies)} 个依赖项安装,总耗时:{total_elapsed_time:.2f} 秒") + else: + logger.warning(f"存在缺失依赖项安装失败,请尝试手动安装,总耗时:{total_elapsed_time:.2f} 秒") + return missing_dependencies + def get_plugin_config(self, pid: str) -> dict: """ 获取插件配置 diff --git a/app/helper/plugin.py b/app/helper/plugin.py index a9a2e7c5..0d92173c 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -2,9 +2,13 @@ import json import shutil import traceback from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Set +import pkg_resources from cachetools import TTLCache, cached +from packaging.specifiers import SpecifierSet, InvalidSpecifier +from packaging.version import Version, InvalidVersion +from pkg_resources import Requirement from app.core.config import settings from app.db.systemconfig_oper import SystemConfigOper @@ -15,6 +19,8 @@ from app.utils.singleton import Singleton from app.utils.system import SystemUtils from app.utils.url import UrlUtils +PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins" + class PluginHelper(metaclass=Singleton): """ @@ -359,7 +365,7 @@ class PluginHelper(metaclass=Singleton): requirements_txt = res.text if requirements_txt.strip(): # 保存并安装依赖 - requirements_file_path = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() / "requirements.txt" + requirements_file_path = PLUGIN_DIR / pid.lower() / "requirements.txt" requirements_file_path.parent.mkdir(parents=True, exist_ok=True) with open(requirements_file_path, "w", encoding="utf-8") as f: f.write(requirements_txt) @@ -376,7 +382,7 @@ class PluginHelper(metaclass=Singleton): :return: (是否存在依赖,安装是否成功, 错误信息) """ # 定位插件目录和依赖文件 - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() + plugin_dir = PLUGIN_DIR / pid.lower() requirements_file = plugin_dir / "requirements.txt" # 检查是否存在 requirements.txt 文件 @@ -397,7 +403,7 @@ class PluginHelper(metaclass=Singleton): :param pid: 插件 ID :return: 备份目录路径 """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid backup_dir = Path(settings.TEMP_PATH) / "plugins_backup" / pid if plugin_dir.exists(): @@ -418,7 +424,7 @@ class PluginHelper(metaclass=Singleton): :param pid: 插件 ID :param backup_dir: 备份目录路径 """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid if plugin_dir.exists(): shutil.rmtree(plugin_dir, ignore_errors=True) logger.debug(f"{pid} 已清理插件目录 {plugin_dir}") @@ -435,7 +441,7 @@ class PluginHelper(metaclass=Singleton): 删除旧插件 :param pid: 插件 ID """ - plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + plugin_dir = PLUGIN_DIR / pid if plugin_dir.exists(): shutil.rmtree(plugin_dir, ignore_errors=True) @@ -560,3 +566,185 @@ class PluginHelper(metaclass=Singleton): logger.error(f"[GitHub] 所有策略均请求失败,URL: {url},请检查网络连接或 GitHub 配置") return None + + def find_missing_dependencies(self) -> List[str]: + """ + 收集所有需要安装或更新的依赖项 + 1. 收集所有插件的依赖项,合并版本约束 + 2. 获取已安装的包及其版本 + 3. 比较已安装的包与所需的依赖项,找出需要安装或升级的包 + :return: 需要安装或更新的依赖项列表,例如 ["package1>=1.0.0", "package2"] + """ + try: + # 收集所有插件的依赖项 + missing_dependencies = self.__find_plugin_dependencies() # 返回格式为 {package_name: version_specifier} + # 获取已安装的包及其版本 + installed_packages = self.__get_installed_packages() # 返回格式为 {package_name: Version} + # 需要安装或更新的依赖项列表 + dependencies_to_install = [] + for pkg_name, version_specifier in missing_dependencies.items(): + spec_set = SpecifierSet(version_specifier) + installed_version = installed_packages.get(pkg_name) + if installed_version is None: + # 包未安装,需要安装 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + elif not spec_set.contains(installed_version, prereleases=True): + # 已安装的版本不满足版本约束,需要升级或降级 + if version_specifier: + dependencies_to_install.append(f"{pkg_name}{version_specifier}") + else: + dependencies_to_install.append(pkg_name) + # 已安装的版本满足要求,无需操作 + return dependencies_to_install + except Exception as e: + logger.error(f"收集所有需要安装或更新的依赖项时发生错误:{e}") + return [] + + def install_dependencies(self, dependencies: List[str]) -> Tuple[bool, str]: + """ + 安装指定的依赖项列表 + + :param dependencies: 需要安装或更新的依赖项列表 + :return: (success, message) + """ + if not dependencies: + return False, "没有传入需要安装的依赖项" + + try: + logger.debug(f"需要安装或更新的依赖项:{dependencies}") + # 创建临时的 requirements.txt 文件用于批量安装 + requirements_temp_file = Path(settings.TEMP_PATH) / "plugin_dependencies" / "requirements.txt" + requirements_temp_file.parent.mkdir(parents=True, exist_ok=True) + with open(requirements_temp_file, "w", encoding="utf-8") as f: + for dep in dependencies: + f.write(dep + '\n') + + # 使用自动降级策略安装依赖 + success, message = self.__pip_install_with_fallback(requirements_temp_file) + # 删除临时文件 + requirements_temp_file.unlink() + return success, message + except Exception as e: + logger.error(f"安装依赖项时发生错误:{e}") + return False, f"安装依赖项时发生错误:{e}" + + def __get_installed_packages(self) -> Dict[str, Version]: + """ + 获取已安装的包及其版本 + 使用 pkg_resources 获取当前环境中已安装的包,标准化包名并转换版本信息 + 对于无法解析的版本,记录警告日志并跳过 + :return: 已安装包的字典,格式为 {package_name: Version} + """ + installed_packages = {} + try: + for dist in pkg_resources.working_set: + pkg_name = self.__standardize_pkg_name(dist.project_name) + try: + installed_packages[pkg_name] = Version(dist.version) + except InvalidVersion: + logger.debug(f"无法解析已安装包 '{pkg_name}' 的版本:{dist.version}") + continue + return installed_packages + except Exception as e: + logger.error(f"获取已安装的包时发生错误:{e}") + return {} + + def __find_plugin_dependencies(self) -> Dict[str, str]: + """ + 收集所有插件的依赖项 + 遍历 plugins 目录下的所有插件,查找存在 requirements.txt 的插件目录 + ,并解析其中的依赖项,同时将所有插件的依赖项合并到字典中,方便后续统一处理 + :return: 依赖项字典,格式为 {package_name: set(version_specifiers)} + """ + dependencies = {} + try: + for plugin_dir in PLUGIN_DIR.iterdir(): + if plugin_dir.is_dir(): + requirements_file = plugin_dir / "requirements.txt" + if requirements_file.exists(): + # 解析当前插件的 requirements.txt,获取依赖项 + plugin_deps = self.__parse_requirements(requirements_file) + for pkg_name, version_specifiers in plugin_deps.items(): + logger.debug(f"当前处理的包:{pkg_name}, 版本约束:{version_specifiers}") + if pkg_name in dependencies: + # 更新已存在的包的版本约束集合 + dependencies[pkg_name].update(version_specifiers) + else: + # 添加新的包及其版本约束 + dependencies[pkg_name] = set(version_specifiers) + return self.__merge_dependencies(dependencies) + except Exception as e: + logger.error(f"收集插件依赖项时发生错误:{e}") + return {} + + def __parse_requirements(self, requirements_file: Path) -> Dict[str, List[str]]: + """ + 解析 requirements.txt 文件,返回依赖项字典 + 使用 packaging 库解析每一行依赖项,提取包名和版本约束 + 对于无法解析的行,记录警告日志,便于后续检查 + :param requirements_file: requirements.txt 文件的路径 + :return: 依赖项字典,格式为 {package_name: [version_specifier]} + """ + dependencies = {} + try: + with open(requirements_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + # 使用 packaging 库解析依赖项 + try: + req = Requirement(line) + pkg_name = self.__standardize_pkg_name(req.name) + version_specifier = str(req.specifier) + logger.debug(f"解析到依赖项:包名={pkg_name}, 版本约束={version_specifier}") + if pkg_name in dependencies: + dependencies[pkg_name].append(version_specifier) + else: + dependencies[pkg_name] = [version_specifier] + except Exception as e: + logger.debug(f"无法解析依赖项 '{line}':{e}") + return dependencies + except Exception as e: + logger.error(f"解析 requirements.txt 时发生错误:{e}") + return {} + + @staticmethod + def __merge_dependencies(dependencies: Dict[str, Set[str]]) -> Dict[str, str]: + """ + 合并依赖项,选择每个包的最高版本要求 + 对于多个插件依赖同一包的情况,合并其版本约束,取交集以满足所有插件的要求 + 如果交集为空,表示存在版本冲突,需要根据策略进行处理 + :param dependencies: 依赖项字典,格式为 {package_name: set(version_specifiers)} + :return: 合并后的依赖项字典,格式为 {package_name: version_specifiers} + """ + try: + merged_dependencies = {} + for pkg_name, version_specifiers in dependencies.items(): + logger.debug(f"合并包:{pkg_name} 的版本约束:{version_specifiers}") + # 合并版本约束 + spec_set = SpecifierSet() + for specifier in version_specifiers: + try: + if specifier: + spec_set &= SpecifierSet(specifier) + except InvalidSpecifier as e: + logger.error(f"发生版本约束冲突:{e}") + # 将合并后的版本约束添加到结果字典 + merged_dependencies[pkg_name] = str(spec_set) if spec_set else '' + return merged_dependencies + except Exception as e: + logger.error(f"合并依赖项时发生错误:{e}") + return {} + + @staticmethod + def __standardize_pkg_name(name: str) -> str: + """ + 标准化包名,将包名转换为小写并将连字符替换为下划线 + + :param name: 原始包名 + :return: 标准化后的包名 + """ + return name.lower().replace("-", "_") if name else name diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index 11f8dfa6..5e7e9ed3 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -2,11 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union from app import schemas from app.core.context import MediaInfo +from app.core.event import eventmanager from app.log import logger from app.modules import _MediaServerBase, _ModuleBase from app.modules.emby.emby import Emby -from app.schemas.event import AuthCredentials -from app.schemas.types import MediaType, ModuleType +from app.schemas.event import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import MediaType, ModuleType, ChainEventType class EmbyModule(_ModuleBase, _MediaServerBase[Emby]): @@ -75,6 +76,16 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]): if not credentials or credentials.grant_type != "password": return None for name, server in self.get_instances().items(): + # 触发认证拦截事件 + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(), + service=name, status="triggered") + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue token = server.authenticate(credentials.username, credentials.password) if token: credentials.channel = self.get_name() diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index 595e6e75..8afe61e8 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -2,11 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union from app import schemas from app.core.context import MediaInfo +from app.core.event import eventmanager from app.log import logger from app.modules import _MediaServerBase, _ModuleBase from app.modules.jellyfin.jellyfin import Jellyfin -from app.schemas.event import AuthCredentials -from app.schemas.types import MediaType, ModuleType +from app.schemas.event import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import MediaType, ModuleType, ChainEventType class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]): @@ -75,6 +76,16 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]): if not credentials or credentials.grant_type != "password": return None for name, server in self.get_instances().items(): + # 触发认证拦截事件 + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(), + service=name, status="triggered") + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue token = server.authenticate(credentials.username, credentials.password) if token: credentials.channel = self.get_name() diff --git a/app/modules/plex/__init__.py b/app/modules/plex/__init__.py index 7c739f54..29a44f0c 100644 --- a/app/modules/plex/__init__.py +++ b/app/modules/plex/__init__.py @@ -2,10 +2,12 @@ from typing import Optional, Tuple, Union, Any, List, Generator from app import schemas from app.core.context import MediaInfo +from app.core.event import eventmanager from app.log import logger from app.modules import _ModuleBase, _MediaServerBase from app.modules.plex.plex import Plex -from app.schemas.types import MediaType, ModuleType +from app.schemas.event import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import MediaType, ModuleType, ChainEventType class PlexModule(_ModuleBase, _MediaServerBase[Plex]): @@ -64,6 +66,37 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]): logger.info(f"Plex {name} 服务器连接断开,尝试重连 ...") server.reconnect() + def user_authenticate(self, credentials: AuthCredentials) -> Optional[AuthCredentials]: + """ + 使用Plex用户辅助完成用户认证 + :param credentials: 认证数据 + :return: 认证数据 + """ + # Plex认证 + if not credentials or credentials.grant_type != "password": + return None + for name, server in self.get_instances().items(): + # 触发认证拦截事件 + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(), + service=name, status="triggered") + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue + auth_result = server.authenticate(credentials.username, credentials.password) + if auth_result: + token, username = auth_result + credentials.channel = self.get_name() + credentials.service = name + credentials.token = token + # Plex 传入可能为邮箱,这里调整为用户名返回 + credentials.username = username + return credentials + return None + def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]: """ 解析Webhook报文体 diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index ee9c5adf..f64304d1 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -5,6 +5,7 @@ from urllib.parse import quote_plus from cachetools import TTLCache, cached from plexapi import media +from plexapi.myplex import MyPlexAccount from plexapi.server import PlexServer from requests import Response, Session @@ -61,6 +62,27 @@ class Plex: self._plex = None logger.error(f"Plex服务器连接失败:{str(e)}") + def authenticate(self, username: str, password: str) -> Optional[Tuple[str, str]]: + """ + 用户认证 + :param username: 用户名 + :param password: 密码 + :return: 认证成功返回 (token, 用户名),否则返回 None + """ + if not username or not password: + return None + try: + account = MyPlexAccount(username=username, password=password, remember=False) + if account: + plex = PlexServer(self._host, account.authToken) + if not plex: + return None + return account.authToken, account.username + except Exception as e: + # 处理认证失败或网络错误等情况 + logger.error(f"Authentication failed: {e}") + return None + @cached(cache=TTLCache(maxsize=100, ttl=86400)) def __get_library_images(self, library_key: str, mtype: int) -> Optional[List[str]]: """ diff --git a/app/schemas/event.py b/app/schemas/event.py index 1a5251c5..3490ab7a 100644 --- a/app/schemas/event.py +++ b/app/schemas/event.py @@ -74,15 +74,17 @@ class AuthInterceptCredentials(ChainEventData): channel (str): 认证渠道 service (str): 服务名称 token (str): 认证令牌 + status (str): 认证状态,"triggered" 和 "completed" 两个状态 # 输出参数 source (str): 拦截源,默认值为 "未知拦截源" cancel (bool): 是否取消认证,默认值为 False """ # 输入参数 - username: str = Field(..., description="用户名") + username: Optional[str] = Field(..., description="用户名") channel: str = Field(..., description="认证渠道") service: str = Field(..., description="服务名称") + status: str = Field(..., description="认证状态, 包含 'triggered' 表示认证触发,'completed' 表示认证成功") token: Optional[str] = Field(None, description="认证令牌") # 输出参数 diff --git a/app/startup/plugins_initializer.py b/app/startup/plugins_initializer.py index 23494732..e1b270ee 100644 --- a/app/startup/plugins_initializer.py +++ b/app/startup/plugins_initializer.py @@ -15,11 +15,18 @@ async def init_plugins_async(): plugin_manager = PluginManager() scheduler = Scheduler() command = CommandChain() - sync_plugins = await loop.run_in_executor(None, plugin_manager.sync) - if not sync_plugins: + + sync_result = await execute_task(loop, plugin_manager.sync, "插件同步到本地") + resolved_dependencies = await execute_task(loop, plugin_manager.install_plugin_missing_dependencies, + "缺失依赖项安装") + # 判断是否需要进行插件初始化 + if not sync_result and not resolved_dependencies: + logger.debug("没有新的插件同步到本地或缺失依赖项需要安装,跳过插件初始化") return + + # 继续执行后续的插件初始化步骤 + logger.info("正在初始化所有插件") # 为避免初始化插件异常,这里所有插件都进行初始化 - logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件") # 安装完成后重新初始化插件 plugin_manager.init_config() # 插件启动后注册后台任务 @@ -33,6 +40,19 @@ async def init_plugins_async(): logger.error(f"插件初始化过程中出现异常: {e}") +async def execute_task(loop, task_func, task_name): + try: + result = await loop.run_in_executor(None, task_func) + if isinstance(result, list) and result: + logger.info(f"{task_name} 已完成,共处理 {len(result)} 个项目") + else: + logger.debug(f"没有新的 {task_name} 需要处理") + return result + except Exception as e: + logger.error(f"{task_name} 时发生错误:{e}", exc_info=True) + return [] + + def register_plugin_api(): """ 插件启动后注册插件API diff --git a/update b/update index 46b0aff7..2a3f7f3f 100644 --- a/update +++ b/update @@ -66,7 +66,7 @@ function install_backend_and_download_resources() { fi INFO "安装依赖成功" # 从后端文件中读取前端版本号 - frontend_version=$(sed -n "s/^FRONTEND_VERSION\s*=\s*'\([^']*\)'/\1/p" /tmp/app/version.py) + frontend_version=$(sed -n "s/^FRONTEND_VERSION\s*=\s*'\([^']*\)'/\1/p" /tmp/App/version.py) if [[ "${frontend_version}" != *v* ]]; then WARN "前端最新版本号获取失败,继续启动..." return 1 @@ -102,7 +102,7 @@ function install_backend_and_download_resources() { INFO "程序部分更新成功,前端版本:${frontend_version},后端版本:${1}" INFO "开始更新插件..." if ! download_and_unzip "${GITHUB_PROXY}https://github.com/jxxghp/MoviePilot-Plugins/archive/refs/heads/main.zip" "Plugins"; then - cp -a /plugins/* /app/app/plugins/ + cp -a /plugins.v2/* /app/app/plugins/ rm -rf /plugins WARN "插件下载失败,继续使用旧的插件来启动..." return 1 @@ -112,6 +112,7 @@ function install_backend_and_download_resources() { cp -a /plugins/* /app/app/plugins/ # 插件仓库 rsync -av --remove-source-files /tmp/Plugins/plugins/* /app/app/plugins/ > /dev/null + rsync -av --remove-source-files /tmp/Plugins/plugins.v2/* /app/app/plugins/ > /dev/null # 提前安装插件依赖 find /app/app/plugins -name requirements.txt -exec pip install --root-user-action=ignore ${PIP_OPTIONS} -r {} \; > /dev/null # 清理临时目录