Merge remote-tracking branch 'origin/v2' into v2

This commit is contained in:
jxxghp
2024-10-24 11:14:45 +08:00
10 changed files with 326 additions and 19 deletions

View File

@@ -202,9 +202,9 @@ class UserChain(ChainBase, metaclass=Singleton):
# 触发认证通过的拦截事件
intercept_event = self.eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(username=username, channel=channel, service=service, token=token)
data=AuthInterceptCredentials(username=username, channel=channel, service=service,
token=token, status="completed")
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:

View File

@@ -331,6 +331,25 @@ class PluginManager(metaclass=Singleton):
)
return sync_plugins
def install_plugin_missing_dependencies(self) -> List[str]:
"""
安装插件中缺失或不兼容的依赖项
"""
# 第一步:获取需要安装的依赖项列表
missing_dependencies = self.pluginhelper.find_missing_dependencies()
if not missing_dependencies:
return missing_dependencies
logger.info(f"开始安装缺失的依赖项,共 {len(missing_dependencies)} 个...")
# 第二步:安装依赖项并返回结果
total_start_time = time.time()
success, message = self.pluginhelper.install_dependencies(missing_dependencies)
total_elapsed_time = time.time() - total_start_time
if success:
logger.info(f"已完成 {len(missing_dependencies)} 个依赖项安装,总耗时:{total_elapsed_time:.2f}")
else:
logger.warning(f"存在缺失依赖项安装失败,请尝试手动安装,总耗时:{total_elapsed_time:.2f}")
return missing_dependencies
def get_plugin_config(self, pid: str) -> dict:
"""
获取插件配置

View File

@@ -2,9 +2,13 @@ import json
import shutil
import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Set
import pkg_resources
from cachetools import TTLCache, cached
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from packaging.version import Version, InvalidVersion
from pkg_resources import Requirement
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
@@ -15,6 +19,8 @@ from app.utils.singleton import Singleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
PLUGIN_DIR = Path(settings.ROOT_PATH) / "app" / "plugins"
class PluginHelper(metaclass=Singleton):
"""
@@ -359,7 +365,7 @@ class PluginHelper(metaclass=Singleton):
requirements_txt = res.text
if requirements_txt.strip():
# 保存并安装依赖
requirements_file_path = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() / "requirements.txt"
requirements_file_path = PLUGIN_DIR / pid.lower() / "requirements.txt"
requirements_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(requirements_file_path, "w", encoding="utf-8") as f:
f.write(requirements_txt)
@@ -376,7 +382,7 @@ class PluginHelper(metaclass=Singleton):
:return: (是否存在依赖,安装是否成功, 错误信息)
"""
# 定位插件目录和依赖文件
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower()
plugin_dir = PLUGIN_DIR / pid.lower()
requirements_file = plugin_dir / "requirements.txt"
# 检查是否存在 requirements.txt 文件
@@ -397,7 +403,7 @@ class PluginHelper(metaclass=Singleton):
:param pid: 插件 ID
:return: 备份目录路径
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
plugin_dir = PLUGIN_DIR / pid
backup_dir = Path(settings.TEMP_PATH) / "plugins_backup" / pid
if plugin_dir.exists():
@@ -418,7 +424,7 @@ class PluginHelper(metaclass=Singleton):
:param pid: 插件 ID
:param backup_dir: 备份目录路径
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
plugin_dir = PLUGIN_DIR / pid
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
logger.debug(f"{pid} 已清理插件目录 {plugin_dir}")
@@ -435,7 +441,7 @@ class PluginHelper(metaclass=Singleton):
删除旧插件
:param pid: 插件 ID
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
plugin_dir = PLUGIN_DIR / pid
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
@@ -560,3 +566,185 @@ class PluginHelper(metaclass=Singleton):
logger.error(f"[GitHub] 所有策略均请求失败URL: {url},请检查网络连接或 GitHub 配置")
return None
def find_missing_dependencies(self) -> List[str]:
"""
收集所有需要安装或更新的依赖项
1. 收集所有插件的依赖项,合并版本约束
2. 获取已安装的包及其版本
3. 比较已安装的包与所需的依赖项,找出需要安装或升级的包
:return: 需要安装或更新的依赖项列表,例如 ["package1>=1.0.0", "package2"]
"""
try:
# 收集所有插件的依赖项
missing_dependencies = self.__find_plugin_dependencies() # 返回格式为 {package_name: version_specifier}
# 获取已安装的包及其版本
installed_packages = self.__get_installed_packages() # 返回格式为 {package_name: Version}
# 需要安装或更新的依赖项列表
dependencies_to_install = []
for pkg_name, version_specifier in missing_dependencies.items():
spec_set = SpecifierSet(version_specifier)
installed_version = installed_packages.get(pkg_name)
if installed_version is None:
# 包未安装,需要安装
if version_specifier:
dependencies_to_install.append(f"{pkg_name}{version_specifier}")
else:
dependencies_to_install.append(pkg_name)
elif not spec_set.contains(installed_version, prereleases=True):
# 已安装的版本不满足版本约束,需要升级或降级
if version_specifier:
dependencies_to_install.append(f"{pkg_name}{version_specifier}")
else:
dependencies_to_install.append(pkg_name)
# 已安装的版本满足要求,无需操作
return dependencies_to_install
except Exception as e:
logger.error(f"收集所有需要安装或更新的依赖项时发生错误:{e}")
return []
def install_dependencies(self, dependencies: List[str]) -> Tuple[bool, str]:
"""
安装指定的依赖项列表
:param dependencies: 需要安装或更新的依赖项列表
:return: (success, message)
"""
if not dependencies:
return False, "没有传入需要安装的依赖项"
try:
logger.debug(f"需要安装或更新的依赖项:{dependencies}")
# 创建临时的 requirements.txt 文件用于批量安装
requirements_temp_file = Path(settings.TEMP_PATH) / "plugin_dependencies" / "requirements.txt"
requirements_temp_file.parent.mkdir(parents=True, exist_ok=True)
with open(requirements_temp_file, "w", encoding="utf-8") as f:
for dep in dependencies:
f.write(dep + '\n')
# 使用自动降级策略安装依赖
success, message = self.__pip_install_with_fallback(requirements_temp_file)
# 删除临时文件
requirements_temp_file.unlink()
return success, message
except Exception as e:
logger.error(f"安装依赖项时发生错误:{e}")
return False, f"安装依赖项时发生错误:{e}"
def __get_installed_packages(self) -> Dict[str, Version]:
"""
获取已安装的包及其版本
使用 pkg_resources 获取当前环境中已安装的包,标准化包名并转换版本信息
对于无法解析的版本,记录警告日志并跳过
:return: 已安装包的字典,格式为 {package_name: Version}
"""
installed_packages = {}
try:
for dist in pkg_resources.working_set:
pkg_name = self.__standardize_pkg_name(dist.project_name)
try:
installed_packages[pkg_name] = Version(dist.version)
except InvalidVersion:
logger.debug(f"无法解析已安装包 '{pkg_name}' 的版本:{dist.version}")
continue
return installed_packages
except Exception as e:
logger.error(f"获取已安装的包时发生错误:{e}")
return {}
def __find_plugin_dependencies(self) -> Dict[str, str]:
"""
收集所有插件的依赖项
遍历 plugins 目录下的所有插件,查找存在 requirements.txt 的插件目录
,并解析其中的依赖项,同时将所有插件的依赖项合并到字典中,方便后续统一处理
:return: 依赖项字典,格式为 {package_name: set(version_specifiers)}
"""
dependencies = {}
try:
for plugin_dir in PLUGIN_DIR.iterdir():
if plugin_dir.is_dir():
requirements_file = plugin_dir / "requirements.txt"
if requirements_file.exists():
# 解析当前插件的 requirements.txt获取依赖项
plugin_deps = self.__parse_requirements(requirements_file)
for pkg_name, version_specifiers in plugin_deps.items():
logger.debug(f"当前处理的包:{pkg_name}, 版本约束:{version_specifiers}")
if pkg_name in dependencies:
# 更新已存在的包的版本约束集合
dependencies[pkg_name].update(version_specifiers)
else:
# 添加新的包及其版本约束
dependencies[pkg_name] = set(version_specifiers)
return self.__merge_dependencies(dependencies)
except Exception as e:
logger.error(f"收集插件依赖项时发生错误:{e}")
return {}
def __parse_requirements(self, requirements_file: Path) -> Dict[str, List[str]]:
"""
解析 requirements.txt 文件,返回依赖项字典
使用 packaging 库解析每一行依赖项,提取包名和版本约束
对于无法解析的行,记录警告日志,便于后续检查
:param requirements_file: requirements.txt 文件的路径
:return: 依赖项字典,格式为 {package_name: [version_specifier]}
"""
dependencies = {}
try:
with open(requirements_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
# 使用 packaging 库解析依赖项
try:
req = Requirement(line)
pkg_name = self.__standardize_pkg_name(req.name)
version_specifier = str(req.specifier)
logger.debug(f"解析到依赖项:包名={pkg_name}, 版本约束={version_specifier}")
if pkg_name in dependencies:
dependencies[pkg_name].append(version_specifier)
else:
dependencies[pkg_name] = [version_specifier]
except Exception as e:
logger.debug(f"无法解析依赖项 '{line}'{e}")
return dependencies
except Exception as e:
logger.error(f"解析 requirements.txt 时发生错误:{e}")
return {}
@staticmethod
def __merge_dependencies(dependencies: Dict[str, Set[str]]) -> Dict[str, str]:
"""
合并依赖项,选择每个包的最高版本要求
对于多个插件依赖同一包的情况,合并其版本约束,取交集以满足所有插件的要求
如果交集为空,表示存在版本冲突,需要根据策略进行处理
:param dependencies: 依赖项字典,格式为 {package_name: set(version_specifiers)}
:return: 合并后的依赖项字典,格式为 {package_name: version_specifiers}
"""
try:
merged_dependencies = {}
for pkg_name, version_specifiers in dependencies.items():
logger.debug(f"合并包:{pkg_name} 的版本约束:{version_specifiers}")
# 合并版本约束
spec_set = SpecifierSet()
for specifier in version_specifiers:
try:
if specifier:
spec_set &= SpecifierSet(specifier)
except InvalidSpecifier as e:
logger.error(f"发生版本约束冲突:{e}")
# 将合并后的版本约束添加到结果字典
merged_dependencies[pkg_name] = str(spec_set) if spec_set else ''
return merged_dependencies
except Exception as e:
logger.error(f"合并依赖项时发生错误:{e}")
return {}
@staticmethod
def __standardize_pkg_name(name: str) -> str:
"""
标准化包名,将包名转换为小写并将连字符替换为下划线
:param name: 原始包名
:return: 标准化后的包名
"""
return name.lower().replace("-", "_") if name else name

View File

@@ -2,11 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.emby.emby import Emby
from app.schemas.event import AuthCredentials
from app.schemas.types import MediaType, ModuleType
from app.schemas.event import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import MediaType, ModuleType, ChainEventType
class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
@@ -75,6 +76,16 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
if not credentials or credentials.grant_type != "password":
return None
for name, server in self.get_instances().items():
# 触发认证拦截事件
intercept_event = eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(),
service=name, status="triggered")
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
continue
token = server.authenticate(credentials.username, credentials.password)
if token:
credentials.channel = self.get_name()

View File

@@ -2,11 +2,12 @@ from typing import Any, Generator, List, Optional, Tuple, Union
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _MediaServerBase, _ModuleBase
from app.modules.jellyfin.jellyfin import Jellyfin
from app.schemas.event import AuthCredentials
from app.schemas.types import MediaType, ModuleType
from app.schemas.event import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import MediaType, ModuleType, ChainEventType
class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
@@ -75,6 +76,16 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
if not credentials or credentials.grant_type != "password":
return None
for name, server in self.get_instances().items():
# 触发认证拦截事件
intercept_event = eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(),
service=name, status="triggered")
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
continue
token = server.authenticate(credentials.username, credentials.password)
if token:
credentials.channel = self.get_name()

View File

@@ -2,10 +2,12 @@ from typing import Optional, Tuple, Union, Any, List, Generator
from app import schemas
from app.core.context import MediaInfo
from app.core.event import eventmanager
from app.log import logger
from app.modules import _ModuleBase, _MediaServerBase
from app.modules.plex.plex import Plex
from app.schemas.types import MediaType, ModuleType
from app.schemas.event import AuthCredentials, AuthInterceptCredentials
from app.schemas.types import MediaType, ModuleType, ChainEventType
class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
@@ -64,6 +66,37 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
logger.info(f"Plex {name} 服务器连接断开,尝试重连 ...")
server.reconnect()
def user_authenticate(self, credentials: AuthCredentials) -> Optional[AuthCredentials]:
"""
使用Plex用户辅助完成用户认证
:param credentials: 认证数据
:return: 认证数据
"""
# Plex认证
if not credentials or credentials.grant_type != "password":
return None
for name, server in self.get_instances().items():
# 触发认证拦截事件
intercept_event = eventmanager.send_event(
etype=ChainEventType.AuthIntercept,
data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(),
service=name, status="triggered")
)
if intercept_event and intercept_event.event_data:
intercept_data: AuthInterceptCredentials = intercept_event.event_data
if intercept_data.cancel:
continue
auth_result = server.authenticate(credentials.username, credentials.password)
if auth_result:
token, username = auth_result
credentials.channel = self.get_name()
credentials.service = name
credentials.token = token
# Plex 传入可能为邮箱,这里调整为用户名返回
credentials.username = username
return credentials
return None
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]:
"""
解析Webhook报文体

