mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-12 04:49:40 +08:00
feat: add ZSpace media server support with authentication and item management
This commit is contained in:
323
app/modules/zspace/__init__.py
Normal file
323
app/modules/zspace/__init__.py
Normal file
@@ -0,0 +1,323 @@
|
||||
from typing import Any, Generator, List, Optional, Tuple, Union
|
||||
|
||||
from app import schemas
|
||||
from app.core.context import MediaInfo
|
||||
from app.core.event import eventmanager
|
||||
from app.log import logger
|
||||
from app.modules import _MediaServerBase, _ModuleBase
|
||||
from app.modules.zspace.zspace import ZSpace
|
||||
from app.schemas import AuthCredentials, AuthInterceptCredentials
|
||||
from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType
|
||||
|
||||
|
||||
class ZSpaceModule(_ModuleBase, _MediaServerBase[ZSpace]):
|
||||
|
||||
def init_module(self) -> None:
|
||||
"""
|
||||
初始化模块
|
||||
"""
|
||||
super().init_service(service_name=ZSpace.__name__.lower(),
|
||||
service_type=lambda conf: ZSpace(**conf.config, sync_libraries=conf.sync_libraries))
|
||||
|
||||
@staticmethod
|
||||
def get_name() -> str:
|
||||
return "极影视"
|
||||
|
||||
@staticmethod
|
||||
def get_type() -> ModuleType:
|
||||
"""
|
||||
获取模块类型
|
||||
"""
|
||||
return ModuleType.MediaServer
|
||||
|
||||
@staticmethod
|
||||
def get_subtype() -> MediaServerType:
|
||||
"""
|
||||
获取模块子类型
|
||||
"""
|
||||
return MediaServerType.ZSpace
|
||||
|
||||
@staticmethod
|
||||
def get_priority() -> int:
|
||||
"""
|
||||
获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效
|
||||
"""
|
||||
return 6
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def test(self) -> Optional[Tuple[bool, str]]:
|
||||
"""
|
||||
测试模块连接性
|
||||
"""
|
||||
if not self.get_instances():
|
||||
return None
|
||||
for name, server in self.get_instances().items():
|
||||
if server.is_inactive() and not server.reconnect():
|
||||
return False, f"无法连接极影视服务器:{name}"
|
||||
if not server.user:
|
||||
return False, f"无法连接极影视服务器:{name}"
|
||||
return True, ""
|
||||
|
||||
def init_setting(self) -> Tuple[str, Union[str, bool]]:
|
||||
pass
|
||||
|
||||
def scheduler_job(self) -> None:
|
||||
"""
|
||||
定时任务,每10分钟调用一次
|
||||
"""
|
||||
for name, server in self.get_instances().items():
|
||||
if server.is_inactive():
|
||||
logger.info(f"极影视服务器 {name} 连接断开,尝试重连 ...")
|
||||
server.reconnect()
|
||||
|
||||
def user_authenticate(self, credentials: AuthCredentials, service_name: Optional[str] = None) \
|
||||
-> Optional[AuthCredentials]:
|
||||
"""
|
||||
使用极影视用户辅助完成用户认证
|
||||
:param credentials: 认证数据
|
||||
:param service_name: 指定要认证的媒体服务器名称,若为 None 则认证所有服务
|
||||
:return: 认证数据
|
||||
"""
|
||||
if not credentials or credentials.grant_type != "password":
|
||||
return None
|
||||
if service_name:
|
||||
servers = [(service_name, server)] if (server := self.get_instance(service_name)) else []
|
||||
else:
|
||||
servers = self.get_instances().items()
|
||||
for name, server in servers:
|
||||
intercept_event = eventmanager.send_event(
|
||||
etype=ChainEventType.AuthIntercept,
|
||||
data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(),
|
||||
service=name, status="triggered")
|
||||
)
|
||||
if intercept_event and intercept_event.event_data:
|
||||
intercept_data: AuthInterceptCredentials = intercept_event.event_data
|
||||
if intercept_data.cancel:
|
||||
continue
|
||||
token = server.authenticate(credentials.username, credentials.password)
|
||||
if token:
|
||||
credentials.channel = self.get_name()
|
||||
credentials.service = name
|
||||
credentials.token = token
|
||||
return credentials
|
||||
return None
|
||||
|
||||
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]:
|
||||
"""
|
||||
解析Webhook报文体
|
||||
:param body: 请求体
|
||||
:param form: 请求表单
|
||||
:param args: 请求参数
|
||||
:return: 字典,解析为消息时需要包含:title、text、image
|
||||
"""
|
||||
source = args.get("source")
|
||||
if source:
|
||||
server: ZSpace = self.get_instance(source)
|
||||
if not server:
|
||||
return None
|
||||
result = server.get_webhook_message(form, args)
|
||||
if result:
|
||||
result.server_name = source
|
||||
return result
|
||||
|
||||
for server in self.get_instances().values():
|
||||
if server:
|
||||
result = server.get_webhook_message(form, args)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
def media_exists(self, mediainfo: MediaInfo, itemid: Optional[str] = None,
|
||||
server: Optional[str] = None) -> Optional[schemas.ExistMediaInfo]:
|
||||
"""
|
||||
判断媒体文件是否存在
|
||||
:param mediainfo: 识别的媒体信息
|
||||
:param itemid: 媒体服务器ItemID
|
||||
:param server: 媒体服务器名称
|
||||
:return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
|
||||
"""
|
||||
if server:
|
||||
servers = [(server, self.get_instance(server))]
|
||||
else:
|
||||
servers = self.get_instances().items()
|
||||
for name, s in servers:
|
||||
if not s:
|
||||
continue
|
||||
if mediainfo.type == MediaType.MOVIE:
|
||||
if itemid:
|
||||
movie = s.get_iteminfo(itemid)
|
||||
if movie:
|
||||
logger.info(f"媒体库 {name} 中找到了 {movie}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.MOVIE,
|
||||
server_type="zspace",
|
||||
server=name,
|
||||
itemid=movie.item_id
|
||||
)
|
||||
movies = s.get_movies(title=mediainfo.title,
|
||||
year=mediainfo.year,
|
||||
tmdb_id=mediainfo.tmdb_id)
|
||||
if not movies:
|
||||
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中")
|
||||
continue
|
||||
else:
|
||||
logger.info(f"媒体库 {name} 中找到了 {movies}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.MOVIE,
|
||||
server_type="zspace",
|
||||
server=name,
|
||||
itemid=movies[0].item_id
|
||||
)
|
||||
else:
|
||||
itemid, tvs = s.get_tv_episodes(title=mediainfo.title,
|
||||
year=mediainfo.year,
|
||||
tmdb_id=mediainfo.tmdb_id,
|
||||
item_id=itemid)
|
||||
if not tvs:
|
||||
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中")
|
||||
continue
|
||||
else:
|
||||
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
|
||||
return schemas.ExistMediaInfo(
|
||||
type=MediaType.TV,
|
||||
seasons=tvs,
|
||||
server_type="zspace",
|
||||
server=name,
|
||||
itemid=itemid
|
||||
)
|
||||
return None
|
||||
|
||||
def media_statistic(self, server: Optional[str] = None) -> Optional[List[schemas.Statistic]]:
|
||||
"""
|
||||
媒体数量统计
|
||||
"""
|
||||
if server:
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
servers = [server_obj]
|
||||
else:
|
||||
servers = self.get_instances().values()
|
||||
media_statistics = []
|
||||
for s in servers:
|
||||
media_statistic = s.get_medias_count()
|
||||
if not media_statistic:
|
||||
continue
|
||||
media_statistic.user_count = s.get_user_count()
|
||||
media_statistics.append(media_statistic)
|
||||
return media_statistics
|
||||
|
||||
def mediaserver_librarys(self, server: str,
|
||||
username: Optional[str] = None,
|
||||
hidden: Optional[bool] = False) -> Optional[List[schemas.MediaServerLibrary]]:
|
||||
"""
|
||||
媒体库列表
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_librarys(username=username, hidden=hidden)
|
||||
return None
|
||||
|
||||
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: Optional[int] = 0,
|
||||
limit: Optional[int] = -1) -> Optional[Generator]:
|
||||
"""
|
||||
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
|
||||
|
||||
:param server: 媒体服务器名称
|
||||
:param library_id: 媒体库ID,用于标识要获取的媒体库
|
||||
:param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取
|
||||
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1
|
||||
|
||||
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_items(library_id, start_index, limit)
|
||||
return None
|
||||
|
||||
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
|
||||
"""
|
||||
媒体库项目详情
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if server_obj:
|
||||
return server_obj.get_iteminfo(item_id)
|
||||
return None
|
||||
|
||||
def mediaserver_tv_episodes(self, server: str,
|
||||
item_id: Union[str, int]) -> Optional[List[schemas.MediaServerSeasonInfo]]:
|
||||
"""
|
||||
获取剧集信息
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
_, seasoninfo = server_obj.get_tv_episodes(item_id=item_id)
|
||||
if not seasoninfo:
|
||||
return []
|
||||
return [schemas.MediaServerSeasonInfo(
|
||||
season=season,
|
||||
episodes=episodes
|
||||
) for season, episodes in seasoninfo.items()]
|
||||
|
||||
def mediaserver_playing(self, server: str, count: Optional[int] = 20,
|
||||
username: Optional[str] = None) -> List[schemas.MediaServerPlayItem]:
|
||||
"""
|
||||
获取媒体服务器正在播放信息
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
return server_obj.get_resume(num=count, username=username)
|
||||
|
||||
def mediaserver_play_url(self, server: str, item_id: Union[str, int]) -> Optional[str]:
|
||||
"""
|
||||
获取媒体库播放地址
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return None
|
||||
return server_obj.get_play_url(item_id)
|
||||
|
||||
def mediaserver_latest(self, server: Optional[str] = None, count: Optional[int] = 20,
|
||||
username: Optional[str] = None) -> List[schemas.MediaServerPlayItem]:
|
||||
"""
|
||||
获取媒体服务器最新入库条目
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
return server_obj.get_latest(num=count, username=username)
|
||||
|
||||
def mediaserver_latest_images(self,
|
||||
server: Optional[str] = None,
|
||||
count: Optional[int] = 10,
|
||||
username: Optional[str] = None,
|
||||
remote: Optional[bool] = False
|
||||
) -> List[str]:
|
||||
"""
|
||||
获取媒体服务器最新入库条目的图片
|
||||
|
||||
:param server: 媒体服务器名称
|
||||
:param count: 获取数量
|
||||
:param username: 用户名
|
||||
:param remote: True为外网链接, False为内网链接
|
||||
:return: 图片链接列表
|
||||
"""
|
||||
server_obj: ZSpace = self.get_instance(server)
|
||||
if not server_obj:
|
||||
return []
|
||||
|
||||
links = []
|
||||
items: List[schemas.MediaServerPlayItem] = self.mediaserver_latest(server=server, count=count,
|
||||
username=username)
|
||||
for item in items:
|
||||
if item.BackdropImageTags:
|
||||
image_url = server_obj.get_backdrop_url(item_id=item.id,
|
||||
image_tag=item.BackdropImageTags[0],
|
||||
remote=remote)
|
||||
if image_url:
|
||||
links.append(image_url)
|
||||
return links
|
||||
238
app/modules/zspace/zspace.py
Normal file
238
app/modules/zspace/zspace.py
Normal file
@@ -0,0 +1,238 @@
|
||||
import json
|
||||
from typing import Any, Generator, List, Optional, Tuple, Union
|
||||
|
||||
from app import schemas
|
||||
from app.log import logger
|
||||
from app.modules.emby.emby import Emby
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.url import UrlUtils
|
||||
|
||||
|
||||
class ZSpace(Emby):
|
||||
_password: Optional[str] = None
|
||||
|
||||
def __init__(self, host: Optional[str] = None, username: Optional[str] = None,
|
||||
password: Optional[str] = None, play_host: Optional[str] = None,
|
||||
sync_libraries: list = None, **kwargs):
|
||||
if not host or not username or not password:
|
||||
logger.error("极影视服务器配置不完整!")
|
||||
return
|
||||
self._host = host
|
||||
if self._host:
|
||||
self._host = UrlUtils.standardize_base_url(self._host)
|
||||
self._playhost = play_host
|
||||
if self._playhost:
|
||||
self._playhost = UrlUtils.standardize_base_url(self._playhost)
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._apikey = None
|
||||
self.user = None
|
||||
self.folders = []
|
||||
self.serverid = None
|
||||
self._sync_libraries = sync_libraries or []
|
||||
if not self.reconnect():
|
||||
logger.error(f"请检查极影视服务端地址 {host}")
|
||||
|
||||
def is_inactive(self) -> bool:
|
||||
"""
|
||||
判断是否需要重连
|
||||
"""
|
||||
if not self._host or not self._username or not self._password:
|
||||
return False
|
||||
if not self._apikey or not self.user:
|
||||
return True
|
||||
current_user = self.__get_current_user()
|
||||
if not current_user:
|
||||
return True
|
||||
self.user = current_user.get("Id") or self.user
|
||||
return False
|
||||
|
||||
def reconnect(self) -> bool:
|
||||
"""
|
||||
重连
|
||||
"""
|
||||
token, user_id = self.__login(self._username, self._password)
|
||||
if not token:
|
||||
self._apikey = None
|
||||
self.user = None
|
||||
self.folders = []
|
||||
self.serverid = None
|
||||
return False
|
||||
self._apikey = token
|
||||
if not user_id:
|
||||
current_user = self.__get_current_user()
|
||||
if not current_user:
|
||||
self._apikey = None
|
||||
self.user = None
|
||||
self.folders = []
|
||||
self.serverid = None
|
||||
return False
|
||||
user_id = current_user.get("Id")
|
||||
self.user = user_id
|
||||
self.folders = self.get_emby_folders()
|
||||
self.serverid = self.get_server_id()
|
||||
return True
|
||||
|
||||
def authenticate(self, username: str, password: str) -> Optional[str]:
|
||||
"""
|
||||
用户认证
|
||||
:param username: 用户名
|
||||
:param password: 密码
|
||||
:return: 认证token
|
||||
"""
|
||||
token, _ = self.__login(username, password)
|
||||
if token:
|
||||
logger.info(f"用户 {username} 极影视认证成功")
|
||||
return token
|
||||
|
||||
def get_user(self, user_name: Optional[str] = None) -> Optional[Union[str, int]]:
|
||||
"""
|
||||
获取用户ID。
|
||||
极影视使用登录态 token 时,不一定总能枚举全部用户,失败时回退当前登录用户。
|
||||
"""
|
||||
if user_name and user_name == self._username and self.user:
|
||||
return self.user
|
||||
user_id = super().get_user(user_name)
|
||||
if user_id:
|
||||
return user_id
|
||||
current_user = self.__get_current_user()
|
||||
if current_user:
|
||||
current_user_id = current_user.get("Id")
|
||||
current_user_name = current_user.get("Name")
|
||||
if current_user_id:
|
||||
self.user = current_user_id
|
||||
if not user_name or user_name == current_user_name:
|
||||
return current_user_id
|
||||
return self.user
|
||||
|
||||
def get_user_count(self) -> int:
|
||||
"""
|
||||
获取用户数量。
|
||||
无法枚举用户时,至少返回当前登录用户数量。
|
||||
"""
|
||||
count = super().get_user_count()
|
||||
if count:
|
||||
return count
|
||||
return 1 if self.user else 0
|
||||
|
||||
def get_librarys(self, username: Optional[str] = None,
|
||||
hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]:
|
||||
"""
|
||||
获取媒体服务器所有媒体库列表
|
||||
"""
|
||||
libraries = super().get_librarys(username=username, hidden=hidden)
|
||||
for library in libraries or []:
|
||||
library.server = "zspace"
|
||||
library.server_type = "zspace"
|
||||
return libraries
|
||||
|
||||
def get_movies(self, title: str, year: Optional[str] = None,
|
||||
tmdb_id: Optional[int] = None) -> Optional[List[schemas.MediaServerItem]]:
|
||||
"""
|
||||
根据标题和年份,检查电影是否在极影视中存在,存在则返回列表
|
||||
"""
|
||||
movies = super().get_movies(title=title, year=year, tmdb_id=tmdb_id)
|
||||
for movie in movies or []:
|
||||
movie.server = "zspace"
|
||||
return movies
|
||||
|
||||
def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
|
||||
"""
|
||||
获取单个项目详情
|
||||
"""
|
||||
item = super().get_iteminfo(itemid)
|
||||
if item:
|
||||
item.server = "zspace"
|
||||
return item
|
||||
|
||||
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0,
|
||||
limit: Optional[int] = -1) -> Generator[schemas.MediaServerItem, Any, None]:
|
||||
"""
|
||||
获取媒体服务器项目列表
|
||||
"""
|
||||
for item in super().get_items(parent=parent, start_index=start_index, limit=limit) or []:
|
||||
if item:
|
||||
item.server = "zspace"
|
||||
yield item
|
||||
|
||||
def get_webhook_message(self, form: Any, args: dict) -> Optional[schemas.WebhookEventInfo]:
|
||||
"""
|
||||
解析极影视 Webhook 报文
|
||||
"""
|
||||
event_item = super().get_webhook_message(form, args)
|
||||
if event_item:
|
||||
event_item.channel = "zspace"
|
||||
return event_item
|
||||
|
||||
def get_resume(self, num: Optional[int] = 12,
|
||||
username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]:
|
||||
"""
|
||||
获得继续观看
|
||||
"""
|
||||
items = super().get_resume(num=num, username=username)
|
||||
for item in items or []:
|
||||
item.server_type = "zspace"
|
||||
return items
|
||||
|
||||
def get_latest(self, num: Optional[int] = 20,
|
||||
username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]:
|
||||
"""
|
||||
获得最近更新
|
||||
"""
|
||||
items = super().get_latest(num=num, username=username)
|
||||
for item in items or []:
|
||||
item.server_type = "zspace"
|
||||
return items
|
||||
|
||||
def __login(self, username: Optional[str], password: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
使用用户名密码登录极影视,返回访问令牌和用户ID
|
||||
"""
|
||||
if not self._host or not username or not password:
|
||||
return None, None
|
||||
url = f"{self._host}emby/Users/AuthenticateByName"
|
||||
try:
|
||||
res = RequestUtils(headers={
|
||||
'X-Emby-Authorization': 'MediaBrowser Client="MoviePilot", '
|
||||
'Device="requests", '
|
||||
'DeviceId="1", '
|
||||
'Version="1.0.0"',
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json'
|
||||
}).post_res(
|
||||
url=url,
|
||||
data=json.dumps({
|
||||
"Username": username,
|
||||
"Pw": password
|
||||
})
|
||||
)
|
||||
if res:
|
||||
result = res.json() or {}
|
||||
token = result.get("AccessToken")
|
||||
user_id = result.get("User", {}).get("Id")
|
||||
if token:
|
||||
return token, user_id
|
||||
else:
|
||||
logger.error("Users/AuthenticateByName 未获取到返回数据")
|
||||
except Exception as e:
|
||||
logger.error(f"连接Users/AuthenticateByName出错:{e}")
|
||||
return None, None
|
||||
|
||||
def __get_current_user(self) -> Optional[dict]:
|
||||
"""
|
||||
获取当前登录用户信息
|
||||
"""
|
||||
if not self._host or not self._apikey:
|
||||
return None
|
||||
url = f"{self._host}emby/Users/Me"
|
||||
params = {
|
||||
"api_key": self._apikey
|
||||
}
|
||||
try:
|
||||
res = RequestUtils().get_res(url, params)
|
||||
if res:
|
||||
return res.json()
|
||||
logger.error("Users/Me 未获取到返回数据")
|
||||
except Exception as e:
|
||||
logger.error(f"连接Users/Me出错:{e}")
|
||||
return None
|
||||
@@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel):
|
||||
type: Optional[MediaType] = None
|
||||
# 季
|
||||
seasons: Optional[Dict[int, list]] = Field(default_factory=dict)
|
||||
# 媒体服务器类型:plex、jellyfin、emby、trimemedia、ugreen
|
||||
# 媒体服务器类型:plex、jellyfin、emby、zspace、trimemedia、ugreen
|
||||
server_type: Optional[str] = None
|
||||
# 媒体服务器名称
|
||||
server: Optional[str] = None
|
||||
|
||||
@@ -29,7 +29,7 @@ class MediaServerConf(BaseModel):
|
||||
|
||||
# 名称
|
||||
name: Optional[str] = None
|
||||
# 类型 emby/jellyfin/plex/trimemedia/ugreen
|
||||
# 类型 emby/zspace/jellyfin/plex/trimemedia/ugreen
|
||||
type: Optional[str] = None
|
||||
# 配置
|
||||
config: Optional[dict] = Field(default_factory=dict)
|
||||
|
||||
@@ -332,6 +332,8 @@ class DownloaderType(Enum):
|
||||
class MediaServerType(Enum):
|
||||
# Emby
|
||||
Emby = "Emby"
|
||||
# 极影视
|
||||
ZSpace = "ZSpace"
|
||||
# Jellyfin
|
||||
Jellyfin = "Jellyfin"
|
||||
# Plex
|
||||
|
||||
@@ -1685,6 +1685,7 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]:
|
||||
{
|
||||
"skip": "跳过",
|
||||
"emby": "Emby",
|
||||
"zspace": "极影视",
|
||||
"jellyfin": "Jellyfin",
|
||||
"plex": "Plex",
|
||||
},
|
||||
@@ -1696,6 +1697,7 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]:
|
||||
config_name = _prompt_text("媒体服务器名称", default=server_type)
|
||||
default_host = {
|
||||
"emby": "http://127.0.0.1:8096",
|
||||
"zspace": "http://127.0.0.1:8096",
|
||||
"jellyfin": "http://127.0.0.1:8096",
|
||||
"plex": "http://127.0.0.1:32400",
|
||||
}[server_type]
|
||||
@@ -1707,6 +1709,12 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]:
|
||||
"host": host,
|
||||
"token": _prompt_text("Plex Token", secret=True),
|
||||
}
|
||||
elif server_type == "zspace":
|
||||
config = {
|
||||
"host": host,
|
||||
"username": _prompt_text("极影视 用户名"),
|
||||
"password": _prompt_text("极影视 密码", secret=True),
|
||||
}
|
||||
else:
|
||||
config = {
|
||||
"host": host,
|
||||
|
||||
@@ -9,6 +9,7 @@ from app.modules.jellyfin.jellyfin import Jellyfin
|
||||
from app.modules.plex.plex import Plex
|
||||
from app.modules.trimemedia.trimemedia import TrimeMedia
|
||||
from app.modules.ugreen.ugreen import Ugreen
|
||||
from app.modules.zspace.zspace import ZSpace
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
@@ -152,6 +153,31 @@ class MediaServerTvStaleItemIdTest(unittest.TestCase):
|
||||
self.assertEqual(episodes, {1: [1]})
|
||||
client._Jellyfin__get_jellyfin_series_id_by_name.assert_called_once_with("测试剧集", "2026")
|
||||
|
||||
def test_zspace_tv_episodes_fallback_when_cached_item_id_missing(self):
|
||||
"""极影视缓存ID失效时,应重新搜索剧集ID后再查询集信息。"""
|
||||
client = ZSpace.__new__(ZSpace)
|
||||
client._host = "http://zspace.local/"
|
||||
client._apikey = "api-key"
|
||||
client.user = "user-id"
|
||||
client.get_iteminfo = Mock(side_effect=[None, SimpleNamespace(tmdbid=12345)])
|
||||
client._Emby__get_emby_series_id_by_name = Mock(return_value="new-series-id")
|
||||
|
||||
with patch("app.modules.emby.emby.RequestUtils") as request_utils_cls:
|
||||
request_utils_cls.return_value.get_res.return_value = _FakeResponse({
|
||||
"Items": [{"ParentIndexNumber": 1, "IndexNumber": 1}]
|
||||
})
|
||||
|
||||
item_id, episodes = client.get_tv_episodes(
|
||||
item_id="old-series-id",
|
||||
title="测试剧集",
|
||||
year="2026",
|
||||
tmdb_id=12345,
|
||||
)
|
||||
|
||||
self.assertEqual(item_id, "new-series-id")
|
||||
self.assertEqual(episodes, {1: [1]})
|
||||
client._Emby__get_emby_series_id_by_name.assert_called_once_with("测试剧集", "2026")
|
||||
|
||||
def test_ugreen_tv_episodes_fallback_when_cached_item_id_missing(self):
|
||||
"""绿联缓存ID失效时,应重新搜索剧集ID后再查询集信息。"""
|
||||
client = Ugreen.__new__(Ugreen)
|
||||
|
||||
74
tests/test_zspace_mediaserver.py
Normal file
74
tests/test_zspace_mediaserver.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from app.modules.zspace.zspace import ZSpace
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, payload: dict | list):
|
||||
self._payload = payload
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
class ZSpaceMediaServerTest(unittest.TestCase):
|
||||
def test_reconnect_uses_username_password_login(self):
|
||||
login_request_utils = Mock()
|
||||
login_request_utils.post_res.return_value = _FakeResponse({
|
||||
"AccessToken": "zspace-token",
|
||||
"User": {"Id": "user-id"},
|
||||
})
|
||||
emby_request_utils = Mock()
|
||||
emby_request_utils.get_res.side_effect = [
|
||||
_FakeResponse([]),
|
||||
_FakeResponse({"Id": "server-id"}),
|
||||
]
|
||||
|
||||
with patch("app.modules.zspace.zspace.RequestUtils", return_value=login_request_utils), patch(
|
||||
"app.modules.emby.emby.RequestUtils", return_value=emby_request_utils
|
||||
):
|
||||
client = ZSpace(
|
||||
host="http://zspace.local",
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
|
||||
self.assertEqual(client._apikey, "zspace-token")
|
||||
self.assertEqual(client.user, "user-id")
|
||||
self.assertEqual(client.serverid, "server-id")
|
||||
|
||||
def test_get_user_falls_back_to_current_login_user(self):
|
||||
client = ZSpace.__new__(ZSpace)
|
||||
client._username = "admin"
|
||||
client.user = "current-user-id"
|
||||
client._ZSpace__get_current_user = Mock(return_value={"Id": "current-user-id", "Name": "admin"})
|
||||
|
||||
with patch("app.modules.emby.emby.Emby.get_user", return_value=None):
|
||||
user_id = client.get_user("admin")
|
||||
|
||||
self.assertEqual(user_id, "current-user-id")
|
||||
|
||||
def test_authenticate_does_not_require_existing_api_key(self):
|
||||
with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls:
|
||||
request_utils_cls.return_value.post_res.return_value = _FakeResponse({
|
||||
"AccessToken": "user-token",
|
||||
"User": {"Id": "user-id"},
|
||||
})
|
||||
|
||||
client = ZSpace.__new__(ZSpace)
|
||||
client._host = "http://zspace.local/"
|
||||
client._apikey = None
|
||||
|
||||
token = client.authenticate("user", "password")
|
||||
|
||||
self.assertEqual(token, "user-token")
|
||||
headers = request_utils_cls.call_args.kwargs.get("headers") or {}
|
||||
self.assertEqual(
|
||||
headers.get("X-Emby-Authorization"),
|
||||
'MediaBrowser Client="MoviePilot", Device="requests", DeviceId="1", Version="1.0.0"',
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user