diff --git a/app/modules/zspace/__init__.py b/app/modules/zspace/__init__.py new file mode 100644 index 00000000..9c6e5efd --- /dev/null +++ b/app/modules/zspace/__init__.py @@ -0,0 +1,323 @@ +from typing import Any, Generator, List, Optional, Tuple, Union + +from app import schemas +from app.core.context import MediaInfo +from app.core.event import eventmanager +from app.log import logger +from app.modules import _MediaServerBase, _ModuleBase +from app.modules.zspace.zspace import ZSpace +from app.schemas import AuthCredentials, AuthInterceptCredentials +from app.schemas.types import ChainEventType, MediaServerType, MediaType, ModuleType + + +class ZSpaceModule(_ModuleBase, _MediaServerBase[ZSpace]): + + def init_module(self) -> None: + """ + 初始化模块 + """ + super().init_service(service_name=ZSpace.__name__.lower(), + service_type=lambda conf: ZSpace(**conf.config, sync_libraries=conf.sync_libraries)) + + @staticmethod + def get_name() -> str: + return "极影视" + + @staticmethod + def get_type() -> ModuleType: + """ + 获取模块类型 + """ + return ModuleType.MediaServer + + @staticmethod + def get_subtype() -> MediaServerType: + """ + 获取模块子类型 + """ + return MediaServerType.ZSpace + + @staticmethod + def get_priority() -> int: + """ + 获取模块优先级,数字越小优先级越高,只有同一接口下优先级才生效 + """ + return 6 + + def stop(self): + pass + + def test(self) -> Optional[Tuple[bool, str]]: + """ + 测试模块连接性 + """ + if not self.get_instances(): + return None + for name, server in self.get_instances().items(): + if server.is_inactive() and not server.reconnect(): + return False, f"无法连接极影视服务器:{name}" + if not server.user: + return False, f"无法连接极影视服务器:{name}" + return True, "" + + def init_setting(self) -> Tuple[str, Union[str, bool]]: + pass + + def scheduler_job(self) -> None: + """ + 定时任务,每10分钟调用一次 + """ + for name, server in self.get_instances().items(): + if server.is_inactive(): + logger.info(f"极影视服务器 {name} 连接断开,尝试重连 ...") + server.reconnect() + + def user_authenticate(self, credentials: AuthCredentials, service_name: Optional[str] = None) \ + -> Optional[AuthCredentials]: + """ + 使用极影视用户辅助完成用户认证 + :param credentials: 认证数据 + :param service_name: 指定要认证的媒体服务器名称,若为 None 则认证所有服务 + :return: 认证数据 + """ + if not credentials or credentials.grant_type != "password": + return None + if service_name: + servers = [(service_name, server)] if (server := self.get_instance(service_name)) else [] + else: + servers = self.get_instances().items() + for name, server in servers: + intercept_event = eventmanager.send_event( + etype=ChainEventType.AuthIntercept, + data=AuthInterceptCredentials(username=credentials.username, channel=self.get_name(), + service=name, status="triggered") + ) + if intercept_event and intercept_event.event_data: + intercept_data: AuthInterceptCredentials = intercept_event.event_data + if intercept_data.cancel: + continue + token = server.authenticate(credentials.username, credentials.password) + if token: + credentials.channel = self.get_name() + credentials.service = name + credentials.token = token + return credentials + return None + + def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]: + """ + 解析Webhook报文体 + :param body: 请求体 + :param form: 请求表单 + :param args: 请求参数 + :return: 字典,解析为消息时需要包含:title、text、image + """ + source = args.get("source") + if source: + server: ZSpace = self.get_instance(source) + if not server: + return None + result = server.get_webhook_message(form, args) + if result: + result.server_name = source + return result + + for server in self.get_instances().values(): + if server: + result = server.get_webhook_message(form, args) + if result: + return result + return None + + def media_exists(self, mediainfo: MediaInfo, itemid: Optional[str] = None, + server: Optional[str] = None) -> Optional[schemas.ExistMediaInfo]: + """ + 判断媒体文件是否存在 + :param mediainfo: 识别的媒体信息 + :param itemid: 媒体服务器ItemID + :param server: 媒体服务器名称 + :return: 如不存在返回None,存在时返回信息,包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}} + """ + if server: + servers = [(server, self.get_instance(server))] + else: + servers = self.get_instances().items() + for name, s in servers: + if not s: + continue + if mediainfo.type == MediaType.MOVIE: + if itemid: + movie = s.get_iteminfo(itemid) + if movie: + logger.info(f"媒体库 {name} 中找到了 {movie}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="zspace", + server=name, + itemid=movie.item_id + ) + movies = s.get_movies(title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id) + if not movies: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + else: + logger.info(f"媒体库 {name} 中找到了 {movies}") + return schemas.ExistMediaInfo( + type=MediaType.MOVIE, + server_type="zspace", + server=name, + itemid=movies[0].item_id + ) + else: + itemid, tvs = s.get_tv_episodes(title=mediainfo.title, + year=mediainfo.year, + tmdb_id=mediainfo.tmdb_id, + item_id=itemid) + if not tvs: + logger.info(f"{mediainfo.title_year} 没有在媒体库 {name} 中") + continue + else: + logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}") + return schemas.ExistMediaInfo( + type=MediaType.TV, + seasons=tvs, + server_type="zspace", + server=name, + itemid=itemid + ) + return None + + def media_statistic(self, server: Optional[str] = None) -> Optional[List[schemas.Statistic]]: + """ + 媒体数量统计 + """ + if server: + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return None + servers = [server_obj] + else: + servers = self.get_instances().values() + media_statistics = [] + for s in servers: + media_statistic = s.get_medias_count() + if not media_statistic: + continue + media_statistic.user_count = s.get_user_count() + media_statistics.append(media_statistic) + return media_statistics + + def mediaserver_librarys(self, server: str, + username: Optional[str] = None, + hidden: Optional[bool] = False) -> Optional[List[schemas.MediaServerLibrary]]: + """ + 媒体库列表 + """ + server_obj: ZSpace = self.get_instance(server) + if server_obj: + return server_obj.get_librarys(username=username, hidden=hidden) + return None + + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: Optional[int] = 0, + limit: Optional[int] = -1) -> Optional[Generator]: + """ + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 + """ + server_obj: ZSpace = self.get_instance(server) + if server_obj: + return server_obj.get_items(library_id, start_index, limit) + return None + + def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: + """ + 媒体库项目详情 + """ + server_obj: ZSpace = self.get_instance(server) + if server_obj: + return server_obj.get_iteminfo(item_id) + return None + + def mediaserver_tv_episodes(self, server: str, + item_id: Union[str, int]) -> Optional[List[schemas.MediaServerSeasonInfo]]: + """ + 获取剧集信息 + """ + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return None + _, seasoninfo = server_obj.get_tv_episodes(item_id=item_id) + if not seasoninfo: + return [] + return [schemas.MediaServerSeasonInfo( + season=season, + episodes=episodes + ) for season, episodes in seasoninfo.items()] + + def mediaserver_playing(self, server: str, count: Optional[int] = 20, + username: Optional[str] = None) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器正在播放信息 + """ + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_resume(num=count, username=username) + + def mediaserver_play_url(self, server: str, item_id: Union[str, int]) -> Optional[str]: + """ + 获取媒体库播放地址 + """ + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return None + return server_obj.get_play_url(item_id) + + def mediaserver_latest(self, server: Optional[str] = None, count: Optional[int] = 20, + username: Optional[str] = None) -> List[schemas.MediaServerPlayItem]: + """ + 获取媒体服务器最新入库条目 + """ + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return [] + return server_obj.get_latest(num=count, username=username) + + def mediaserver_latest_images(self, + server: Optional[str] = None, + count: Optional[int] = 10, + username: Optional[str] = None, + remote: Optional[bool] = False + ) -> List[str]: + """ + 获取媒体服务器最新入库条目的图片 + + :param server: 媒体服务器名称 + :param count: 获取数量 + :param username: 用户名 + :param remote: True为外网链接, False为内网链接 + :return: 图片链接列表 + """ + server_obj: ZSpace = self.get_instance(server) + if not server_obj: + return [] + + links = [] + items: List[schemas.MediaServerPlayItem] = self.mediaserver_latest(server=server, count=count, + username=username) + for item in items: + if item.BackdropImageTags: + image_url = server_obj.get_backdrop_url(item_id=item.id, + image_tag=item.BackdropImageTags[0], + remote=remote) + if image_url: + links.append(image_url) + return links diff --git a/app/modules/zspace/zspace.py b/app/modules/zspace/zspace.py new file mode 100644 index 00000000..a7370023 --- /dev/null +++ b/app/modules/zspace/zspace.py @@ -0,0 +1,238 @@ +import json +from typing import Any, Generator, List, Optional, Tuple, Union + +from app import schemas +from app.log import logger +from app.modules.emby.emby import Emby +from app.utils.http import RequestUtils +from app.utils.url import UrlUtils + + +class ZSpace(Emby): + _password: Optional[str] = None + + def __init__(self, host: Optional[str] = None, username: Optional[str] = None, + password: Optional[str] = None, play_host: Optional[str] = None, + sync_libraries: list = None, **kwargs): + if not host or not username or not password: + logger.error("极影视服务器配置不完整!") + return + self._host = host + if self._host: + self._host = UrlUtils.standardize_base_url(self._host) + self._playhost = play_host + if self._playhost: + self._playhost = UrlUtils.standardize_base_url(self._playhost) + self._username = username + self._password = password + self._apikey = None + self.user = None + self.folders = [] + self.serverid = None + self._sync_libraries = sync_libraries or [] + if not self.reconnect(): + logger.error(f"请检查极影视服务端地址 {host}") + + def is_inactive(self) -> bool: + """ + 判断是否需要重连 + """ + if not self._host or not self._username or not self._password: + return False + if not self._apikey or not self.user: + return True + current_user = self.__get_current_user() + if not current_user: + return True + self.user = current_user.get("Id") or self.user + return False + + def reconnect(self) -> bool: + """ + 重连 + """ + token, user_id = self.__login(self._username, self._password) + if not token: + self._apikey = None + self.user = None + self.folders = [] + self.serverid = None + return False + self._apikey = token + if not user_id: + current_user = self.__get_current_user() + if not current_user: + self._apikey = None + self.user = None + self.folders = [] + self.serverid = None + return False + user_id = current_user.get("Id") + self.user = user_id + self.folders = self.get_emby_folders() + self.serverid = self.get_server_id() + return True + + def authenticate(self, username: str, password: str) -> Optional[str]: + """ + 用户认证 + :param username: 用户名 + :param password: 密码 + :return: 认证token + """ + token, _ = self.__login(username, password) + if token: + logger.info(f"用户 {username} 极影视认证成功") + return token + + def get_user(self, user_name: Optional[str] = None) -> Optional[Union[str, int]]: + """ + 获取用户ID。 + 极影视使用登录态 token 时,不一定总能枚举全部用户,失败时回退当前登录用户。 + """ + if user_name and user_name == self._username and self.user: + return self.user + user_id = super().get_user(user_name) + if user_id: + return user_id + current_user = self.__get_current_user() + if current_user: + current_user_id = current_user.get("Id") + current_user_name = current_user.get("Name") + if current_user_id: + self.user = current_user_id + if not user_name or user_name == current_user_name: + return current_user_id + return self.user + + def get_user_count(self) -> int: + """ + 获取用户数量。 + 无法枚举用户时,至少返回当前登录用户数量。 + """ + count = super().get_user_count() + if count: + return count + return 1 if self.user else 0 + + def get_librarys(self, username: Optional[str] = None, + hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]: + """ + 获取媒体服务器所有媒体库列表 + """ + libraries = super().get_librarys(username=username, hidden=hidden) + for library in libraries or []: + library.server = "zspace" + library.server_type = "zspace" + return libraries + + def get_movies(self, title: str, year: Optional[str] = None, + tmdb_id: Optional[int] = None) -> Optional[List[schemas.MediaServerItem]]: + """ + 根据标题和年份,检查电影是否在极影视中存在,存在则返回列表 + """ + movies = super().get_movies(title=title, year=year, tmdb_id=tmdb_id) + for movie in movies or []: + movie.server = "zspace" + return movies + + def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]: + """ + 获取单个项目详情 + """ + item = super().get_iteminfo(itemid) + if item: + item.server = "zspace" + return item + + def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, + limit: Optional[int] = -1) -> Generator[schemas.MediaServerItem, Any, None]: + """ + 获取媒体服务器项目列表 + """ + for item in super().get_items(parent=parent, start_index=start_index, limit=limit) or []: + if item: + item.server = "zspace" + yield item + + def get_webhook_message(self, form: Any, args: dict) -> Optional[schemas.WebhookEventInfo]: + """ + 解析极影视 Webhook 报文 + """ + event_item = super().get_webhook_message(form, args) + if event_item: + event_item.channel = "zspace" + return event_item + + def get_resume(self, num: Optional[int] = 12, + username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]: + """ + 获得继续观看 + """ + items = super().get_resume(num=num, username=username) + for item in items or []: + item.server_type = "zspace" + return items + + def get_latest(self, num: Optional[int] = 20, + username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]: + """ + 获得最近更新 + """ + items = super().get_latest(num=num, username=username) + for item in items or []: + item.server_type = "zspace" + return items + + def __login(self, username: Optional[str], password: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + """ + 使用用户名密码登录极影视,返回访问令牌和用户ID + """ + if not self._host or not username or not password: + return None, None + url = f"{self._host}emby/Users/AuthenticateByName" + try: + res = RequestUtils(headers={ + 'X-Emby-Authorization': 'MediaBrowser Client="MoviePilot", ' + 'Device="requests", ' + 'DeviceId="1", ' + 'Version="1.0.0"', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + }).post_res( + url=url, + data=json.dumps({ + "Username": username, + "Pw": password + }) + ) + if res: + result = res.json() or {} + token = result.get("AccessToken") + user_id = result.get("User", {}).get("Id") + if token: + return token, user_id + else: + logger.error("Users/AuthenticateByName 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users/AuthenticateByName出错:{e}") + return None, None + + def __get_current_user(self) -> Optional[dict]: + """ + 获取当前登录用户信息 + """ + if not self._host or not self._apikey: + return None + url = f"{self._host}emby/Users/Me" + params = { + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + return res.json() + logger.error("Users/Me 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users/Me出错:{e}") + return None diff --git a/app/schemas/mediaserver.py b/app/schemas/mediaserver.py index 3c8dccae..d99a13bd 100644 --- a/app/schemas/mediaserver.py +++ b/app/schemas/mediaserver.py @@ -14,7 +14,7 @@ class ExistMediaInfo(BaseModel): type: Optional[MediaType] = None # 季 seasons: Optional[Dict[int, list]] = Field(default_factory=dict) - # 媒体服务器类型:plex、jellyfin、emby、trimemedia、ugreen + # 媒体服务器类型:plex、jellyfin、emby、zspace、trimemedia、ugreen server_type: Optional[str] = None # 媒体服务器名称 server: Optional[str] = None diff --git a/app/schemas/system.py b/app/schemas/system.py index 6b82b78a..3b575604 100644 --- a/app/schemas/system.py +++ b/app/schemas/system.py @@ -29,7 +29,7 @@ class MediaServerConf(BaseModel): # 名称 name: Optional[str] = None - # 类型 emby/jellyfin/plex/trimemedia/ugreen + # 类型 emby/zspace/jellyfin/plex/trimemedia/ugreen type: Optional[str] = None # 配置 config: Optional[dict] = Field(default_factory=dict) diff --git a/app/schemas/types.py b/app/schemas/types.py index d3057d46..a0990c3d 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -332,6 +332,8 @@ class DownloaderType(Enum): class MediaServerType(Enum): # Emby Emby = "Emby" + # 极影视 + ZSpace = "ZSpace" # Jellyfin Jellyfin = "Jellyfin" # Plex diff --git a/scripts/local_setup.py b/scripts/local_setup.py index f9b8f36b..d1db2bba 100644 --- a/scripts/local_setup.py +++ b/scripts/local_setup.py @@ -1685,6 +1685,7 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]: { "skip": "跳过", "emby": "Emby", + "zspace": "极影视", "jellyfin": "Jellyfin", "plex": "Plex", }, @@ -1696,6 +1697,7 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]: config_name = _prompt_text("媒体服务器名称", default=server_type) default_host = { "emby": "http://127.0.0.1:8096", + "zspace": "http://127.0.0.1:8096", "jellyfin": "http://127.0.0.1:8096", "plex": "http://127.0.0.1:32400", }[server_type] @@ -1707,6 +1709,12 @@ def _collect_media_server_config() -> Optional[dict[str, Any]]: "host": host, "token": _prompt_text("Plex Token", secret=True), } + elif server_type == "zspace": + config = { + "host": host, + "username": _prompt_text("极影视 用户名"), + "password": _prompt_text("极影视 密码", secret=True), + } else: config = { "host": host, diff --git a/tests/test_mediaserver_tv_stale_itemid.py b/tests/test_mediaserver_tv_stale_itemid.py index 7b2435e0..40a50f5f 100644 --- a/tests/test_mediaserver_tv_stale_itemid.py +++ b/tests/test_mediaserver_tv_stale_itemid.py @@ -9,6 +9,7 @@ from app.modules.jellyfin.jellyfin import Jellyfin from app.modules.plex.plex import Plex from app.modules.trimemedia.trimemedia import TrimeMedia from app.modules.ugreen.ugreen import Ugreen +from app.modules.zspace.zspace import ZSpace class _FakeResponse: @@ -152,6 +153,31 @@ class MediaServerTvStaleItemIdTest(unittest.TestCase): self.assertEqual(episodes, {1: [1]}) client._Jellyfin__get_jellyfin_series_id_by_name.assert_called_once_with("测试剧集", "2026") + def test_zspace_tv_episodes_fallback_when_cached_item_id_missing(self): + """极影视缓存ID失效时,应重新搜索剧集ID后再查询集信息。""" + client = ZSpace.__new__(ZSpace) + client._host = "http://zspace.local/" + client._apikey = "api-key" + client.user = "user-id" + client.get_iteminfo = Mock(side_effect=[None, SimpleNamespace(tmdbid=12345)]) + client._Emby__get_emby_series_id_by_name = Mock(return_value="new-series-id") + + with patch("app.modules.emby.emby.RequestUtils") as request_utils_cls: + request_utils_cls.return_value.get_res.return_value = _FakeResponse({ + "Items": [{"ParentIndexNumber": 1, "IndexNumber": 1}] + }) + + item_id, episodes = client.get_tv_episodes( + item_id="old-series-id", + title="测试剧集", + year="2026", + tmdb_id=12345, + ) + + self.assertEqual(item_id, "new-series-id") + self.assertEqual(episodes, {1: [1]}) + client._Emby__get_emby_series_id_by_name.assert_called_once_with("测试剧集", "2026") + def test_ugreen_tv_episodes_fallback_when_cached_item_id_missing(self): """绿联缓存ID失效时,应重新搜索剧集ID后再查询集信息。""" client = Ugreen.__new__(Ugreen) diff --git a/tests/test_zspace_mediaserver.py b/tests/test_zspace_mediaserver.py new file mode 100644 index 00000000..dd442257 --- /dev/null +++ b/tests/test_zspace_mediaserver.py @@ -0,0 +1,74 @@ +import unittest +from unittest.mock import Mock, patch + +from app.modules.zspace.zspace import ZSpace + + +class _FakeResponse: + def __init__(self, payload: dict | list): + self._payload = payload + + def json(self): + return self._payload + + +class ZSpaceMediaServerTest(unittest.TestCase): + def test_reconnect_uses_username_password_login(self): + login_request_utils = Mock() + login_request_utils.post_res.return_value = _FakeResponse({ + "AccessToken": "zspace-token", + "User": {"Id": "user-id"}, + }) + emby_request_utils = Mock() + emby_request_utils.get_res.side_effect = [ + _FakeResponse([]), + _FakeResponse({"Id": "server-id"}), + ] + + with patch("app.modules.zspace.zspace.RequestUtils", return_value=login_request_utils), patch( + "app.modules.emby.emby.RequestUtils", return_value=emby_request_utils + ): + client = ZSpace( + host="http://zspace.local", + username="admin", + password="secret", + ) + + self.assertEqual(client._apikey, "zspace-token") + self.assertEqual(client.user, "user-id") + self.assertEqual(client.serverid, "server-id") + + def test_get_user_falls_back_to_current_login_user(self): + client = ZSpace.__new__(ZSpace) + client._username = "admin" + client.user = "current-user-id" + client._ZSpace__get_current_user = Mock(return_value={"Id": "current-user-id", "Name": "admin"}) + + with patch("app.modules.emby.emby.Emby.get_user", return_value=None): + user_id = client.get_user("admin") + + self.assertEqual(user_id, "current-user-id") + + def test_authenticate_does_not_require_existing_api_key(self): + with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls: + request_utils_cls.return_value.post_res.return_value = _FakeResponse({ + "AccessToken": "user-token", + "User": {"Id": "user-id"}, + }) + + client = ZSpace.__new__(ZSpace) + client._host = "http://zspace.local/" + client._apikey = None + + token = client.authenticate("user", "password") + + self.assertEqual(token, "user-token") + headers = request_utils_cls.call_args.kwargs.get("headers") or {} + self.assertEqual( + headers.get("X-Emby-Authorization"), + 'MediaBrowser Client="MoviePilot", Device="requests", DeviceId="1", Version="1.0.0"', + ) + + +if __name__ == "__main__": + unittest.main()