fix mediaservers

This commit is contained in:
jxxghp
2024-07-02 10:03:56 +08:00
parent 9a07d88d41
commit dfc5872087
7 changed files with 408 additions and 267 deletions

View File

@@ -1,7 +1,8 @@
from typing import Optional, Tuple, Union, Any, List, Generator
from typing import Optional, Tuple, Union, Any, List, Generator, Dict
from app import schemas
from app.core.context import MediaInfo
from app.helper.mediaserver import MediaServerHelper
from app.log import logger
from app.modules import _ModuleBase
from app.modules.emby.emby import Emby
@@ -9,10 +10,27 @@ from app.schemas.types import MediaType
class EmbyModule(_ModuleBase):
emby: Emby = None
_servers: Dict[str, Emby] = {}
def init_module(self) -> None:
self.emby = Emby()
"""
初始化模块
"""
# 读取媒体服务器配置
self._servers = {}
mediaservers = MediaServerHelper().get_mediaservers()
if not mediaservers:
return
# 读取Emby配置
for server in mediaservers:
if server.type == "emby":
self._servers[server.name] = Emby(**server.config)
def get_server(self, name: str) -> Optional[Emby]:
"""
获取Emby服务器
"""
return self._servers.get(name)
@staticmethod
def get_name() -> str:
@@ -25,22 +43,27 @@ class EmbyModule(_ModuleBase):
"""
测试模块连接性
"""
if self.emby.is_inactive():
self.emby.reconnect()
if not self.emby.get_user():
return False, "无法连接Emby请检查参数配置"
if not self._servers:
return False, "未配置Emby服务器"
for name, server in self._servers.items():
if server.is_inactive():
server.reconnect()
if not server.get_user():
return False, f"无法连接Emby服务器{name}"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
return "MEDIASERVER", "emby"
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
# 定时重连
if self.emby.is_inactive():
self.emby.reconnect()
for name, server in self._servers.items():
if server.is_inactive():
logger.info(f"Emby服务器 {name} 连接断开,尝试重连 ...")
server.reconnect()
def user_authenticate(self, name: str, password: str) -> Optional[str]:
"""
@@ -50,7 +73,11 @@ class EmbyModule(_ModuleBase):
:return: token or None
"""
# Emby认证
return self.emby.authenticate(name, password)
for server in self._servers.values():
result = server.authenticate(name, password)
if result:
return result
return None
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]:
"""
@@ -60,7 +87,11 @@ class EmbyModule(_ModuleBase):
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
return self.emby.get_webhook_message(form, args)
for server in self._servers.values():
result = server.get_webhook_message(form, args)
if result:
return result
return None
def media_exists(self, mediainfo: MediaInfo, itemid: str = None) -> Optional[schemas.ExistMediaInfo]:
"""
@@ -69,86 +100,97 @@ class EmbyModule(_ModuleBase):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = self.emby.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库中已存在:{movie}")
for name, server in self._servers.values():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库 {name} 中找到了 {movie}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server=name,
itemid=movie.item_id
)
movies = server.get_movies(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"媒体库 {name} 中找到了 {movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="emby",
itemid=movie.item_id
server=name,
itemid=movies[0].item_id
)
movies = self.emby.get_movies(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="emby",
itemid=movies[0].item_id
)
else:
itemid, tvs = self.emby.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"{mediainfo.title_year} 媒体库中已存在:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server="emby",
itemid=itemid
)
itemid, tvs = server.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server=name,
itemid=itemid
)
return None
def media_statistic(self) -> List[schemas.Statistic]:
"""
媒体数量统计
"""
media_statistic = self.emby.get_medias_count()
media_statistic.user_count = self.emby.get_user_count()
return [media_statistic]
media_statistics = []
for server in self._servers.values():
media_statistic = server.get_medias_count()
media_statistic.user_count = server.get_user_count()
if media_statistic:
media_statistics.append(media_statistic)
return media_statistics
def mediaserver_librarys(self, server: str = None, username: str = None) -> Optional[List[schemas.MediaServerLibrary]]:
def mediaserver_librarys(self, server: str,
username: str = None) -> Optional[List[schemas.MediaServerLibrary]]:
"""
媒体库列表
"""
if server and server != "emby":
return None
return self.emby.get_librarys(username)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_librarys(username)
return None
def mediaserver_items(self, server: str, library_id: str) -> Optional[Generator]:
"""
媒体库项目列表
"""
if server != "emby":
return None
return self.emby.get_items(library_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_items(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情
"""
if server != "emby":
return None
return self.emby.get_iteminfo(item_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_iteminfo(item_id)
return None
def mediaserver_tv_episodes(self, server: str,
item_id: Union[str, int]) -> Optional[List[schemas.MediaServerSeasonInfo]]:
"""
获取剧集信息
"""
if server != "emby":
server_obj = self.get_server(server)
if not server_obj:
return None
_, seasoninfo = self.emby.get_tv_episodes(item_id=item_id)
_, seasoninfo = server_obj.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
@@ -156,28 +198,31 @@ class EmbyModule(_ModuleBase):
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, count: int = 20,
server: str = None, username: str = None) -> List[schemas.MediaServerPlayItem]:
def mediaserver_playing(self, server: str,
count: int = 20, username: str = None) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server and server != "emby":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.emby.get_resume(num=count, username=username)
return server_obj.get_resume(num=count, username=username)
def mediaserver_play_url(self, server: str, item_id: Union[str, int]) -> Optional[str]:
"""
获取媒体库播放地址
"""
if server != "emby":
server_obj = self.get_server(server)
if not server_obj:
return None
return self.emby.get_play_url(item_id)
return server_obj.get_play_url(item_id)
def mediaserver_latest(self, count: int = 20,
server: str = None, username: str = None) -> List[schemas.MediaServerPlayItem]:
def mediaserver_latest(self, server: str,
count: int = 20, username: str = None) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server and server != "emby":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.emby.get_latest(num=count, username=username)
return server_obj.get_latest(num=count, username=username)

View File

@@ -1,7 +1,8 @@
from typing import Optional, Tuple, Union, Any, List, Generator
from typing import Optional, Tuple, Union, Any, List, Generator, Dict
from app import schemas
from app.core.context import MediaInfo
from app.helper.mediaserver import MediaServerHelper
from app.log import logger
from app.modules import _ModuleBase
from app.modules.jellyfin.jellyfin import Jellyfin
@@ -9,25 +10,44 @@ from app.schemas.types import MediaType
class JellyfinModule(_ModuleBase):
jellyfin: Jellyfin = None
_servers: Dict[str, Jellyfin] = {}
def init_module(self) -> None:
self.jellyfin = Jellyfin()
"""
初始化模块
"""
# 读取媒体服务器配置
self._servers = {}
mediaservers = MediaServerHelper().get_mediaservers()
if not mediaservers:
return
# 读取Jelly配置
for server in mediaservers:
if server.type == "jellyfin":
self._servers[server.name] = Jellyfin(**server.config)
def get_server(self, name: str) -> Optional[Jellyfin]:
"""
获取Jellyfin服务器
"""
return self._servers.get(name)
@staticmethod
def get_name() -> str:
return "Jellyfin"
def init_setting(self) -> Tuple[str, Union[str, bool]]:
return "MEDIASERVER", "jellyfin"
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
# 定时重连
if self.jellyfin.is_inactive():
self.jellyfin.reconnect()
for name, server in self._servers.items():
if server.is_inactive():
logger.info(f"Jellyfin {name} 服务器连接断开,尝试重连 ...")
server.reconnect()
def stop(self):
pass
@@ -36,10 +56,13 @@ class JellyfinModule(_ModuleBase):
"""
测试模块连接性
"""
if self.jellyfin.is_inactive():
self.jellyfin.reconnect()
if not self.jellyfin.get_user():
return False, "无法连接Jellyfin请检查参数配置"
if not self._servers:
return False, "未配置Jellyfin服务器"
for name, server in self._servers.items():
if server.is_inactive():
server.reconnect()
if not server.get_user():
return False, f"无法连接Jellyfin服务器{name}"
return True, ""
def user_authenticate(self, name: str, password: str) -> Optional[str]:
@@ -50,7 +73,11 @@ class JellyfinModule(_ModuleBase):
:return: Token or None
"""
# Jellyfin认证
return self.jellyfin.authenticate(name, password)
for server in self._servers.values():
result = server.authenticate(name, password)
if result:
return result
return None
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]:
"""
@@ -60,7 +87,11 @@ class JellyfinModule(_ModuleBase):
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
return self.jellyfin.get_webhook_message(body)
for server in self._servers.values():
result = server.get_webhook_message(body)
if result:
return result
return None
def media_exists(self, mediainfo: MediaInfo, itemid: str = None) -> Optional[schemas.ExistMediaInfo]:
"""
@@ -69,84 +100,95 @@ class JellyfinModule(_ModuleBase):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = self.jellyfin.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库中已存在:{movie}")
for name, server in self._servers.items():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库 {name} 中找到了 {movie}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server=name,
itemid=movie.item_id
)
movies = server.get_movies(title=mediainfo.title, year=mediainfo.year, tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"媒体库 {name} 中找到了 {movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="jellyfin",
itemid=movie.item_id
server=name,
itemid=movies[0].item_id
)
movies = self.jellyfin.get_movies(title=mediainfo.title, year=mediainfo.year, tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="jellyfin",
itemid=movies[0].item_id
)
else:
itemid, tvs = self.jellyfin.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"{mediainfo.title_year} 媒体库中已存在:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server="jellyfin",
itemid=itemid
)
itemid, tvs = server.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server=name,
itemid=itemid
)
return None
def media_statistic(self) -> List[schemas.Statistic]:
"""
媒体数量统计
"""
media_statistic = self.jellyfin.get_medias_count()
media_statistic.user_count = self.jellyfin.get_user_count()
return [media_statistic]
media_statistics = []
for server in self._servers.values():
media_statistic = server.get_medias_count()
media_statistic.user_count = server.get_user_count()
if media_statistic:
media_statistics.append(media_statistic)
return media_statistics
def mediaserver_librarys(self, server: str = None, username: str = None) -> Optional[List[schemas.MediaServerLibrary]]:
def mediaserver_librarys(self, server: str = None,
username: str = None) -> Optional[List[schemas.MediaServerLibrary]]:
"""
媒体库列表
"""
if server and server != "jellyfin":
return None
return self.jellyfin.get_librarys(username)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_librarys(username)
return None
def mediaserver_items(self, server: str, library_id: str) -> Optional[Generator]:
"""
媒体库项目列表
"""
if server != "jellyfin":
return None
return self.jellyfin.get_items(library_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_items(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情
"""
if server != "jellyfin":
return None
return self.jellyfin.get_iteminfo(item_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_iteminfo(item_id)
return None
def mediaserver_tv_episodes(self, server: str,
item_id: Union[str, int]) -> Optional[List[schemas.MediaServerSeasonInfo]]:
"""
获取剧集信息
"""
if server != "jellyfin":
server_obj = self.get_server(server)
if not server_obj:
return None
_, seasoninfo = self.jellyfin.get_tv_episodes(item_id=item_id)
_, seasoninfo = server_obj.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
@@ -154,28 +196,31 @@ class JellyfinModule(_ModuleBase):
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, count: int = 20,
server: str = None, username: str = None) -> List[schemas.MediaServerPlayItem]:
def mediaserver_playing(self, server: str,
count: int = 20, username: str = None) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server and server != "jellyfin":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.jellyfin.get_resume(num=count, username=username)
return server_obj.get_resume(num=count, username=username)
def mediaserver_play_url(self, server: str, item_id: Union[str, int]) -> Optional[str]:
"""
获取媒体库播放地址
"""
if server != "jellyfin":
server_obj = self.get_server(server)
if not server_obj:
return None
return self.jellyfin.get_play_url(item_id)
return server_obj.get_play_url(item_id)
def mediaserver_latest(self, count: int = 20,
server: str = None, username: str = None) -> List[schemas.MediaServerPlayItem]:
def mediaserver_latest(self, server: str,
count: int = 20, username: str = None) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server and server != "jellyfin":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.jellyfin.get_latest(num=count, username=username)
return server_obj.get_latest(num=count, username=username)

View File

@@ -1,7 +1,8 @@
from typing import Optional, Tuple, Union, Any, List, Generator
from typing import Optional, Tuple, Union, Any, List, Generator, Dict
from app import schemas
from app.core.context import MediaInfo
from app.helper.mediaserver import MediaServerHelper
from app.log import logger
from app.modules import _ModuleBase
from app.modules.plex.plex import Plex
@@ -9,15 +10,32 @@ from app.schemas.types import MediaType
class PlexModule(_ModuleBase):
plex: Plex = None
_servers: Dict[str, Plex] = {}
def init_module(self) -> None:
self.plex = Plex()
"""
初始化模块
"""
# 读取媒体服务器配置
self._servers = {}
mediaservers = MediaServerHelper().get_mediaservers()
if not mediaservers:
return
# 读取Emby配置
for server in mediaservers:
if server.type == "plex":
self._servers[server.name] = Plex(**server.config)
@staticmethod
def get_name() -> str:
return "Plex"
def get_server(self, name: str) -> Optional[Plex]:
"""
获取Plex服务器
"""
return self._servers.get(name)
def stop(self):
pass
@@ -25,22 +43,27 @@ class PlexModule(_ModuleBase):
"""
测试模块连接性
"""
if self.plex.is_inactive():
self.plex.reconnect()
if not self.plex.get_librarys():
return False, "无法连接Plex请检查参数配置"
if not self._servers:
return False, "未配置Plex服务器"
for name, server in self._servers.items():
if server.is_inactive():
server.reconnect()
if not server.get_librarys():
return False, f"无法连接Plex服务器{name}"
return True, ""
def init_setting(self) -> Tuple[str, Union[str, bool]]:
return "MEDIASERVER", "plex"
pass
def scheduler_job(self) -> None:
"""
定时任务每10分钟调用一次
"""
# 定时重连
if self.plex.is_inactive():
self.plex.reconnect()
for name, server in self._servers.items():
if server.is_inactive():
logger.info(f"Plex {name} 服务器连接断开,尝试重连 ...")
server.reconnect()
def webhook_parser(self, body: Any, form: Any, args: Any) -> Optional[schemas.WebhookEventInfo]:
"""
@@ -50,7 +73,11 @@ class PlexModule(_ModuleBase):
:param args: 请求参数
:return: 字典解析为消息时需要包含title、text、image
"""
return self.plex.get_webhook_message(form)
for server in self._servers.values():
result = server.get_webhook_message(body)
if result:
return result
return None
def media_exists(self, mediainfo: MediaInfo, itemid: str = None) -> Optional[schemas.ExistMediaInfo]:
"""
@@ -59,88 +86,98 @@ class PlexModule(_ModuleBase):
:param itemid: 媒体服务器ItemID
:return: 如不存在返回None存在时返回信息包括每季已存在所有集{type: movie/tv, seasons: {season: [episodes]}}
"""
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = self.plex.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库中已存在:{movie}")
for name, server in self._servers.items():
if mediainfo.type == MediaType.MOVIE:
if itemid:
movie = server.get_iteminfo(itemid)
if movie:
logger.info(f"媒体库 {name} 中找到了 {movie}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server=name,
itemid=movie.item_id
)
movies = server.get_movies(title=mediainfo.title,
original_title=mediainfo.original_title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"媒体库 {name} 中找到了 {movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="plex",
itemid=movie.item_id
server=name,
itemid=movies[0].item_id
)
movies = self.plex.get_movies(title=mediainfo.title,
original_title=mediainfo.original_title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id)
if not movies:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return schemas.ExistMediaInfo(
type=MediaType.MOVIE,
server="plex",
itemid=movies[0].item_id
)
else:
item_id, tvs = self.plex.get_tv_episodes(title=mediainfo.title,
original_title=mediainfo.original_title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 在媒体库中不存在")
return None
else:
logger.info(f"{mediainfo.title_year} 媒体库中已存在:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server="plex",
itemid=item_id
)
item_id, tvs = server.get_tv_episodes(title=mediainfo.title,
original_title=mediainfo.original_title,
year=mediainfo.year,
tmdb_id=mediainfo.tmdb_id,
item_id=itemid)
if not tvs:
logger.info(f"{mediainfo.title_year} 没有在媒体库 {name}")
continue
else:
logger.info(f"{mediainfo.title_year} 在媒体库 {name} 中找到了这些季集:{tvs}")
return schemas.ExistMediaInfo(
type=MediaType.TV,
seasons=tvs,
server=name,
itemid=item_id
)
return None
def media_statistic(self) -> List[schemas.Statistic]:
"""
媒体数量统计
"""
media_statistic = self.plex.get_medias_count()
media_statistic.user_count = 1
return [media_statistic]
media_statistics = []
for server in self._servers.values():
media_statistic = server.get_medias_count()
media_statistic.user_count = 1
if media_statistic:
media_statistics.append(media_statistic)
return media_statistics
def mediaserver_librarys(self, server: str = None, **kwargs) -> Optional[List[schemas.MediaServerLibrary]]:
"""
媒体库列表
"""
if server and server != "plex":
return None
return self.plex.get_librarys()
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_librarys()
return None
def mediaserver_items(self, server: str, library_id: str) -> Optional[Generator]:
"""
媒体库项目列表
"""
if server != "plex":
return None
return self.plex.get_items(library_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_items(library_id)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:
"""
媒体库项目详情
"""
if server != "plex":
return None
return self.plex.get_iteminfo(item_id)
server_obj = self.get_server(server)
if server_obj:
return server_obj.get_iteminfo(item_id)
return None
def mediaserver_tv_episodes(self, server: str,
item_id: Union[str, int]) -> Optional[List[schemas.MediaServerSeasonInfo]]:
"""
获取剧集信息
"""
if server != "plex":
server_obj = self.get_server(server)
if not server_obj:
return None
_, seasoninfo = self.plex.get_tv_episodes(item_id=item_id)
_, seasoninfo = server_obj.get_tv_episodes(item_id=item_id)
if not seasoninfo:
return []
return [schemas.MediaServerSeasonInfo(
@@ -148,26 +185,29 @@ class PlexModule(_ModuleBase):
episodes=episodes
) for season, episodes in seasoninfo.items()]
def mediaserver_playing(self, count: int = 20, server: str = None, **kwargs) -> List[schemas.MediaServerPlayItem]:
def mediaserver_playing(self, server: str, count: int = 20, **kwargs) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器正在播放信息
"""
if server and server != "plex":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.plex.get_resume(count)
return server_obj.get_resume(num=count)
def mediaserver_latest(self, count: int = 20, server: str = None, **kwargs) -> List[schemas.MediaServerPlayItem]:
def mediaserver_latest(self, server: str, count: int = 20, **kwargs) -> List[schemas.MediaServerPlayItem]:
"""
获取媒体服务器最新入库条目
"""
if server and server != "plex":
server_obj = self.get_server(server)
if not server_obj:
return []
return self.plex.get_latest(count)
return server_obj.get_latest(num=count)
def mediaserver_play_url(self, server: str, item_id: Union[str, int]) -> Optional[str]:
"""
获取媒体库播放地址
"""
if server != "plex":
server_obj = self.get_server(server)
if not server_obj:
return None
return self.plex.get_play_url(item_id)
return server_obj.get_play_url(item_id)