From 48122d8d9a0c2d0011c524ccd152533a18be9934 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 27 Sep 2024 17:23:27 +0800 Subject: [PATCH 1/3] fix(#2755): handle Plex None values and exceptions in item builder --- app/modules/plex/plex.py | 96 +++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 51 deletions(-) diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index d6b120d5..27f39f07 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -392,23 +392,7 @@ class Plex: return None try: item = self.__fetch_item(itemid) - ids = self.__get_ids(item.guids) - path = None - if item.locations: - path = item.locations[0] - return schemas.MediaServerItem( - server="plex", - library=item.librarySectionID, - item_id=item.key, - item_type=item.type, - title=item.title, - original_title=item.originalTitle, - year=item.year, - tmdbid=ids['tmdb_id'], - imdbid=ids['imdb_id'], - tvdbid=ids['tvdb_id'], - path=path, - ) + return self.__build_media_server_item(item) except Exception as err: logger.error(f"获取项目详情出错:{str(err)}") return None @@ -454,6 +438,48 @@ class Plex: item_id = int(item_id) return self._plex.fetchItem(item_id) + def __build_media_server_item(self, item) -> Optional[schemas.MediaServerItem]: + """ + 构造MediaServerItem + :param item: Plex媒体项目 + :return: MediaServerItem + """ + if not item: + return None + ids = self.__get_ids(item.guids) + path = item.locations[0] if item.locations else None + playback_position = getattr(item, "viewOffset", None) or 0 + duration = getattr(item, "duration", None) or 0 + percentage = (playback_position / duration * 100) if duration > 0 else None + played = getattr(item, "isPlayed", None) or False + play_count = getattr(item, "viewCount", None) or 0 + last_played_date = getattr(item, "lastViewedAt", None) + + user_state = schemas.MediaServerItemUserState( + played=played, + resume=playback_position > 0, + last_played_date=last_played_date.isoformat() if last_played_date and hasattr(last_played_date, + 'isoformat') else None, + play_count=play_count, + percentage=percentage, + ) + + return schemas.MediaServerItem( + id=item.ratingKey, + server="plex", + library=item.librarySectionID, + item_id=item.key, + item_type=item.type, + title=item.title, + original_title=item.originalTitle, + year=item.year, + tmdbid=ids.get("tmdb_id"), + imdbid=ids.get("imdb_id"), + tvdbid=ids.get("tvdb_id"), + path=path, + user_state=user_state, + ) + def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: """ 获取媒体服务器所有媒体库列表 @@ -473,41 +499,9 @@ class Plex: try: if not item: continue - ids = self.__get_ids(item.guids) - path = None - if item.locations: - path = item.locations[0] - playback_position = item.viewOffset if hasattr(item, 'viewOffset') else 0 - duration = item.duration if hasattr(item, 'duration') else 0 - percentage = (playback_position / duration * 100) if duration > 0 else None - played = item.isPlayed if hasattr(item, 'isPlayed') else False - play_count = item.viewCount if hasattr(item, 'viewCount') else 0 - last_played_date = item.lastViewedAt if hasattr(item, 'lastViewedAt') else None - user_state = schemas.MediaServerItemUserState( - played=played, - resume=playback_position > 0, - last_played_date=last_played_date.isoformat() if last_played_date else None, - play_count=play_count, - percentage=percentage, - ) - - yield schemas.MediaServerItem( - id=item.ratingKey, - server="plex", - library=item.librarySectionID, - item_id=item.key, - item_type=item.type, - title=item.title, - original_title=item.originalTitle, - year=item.year, - tmdbid=ids['tmdb_id'], - imdbid=ids['imdb_id'], - tvdbid=ids['tvdb_id'], - path=path, - user_state=user_state, - ) + yield self.__build_media_server_item(item) except Exception as e: - logger.error(f"处理媒体项目时出错:{str(e)}, 跳过此项目。") + logger.error(f"处理媒体项目时出错:{str(e)}, 跳过此项目") continue except Exception as err: logger.error(f"获取媒体库列表出错:{str(err)}") From 1add203c0e225c17ba8e59322441fbd03f2b5fa6 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sat, 28 Sep 2024 00:57:13 +0800 Subject: [PATCH 2/3] fix(#2755): refactor pagination and fix media sync DB issue --- app/chain/mediaserver.py | 54 ++++++++++++++++++++++++-------- app/modules/emby/__init__.py | 15 ++++++--- app/modules/emby/emby.py | 40 ++++++++++++----------- app/modules/jellyfin/__init__.py | 15 ++++++--- app/modules/jellyfin/jellyfin.py | 39 ++++++++++++----------- app/modules/plex/__init__.py | 15 ++++++--- app/modules/plex/plex.py | 29 +++++++++-------- app/schemas/mediaserver.py | 2 +- 8 files changed, 134 insertions(+), 75 deletions(-) diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index b00d1fe3..54f4da2c 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -1,6 +1,6 @@ import json import threading -from typing import List, Union, Optional +from typing import List, Union, Optional, Generator from app import schemas from app.chain import ChainBase @@ -26,19 +26,47 @@ class MediaServerChain(ChainBase): """ return self.run_module("mediaserver_librarys", server=server, username=username, hidden=hidden) - def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: int = 100) \ - -> List[schemas.MediaServerItem]: + def items(self, server: str, library_id: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有项目 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 + + 说明: + - 特别注意的是,这里使用yield from返回迭代器,避免同时使用return与yield导致Python生成器解析异常 + - 如果 `limit` 为 None 或 -1 时,表示一次性获取所有数据,分页处理将不再生效 + - 在这种情况下,内存消耗可能会较大,特别是在数据量非常大的场景下 + - 如果未来评估结果显示,不分页场景下的内存消耗远大于分页处理时的网络请求开销,可以考虑在此方法中实现自分页的处理 + - 即通过 `while` 循环在上层进行分页控制,逐步获取所有数据,避免内存爆炸,当前该逻辑由具体实例来实现不分页的处理 + - Plex 实际上已默认支持内部分页处理,Jellyfin 与 Emby 获取数据时存在内部过滤场景,如排除合集等,分页数据可能是错误的 + if limit is not None and limit != -1: + yield from self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=limit) + else: + # 自分页逻辑,通过循环逐步获取所有数据 + page_size = 10 + while True: + data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=page_size) + if not data_generator: + break + count = 0 + for item in data_generator: + if item: + count += 1 + yield item + if count < page_size: + break + start_index += page_size """ - data = [] - data_generator = self.run_module("mediaserver_items", server=server, library_id=library_id, - start_index=start_index, limit=limit) - if data_generator: - for item in data_generator: - if item: - data.append(item) - return data + yield from self.run_module("mediaserver_items", server=server, library_id=library_id, + start_index=start_index, limit=limit) def iteminfo(self, server: str, item_id: Union[str, int]) -> schemas.MediaServerItem: """ @@ -107,7 +135,7 @@ class MediaServerChain(ChainBase): continue logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...") library_count = 0 - for item in self.items(server_name, library.id): + for item in self.items(server=server_name, library_id=library.id): if not item or not item.item_id: continue logger.debug(f"正在同步 {item.title} ...") diff --git a/app/modules/emby/__init__.py b/app/modules/emby/__init__.py index 20432cf9..11d22b77 100644 --- a/app/modules/emby/__init__.py +++ b/app/modules/emby/__init__.py @@ -173,14 +173,21 @@ class EmbyModule(_ModuleBase, _MediaServerBase[Emby]): return server.get_librarys(username=username, hidden=hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Emby = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/emby/emby.py b/app/modules/emby/emby.py index 0c8414fa..d35a8b38 100644 --- a/app/modules/emby/emby.py +++ b/app/modules/emby/emby.py @@ -313,7 +313,7 @@ class Emby: if not self._host or not self._apikey: return None url = f"{self._host}emby/Items" - params={ + params = { "IncludeItemTypes": "Series", "Fields": "ProductionYear", "StartIndex": 0, @@ -601,7 +601,8 @@ class Emby: # 刷新根目录 return "/" - def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]: + @staticmethod + def __format_item_info(item) -> Optional[schemas.MediaServerItem]: """ 格式化item """ @@ -610,7 +611,8 @@ class Emby: if not user_data: user_state = None else: - resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0 + resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get( + "PlaybackPositionTicks") > 0 last_played_date = item.get("UserData", {}).get("LastPlayedDate") if last_played_date is not None and "." in last_played_date: last_played_date = last_played_date.split(".")[0] @@ -624,7 +626,6 @@ class Emby: ) tmdbid = item.get("ProviderIds", {}).get("Tmdb") return schemas.MediaServerItem( - id=item.get("Id"), server="emby", library=item.get("ParentId"), item_id=item.get("Id"), @@ -664,26 +665,30 @@ class Emby: logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:" + str(e)) return None - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._host or not self._apikey: - yield None + if not parent or not self._host or not self._apikey: + return None url = f"{self._host}emby/Users/{self.user}/Items" params = { "ParentId": parent, "api_key": self._apikey, - "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId", - "StartIndex": start_index, - "Limit": limit + "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId" } + if limit is not None and limit != -1: + params.update({ + "StartIndex": start_index, + "Limit": limit + }) try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: @@ -700,7 +705,6 @@ class Emby: except Exception as e: logger.error(f"连接Users/Items出错:" + str(e)) - yield None def get_webhook_message(self, form: any, args: dict) -> Optional[schemas.WebhookEventInfo]: """ diff --git a/app/modules/jellyfin/__init__.py b/app/modules/jellyfin/__init__.py index c03bb8d3..35718fe1 100644 --- a/app/modules/jellyfin/__init__.py +++ b/app/modules/jellyfin/__init__.py @@ -171,14 +171,21 @@ class JellyfinModule(_ModuleBase, _MediaServerBase[Jellyfin]): return server.get_librarys(username=username, hidden=hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Jellyfin = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/jellyfin/jellyfin.py b/app/modules/jellyfin/jellyfin.py index 65848527..3b5e31b9 100644 --- a/app/modules/jellyfin/jellyfin.py +++ b/app/modules/jellyfin/jellyfin.py @@ -662,8 +662,8 @@ class Jellyfin: return eventItem - - def __format_item_info(self, item) -> Optional[schemas.MediaServerItem]: + @staticmethod + def __format_item_info(item) -> Optional[schemas.MediaServerItem]: """ 格式化item """ @@ -672,7 +672,8 @@ class Jellyfin: if not user_data: user_state = None else: - resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get("PlaybackPositionTicks") > 0 + resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get( + "PlaybackPositionTicks") > 0 last_played_date = item.get("UserData", {}).get("LastPlayedDate") if last_played_date is not None and "." in last_played_date: last_played_date = last_played_date.split(".")[0] @@ -687,7 +688,6 @@ class Jellyfin: tmdbid = item.get("ProviderIds", {}).get("Tmdb") return schemas.MediaServerItem( server="jellyfin", - id=item.get("Id"), library=item.get("ParentId"), item_id=item.get("Id"), item_type=item.get("Type"), @@ -725,26 +725,30 @@ class Jellyfin: logger.error(f"连接Users/{self.user}/Items/{itemid}:" + str(e)) return None - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._host or not self._apikey: - yield None + if not parent or not self._host or not self._apikey: + return None url = f"{self._host}Users/{self.user}/Items" params = { - "parentId": parent, + "ParentId": parent, "api_key": self._apikey, "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId", - "StartIndex": start_index, - "Limit": limit, } + if limit is not None and limit != -1: + params.update({ + "StartIndex": start_index, + "Limit": limit + }) try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: @@ -760,7 +764,6 @@ class Jellyfin: yield self.__format_item_info(item) except Exception as e: logger.error(f"连接Users/Items出错:" + str(e)) - yield None def get_data(self, url: str) -> Optional[Response]: """ diff --git a/app/modules/plex/__init__.py b/app/modules/plex/__init__.py index afe108d7..dfe339e0 100644 --- a/app/modules/plex/__init__.py +++ b/app/modules/plex/__init__.py @@ -159,14 +159,21 @@ class PlexModule(_ModuleBase, _MediaServerBase[Plex]): return server.get_librarys(hidden) return None - def mediaserver_items(self, server: str, library_id: str, start_index: int = 0, limit: int = 100) \ - -> Optional[Generator]: + def mediaserver_items(self, server: str, library_id: Union[str, int], start_index: int = 0, + limit: Optional[int] = -1) -> Optional[Generator]: """ - 媒体库项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param server: 媒体服务器名称 + :param library_id: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ server: Plex = self.get_instance(server) if server: - return server.get_items(library_id, start_index, limit) + yield from server.get_items(library_id, start_index, limit) return None def mediaserver_iteminfo(self, server: str, item_id: str) -> Optional[schemas.MediaServerItem]: diff --git a/app/modules/plex/plex.py b/app/modules/plex/plex.py index 27f39f07..ffcde53d 100644 --- a/app/modules/plex/plex.py +++ b/app/modules/plex/plex.py @@ -465,7 +465,6 @@ class Plex: ) return schemas.MediaServerItem( - id=item.ratingKey, server="plex", library=item.librarySectionID, item_id=item.key, @@ -480,22 +479,27 @@ class Plex: user_state=user_state, ) - def get_items(self, parent: str, start_index: int = 0, limit: int = 100) -> Generator: + def get_items(self, parent: Union[str, int], start_index: int = 0, limit: Optional[int] = -1) \ + -> Optional[Generator]: """ - 获取媒体服务器所有媒体库列表 - :param parent: 父媒体库ID - :param start_index: 开始索引,用于分页 - :param limit: 每次请求返回的项目数量 - :return: 生成器 schemas.MediaServerItem + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - if not parent: - yield None - if not self._plex: - yield None + if not parent or not self._plex: + return None try: section = self._plex.library.sectionByID(int(parent)) if section: - for item in section.all(container_start=start_index, limit=limit): + if limit is None or limit == -1: + items = section.all() + else: + items = section.all(container_start=start_index, container_size=limit, maxresults=limit) + for item in items: try: if not item: continue @@ -505,7 +509,6 @@ class Plex: continue except Exception as err: logger.error(f"获取媒体库列表出错:{str(err)}") - yield None def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]: """ diff --git a/app/schemas/mediaserver.py b/app/schemas/mediaserver.py index 807f5c15..825043fc 100644 --- a/app/schemas/mediaserver.py +++ b/app/schemas/mediaserver.py @@ -72,7 +72,6 @@ class MediaServerLibrary(BaseModel): link: Optional[str] = None - class MediaServerItemUserState(BaseModel): # 已播放 played: Optional[bool] = None @@ -85,6 +84,7 @@ class MediaServerItemUserState(BaseModel): # 播放进度 percentage: Optional[float] = None + class MediaServerItem(BaseModel): """ 媒体服务器媒体信息 From 5993bfcefb305e7fef8f21acfe688ff8d7f9b5d8 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Sat, 28 Sep 2024 00:57:59 +0800 Subject: [PATCH 3/3] fix(#2755): remove yield None, handle generator termination on error --- app/modules/emby/emby.py | 2 +- app/modules/jellyfin/jellyfin.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/modules/emby/emby.py b/app/modules/emby/emby.py index d35a8b38..6e5127ff 100644 --- a/app/modules/emby/emby.py +++ b/app/modules/emby/emby.py @@ -692,7 +692,7 @@ class Emby: try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: - yield None + return None items = res.json().get("Items") or [] for item in items: if not item: diff --git a/app/modules/jellyfin/jellyfin.py b/app/modules/jellyfin/jellyfin.py index 3b5e31b9..0916142d 100644 --- a/app/modules/jellyfin/jellyfin.py +++ b/app/modules/jellyfin/jellyfin.py @@ -752,7 +752,7 @@ class Jellyfin: try: res = RequestUtils().get_res(url, params) if not res or res.status_code != 200: - yield None + return None items = res.json().get("Items") or [] for item in items: if not item: