mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-06 20:42:43 +08:00
fix(jellyfin): resolve URL string interpolation failure and enhance RBAC fallback resilience
Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import posixpath
|
||||
from datetime import datetime
|
||||
from typing import List, Union, Optional, Dict, Generator, Tuple, Any
|
||||
|
||||
@@ -123,7 +124,12 @@ class Jellyfin:
|
||||
user = self.get_user(username)
|
||||
else:
|
||||
user = self.user
|
||||
url = f"{self._host}Users/{user}/Views"
|
||||
if not user:
|
||||
return []
|
||||
# 使用标准库路径拼接结合统一 URL 规整,避免 host 尾部斜杠缺失导致的寻址偏移。
|
||||
url = UrlUtils.combine_url(self._host, posixpath.join("Users", str(user), "Views"))
|
||||
if not url:
|
||||
return []
|
||||
params = {"api_key": self._apikey}
|
||||
try:
|
||||
res = RequestUtils().get_res(url, params)
|
||||
@@ -213,10 +219,31 @@ class Jellyfin:
|
||||
for user in users:
|
||||
if user.get("Name") == user_name:
|
||||
return user.get("Id")
|
||||
# 查询管理员
|
||||
if user_name == settings.SUPERUSER:
|
||||
logger.warning(
|
||||
"MoviePilot 当前配置的超级管理员用户名为 {},请确保Jellyfin中存在同名管理员账号,否则可能无法正常使用部分功能!".format(settings.SUPERUSER)
|
||||
)
|
||||
# 查询管理员,优先选择同时具备全库访问能力的账号,再回退到普通管理员。
|
||||
# 获取总媒体库数量
|
||||
total_library_count = len(self.get_jellyfin_folders())
|
||||
best_admin_id = None
|
||||
best_admin_name = None
|
||||
best_admin_library_count = -1
|
||||
for user in users:
|
||||
if user.get("Policy", {}).get("IsAdministrator"):
|
||||
policy = user.get("Policy") or {}
|
||||
if not policy.get("IsAdministrator"):
|
||||
continue
|
||||
if policy.get("EnableAllFolders"):
|
||||
return user.get("Id")
|
||||
elif not policy.get("EnableAllFolders"):
|
||||
logger.warning(f"管理员账号 {user.get('Name')} 仅可访问{len(policy.get("EnabledFolders") or [])}/{total_library_count}个媒体库,可能导致媒体库数据不完整!")
|
||||
# 更新最佳管理员
|
||||
if best_admin_id is None or len(policy.get("EnabledFolders") or []) > best_admin_library_count:
|
||||
best_admin_id = user.get("Id")
|
||||
best_admin_name = user.get("Name")
|
||||
best_admin_library_count = len(policy.get("EnabledFolders") or [])
|
||||
logger.warning(f"未找到具备全库访问权限的管理员账号,回退使用仅可访问{best_admin_library_count}/{total_library_count}个媒体库的管理员账号{best_admin_name}!")
|
||||
return best_admin_id
|
||||
else:
|
||||
logger.error(f"Users 未获取到返回数据")
|
||||
except Exception as e:
|
||||
|
||||
213
tests/test_jellyfin.py
Normal file
213
tests/test_jellyfin.py
Normal file
@@ -0,0 +1,213 @@
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
def _load_jellyfin_module():
|
||||
module_name = "_test_jellyfin_module"
|
||||
if module_name in sys.modules:
|
||||
return sys.modules[module_name]
|
||||
|
||||
if "app.log" not in sys.modules:
|
||||
log_module = types.ModuleType("app.log")
|
||||
|
||||
class _Logger:
|
||||
def info(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def warning(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def error(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
def debug(self, *_args, **_kwargs):
|
||||
pass
|
||||
|
||||
log_module.logger = _Logger()
|
||||
sys.modules["app.log"] = log_module
|
||||
|
||||
if "app.core.config" not in sys.modules:
|
||||
config_module = types.ModuleType("app.core.config")
|
||||
config_module.settings = types.SimpleNamespace(SUPERUSER="admin", USER_AGENT="MoviePilot")
|
||||
sys.modules["app.core.config"] = config_module
|
||||
|
||||
if "app.schemas" not in sys.modules:
|
||||
schemas_module = types.ModuleType("app.schemas")
|
||||
schemas_module.MediaType = types.SimpleNamespace(MOVIE=types.SimpleNamespace(value="movie"))
|
||||
schemas_module.MediaServerItem = object
|
||||
schemas_module.MediaServerLibrary = object
|
||||
schemas_module.Statistic = object
|
||||
schemas_module.WebhookEventInfo = object
|
||||
schemas_module.MediaServerItemUserState = object
|
||||
schemas_module.MediaServerPlayItem = object
|
||||
sys.modules["app.schemas"] = schemas_module
|
||||
|
||||
if "app.utils.http" not in sys.modules:
|
||||
http_module = types.ModuleType("app.utils.http")
|
||||
|
||||
class _RequestUtils:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def get_res(self, *args, **kwargs):
|
||||
return None
|
||||
|
||||
http_module.RequestUtils = _RequestUtils
|
||||
sys.modules["app.utils.http"] = http_module
|
||||
|
||||
if "app.utils.url" not in sys.modules:
|
||||
url_module = types.ModuleType("app.utils.url")
|
||||
|
||||
class _UrlUtils:
|
||||
@staticmethod
|
||||
def standardize_base_url(host):
|
||||
if not host:
|
||||
return host
|
||||
if not host.endswith("/"):
|
||||
host += "/"
|
||||
if not host.startswith("http://") and not host.startswith("https://"):
|
||||
host = "http://" + host
|
||||
return host
|
||||
|
||||
@staticmethod
|
||||
def combine_url(host, path=None, query=None):
|
||||
from urllib.parse import urljoin
|
||||
|
||||
if path is None:
|
||||
path = "/"
|
||||
host = _UrlUtils.standardize_base_url(host)
|
||||
return urljoin(host, path)
|
||||
|
||||
url_module.UrlUtils = _UrlUtils
|
||||
sys.modules["app.utils.url"] = url_module
|
||||
|
||||
jellyfin_path = Path(__file__).resolve().parents[1] / "app" / "modules" / "jellyfin" / "jellyfin.py"
|
||||
spec = importlib.util.spec_from_file_location(module_name, jellyfin_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
assert spec and spec.loader
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
jellyfin_module = _load_jellyfin_module()
|
||||
Jellyfin = jellyfin_module.Jellyfin
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, payload: dict):
|
||||
self._payload = payload
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
class JellyfinUserResolutionTest(unittest.TestCase):
|
||||
def _build_client(self) -> Jellyfin:
|
||||
client = Jellyfin.__new__(Jellyfin)
|
||||
client._host = "http://jellyfin.local:8096"
|
||||
client._apikey = "api-key"
|
||||
client._playhost = None
|
||||
client._sync_libraries = []
|
||||
client.user = "fallback-user"
|
||||
return client
|
||||
|
||||
def test_get_user_prefers_exact_username_without_warning(self):
|
||||
client = self._build_client()
|
||||
payload = [
|
||||
{"Id": "admin-id", "Name": "admin", "Policy": {"IsAdministrator": True}},
|
||||
{"Id": "alice-id", "Name": "alice", "Policy": {"IsAdministrator": False}},
|
||||
]
|
||||
|
||||
with patch.object(jellyfin_module, "RequestUtils") as request_utils_cls, patch.object(
|
||||
jellyfin_module.logger, "warning"
|
||||
) as warning_mock:
|
||||
request_utils_cls.return_value.get_res.return_value = _FakeResponse(payload)
|
||||
|
||||
user_id = client.get_user("alice")
|
||||
|
||||
self.assertEqual(user_id, "alice-id")
|
||||
warning_mock.assert_not_called()
|
||||
|
||||
def test_get_user_prefers_enable_all_folders_admin(self):
|
||||
client = self._build_client()
|
||||
payload = [
|
||||
{
|
||||
"Id": "visible-admin-id",
|
||||
"Name": "visible",
|
||||
"Policy": {"IsAdministrator": True, "EnabledFolders": ["lib-1", "lib-2", "lib-3"]},
|
||||
},
|
||||
{
|
||||
"Id": "full-admin-id",
|
||||
"Name": "full",
|
||||
"Policy": {"IsAdministrator": True, "EnableAllFolders": True},
|
||||
},
|
||||
]
|
||||
|
||||
with patch.object(jellyfin_module, "RequestUtils") as request_utils_cls:
|
||||
request_utils_cls.return_value.get_res.return_value = _FakeResponse(payload)
|
||||
|
||||
user_id = client.get_user()
|
||||
|
||||
self.assertEqual(user_id, "full-admin-id")
|
||||
|
||||
def test_get_user_warns_and_prefers_larger_visible_scope_admin(self):
|
||||
client = self._build_client()
|
||||
payload = [
|
||||
{
|
||||
"Id": "small-admin-id",
|
||||
"Name": "small",
|
||||
"Policy": {"IsAdministrator": True, "EnabledFolders": ["lib-1"]},
|
||||
},
|
||||
{
|
||||
"Id": "large-admin-id",
|
||||
"Name": "large",
|
||||
"Policy": {"IsAdministrator": True, "EnabledFolders": ["lib-1", "lib-2", "lib-3"]},
|
||||
},
|
||||
{"Id": "user-id", "Name": "normal", "Policy": {"IsAdministrator": False}},
|
||||
]
|
||||
|
||||
with patch.object(jellyfin_module, "RequestUtils") as request_utils_cls, patch.object(
|
||||
jellyfin_module.logger, "warning"
|
||||
) as warning_mock:
|
||||
request_utils_cls.return_value.get_res.return_value = _FakeResponse(payload)
|
||||
|
||||
user_id = client.get_user("admin")
|
||||
|
||||
self.assertEqual(user_id, "large-admin-id")
|
||||
warning_mock.assert_called_once()
|
||||
self.assertIn("默认超级管理员状态流失", warning_mock.call_args.args[0])
|
||||
|
||||
def test_get_jellyfin_librarys_returns_empty_when_user_missing(self):
|
||||
client = self._build_client()
|
||||
client.user = None
|
||||
|
||||
with patch.object(jellyfin_module, "RequestUtils") as request_utils_cls:
|
||||
libraries = client._Jellyfin__get_jellyfin_librarys()
|
||||
|
||||
self.assertEqual(libraries, [])
|
||||
request_utils_cls.assert_not_called()
|
||||
|
||||
def test_get_jellyfin_librarys_uses_normalized_views_url(self):
|
||||
client = self._build_client()
|
||||
client._host = "http://jellyfin.local:8096"
|
||||
client.user = "user-id"
|
||||
|
||||
with patch.object(jellyfin_module, "RequestUtils") as request_utils_cls:
|
||||
request_utils_cls.return_value.get_res.return_value = _FakeResponse({"Items": []})
|
||||
|
||||
libraries = client._Jellyfin__get_jellyfin_librarys()
|
||||
|
||||
self.assertEqual(libraries, [])
|
||||
request_utils_cls.return_value.get_res.assert_called_once_with(
|
||||
"http://jellyfin.local:8096/Users/user-id/Views",
|
||||
{"api_key": "api-key"},
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user