View File

@@ -5,6 +5,7 @@ from urllib.parse import quote_plus
from cachetools import TTLCache, cached
from plexapi import media
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from requests import Response, Session
@@ -61,6 +62,27 @@ class Plex:
self._plex = None
logger.error(f"Plex服务器连接失败{str(e)}")
def authenticate(self, username: str, password: str) -> Optional[Tuple[str, str]]:
"""
用户认证
:param username: 用户名
:param password: 密码
:return: 认证成功返回 (token, 用户名),否则返回 None
"""
if not username or not password:
return None
try:
account = MyPlexAccount(username=username, password=password, remember=False)
if account:
plex = PlexServer(self._host, account.authToken)
if not plex:
return None
return account.authToken, account.username
except Exception as e:
# 处理认证失败或网络错误等情况
logger.error(f"Authentication failed: {e}")
return None
@cached(cache=TTLCache(maxsize=100, ttl=86400))
def __get_library_images(self, library_key: str, mtype: int) -> Optional[List[str]]:
"""

View File

@@ -74,15 +74,17 @@ class AuthInterceptCredentials(ChainEventData):
channel (str): 认证渠道
service (str): 服务名称
token (str): 认证令牌
status (str): 认证状态,"triggered""completed" 两个状态
# 输出参数
source (str): 拦截源,默认值为 "未知拦截源"
cancel (bool): 是否取消认证,默认值为 False
"""
# 输入参数
username: str = Field(..., description="用户名")
username: Optional[str] = Field(..., description="用户名")
channel: str = Field(..., description="认证渠道")
service: str = Field(..., description="服务名称")
status: str = Field(..., description="认证状态, 包含 'triggered' 表示认证触发,'completed' 表示认证成功")
token: Optional[str] = Field(None, description="认证令牌")
# 输出参数

