Merge pull request #2783 from InfinityPacer/dev

This commit is contained in:
jxxghp
2024-09-28 06:51:44 +08:00
committed by GitHub
8 changed files with 180 additions and 127 deletions

View File

@@ -1,6 +1,6 @@
import json
import threading
from typing import List, Union, Optional
from typing import List, Union, Optional, Generator
from app import schemas
from app.chain import ChainBase
@@ -26,19 +26,47 @@ class MediaServerChain(ChainBase):
"""
return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden)
def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: int = 100) \
-> List[schemas.MediaServerItem]:
def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有项目
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
说明:
- 特别注意的是这里使用yield from返回迭代器避免同时使用return与yield导致Python生成器解析异常
- 如果 `limit` 为 None 或 -1 时,表示一次性获取所有数据,分页处理将不再生效
- 在这种情况下,内存消耗可能会较大,特别是在数据量非常大的场景下
- 如果未来评估结果显示,不分页场景下的内存消耗远大于分页处理时的网络请求开销,可以考虑在此方法中实现自分页的处理
- 即通过 `while` 循环在上层进行分页控制,逐步获取所有数据,避免内存爆炸,当前该逻辑由具体实例来实现不分页的处理
- Plex 实际上已默认支持内部分页处理Jellyfin 与 Emby 获取数据时存在内部过滤场景,如排除合集等,分页数据可能是错误的
if limit is not None and limit != -1:
yield from self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
else:
# 自分页逻辑,通过循环逐步获取所有数据
page_size = 10
while True:
data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=page_size)
if not data_generator:
break
count = 0
for item in data_generator:
if item:
count += 1
yield item
if count < page_size:
break
start_index += page_size
"""
data = []
data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
if data_generator:
for item in data_generator:
if item:
data.append(item)
return data
yield from self.run_module("mediaserver_items", server=server, library_id=library_id,
start_index=start_index, limit=limit)
def iteminfo(self, server: str, item_id: Union[str, int]) -> schemas.MediaServerItem:
"""
@@ -107,7 +135,7 @@ class MediaServerChain(ChainBase):
continue
logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...")
library_count = 0
for item in self.items(server_name, library.id):
for item in self.items(server=server_name, library_id=library.id):
if not item or not item.item_id:
continue
logger.debug(f"正在同步 {item.title} ...")

View File

@@ -173,14 +173,21 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]):
return server.get_librarys(username=username, hidden=hidden)
return None
def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Emby = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:

View File

@@ -313,7 +313,7 @@ class Emby:
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Items"
params={
params = {
"IncludeItemTypes": "Series",
"Fields": "ProductionYear",
"StartIndex": 0,
@@ -601,7 +601,8 @@ class Emby:
# 刷新根目录
return "/"
def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
@staticmethod
def __format_item_info(item) -> Optional[schemas.MediaServerItem]:
"""
格式化item
"""
@@ -610,7 +611,8 @@ class Emby:
if not user_data:
user_state = None
else:
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get(
"PlaybackPositionTicks") > 0
last_played_date = item.get("UserData", {}).get("LastPlayedDate")
if last_played_date is not None and "." in last_played_date:
last_played_date = last_played_date.split(".")[0]
@@ -624,7 +626,6 @@ class Emby:
)
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
id=item.get("Id"),
server="emby",
library=item.get("ParentId"),
item_id=item.get("Id"),
@@ -664,30 +665,34 @@ class Emby:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e))
return None
def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent:
yield None
if not self._host or not self._apikey:
yield None
if not parent or not self._host or not self._apikey:
return None
url = f"{self._host}emby/Users/{self.user}/Items"
params = {
"ParentId": parent,
"api_key": self._apikey,
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId",
"StartIndex": start_index,
"Limit": limit
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId"
}
if limit is not None and limit != -1:
params.update({
"StartIndex": start_index,
"Limit": limit
})
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
yield None
return None
items = res.json().get("Items") or []
for item in items:
if not item:
@@ -700,7 +705,6 @@ class Emby:
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield None
def get_webhook_message(self, form: any, args: dict) -> Optional[schemas.WebhookEventInfo]:
"""

View File

@@ -171,14 +171,21 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]):
return server.get_librarys(username=username, hidden=hidden)
return None
def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Jellyfin = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:

View File

@@ -662,8 +662,8 @@ class Jellyfin:
return eventItem
def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]:
@staticmethod
def __format_item_info(item) -> Optional[schemas.MediaServerItem]:
"""
格式化item
"""
@@ -672,7 +672,8 @@ class Jellyfin:
if not user_data:
user_state = None
else:
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get(
"PlaybackPositionTicks") > 0
last_played_date = item.get("UserData", {}).get("LastPlayedDate")
if last_played_date is not None and "." in last_played_date:
last_played_date = last_played_date.split(".")[0]
@@ -687,7 +688,6 @@ class Jellyfin:
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="jellyfin",
id=item.get("Id"),
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
@@ -725,30 +725,34 @@ class Jellyfin:
logger.error(f"连接Users/{self.user}/Items/{itemid}" + str(e))
return None
def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent:
yield None
if not self._host or not self._apikey:
yield None
if not parent or not self._host or not self._apikey:
return None
url = f"{self._host}Users/{self.user}/Items"
params = {
"parentId": parent,
"ParentId": parent,
"api_key": self._apikey,
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId",
"StartIndex": start_index,
"Limit": limit,
}
if limit is not None and limit != -1:
params.update({
"StartIndex": start_index,
"Limit": limit
})
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
yield None
return None
items = res.json().get("Items") or []
for item in items:
if not item:
@@ -760,7 +764,6 @@ class Jellyfin:
yield self.__format_item_info(item)
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield None
def get_data(self, url: str) -> Optional[Response]:
"""

View File

@@ -159,14 +159,21 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]):
return server.get_librarys(hidden)
return None
def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \
-> Optional[Generator]:
def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0,
limit: Optional[int] = -1) -> Optional[Generator]:
"""
媒体库项目列表
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param server: 媒体服务器名称
:param library_id: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
server: Plex = self.get_instance(server)
if server:
return server.get_items(library_id, start_index, limit)
yield from server.get_items(library_id, start_index, limit)
return None
def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]:

View File

@@ -392,23 +392,7 @@ class Plex:
return None
try:
item = self.__fetch_item(itemid)
ids = self.__get_ids(item.guids)
path = None
if item.locations:
path = item.locations[0]
return schemas.MediaServerItem(
server="plex",
library=item.librarySectionID,
item_id=item.key,
item_type=item.type,
title=item.title,
original_title=item.originalTitle,
year=item.year,
tmdbid=ids['tmdb_id'],
imdbid=ids['imdb_id'],
tvdbid=ids['tvdb_id'],
path=path,
)
return self.__build_media_server_item(item)
except Exception as err:
logger.error(f"获取项目详情出错:{str(err)}")
return None
@@ -454,64 +438,77 @@ class Plex:
item_id = int(item_id)
return self._plex.fetchItem(item_id)
def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator:
def __build_media_server_item(self, item) -> Optional[schemas.MediaServerItem]:
"""
获取媒体服务器所有媒体库列表
:param parent: 父媒体库ID
:param start_index: 开始索引,用于分页
:param limit: 每次请求返回的项目数量
:return: 生成器 schemas.MediaServerItem
构造MediaServerItem
:param item: Plex媒体项目
:return: MediaServerItem
"""
if not parent:
yield None
if not self._plex:
yield None
if not item:
return None
ids = self.__get_ids(item.guids)
path = item.locations[0] if item.locations else None
playback_position = getattr(item, "viewOffset", None) or 0
duration = getattr(item, "duration", None) or 0
percentage = (playback_position / duration * 100) if duration > 0 else None
played = getattr(item, "isPlayed", None) or False
play_count = getattr(item, "viewCount", None) or 0
last_played_date = getattr(item, "lastViewedAt", None)
user_state = schemas.MediaServerItemUserState(
played=played,
resume=playback_position > 0,
last_played_date=last_played_date.isoformat() if last_played_date and hasattr(last_played_date,
'isoformat') else None,
play_count=play_count,
percentage=percentage,
)
return schemas.MediaServerItem(
server="plex",
library=item.librarySectionID,
item_id=item.key,
item_type=item.type,
title=item.title,
original_title=item.originalTitle,
year=item.year,
tmdbid=ids.get("tmdb_id"),
imdbid=ids.get("imdb_id"),
tvdbid=ids.get("tvdb_id"),
path=path,
user_state=user_state,
)
def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \
-> Optional[Generator]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent or not self._plex:
return None
try:
section = self._plex.library.sectionByID(int(parent))
if section:
for item in section.all(container_start=start_index, limit=limit):
if limit is None or limit == -1:
items = section.all()
else:
items = section.all(container_start=start_index, container_size=limit, maxresults=limit)
for item in items:
try:
if not item:
continue
ids = self.__get_ids(item.guids)
path = None
if item.locations:
path = item.locations[0]
playback_position = item.viewOffset if hasattr(item, 'viewOffset') else 0
duration = item.duration if hasattr(item, 'duration') else 0
percentage = (playback_position / duration * 100) if duration > 0 else None
played = item.isPlayed if hasattr(item, 'isPlayed') else False
play_count = item.viewCount if hasattr(item, 'viewCount') else 0
last_played_date = item.lastViewedAt if hasattr(item, 'lastViewedAt') else None
user_state = schemas.MediaServerItemUserState(
played=played,
resume=playback_position > 0,
last_played_date=last_played_date.isoformat() if last_played_date else None,
play_count=play_count,
percentage=percentage,
)
yield schemas.MediaServerItem(
id=item.ratingKey,
server="plex",
library=item.librarySectionID,
item_id=item.key,
item_type=item.type,
title=item.title,
original_title=item.originalTitle,
year=item.year,
tmdbid=ids['tmdb_id'],
imdbid=ids['imdb_id'],
tvdbid=ids['tvdb_id'],
path=path,
user_state=user_state,
)
yield self.__build_media_server_item(item)
except Exception as e:
logger.error(f"处理媒体项目时出错:{str(e)}, 跳过此项目")
logger.error(f"处理媒体项目时出错:{str(e)}, 跳过此项目")
continue
except Exception as err:
logger.error(f"获取媒体库列表出错:{str(err)}")
yield None
def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]:
"""

View File

@@ -72,7 +72,6 @@ class MediaServerLibrary(BaseModel):
link: Optional[str] = None
class MediaServerItemUserState(BaseModel):
# 已播放
played: Optional[bool] = None
@@ -85,6 +84,7 @@ class MediaServerItemUserState(BaseModel):
# 播放进度
percentage: Optional[float] = None
class MediaServerItem(BaseModel):
"""
媒体服务器媒体信息