From 1add203c0e225c17ba8e59322441fbd03f2b5fa6 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sat, 28 Sep 2024 00:57:13 +0800 Subject: [PATCH] fix(#2755): refactor pagination and fix media sync DB issue --- app/chain/mediaserver.py | 54 ++++++++++++++++++++++++-------- app/modules/emby/__init__.py | 15 ++++++--- app/modules/emby/emby.py | 40 ++++++++++++----------- app/modules/jellyfin/__init__.py | 15 ++++++--- app/modules/jellyfin/jellyfin.py | 39 ++++++++++++----------- app/modules/plex/__init__.py | 15 ++++++--- app/modules/plex/plex.py | 29 +++++++++-------- app/schemas/mediaserver.py | 2 +- 8 files changed, 134 insertions(+), 75 deletions(-) diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index b00d1fe3..54f4da2c 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -1,6 +1,6 @@ import json import threading -from typing import List, Union, Optional +from typing import List, Union, Optional, Generator from app import schemas from app.chain import ChainBase @@ -26,19 +26,47 @@ class MediaServerChain(ChainBase): """ return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden) - def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: int = 100) \ - -> List[schemas.MediaServerItem]: + def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有项目 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 + + 说明: + - 特别注意的是,这里使用yield from返回迭代器,避免同时使用return与yield导致Python生成器解析异常 + - 如果 `limit` 为 None 或 -1 时,表示一次性获取所有数据,分页处理将不再生效 + - 在这种情况下,内存消耗可能会较大,特别是在数据量非常大的场景下 + - 如果未来评估结果显示,不分页场景下的内存消耗远大于分页处理时的网络请求开销,可以考虑在此方法中实现自分页的处理 + - 即通过 `while` 循环在上层进行分页控制,逐步获取所有数据,避免内存爆炸,当前该逻辑由具体实例来实现不分页的处理 + - Plex 实际上已默认支持内部分页处理,Jellyfin 与 Emby 获取数据时存在内部过滤场景,如排除合集等,分页数据可能是错误的 + if limit is not None and limit != -1: + yield from self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=limit) + else: + # 自分页逻辑,通过循环逐步获取所有数据 + page_size = 10 + while True: + data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=page_size) + if not data_generator: + break + count = 0 + for item in data_generator: + if item: + count += 1 + yield item + if count < page_size: + break + start_index += page_size """ - data = [] - data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id, - start_index=start_index, limit=limit) - if data_generator: - for item in data_generator: - if item: - data.append(item) - return data + yield from self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=limit) def iteminfo(self, server: str, item_id: Union[str, int]) -> schemas.MediaServerItem: """ @@ -107,7 +135,7 @@ class MediaServerChain(ChainBase): continue logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...") library_count = 0 - for item in self.items(server_name, library.id): + for item in self.items(server=server_name, library_id=library.id): if not item or not item.item_id: continue logger.debug(f"正在同步 {item.title} ...") diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index 20432cf9..11d22b77 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -173,14 +173,21 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]): return server.get_librarys(username=username, hidden=hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Emby = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/emby/emby.py b/app/modules/emby/emby.py index 0c8414fa..d35a8b38 100644 --- a/app/modules/emby/emby.py +++ b/app/modules/emby/emby.py @@ -313,7 +313,7 @@ class Emby: if not self._host or not self._apikey: return None url = f"{self._host}emby/Items" - params={ + params = { "IncludeItemTypes": "Series", "Fields": "ProductionYear", "StartIndex": 0, @@ -601,7 +601,8 @@ class Emby: # 刷新根目录 return "/" - def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]: + @staticmethod + def __format_item_info(item) -> Optional[schemas.MediaServerItem]: """ 格式化item """ @@ -610,7 +611,8 @@ class Emby: if not user_data: user_state = None else: - resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0 + resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get( + "PlaybackPositionTicks") > 0 last_played_date = item.get("UserData", {}).get("LastPlayedDate") if last_played_date is not None and "." in last_played_date: last_played_date = last_played_date.split(".")[0] @@ -624,7 +626,6 @@ class Emby: ) tmdbid = item.get("ProviderIds", {}).get("Tmdb") return schemas.MediaServerItem( - id=item.get("Id"), server="emby", library=item.get("ParentId"), item_id=item.get("Id"), @@ -664,26 +665,30 @@ class Emby: logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e)) return None - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._host or not self._apikey: - yield None + if not parent or not self._host or not self._apikey: + return None url = f"{self._host}emby/Users/{self.user}/Items" params = { "ParentId": parent, "api_key": self._apikey, - "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId", - "StartIndex": start_index, - "Limit": limit + "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId" } + if limit is not None and limit != -1: + params.update({ + "StartIndex": start_index, + "Limit": limit + }) try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: @@ -700,7 +705,6 @@ class Emby: except Exception as e: logger.error(f"连接Users/Items出错:" + str(e)) - yield None def get_webhook_message(self, form: any, args: dict) -> Optional[schemas.WebhookEventInfo]: """ diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index c03bb8d3..35718fe1 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -171,14 +171,21 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]): return server.get_librarys(username=username, hidden=hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Jellyfin = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/jellyfin/jellyfin.py b/app/modules/jellyfin/jellyfin.py index 65848527..3b5e31b9 100644 --- a/app/modules/jellyfin/jellyfin.py +++ b/app/modules/jellyfin/jellyfin.py @@ -662,8 +662,8 @@ class Jellyfin: return eventItem - - def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]: + @staticmethod + def __format_item_info(item) -> Optional[schemas.MediaServerItem]: """ 格式化item """ @@ -672,7 +672,8 @@ class Jellyfin: if not user_data: user_state = None else: - resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0 + resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get( + "PlaybackPositionTicks") > 0 last_played_date = item.get("UserData", {}).get("LastPlayedDate") if last_played_date is not None and "." in last_played_date: last_played_date = last_played_date.split(".")[0] @@ -687,7 +688,6 @@ class Jellyfin: tmdbid = item.get("ProviderIds", {}).get("Tmdb") return schemas.MediaServerItem( server="jellyfin", - id=item.get("Id"), library=item.get("ParentId"), item_id=item.get("Id"), item_type=item.get("Type"), @@ -725,26 +725,30 @@ class Jellyfin: logger.error(f"连接Users/{self.user}/Items/{itemid}:" + str(e)) return None - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._host or not self._apikey: - yield None + if not parent or not self._host or not self._apikey: + return None url = f"{self._host}Users/{self.user}/Items" params = { - "parentId": parent, + "ParentId": parent, "api_key": self._apikey, "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId", - "StartIndex": start_index, - "Limit": limit, } + if limit is not None and limit != -1: + params.update({ + "StartIndex": start_index, + "Limit": limit + }) try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: @@ -760,7 +764,6 @@ class Jellyfin: yield self.__format_item_info(item) except Exception as e: logger.error(f"连接Users/Items出错:" + str(e)) - yield None def get_data(self, url: str) -> Optional[Response]: """ diff --git a/app/modules/plex/__init__.py b/app/modules/plex/__init__.py index afe108d7..dfe339e0 100644 --- a/app/modules/plex/__init__.py +++ b/app/modules/plex/__init__.py @@ -159,14 +159,21 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]): return server.get_librarys(hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Plex = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index 27f39f07..ffcde53d 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -465,7 +465,6 @@ class Plex: ) return schemas.MediaServerItem( - id=item.ratingKey, server="plex", library=item.librarySectionID, item_id=item.key, @@ -480,22 +479,27 @@ class Plex: user_state=user_state, ) - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._plex: - yield None + if not parent or not self._plex: + return None try: section = self._plex.library.sectionByID(int(parent)) if section: - for item in section.all(container_start=start_index, limit=limit): + if limit is None or limit == -1: + items = section.all() + else: + items = section.all(container_start=start_index, container_size=limit, maxresults=limit) + for item in items: try: if not item: continue @@ -505,7 +509,6 @@ class Plex: continue except Exception as err: logger.error(f"获取媒体库列表出错:{str(err)}") - yield None def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]: """ diff --git a/app/schemas/mediaserver.py b/app/schemas/mediaserver.py index 807f5c15..825043fc 100644 --- a/app/schemas/mediaserver.py +++ b/app/schemas/mediaserver.py @@ -72,7 +72,6 @@ class MediaServerLibrary(BaseModel): link: Optional[str] = None - class MediaServerItemUserState(BaseModel): # 已播放 played: Optional[bool] = None @@ -85,6 +84,7 @@ class MediaServerItemUserState(BaseModel): # 播放进度 percentage: Optional[float] = None + class MediaServerItem(BaseModel): """ 媒体服务器媒体信息