Merge pull request #2716 from InfinityPacer/feature/plugin

This commit is contained in:
jxxghp
2024-09-14 18:16:02 +08:00
committed by GitHub
3 changed files with 335 additions and 179 deletions

View File

@@ -63,7 +63,7 @@ class Settings(BaseSettings):
# 超级管理员
SUPERUSER: str = "admin"
# 辅助认证,允许通过媒体服务器认证并创建用户
AUXILIARY_AUTH_ENABLE: bool = True
AUXILIARY_AUTH_ENABLE: bool = False
# API密钥需要更换
API_TOKEN: Optional[str] = None
# 网络代理 IP:PORT
@@ -280,46 +280,6 @@ class Settings(BaseSettings):
}
return None
@property
def PROXY_URLPARSE(self):
"""
解析地址组成
"""
if self.PROXY_HOST:
parsed_url = urlparse(self.PROXY_HOST)
protocol = parsed_url.scheme or "" # 协议
username = parsed_url.username or "" # 用户名
password = parsed_url.password or "" # 密码
host = parsed_url.hostname or "" # 主机
port = parsed_url.port or "" # 端口
path = parsed_url.path or "" # 路径
netloc = parsed_url.netloc or "" # 用户名:密码@主机:端口
query = parsed_url.query or "" # 查询参数: ?key=value
params = parsed_url.params or "" # 使用;分割的参数
fragment = parsed_url.fragment or "" # 片段: #fragment
if not port:
if protocol == "https":
port = 443
elif protocol == "http":
port = 80
elif protocol in {"socks5", "socks5h", "socks4", "socks4a"}:
port = 1080
return {
"protocol": protocol,
"username": username,
"password": password,
"host": host,
"port": port,
"path": path,
"netloc": netloc,
"query": query,
"params": params,
"fragment": fragment
}
return None
@property
def PROXY_SERVER(self):
if self.PROXY_HOST:
@@ -338,28 +298,6 @@ class Settings(BaseSettings):
}
return {}
@property
def PIP_OPTIONS(self):
"""
pip调用附加参数
"""
protocol = host = port = ""
parsed_url = self.PROXY_URLPARSE
if parsed_url:
protocol = parsed_url.get("scheme", "").lower()
host = parsed_url.get("host", "").lower()
port = parsed_url.get("port", "")
# 优先级:镜像站 > 全局 > 不代理
if settings.PIP_PROXY:
PIP_OPTIONS = f" -i {settings.PIP_PROXY} "
# 全局代理地址
elif protocol in {"http", "https", "socks4", "socks4a", "socks5", "socks5h"} and host and port:
PIP_OPTIONS = f" --proxy={settings.PROXY_HOST} "
# 不使用代理
else:
PIP_OPTIONS = ""
return PIP_OPTIONS
def REPO_GITHUB_HEADERS(self, repo: str = None):
"""
Github指定的仓库请求头

View File

@@ -2,7 +2,7 @@ import json
import shutil
import traceback
from pathlib import Path
from typing import Dict, Tuple, Optional, List
from typing import Dict, Tuple, Optional, List, Any
from cachetools import TTLCache, cached
@@ -13,6 +13,7 @@ from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
from app.utils.system import SystemUtils
from app.utils.url import UrlUtils
class PluginHelper(metaclass=Singleton):
@@ -20,12 +21,9 @@ class PluginHelper(metaclass=Singleton):
插件市场管理,下载安装插件到本地
"""
_base_url = f"{settings.GITHUB_PROXY}https://raw.githubusercontent.com/%s/%s/main/"
_install_reg = f"{settings.MP_SERVER_HOST}/plugin/install/%s"
_base_url = "https://raw.githubusercontent.com/{user}/{repo}/main/"
_install_reg = f"{settings.MP_SERVER_HOST}/plugin/install/{{pid}}"
_install_report = f"{settings.MP_SERVER_HOST}/plugin/install"
_install_statistic = f"{settings.MP_SERVER_HOST}/plugin/statistic"
def __init__(self):
@@ -35,10 +33,6 @@ class PluginHelper(metaclass=Singleton):
if self.install_report():
self.systemconfig.set(SystemConfigKey.PluginInstallReport, "1")
@property
def proxies(self):
return None if settings.GITHUB_PROXY else settings.PROXY
@cached(cache=TTLCache(maxsize=1000, ttl=1800))
def get_plugins(self, repo_url: str, version: str = None) -> Dict[str, dict]:
"""
@@ -48,27 +42,26 @@ class PluginHelper(metaclass=Singleton):
"""
if not repo_url:
return {}
user, repo = self.get_repo_info(repo_url)
if not user or not repo:
return {}
raw_url = self._base_url % (user, repo)
raw_url = self._base_url.format(user=user, repo=repo)
package_url = f"{raw_url}package.{version}.json" if version else f"{raw_url}package.json"
res = RequestUtils(proxies=self.proxies,
headers=settings.REPO_GITHUB_HEADERS(repo=f"{user}/{repo}"),
timeout=10).get_res(package_url)
res = self.__request_with_fallback(package_url, headers=settings.REPO_GITHUB_HEADERS(repo=f"{user}/{repo}"))
if res:
try:
return json.loads(res.text)
except json.JSONDecodeError:
logger.error(f"插件包数据解析失败:{res.text}")
return {}
return {}
@staticmethod
def get_repo_info(repo_url: str) -> Tuple[Optional[str], Optional[str]]:
"""
获取Github仓库信息
:param repo_url: Github仓库地址
获取GitHub仓库信息
"""
if not repo_url:
return None, None
@@ -79,7 +72,7 @@ class PluginHelper(metaclass=Singleton):
try:
user, repo = repo_url.split("/")[-4:-2]
except Exception as e:
logger.error(f"解析Github仓库地址失败{str(e)} - {traceback.format_exc()}")
logger.error(f"解析GitHub仓库地址失败{str(e)} - {traceback.format_exc()}")
return None, None
return user, repo
@@ -103,7 +96,8 @@ class PluginHelper(metaclass=Singleton):
return False
if not pid:
return False
res = RequestUtils(timeout=5).get_res(self._install_reg % pid)
install_reg_url = self._install_reg.format(pid=pid)
res = RequestUtils(timeout=5).get_res(install_reg_url)
if res and res.status_code == 200:
return True
return False
@@ -119,119 +113,321 @@ class PluginHelper(metaclass=Singleton):
return False
res = RequestUtils(content_type="application/json",
timeout=5).post(self._install_report,
json={
"plugins": [
{
"plugin_id": plugin,
} for plugin in plugins
]
})
json={"plugins": [{"plugin_id": plugin} for plugin in plugins]})
return True if res else False
def install(self, pid: str, repo_url: str) -> Tuple[bool, str]:
"""
安装插件
安装插件,包括依赖安装和文件下载,相关资源支持自动降级策略
1. 从 GitHub 获取文件列表(包括 requirements.txt
2. 删除旧的插件目录
3. 下载并预安装 requirements.txt 中的依赖(如果存在)
4. 下载并安装插件的其他文件
5. 再次尝试安装依赖(确保安装完整)
:param pid: 插件 ID
:param repo_url: 插件仓库地址
:return: (是否成功, 错误信息)
"""
if SystemUtils.is_frozen():
return False, "可执行文件模式下,只能安装本地插件"
# 从Github的repo_url获取用户和项目名
# 验证参数
if not pid or not repo_url:
return False, "参数错误"
# 从 GitHub 的 repo_url 获取用户和项目名
user, repo = self.get_repo_info(repo_url)
if not user or not repo:
return False, "不支持的插件仓库地址格式"
user_repo = f"{user}/{repo}"
def __get_filelist(_p: str) -> Tuple[Optional[list], Optional[str]]:
"""
获取插件的文件列表
"""
file_api = f"https://api.github.com/repos/{user_repo}/contents/plugins/{_p}"
r = RequestUtils(proxies=settings.PROXY,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo),
timeout=30).get_res(file_api)
if r is None:
return None, "连接仓库失败"
elif r.status_code != 200:
return None, f"连接仓库失败:{r.status_code} - " \
f"{'超出速率限制请配置GITHUB_TOKEN环境变量或稍后重试' if r.status_code == 403 else r.reason}"
ret = r.json()
if ret and ret[0].get("message") == "Not Found":
return None, "插件在仓库中不存在"
return ret, ""
def __download_files(_p: str, _l: List[dict]) -> Tuple[bool, str]:
"""
下载插件文件
"""
if not _l:
return False, "文件列表为空"
for item in _l:
if item.get("download_url"):
download_url = f"{settings.GITHUB_PROXY}{item.get('download_url')}"
# 下载插件文件
res = RequestUtils(proxies=self.proxies,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo),
timeout=60).get_res(download_url)
if not res:
return False, f"文件 {item.get('name')} 下载失败!"
elif res.status_code != 200:
return False, f"下载文件 {item.get('name')} 失败:{res.status_code} - " \
f"{'超出速率限制请配置GITHUB_TOKEN环境变量或稍后重试' if res.status_code == 403 else res.reason}"
# 创建插件文件夹
file_path = Path(settings.ROOT_PATH) / "app" / item.get("path")
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
f.write(res.text)
else:
# 递归下载子目录
p = f"{_p}/{item.get('name')}"
l, m = __get_filelist(p)
if not l:
return False, m
__download_files(p, l)
return True, ""
if not pid or not repo_url:
return False, "参数错误"
# 获取插件的文件列表
"""
[
{
"name": "__init__.py",
"path": "plugins/autobackup/__init__.py",
"sha": "cd10eba3f0355d61adeb35561cb26a0a36c15a6c",
"size": 12385,
"url": "https://api.github.com/repos/jxxghp/MoviePilot-Plugins/contents/plugins/autobackup/__init__.py?ref=main",
"html_url": "https://github.com/jxxghp/MoviePilot-Plugins/blob/main/plugins/autobackup/__init__.py",
"git_url": "https://api.github.com/repos/jxxghp/MoviePilot-Plugins/git/blobs/cd10eba3f0355d61adeb35561cb26a0a36c15a6c",
"download_url": "https://raw.githubusercontent.com/jxxghp/MoviePilot-Plugins/main/plugins/autobackup/__init__.py",
"type": "file",
"_links": {
"self": "https://api.github.com/repos/jxxghp/MoviePilot-Plugins/contents/plugins/autobackup/__init__.py?ref=main",
"git": "https://api.github.com/repos/jxxghp/MoviePilot-Plugins/git/blobs/cd10eba3f0355d61adeb35561cb26a0a36c15a6c",
"html": "https://github.com/jxxghp/MoviePilot-Plugins/blob/main/plugins/autobackup/__init__.py"
}
}
]
"""
# 获取第一级文件列表
file_list, msg = __get_filelist(pid.lower())
# 1. 获取插件文件列表(包括 requirements.txt
file_list, msg = self.__get_file_list(pid.lower(), user_repo)
if not file_list:
return False, msg
# 本地存在时先删除
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower()
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
# 下载所有文件
__download_files(pid.lower(), file_list)
# 插件目录下如有requirements.txt则安装依赖
requirements_file = plugin_dir / "requirements.txt"
if requirements_file.exists():
SystemUtils.execute(f"pip install -r {requirements_file} {settings.PIP_OPTIONS} > /dev/null 2>&1")
# 安装成功后统计
# 2. 删除旧的插件目录
self.__remove_old_plugin(pid.lower())
# 3. 查找并安装 requirements.txt 中的依赖,确保插件环境的依赖尽可能完整。依赖安装可能失败且不影响插件安装,目前只记录日志
requirements_file_info = next((f for f in file_list if f.get("name") == "requirements.txt"), None)
if requirements_file_info:
logger.debug(f"{pid} 发现 requirements.txt提前下载并预安装依赖")
success, message = self.__download_and_install_requirements(requirements_file_info,
pid, user_repo)
if not success:
logger.debug(f"{pid} 依赖预安装失败:{message}")
else:
logger.debug(f"{pid} 依赖预安装成功")
# 4. 下载插件的其他文件
logger.info(f"{pid} 准备开始下载插件文件")
success, message = self.__download_files(pid.lower(), file_list, user_repo, True)
if not success:
logger.error(f"{pid} 下载插件文件失败:{message}")
return False, message
else:
logger.info(f"{pid} 下载插件文件成功")
# 5. 插件文件安装成功后,再次尝试安装依赖,避免因为遗漏依赖导致的插件运行问题,目前依旧只记录日志
success, message = self.__install_dependencies_if_required(pid)
if not success:
logger.error(f"{pid} 依赖安装失败:{message}")
else:
logger.info(f"{pid} 依赖安装成功")
# 插件安装成功后,统计安装信息
self.install_reg(pid)
return True, ""
def __get_file_list(self, pid: str, user_repo: str) -> Tuple[Optional[list], Optional[str]]:
"""
获取插件的文件列表
:param pid: 插件 ID
:param user_repo: GitHub 仓库的 user/repo 路径
:return: (文件列表, 错误信息)
"""
file_api = f"https://api.github.com/repos/{user_repo}/contents/plugins/{pid}"
res = self.__request_with_fallback(file_api,
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo),
is_api=True,
timeout=30)
if res is None:
return None, "连接仓库失败"
elif res.status_code != 200:
return None, f"连接仓库失败:{res.status_code} - " \
f"{'超出速率限制请配置GITHUB_TOKEN环境变量或稍后重试' if res.status_code == 403 else res.reason}"
try:
ret = res.json()
if isinstance(ret, list) and len(ret) > 0 and "message" not in ret[0]:
return ret, ""
else:
return None, "插件在仓库中不存在或返回数据格式不正确"
except Exception as e:
logger.error(f"插件数据解析失败:{res.text}{e}")
return None, "插件数据解析失败"
def __download_files(self, pid: str, file_list: List[dict], user_repo: str, skip_requirements: bool = False) \
-> Tuple[bool, str]:
"""
下载插件文件
:param pid: 插件 ID
:param file_list: 要下载的文件列表,包含文件的元数据(包括下载链接)
:param user_repo: GitHub 仓库的 user/repo 路径
:param skip_requirements: 是否跳过 requirements.txt 文件的下载
:return: (是否成功, 错误信息)
"""
if not file_list:
return False, "文件列表为空"
# 使用栈结构来替代递归调用,避免递归深度过大问题
stack = [(pid, file_list)]
while stack:
current_pid, current_file_list = stack.pop()
for item in current_file_list:
# 跳过 requirements.txt 的下载
if skip_requirements and item.get("name") == "requirements.txt":
continue
if item.get("download_url"):
logger.debug(f"正在下载文件:{item.get('path')}")
res = self.__request_with_fallback(item.get('download_url'),
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
if not res:
return False, f"文件 {item.get('path')} 下载失败!"
elif res.status_code != 200:
return False, f"下载文件 {item.get('path')} 失败:{res.status_code}"
# 创建插件文件夹并写入文件
file_path = Path(settings.ROOT_PATH) / "app" / item.get("path")
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
f.write(res.text)
logger.debug(f"文件 {item.get('path')} 下载成功,保存路径:{file_path}")
else:
# 如果是子目录,则将子目录内容加入栈中继续处理
sub_list, msg = self.__get_file_list(f"{current_pid}/{item.get('name')}", user_repo)
if not sub_list:
return False, msg
stack.append((f"{current_pid}/{item.get('name')}", sub_list))
return True, ""
def __download_and_install_requirements(self, requirements_file_info: dict, pid: str, user_repo: str) \
-> Tuple[bool, str]:
"""
下载并安装 requirements.txt 文件中的依赖
:param requirements_file_info: requirements.txt 文件的元数据信息
:param pid: 插件 ID
:param user_repo: GitHub 仓库的 user/repo 路径
:return: (是否成功, 错误信息)
"""
# 下载 requirements.txt
res = self.__request_with_fallback(requirements_file_info.get("download_url"),
headers=settings.REPO_GITHUB_HEADERS(repo=user_repo))
if not res:
return False, "requirements.txt 文件下载失败"
elif res.status_code != 200:
return False, f"下载 requirements.txt 文件失败:{res.status_code}"
requirements_txt = res.text
if requirements_txt.strip():
# 保存并安装依赖
requirements_file_path = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower() / "requirements.txt"
requirements_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(requirements_file_path, "w", encoding="utf-8") as f:
f.write(requirements_txt)
success, message = self.__pip_install_with_fallback(requirements_file_path)
return success, message
return True, "" # 如果 requirements.txt 为空,视作成功
def __install_dependencies_if_required(self, pid: str) -> Tuple[bool, str]:
"""
安装插件依赖
:param pid: 插件 ID
:return: (是否成功, 错误信息)
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid.lower()
requirements_file = plugin_dir / "requirements.txt"
if requirements_file.exists():
logger.info(f"{pid} 存在依赖项,开始尝试安装依赖")
return self.__pip_install_with_fallback(requirements_file)
return True, ""
@staticmethod
def __remove_old_plugin(pid: str):
"""
删除旧插件
:param pid: 插件 ID
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
@staticmethod
def __pip_uninstall_and_install_with_fallback(requirements_file: Path) -> Tuple[bool, str]:
"""
先卸载 requirements.txt 中的依赖,再按照自动降级策略重新安装,不使用 PIP 缓存
:param requirements_file: 依赖的 requirements.txt 文件路径
:return: (是否成功, 错误信息)
"""
# 读取 requirements.txt 文件中的依赖列表
try:
with open(requirements_file, "r", encoding="utf-8") as f:
dependencies = [line.strip() for line in f if line.strip() and not line.startswith("#")]
except Exception as e:
return False, f"无法读取 requirements.txt 文件:{str(e)}"
# 1. 先卸载所有依赖包
for dep in dependencies:
pip_uninstall_command = ["pip", "uninstall", "-y", dep]
logger.debug(f"尝试卸载依赖:{dep},命令:{' '.join(pip_uninstall_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_uninstall_command)
if success:
logger.debug(f"依赖 {dep} 卸载成功,输出:{message}")
else:
error_message = f"卸载依赖 {dep} 失败,错误信息:{message}"
logger.error(error_message)
# 2. 重新安装所有依赖,使用自动降级策略
strategies = []
# 添加策略到列表中
if settings.PIP_PROXY:
strategies.append(("镜像站",
["pip", "install", "-r", str(requirements_file),
"-i", settings.PIP_PROXY, "--no-cache-dir"]))
if settings.PROXY_HOST:
strategies.append(("代理",
["pip", "install", "-r", str(requirements_file),
"--proxy", settings.PROXY_HOST, "--no-cache-dir"]))
strategies.append(("直连", ["pip", "install", "-r", str(requirements_file), "--no-cache-dir"]))
# 遍历策略进行安装
for strategy_name, pip_command in strategies:
logger.debug(f"PIP 尝试使用策略 {strategy_name} 安装依赖,命令:{' '.join(pip_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_command)
if success:
logger.debug(f"PIP 策略 {strategy_name} 安装依赖成功,输出:{message}")
return True, message
else:
logger.error(f"PIP 策略 {strategy_name} 安装依赖失败,错误信息:{message}")
return False, "所有依赖安装方式均失败,请检查网络连接或 PIP 配置"
@staticmethod
def __pip_install_with_fallback(requirements_file: Path) -> Tuple[bool, str]:
"""
使用自动降级策略PIP 安装依赖,优先级依次为镜像站、代理、直连
:param requirements_file: 依赖的 requirements.txt 文件路径
:return: (是否成功, 错误信息)
"""
strategies = []
# 添加策略到列表中
if settings.PIP_PROXY:
strategies.append(("镜像站", ["pip", "install", "-r", str(requirements_file), "-i", settings.PIP_PROXY]))
if settings.PROXY_HOST:
strategies.append(
("代理", ["pip", "install", "-r", str(requirements_file), "--proxy", settings.PROXY_HOST]))
strategies.append(("直连", ["pip", "install", "-r", str(requirements_file)]))
# 遍历策略进行安装
for strategy_name, pip_command in strategies:
logger.debug(f"PIP 尝试使用策略 {strategy_name} 安装依赖,命令:{' '.join(pip_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_command)
if success:
logger.debug(f"PIP 策略 {strategy_name} 安装依赖成功,输出:{message}")
return True, message
else:
logger.error(f"PIP 策略 {strategy_name} 安装依赖失败,错误信息:{message}")
return False, "所有PIP依赖安装方式均失败请检查网络连接或 PIP 配置"
@staticmethod
def __request_with_fallback(url: str,
headers: Optional[dict] = None,
timeout: int = 60,
is_api: bool = False) -> Optional[Any]:
"""
使用自动降级策略,请求资源,优先级依次为镜像站、代理、直连
:param url: 目标URL
:param headers: 请求头信息
:param timeout: 请求超时时间
:param is_api: 是否为GitHub API请求API请求不走镜像站
:return: 请求成功则返回 Response失败返回 None
"""
strategies = []
# 1. 尝试使用镜像站镜像站一般不支持API请求因此API请求直接跳过镜像站
if not is_api and settings.GITHUB_PROXY:
proxy_url = f"{UrlUtils.standardize_base_url(settings.GITHUB_PROXY)}{url}"
strategies.append(("镜像站", proxy_url, {"headers": headers, "timeout": timeout}))
# 2. 尝试使用代理
if settings.PROXY_HOST:
strategies.append(("代理", url, {"headers": headers, "proxies": settings.PROXY, "timeout": timeout}))
# 3. 最后尝试直连
strategies.append(("直连", url, {"headers": headers, "timeout": timeout}))
# 遍历策略并尝试请求
for strategy_name, target_url, request_params in strategies:
logger.debug(f"GitHub 尝试使用策略 {strategy_name} 访问 {target_url}")
try:
res = RequestUtils(**request_params).get_res(url=target_url, raise_exception=True)
logger.debug(f"GitHub 策略 {strategy_name} 访问成功URL: {target_url}")
return res
except Exception as e:
logger.error(f"GitHub 策略 {strategy_name} 访问失败URL: {target_url},错误信息:{str(e)}")
logger.error(f"所有GitHub策略访问 {url} 均失败")
return None

View File

@@ -3,6 +3,7 @@ import os
import platform
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import List, Union, Tuple, Optional
@@ -27,6 +28,27 @@ class SystemUtils:
print(str(err))
return ""
@staticmethod
def execute_with_subprocess(pip_command: list) -> Tuple[bool, str]:
"""
执行命令并捕获标准输出和错误输出,记录日志。
:param pip_command: 要执行的命令,以列表形式提供
:return: (命令是否成功, 输出信息或错误信息)
"""
try:
# 使用 subprocess.run 捕获标准输出和标准错误
result = subprocess.run(pip_command, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 合并 stdout 和 stderr
output = result.stdout + result.stderr
return True, output
except subprocess.CalledProcessError as e:
error_message = f"命令:{' '.join(pip_command)},执行失败,错误信息:{e.stderr.strip()}"
return False, error_message
except Exception as e:
error_message = f"未知错误,命令:{' '.join(pip_command)},错误:{str(e)}"
return False, error_message
@staticmethod
def is_docker() -> bool:
"""