View File

@@ -15,11 +15,18 @@ async def init_plugins_async():
plugin_manager = PluginManager()
scheduler = Scheduler()
command = CommandChain()
sync_plugins = await loop.run_in_executor(None, plugin_manager.sync)
if not sync_plugins:
sync_result = await execute_task(loop, plugin_manager.sync, "插件同步到本地")
resolved_dependencies = await execute_task(loop, plugin_manager.install_plugin_missing_dependencies,
"缺失依赖项安装")
# 判断是否需要进行插件初始化
if not sync_result and not resolved_dependencies:
logger.debug("没有新的插件同步到本地或缺失依赖项需要安装,跳过插件初始化")
return
# 继续执行后续的插件初始化步骤
logger.info("正在初始化所有插件")
# 为避免初始化插件异常,这里所有插件都进行初始化
logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件")
# 安装完成后重新初始化插件
plugin_manager.init_config()
# 插件启动后注册后台任务
@@ -33,6 +40,19 @@ async def init_plugins_async():
logger.error(f"插件初始化过程中出现异常: {e}")
async def execute_task(loop, task_func, task_name):
try:
result = await loop.run_in_executor(None, task_func)
if isinstance(result, list) and result:
logger.info(f"{task_name} 已完成,共处理 {len(result)} 个项目")
else:
logger.debug(f"没有新的 {task_name} 需要处理")
return result
except Exception as e:
logger.error(f"{task_name} 时发生错误:{e}", exc_info=True)
return []
def register_plugin_api():
"""
插件启动后注册插件API

5
update
View File

@@ -66,7 +66,7 @@ function install_backend_and_download_resources() {
fi
INFO "安装依赖成功"
# 从后端文件中读取前端版本号
frontend_version=$(sed -n "s/^FRONTEND_VERSION\s*=\s*'\([^']*\)'/\1/p" /tmp/app/version.py)
frontend_version=$(sed -n "s/^FRONTEND_VERSION\s*=\s*'\([^']*\)'/\1/p" /tmp/App/version.py)
if [[ "${frontend_version}" != *v* ]]; then
WARN "前端最新版本号获取失败,继续启动..."
return 1
@@ -102,7 +102,7 @@ function install_backend_and_download_resources() {
INFO "程序部分更新成功,前端版本:${frontend_version},后端版本:${1}"
INFO "开始更新插件..."
if ! download_and_unzip "${GITHUB_PROXY}https://github.com/jxxghp/MoviePilot-Plugins/archive/refs/heads/main.zip" "Plugins"; then
cp -a /plugins/* /app/app/plugins/
cp -a /plugins.v2/* /app/app/plugins/
rm -rf /plugins
WARN "插件下载失败,继续使用旧的插件来启动..."
return 1
@@ -112,6 +112,7 @@ function install_backend_and_download_resources() {
cp -a /plugins/* /app/app/plugins/
# 插件仓库
rsync -av --remove-source-files /tmp/Plugins/plugins/* /app/app/plugins/ > /dev/null
rsync -av --remove-source-files /tmp/Plugins/plugins.v2/* /app/app/plugins/ > /dev/null
# 提前安装插件依赖
find /app/app/plugins -name requirements.txt -exec pip install --root-user-action=ignore ${PIP_OPTIONS} -r {} \; > /dev/null
# 清理临时目录