feat:媒体查询协程处理

This commit is contained in:
jxxghp
2025-07-31 15:24:50 +08:00
parent 983f8fcb03
commit a0e4b4a56e
9 changed files with 1509 additions and 485 deletions

View File

@@ -18,61 +18,61 @@ router = APIRouter()
@router.get("/recognize", summary="识别媒体信息(种子)", response_model=schemas.Context)
def recognize(title: str,
subtitle: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def recognize(title: str,
subtitle: Optional[str] = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据标题、副标题识别媒体信息
"""
# 识别媒体信息
metainfo = MetaInfo(title, subtitle)
mediainfo = MediaChain().recognize_by_meta(metainfo)
mediainfo = await MediaChain().async_recognize_by_meta(metainfo)
if mediainfo:
return Context(meta_info=metainfo, media_info=mediainfo).to_dict()
return schemas.Context()
@router.get("/recognize2", summary="识别种子媒体信息API_TOKEN", response_model=schemas.Context)
def recognize2(_: Annotated[str, Depends(verify_apitoken)],
title: str,
subtitle: Optional[str] = None
) -> Any:
async def recognize2(_: Annotated[str, Depends(verify_apitoken)],
title: str,
subtitle: Optional[str] = None
) -> Any:
"""
根据标题、副标题识别媒体信息 API_TOKEN认证?token=xxx
"""
# 识别媒体信息
return recognize(title, subtitle)
return await recognize(title, subtitle)
@router.get("/recognize_file", summary="识别媒体信息(文件)", response_model=schemas.Context)
def recognize_file(path: str,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def recognize_file(path: str,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据文件路径识别媒体信息
"""
# 识别媒体信息
context = MediaChain().recognize_by_path(path)
context = await MediaChain().async_recognize_by_path(path)
if context:
return context.to_dict()
return schemas.Context()
@router.get("/recognize_file2", summary="识别文件媒体信息API_TOKEN", response_model=schemas.Context)
def recognize_file2(path: str,
_: Annotated[str, Depends(verify_apitoken)]) -> Any:
async def recognize_file2(path: str,
_: Annotated[str, Depends(verify_apitoken)]) -> Any:
"""
根据文件路径识别媒体信息 API_TOKEN认证?token=xxx
"""
# 识别媒体信息
return recognize_file(path)
return await recognize_file(path)
@router.get("/search", summary="搜索媒体/人物信息", response_model=List[dict])
def search(title: str,
type: Optional[str] = "media",
page: int = 1,
count: int = 8,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def search(title: str,
type: Optional[str] = "media",
page: int = 1,
count: int = 8,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
模糊搜索媒体/人物信息列表 media媒体信息person人物信息
"""
@@ -87,13 +87,13 @@ def search(title: str,
result = []
if type == "media":
_, medias = MediaChain().search(title=title)
_, medias = await MediaChain().async_search(title=title)
if medias:
result = [media.to_dict() for media in medias]
elif type == "collection":
result = MediaChain().search_collections(name=title)
result = await MediaChain().async_search_collections(name=title)
else:
result = MediaChain().search_persons(name=title)
result = await MediaChain().async_search_persons(name=title)
if result:
# 按设置的顺序对结果进行排序
setting_order = settings.SEARCH_SOURCE.split(',') or []
@@ -123,13 +123,13 @@ def scrape(fileitem: schemas.FileItem,
if storage == "local":
if not scrape_path.exists():
return schemas.Response(success=False, message="刮削路径不存在")
# 手动刮削
# 手动刮削 (暂时使用同步版本,可以后续优化为异步)
chain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo, overwrite=True)
return schemas.Response(success=True, message=f"{fileitem.path} 刮削完成")
@router.get("/category", summary="查询自动分类配置", response_model=dict)
def category(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def category(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询自动分类配置
"""
@@ -137,37 +137,37 @@ def category(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
@router.get("/group/seasons/{episode_group}", summary="查询剧集组季信息", response_model=List[schemas.MediaSeason])
def group_seasons(episode_group: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def group_seasons(episode_group: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询剧集组季信息themoviedb
"""
return TmdbChain().tmdb_group_seasons(group_id=episode_group)
return await TmdbChain().async_tmdb_group_seasons(group_id=episode_group)
@router.get("/groups/{tmdbid}", summary="查询媒体剧集组", response_model=List[dict])
def seasons(tmdbid: int, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def groups(tmdbid: int, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询媒体剧集组列表themoviedb
"""
mediainfo = MediaChain().recognize_media(tmdbid=tmdbid, mtype=MediaType.TV)
mediainfo = await MediaChain().async_recognize_media(tmdbid=tmdbid, mtype=MediaType.TV)
if not mediainfo:
return []
return mediainfo.episode_groups
@router.get("/seasons", summary="查询媒体季信息", response_model=List[schemas.MediaSeason])
def seasons(mediaid: Optional[str] = None,
title: Optional[str] = None,
year: str = None,
season: int = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def seasons(mediaid: Optional[str] = None,
title: Optional[str] = None,
year: str = None,
season: int = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询媒体季信息
"""
if mediaid:
if mediaid.startswith("tmdb:"):
tmdbid = int(mediaid[5:])
seasons_info = TmdbChain().tmdb_seasons(tmdbid=tmdbid)
seasons_info = await TmdbChain().async_tmdb_seasons(tmdbid=tmdbid)
if seasons_info:
if season:
return [sea for sea in seasons_info if sea.season_number == season]
@@ -176,17 +176,17 @@ def seasons(mediaid: Optional[str] = None,
meta = MetaInfo(title)
if year:
meta.year = year
mediainfo = MediaChain().recognize_media(meta, mtype=MediaType.TV)
mediainfo = await MediaChain().async_recognize_media(meta, mtype=MediaType.TV)
if mediainfo:
if settings.RECOGNIZE_SOURCE == "themoviedb":
seasons_info = TmdbChain().tmdb_seasons(tmdbid=mediainfo.tmdb_id)
seasons_info = await TmdbChain().async_tmdb_seasons(tmdbid=mediainfo.tmdb_id)
if seasons_info:
if season:
return [sea for sea in seasons_info if sea.season_number == season]
return seasons_info
else:
sea = season or 1
return schemas.MediaSeason(
return [schemas.MediaSeason(
season_number=sea,
poster_path=mediainfo.poster_path,
name=f"{sea}",
@@ -194,24 +194,25 @@ def seasons(mediaid: Optional[str] = None,
overview=mediainfo.overview,
vote_average=mediainfo.vote_average,
episode_count=mediainfo.number_of_episodes
)
)]
return []
@router.get("/{mediaid}", summary="查询媒体详情", response_model=schemas.MediaInfo)
def detail(mediaid: str, type_name: str, title: Optional[str] = None, year: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
async def detail(mediaid: str, type_name: str, title: Optional[str] = None, year: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据媒体ID查询themoviedb或豆瓣媒体信息type_name: 电影/电视剧
"""
mtype = MediaType(type_name)
mediainfo = None
mediachain = MediaChain()
if mediaid.startswith("tmdb:"):
mediainfo = MediaChain().recognize_media(tmdbid=int(mediaid[5:]), mtype=mtype)
mediainfo = await mediachain.async_recognize_media(tmdbid=int(mediaid[5:]), mtype=mtype)
elif mediaid.startswith("douban:"):
mediainfo = MediaChain().recognize_media(doubanid=mediaid[7:], mtype=mtype)
mediainfo = await mediachain.async_recognize_media(doubanid=mediaid[7:], mtype=mtype)
elif mediaid.startswith("bangumi:"):
mediainfo = MediaChain().recognize_media(bangumiid=int(mediaid[8:]), mtype=mtype)
mediainfo = await mediachain.async_recognize_media(bangumiid=int(mediaid[8:]), mtype=mtype)
else:
# 广播事件解析媒体信息
event_data = MediaRecognizeConvertEventData(
@@ -224,9 +225,9 @@ def detail(mediaid: str, type_name: str, title: Optional[str] = None, year: str
event_data: MediaRecognizeConvertEventData = event.event_data
new_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
mediainfo = MediaChain().recognize_media(tmdbid=new_id, mtype=mtype)
mediainfo = await mediachain.async_recognize_media(tmdbid=new_id, mtype=mtype)
elif event_data.convert_type == "douban":
mediainfo = MediaChain().recognize_media(doubanid=new_id, mtype=mtype)
mediainfo = await mediachain.async_recognize_media(doubanid=new_id, mtype=mtype)
elif title:
# 使用名称识别兜底
meta = MetaInfo(title)
@@ -234,10 +235,10 @@ def detail(mediaid: str, type_name: str, title: Optional[str] = None, year: str
meta.year = year
if mtype:
meta.type = mtype
mediainfo = MediaChain().recognize_media(meta=meta)
mediainfo = await mediachain.async_recognize_media(meta=meta)
# 识别
if mediainfo:
MediaChain().obtain_images(mediainfo)
await mediachain.async_obtain_images(mediainfo)
return mediainfo.to_dict()
return schemas.MediaInfo()

View File

@@ -332,6 +332,39 @@ class ChainBase(metaclass=ABCMeta):
tmdbid=tmdbid, doubanid=doubanid, bangumiid=bangumiid,
episode_group=episode_group, cache=cache)
async def async_recognize_media(self, meta: MetaBase = None,
mtype: Optional[MediaType] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
bangumiid: Optional[int] = None,
episode_group: Optional[str] = None,
cache: bool = True) -> Optional[MediaInfo]:
"""
识别媒体信息不含Fanart图片异步版本
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与tmdbid配套
:param tmdbid: tmdbid
:param doubanid: 豆瓣ID
:param bangumiid: BangumiID
:param episode_group: 剧集组
:param cache: 是否使用缓存
:return: 识别的媒体信息,包括剧集信息
"""
# 识别用名中含指定信息情形
if not mtype and meta and meta.type in [MediaType.TV, MediaType.MOVIE]:
mtype = meta.type
if not tmdbid and hasattr(meta, "tmdbid"):
tmdbid = meta.tmdbid
if not doubanid and hasattr(meta, "doubanid"):
doubanid = meta.doubanid
# 有tmdbid时不使用其它ID
if tmdbid:
doubanid = None
bangumiid = None
return await self.async_run_module("async_recognize_media", meta=meta, mtype=mtype,
tmdbid=tmdbid, doubanid=doubanid, bangumiid=bangumiid,
episode_group=episode_group, cache=cache)
def match_doubaninfo(self, name: str, imdbid: Optional[str] = None,
mtype: Optional[MediaType] = None, year: Optional[str] = None, season: Optional[int] = None,
raise_exception: bool = False) -> Optional[dict]:
@@ -347,6 +380,22 @@ class ChainBase(metaclass=ABCMeta):
return self.run_module("match_doubaninfo", name=name, imdbid=imdbid,
mtype=mtype, year=year, season=season, raise_exception=raise_exception)
async def async_match_doubaninfo(self, name: str, imdbid: Optional[str] = None,
mtype: Optional[MediaType] = None, year: Optional[str] = None,
season: Optional[int] = None,
raise_exception: bool = False) -> Optional[dict]:
"""
搜索和匹配豆瓣信息(异步版本)
:param name: 标题
:param imdbid: imdbid
:param mtype: 类型
:param year: 年份
:param season: 季
:param raise_exception: 触发速率限制时是否抛出异常
"""
return await self.async_run_module("async_match_doubaninfo", name=name, imdbid=imdbid,
mtype=mtype, year=year, season=season, raise_exception=raise_exception)
def match_tmdbinfo(self, name: str, mtype: Optional[MediaType] = None,
year: Optional[str] = None, season: Optional[int] = None) -> Optional[dict]:
"""
@@ -359,6 +408,18 @@ class ChainBase(metaclass=ABCMeta):
return self.run_module("match_tmdbinfo", name=name,
mtype=mtype, year=year, season=season)
async def async_match_tmdbinfo(self, name: str, mtype: Optional[MediaType] = None,
year: Optional[str] = None, season: Optional[int] = None) -> Optional[dict]:
"""
搜索和匹配TMDB信息异步版本
:param name: 标题
:param mtype: 类型
:param year: 年份
:param season: 季
"""
return await self.async_run_module("async_match_tmdbinfo", name=name,
mtype=mtype, year=year, season=season)
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
@@ -367,6 +428,14 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("obtain_images", mediainfo=mediainfo)
async def async_obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片(异步版本)
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
return await self.async_run_module("async_obtain_images", mediainfo=mediainfo)
def obtain_specific_image(self, mediaid: Union[str, int], mtype: MediaType,
image_type: MediaImageType, image_prefix: Optional[str] = None,
season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]:
@@ -403,7 +472,7 @@ class ChainBase(metaclass=ABCMeta):
:return: 豆瓣信息
:param raise_exception: 触发速率限制时是否抛出异常
"""
return await self.run_module("async_douban_info", doubanid=doubanid, mtype=mtype,
return await self.async_run_module("async_douban_info", doubanid=doubanid, mtype=mtype,
raise_exception=raise_exception)
def tvdb_info(self, tvdbid: int) -> Optional[dict]:
@@ -424,6 +493,16 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("tmdb_info", tmdbid=tmdbid, mtype=mtype, season=season)
async def async_tmdb_info(self, tmdbid: int, mtype: MediaType, season: Optional[int] = None) -> Optional[dict]:
"""
获取TMDB信息异步版本
:param tmdbid: int
:param mtype: 媒体类型
:param season: 季
:return: TVDB信息
"""
return await self.async_run_module("async_tmdb_info", tmdbid=tmdbid, mtype=mtype, season=season)
def bangumi_info(self, bangumiid: int) -> Optional[dict]:
"""
获取Bangumi信息
@@ -432,6 +511,14 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("bangumi_info", bangumiid=bangumiid)
async def async_bangumi_info(self, bangumiid: int) -> Optional[dict]:
"""
获取Bangumi信息异步版本
:param bangumiid: int
:return: Bangumi信息
"""
return await self.async_run_module("async_bangumi_info", bangumiid=bangumiid)
def message_parser(self, source: str, body: Any, form: Any,
args: Any) -> Optional[CommingMessage]:
"""
@@ -465,6 +552,14 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("search_medias", meta=meta)
async def async_search_medias(self, meta: MetaBase) -> Optional[List[MediaInfo]]:
"""
搜索媒体信息(异步版本)
:param meta: 识别的元数据
:reutrn: 媒体信息列表
"""
return await self.async_run_module("async_search_medias", meta=meta)
def search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
搜索人物信息
@@ -472,6 +567,13 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("search_persons", name=name)
async def async_search_persons(self, name: str) -> Optional[List[MediaPerson]]:
"""
搜索人物信息(异步版本)
:param name: 人物名称
"""
return await self.async_run_module("async_search_persons", name=name)
def search_collections(self, name: str) -> Optional[List[MediaInfo]]:
"""
搜索集合信息
@@ -479,6 +581,13 @@ class ChainBase(metaclass=ABCMeta):
"""
return self.run_module("search_collections", name=name)
async def async_search_collections(self, name: str) -> Optional[List[MediaInfo]]:
"""
搜索集合信息(异步版本)
:param name: 集合名称
"""
return await self.async_run_module("async_search_collections", name=name)
def search_torrents(self, site: dict,
keywords: List[str],
mtype: Optional[MediaType] = None,

View File

@@ -62,13 +62,13 @@ class BangumiChain(ChainBase):
"""
获取Bangumi每日放送异步版本
"""
return await self.run_module("async_bangumi_calendar")
return await self.async_run_module("async_bangumi_calendar")
async def async_discover(self, **kwargs) -> Optional[List[MediaInfo]]:
"""
发现Bangumi番剧异步版本
"""
return await self.run_module("async_bangumi_discover", **kwargs)
return await self.async_run_module("async_bangumi_discover", **kwargs)
async def async_bangumi_info(self, bangumiid: int) -> Optional[dict]:
"""
@@ -76,32 +76,32 @@ class BangumiChain(ChainBase):
:param bangumiid: BangumiID
:return: Bangumi信息
"""
return await self.run_module("async_bangumi_info", bangumiid=bangumiid)
return await self.async_run_module("async_bangumi_info", bangumiid=bangumiid)
async def async_bangumi_credits(self, bangumiid: int) -> List[schemas.MediaPerson]:
"""
根据BangumiID查询电影演职员表异步版本
:param bangumiid: BangumiID
"""
return await self.run_module("async_bangumi_credits", bangumiid=bangumiid)
return await self.async_run_module("async_bangumi_credits", bangumiid=bangumiid)
async def async_bangumi_recommend(self, bangumiid: int) -> Optional[List[MediaInfo]]:
"""
根据BangumiID查询推荐电影异步版本
:param bangumiid: BangumiID
"""
return await self.run_module("async_bangumi_recommend", bangumiid=bangumiid)
return await self.async_run_module("async_bangumi_recommend", bangumiid=bangumiid)
async def async_person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]:
"""
根据人物ID查询Bangumi人物详情异步版本
:param person_id: 人物ID
"""
return await self.run_module("async_bangumi_person_detail", person_id=person_id)
return await self.async_run_module("async_bangumi_person_detail", person_id=person_id)
async def async_person_credits(self, person_id: int) -> Optional[List[MediaInfo]]:
"""
根据人物ID查询人物参演作品异步版本
:param person_id: 人物ID
"""
return await self.run_module("async_bangumi_person_credits", person_id=person_id)
return await self.async_run_module("async_bangumi_person_credits", person_id=person_id)

View File

@@ -117,7 +117,7 @@ class DoubanChain(ChainBase):
根据人物ID查询豆瓣人物详情异步版本
:param person_id: 人物ID
"""
return await self.run_module("async_douban_person_detail", person_id=person_id)
return await self.async_run_module("async_douban_person_detail", person_id=person_id)
async def async_person_credits(self, person_id: int, page: Optional[int] = 1) -> List[MediaInfo]:
"""
@@ -125,7 +125,7 @@ class DoubanChain(ChainBase):
:param person_id: 人物ID
:param page: 页码
"""
return await self.run_module("async_douban_person_credits", person_id=person_id, page=page)
return await self.async_run_module("async_douban_person_credits", person_id=person_id, page=page)
async def async_movie_top250(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
@@ -134,28 +134,28 @@ class DoubanChain(ChainBase):
:param page: 页码
:param count: 每页数量
"""
return await self.run_module("async_movie_top250", page=page, count=count)
return await self.async_run_module("async_movie_top250", page=page, count=count)
async def async_movie_showing(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取正在上映的电影(异步版本)
"""
return await self.run_module("async_movie_showing", page=page, count=count)
return await self.async_run_module("async_movie_showing", page=page, count=count)
async def async_tv_weekly_chinese(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取本周中国剧集榜(异步版本)
"""
return await self.run_module("async_tv_weekly_chinese", page=page, count=count)
return await self.async_run_module("async_tv_weekly_chinese", page=page, count=count)
async def async_tv_weekly_global(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取本周全球剧集榜(异步版本)
"""
return await self.run_module("async_tv_weekly_global", page=page, count=count)
return await self.async_run_module("async_tv_weekly_global", page=page, count=count)
async def async_douban_discover(self, mtype: MediaType, sort: str, tags: str,
page: Optional[int] = 0, count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
@@ -168,54 +168,54 @@ class DoubanChain(ChainBase):
:param count: 数量
:return: 媒体信息列表
"""
return await self.run_module("async_douban_discover", mtype=mtype, sort=sort, tags=tags,
page=page, count=count)
return await self.async_run_module("async_douban_discover", mtype=mtype, sort=sort, tags=tags,
page=page, count=count)
async def async_tv_animation(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取动画剧集(异步版本)
"""
return await self.run_module("async_tv_animation", page=page, count=count)
return await self.async_run_module("async_tv_animation", page=page, count=count)
async def async_movie_hot(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取热门电影(异步版本)
"""
return await self.run_module("async_movie_hot", page=page, count=count)
return await self.async_run_module("async_movie_hot", page=page, count=count)
async def async_tv_hot(self, page: Optional[int] = 1,
count: Optional[int] = 30) -> Optional[List[MediaInfo]]:
"""
获取热门剧集(异步版本)
"""
return await self.run_module("async_tv_hot", page=page, count=count)
return await self.async_run_module("async_tv_hot", page=page, count=count)
async def async_movie_credits(self, doubanid: str) -> Optional[List[schemas.MediaPerson]]:
"""
根据TMDBID查询电影演职人员异步版本
:param doubanid: 豆瓣ID
"""
return await self.run_module("async_douban_movie_credits", doubanid=doubanid)
return await self.async_run_module("async_douban_movie_credits", doubanid=doubanid)
async def async_tv_credits(self, doubanid: str) -> Optional[List[schemas.MediaPerson]]:
"""
根据TMDBID查询电视剧演职人员异步版本
:param doubanid: 豆瓣ID
"""
return await self.run_module("async_douban_tv_credits", doubanid=doubanid)
return await self.async_run_module("async_douban_tv_credits", doubanid=doubanid)
async def async_movie_recommend(self, doubanid: str) -> List[MediaInfo]:
"""
根据豆瓣ID查询推荐电影异步版本
:param doubanid: 豆瓣ID
"""
return await self.run_module("async_douban_movie_recommend", doubanid=doubanid)
return await self.async_run_module("async_douban_movie_recommend", doubanid=doubanid)
async def async_tv_recommend(self, doubanid: str) -> List[MediaInfo]:
"""
根据豆瓣ID查询推荐电视剧异步版本
:param doubanid: 豆瓣ID
"""
return await self.run_module("async_douban_tv_recommend", doubanid=doubanid)
return await self.async_run_module("async_douban_tv_recommend", doubanid=doubanid)

View File

@@ -763,3 +763,129 @@ class MediaChain(ChainBase):
else:
logger.info(f"电视剧图片刮削已关闭,跳过:{image_name}")
logger.info(f"{filepath.name} 刮削完成")
async def async_recognize_by_meta(self, metainfo: MetaBase,
episode_group: Optional[str] = None) -> Optional[MediaInfo]:
"""
根据主副标题识别媒体信息(异步版本)
"""
title = metainfo.title
# 识别媒体信息
mediainfo: MediaInfo = await self.async_recognize_media(meta=metainfo, episode_group=episode_group)
if not mediainfo:
# 尝试使用辅助识别,如果有注册响应事件的话
if eventmanager.check(ChainEventType.NameRecognize):
logger.info(f'请求辅助识别,标题:{title} ...')
mediainfo = await self.async_recognize_help(title=title, org_meta=metainfo)
if not mediainfo:
logger.warn(f'{title} 未识别到媒体信息')
return None
# 识别成功
logger.info(f'{title} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}')
# 更新媒体图片
await self.async_obtain_images(mediainfo=mediainfo)
# 返回上下文
return mediainfo
async def async_recognize_help(self, title: str, org_meta: MetaBase) -> Optional[MediaInfo]:
"""
请求辅助识别,返回媒体信息(异步版本)
:param title: 标题
:param org_meta: 原始元数据
"""
# 发送请求事件,等待结果
result: Event = eventmanager.send_event(
ChainEventType.NameRecognize,
{
'title': title,
}
)
if not result:
return None
# 获取返回事件数据
event_data = result.event_data or {}
logger.info(f'获取到辅助识别结果:{event_data}')
# 处理数据格式
title, year, season_number, episode_number = None, None, None, None
if event_data.get("name"):
title = str(event_data["name"]).split("/")[0].strip().replace(".", " ")
if event_data.get("year"):
year = str(event_data["year"]).split("/")[0].strip()
if event_data.get("season") and str(event_data["season"]).isdigit():
season_number = int(event_data["season"])
if event_data.get("episode") and str(event_data["episode"]).isdigit():
episode_number = int(event_data["episode"])
if not title:
return None
if title == 'Unknown':
return None
if not str(year).isdigit():
year = None
# 结果赋值
if title == org_meta.name and year == org_meta.year:
logger.info(f'辅助识别与原始识别结果一致,无需重新识别媒体信息')
return None
logger.info(f'辅助识别结果与原始识别结果不一致,重新匹配媒体信息 ...')
org_meta.name = title
org_meta.year = year
org_meta.begin_season = season_number
org_meta.begin_episode = episode_number
if org_meta.begin_season or org_meta.begin_episode:
org_meta.type = MediaType.TV
# 重新识别
return await self.async_recognize_media(meta=org_meta)
async def async_recognize_by_path(self, path: str, episode_group: Optional[str] = None) -> Optional[Context]:
"""
根据文件路径识别媒体信息(异步版本)
"""
logger.info(f'开始识别媒体信息,文件:{path} ...')
file_path = Path(path)
# 元数据
file_meta = MetaInfoPath(file_path)
# 识别媒体信息
mediainfo = await self.async_recognize_media(meta=file_meta, episode_group=episode_group)
if not mediainfo:
# 尝试使用辅助识别,如果有注册响应事件的话
if eventmanager.check(ChainEventType.NameRecognize):
logger.info(f'请求辅助识别,标题:{file_path.name} ...')
mediainfo = await self.async_recognize_help(title=path, org_meta=file_meta)
if not mediainfo:
logger.warn(f'{path} 未识别到媒体信息')
return Context(meta_info=file_meta)
logger.info(f'{path} 识别到媒体信息:{mediainfo.type.value} {mediainfo.title_year}')
# 更新媒体图片
await self.async_obtain_images(mediainfo=mediainfo)
# 返回上下文
return Context(meta_info=file_meta, media_info=mediainfo)
async def async_search(self, title: str) -> Tuple[Optional[MetaBase], List[MediaInfo]]:
"""
搜索媒体/人物信息(异步版本)
:param title: 搜索内容
:return: 识别元数据,媒体信息列表
"""
# 提取要素
mtype, key_word, season_num, episode_num, year, content = StringUtils.get_keyword(title)
# 识别
meta = MetaInfo(content)
if not meta.name:
meta.cn_name = content
# 合并信息
if mtype:
meta.type = mtype
if season_num:
meta.begin_season = season_num
if episode_num:
meta.begin_episode = episode_num
if year:
meta.year = year
# 开始搜索
logger.info(f"开始搜索媒体信息:{meta.name}")
medias: Optional[List[MediaInfo]] = await self.async_search_medias(meta=meta)
if not medias:
logger.warn(f"{meta.name} 没有找到对应的媒体信息!")
return meta, []
logger.info(f"{content} 搜索到 {len(medias)} 条相关媒体信息")
# 识别的元数据,媒体信息列表
return meta, medias

View File

@@ -189,16 +189,16 @@ class TmdbChain(ChainBase):
:param page: 页码
:return: 媒体信息列表
"""
return await self.run_module("async_tmdb_discover", mtype=mtype,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
return await self.async_run_module("async_tmdb_discover", mtype=mtype,
sort_by=sort_by,
with_genres=with_genres,
with_original_language=with_original_language,
with_keywords=with_keywords,
with_watch_providers=with_watch_providers,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
page=page)
async def async_tmdb_trending(self, page: Optional[int] = 1) -> Optional[List[MediaInfo]]:
"""
@@ -206,28 +206,28 @@ class TmdbChain(ChainBase):
:param page: 第几页
:return: TMDB信息列表
"""
return await self.run_module("async_tmdb_trending", page=page)
return await self.async_run_module("async_tmdb_trending", page=page)
async def async_tmdb_collection(self, collection_id: int) -> Optional[List[MediaInfo]]:
"""
根据合集ID查询集合异步版本
:param collection_id: 合集ID
"""
return await self.run_module("async_tmdb_collection", collection_id=collection_id)
return await self.async_run_module("async_tmdb_collection", collection_id=collection_id)
async def async_tmdb_seasons(self, tmdbid: int) -> List[schemas.TmdbSeason]:
"""
根据TMDBID查询themoviedb所有季信息异步版本
:param tmdbid: TMDBID
"""
return await self.run_module("async_tmdb_seasons", tmdbid=tmdbid)
return await self.async_run_module("async_tmdb_seasons", tmdbid=tmdbid)
async def async_tmdb_group_seasons(self, group_id: str) -> List[schemas.TmdbSeason]:
"""
根据剧集组ID查询themoviedb所有季集信息异步版本
:param group_id: 剧集组ID
"""
return await self.run_module("async_tmdb_group_seasons", group_id=group_id)
return await self.async_run_module("async_tmdb_group_seasons", group_id=group_id)
async def async_tmdb_episodes(self, tmdbid: int, season: int,
episode_group: Optional[str] = None) -> List[schemas.TmdbEpisode]:
@@ -237,35 +237,36 @@ class TmdbChain(ChainBase):
:param season: 季
:param episode_group: 剧集组
"""
return await self.run_module("async_tmdb_episodes", tmdbid=tmdbid, season=season, episode_group=episode_group)
return await self.async_run_module("async_tmdb_episodes", tmdbid=tmdbid, season=season,
episode_group=episode_group)
async def async_movie_similar(self, tmdbid: int) -> Optional[List[MediaInfo]]:
"""
根据TMDBID查询类似电影异步版本
:param tmdbid: TMDBID
"""
return await self.run_module("async_tmdb_movie_similar", tmdbid=tmdbid)
return await self.async_run_module("async_tmdb_movie_similar", tmdbid=tmdbid)
async def async_tv_similar(self, tmdbid: int) -> Optional[List[MediaInfo]]:
"""
根据TMDBID查询类似电视剧异步版本
:param tmdbid: TMDBID
"""
return await self.run_module("async_tmdb_tv_similar", tmdbid=tmdbid)
return await self.async_run_module("async_tmdb_tv_similar", tmdbid=tmdbid)
async def async_movie_recommend(self, tmdbid: int) -> Optional[List[MediaInfo]]:
"""
根据TMDBID查询推荐电影异步版本
:param tmdbid: TMDBID
"""
return await self.run_module("async_tmdb_movie_recommend", tmdbid=tmdbid)
return await self.async_run_module("async_tmdb_movie_recommend", tmdbid=tmdbid)
async def async_tv_recommend(self, tmdbid: int) -> Optional[List[MediaInfo]]:
"""
根据TMDBID查询推荐电视剧异步版本
:param tmdbid: TMDBID
"""
return await self.run_module("async_tmdb_tv_recommend", tmdbid=tmdbid)
return await self.async_run_module("async_tmdb_tv_recommend", tmdbid=tmdbid)
async def async_movie_credits(self, tmdbid: int, page: Optional[int] = 1) -> Optional[List[schemas.MediaPerson]]:
"""
@@ -273,7 +274,7 @@ class TmdbChain(ChainBase):
:param tmdbid: TMDBID
:param page: 页码
"""
return await self.run_module("async_tmdb_movie_credits", tmdbid=tmdbid, page=page)
return await self.async_run_module("async_tmdb_movie_credits", tmdbid=tmdbid, page=page)
async def async_tv_credits(self, tmdbid: int, page: Optional[int] = 1) -> Optional[List[schemas.MediaPerson]]:
"""
@@ -281,14 +282,14 @@ class TmdbChain(ChainBase):
:param tmdbid: TMDBID
:param page: 页码
"""
return await self.run_module("async_tmdb_tv_credits", tmdbid=tmdbid, page=page)
return await self.async_run_module("async_tmdb_tv_credits", tmdbid=tmdbid, page=page)
async def async_person_detail(self, person_id: int) -> Optional[schemas.MediaPerson]:
"""
根据TMDBID查询演职员详情异步版本
:param person_id: 人物ID
"""
return await self.run_module("async_tmdb_person_detail", person_id=person_id)
return await self.async_run_module("async_tmdb_person_detail", person_id=person_id)
async def async_person_credits(self, person_id: int, page: Optional[int] = 1) -> Optional[List[MediaInfo]]:
"""
@@ -296,7 +297,7 @@ class TmdbChain(ChainBase):
:param person_id: 人物ID
:param page: 页码
"""
return await self.run_module("async_tmdb_person_credits", person_id=person_id, page=page)
return await self.async_run_module("async_tmdb_person_credits", person_id=person_id, page=page)
async def async_get_random_wallpager(self) -> Optional[str]:
"""

View File

@@ -71,17 +71,21 @@ class DoubanModule(_ModuleBase):
"""
return 2
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
def _recognize_media_core(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
cache: Optional[bool] = True,
douban_info_func=None,
match_doubaninfo_func=None,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息
识别媒体信息的核心逻辑
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与doubanid配套
:param doubanid: 豆瓣ID
:param cache: 是否使用缓存
:param douban_info_func: 获取豆瓣信息的函数
:param match_doubaninfo_func: 匹配豆瓣信息的函数
:return: 识别的媒体信息,包括剧集信息
"""
if not doubanid and not meta:
@@ -110,7 +114,7 @@ class DoubanModule(_ModuleBase):
# 缓存没有或者强制不使用缓存
if doubanid:
# 直接查询详情
info = self.douban_info(doubanid=doubanid, mtype=mtype or meta.type)
info = douban_info_func(doubanid=doubanid, mtype=mtype or meta.type)
elif meta:
info = {}
# 简体名称
@@ -123,13 +127,13 @@ class DoubanModule(_ModuleBase):
else:
logger.info(f"正在识别 {name} ...")
# 匹配豆瓣信息
match_info = self.match_doubaninfo(name=name,
match_info = match_doubaninfo_func(name=name,
mtype=mtype or meta.type,
year=meta.year,
season=meta.begin_season)
if match_info:
# 匹配到豆瓣信息
info = self.douban_info(
info = douban_info_func(
doubanid=match_info.get("id"),
mtype=mtype or meta.type
)
@@ -146,7 +150,7 @@ class DoubanModule(_ModuleBase):
# 使用缓存信息
if cache_info.get("title"):
logger.info(f"{meta.name} 使用豆瓣识别缓存:{cache_info.get('title')}")
info = self.douban_info(mtype=cache_info.get("type"),
info = douban_info_func(mtype=cache_info.get("type"),
doubanid=cache_info.get("id"))
else:
logger.info(f"{meta.name} 使用豆瓣识别缓存:无法识别")
@@ -168,6 +172,153 @@ class DoubanModule(_ModuleBase):
return None
async def _async_recognize_media_core(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
cache: Optional[bool] = True,
async_douban_info_func=None,
async_match_doubaninfo_func=None,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息的核心逻辑(异步版本)
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与doubanid配套
:param doubanid: 豆瓣ID
:param cache: 是否使用缓存
:param async_douban_info_func: 获取豆瓣信息的异步函数
:param async_match_doubaninfo_func: 匹配豆瓣信息的异步函数
:return: 识别的媒体信息,包括剧集信息
"""
if not doubanid and not meta:
return None
if meta and not doubanid \
and settings.RECOGNIZE_SOURCE != "douban":
return None
if not meta:
# 未提供元数据时,直接查询豆瓣信息,不使用缓存
cache_info = {}
elif not meta.name:
logger.error("识别媒体信息时未提供元数据名称")
return None
else:
# 读取缓存
if mtype:
meta.type = mtype
if doubanid:
meta.doubanid = doubanid
cache_info = self.cache.get(meta)
# 识别豆瓣信息
if not cache_info or not cache:
# 缓存没有或者强制不使用缓存
if doubanid:
# 直接查询详情
info = await async_douban_info_func(doubanid=doubanid, mtype=mtype or meta.type)
elif meta:
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
# 匹配豆瓣信息
match_info = await async_match_doubaninfo_func(name=name,
mtype=mtype or meta.type,
year=meta.year,
season=meta.begin_season)
if match_info:
# 匹配到豆瓣信息
info = await async_douban_info_func(
doubanid=match_info.get("id"),
mtype=mtype or meta.type
)
if info:
break
else:
logger.error("识别媒体信息时未提供元数据或豆瓣ID")
return None
# 保存到缓存
if meta and cache:
self.cache.update(meta, info)
else:
# 使用缓存信息
if cache_info.get("title"):
logger.info(f"{meta.name} 使用豆瓣识别缓存:{cache_info.get('title')}")
info = await async_douban_info_func(mtype=cache_info.get("type"),
doubanid=cache_info.get("id"))
else:
logger.info(f"{meta.name} 使用豆瓣识别缓存:无法识别")
info = None
if info:
# 赋值TMDB信息并返回
mediainfo = MediaInfo(douban_info=info)
if meta:
logger.info(f"{meta.name} 豆瓣识别结果:{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.douban_id}")
else:
logger.info(f"{doubanid} 豆瓣识别结果:{mediainfo.type.value} "
f"{mediainfo.title_year}")
return mediainfo
else:
logger.info(f"{meta.name if meta else doubanid} 未匹配到豆瓣媒体信息")
return None
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与doubanid配套
:param doubanid: 豆瓣ID
:param cache: 是否使用缓存
:return: 识别的媒体信息,包括剧集信息
"""
return self._recognize_media_core(
meta=meta,
mtype=mtype,
doubanid=doubanid,
cache=cache,
douban_info_func=self.douban_info,
match_doubaninfo_func=self.match_doubaninfo,
**kwargs
)
async def async_recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
doubanid: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息(异步版本)
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与doubanid配套
:param doubanid: 豆瓣ID
:param cache: 是否使用缓存
:return: 识别的媒体信息,包括剧集信息
"""
return await self._async_recognize_media_core(
meta=meta,
mtype=mtype,
doubanid=doubanid,
cache=cache,
async_douban_info_func=self.async_douban_info,
async_match_doubaninfo_func=self.async_match_doubaninfo,
**kwargs
)
@rate_limit_exponential(source="douban_info")
def douban_info(self, doubanid: str, mtype: MediaType = None, raise_exception: bool = True) -> Optional[dict]:
"""
@@ -819,45 +970,49 @@ class DoubanModule(_ModuleBase):
}) for item in result.get('items') if name in item.get('target', {}).get('title')]
return []
@retry(Exception, 5, 3, 3, logger=logger)
@rate_limit_exponential(source="match_doubaninfo")
def match_doubaninfo(self, name: str, imdbid: str = None,
mtype: MediaType = None, year: str = None, season: int = None,
raise_exception: bool = False) -> dict:
@staticmethod
def _process_imdbid_result(result: dict, imdbid: str) -> Optional[dict]:
"""
搜索和匹配豆瓣信息
:param name: 名称
:param imdbid: IMDB ID
:param mtype: 类型
:param year: 年份
:param season: 季号
:param raise_exception: 触发速率限制时是否抛出异常
处理IMDBID查询结果
:param result: IMDBID查询返回的结果
:param imdbid: IMDB ID
:return: 处理后的结果None表示无结果
"""
if result:
doubanid = result.get("id")
if doubanid and not str(doubanid).isdigit():
doubanid = re.search(r"\d+", doubanid).group(0)
result["id"] = doubanid
logger.info(f"{imdbid} 查询到豆瓣信息:{result.get('title')}")
return result
return None
@staticmethod
def _process_search_results(result: dict, name: str, mtype: MediaType = None,
year: str = None, season: int = None) -> dict:
"""
处理搜索结果并进行匹配
:param result: 搜索返回的结果
:param name: 搜索名称
:param mtype: 媒体类型
:param year: 年份
:param season: 季号
:return: 匹配到的豆瓣信息
"""
if imdbid:
# 优先使用IMDBID查询
logger.info(f"开始使用IMDBID {imdbid} 查询豆瓣信息 ...")
result = self.doubanapi.imdbid(imdbid)
if result:
doubanid = result.get("id")
if doubanid and not str(doubanid).isdigit():
doubanid = re.search(r"\d+", doubanid).group(0)
result["id"] = doubanid
logger.info(f"{imdbid} 查询到豆瓣信息:{result.get('title')}")
return result
# 搜索
logger.info(f"开始使用名称 {name} 匹配豆瓣信息 ...")
result = self.doubanapi.search(f"{name} {year or ''}".strip())
if not result:
logger.warn(f"未找到 {name} 的豆瓣信息")
return {}
# 触发rate limit
# 触发rate limit检查
if "search_access_rate_limit" in result.values():
msg = f"触发豆瓣API速率限制错误信息{result} ..."
logger.warn(msg)
raise APIRateLimitException(msg)
if not result.get("items"):
logger.warn(f"未找到 {name} 的豆瓣信息")
return {}
for item_obj in result.get("items"):
type_name = item_obj.get("type_name")
if type_name not in [MediaType.TV.value, MediaType.MOVIE.value]:
@@ -881,6 +1036,60 @@ class DoubanModule(_ModuleBase):
return item
return {}
@retry(Exception, 5, 3, 3, logger=logger)
@rate_limit_exponential(source="match_doubaninfo")
def match_doubaninfo(self, name: str, imdbid: str = None,
mtype: MediaType = None, year: str = None, season: int = None,
raise_exception: bool = False) -> dict:
"""
搜索和匹配豆瓣信息
:param name: 名称
:param imdbid: IMDB ID
:param mtype: 类型
:param year: 年份
:param season: 季号
:param raise_exception: 触发速率限制时是否抛出异常
"""
if imdbid:
# 优先使用IMDBID查询
logger.info(f"开始使用IMDBID {imdbid} 查询豆瓣信息 ...")
result = self.doubanapi.imdbid(imdbid)
processed_result = self._process_imdbid_result(result, imdbid)
if processed_result:
return processed_result
# 搜索
logger.info(f"开始使用名称 {name} 匹配豆瓣信息 ...")
result = self.doubanapi.search(f"{name} {year or ''}".strip())
return self._process_search_results(result, name, mtype, year, season)
@retry(Exception, 5, 3, 3, logger=logger)
@rate_limit_exponential(source="match_doubaninfo")
async def async_match_doubaninfo(self, name: str, imdbid: str = None,
mtype: MediaType = None, year: str = None, season: int = None,
raise_exception: bool = False) -> dict:
"""
搜索和匹配豆瓣信息(异步版本)
:param name: 名称
:param imdbid: IMDB ID
:param mtype: 类型
:param year: 年份
:param season: 季号
:param raise_exception: 触发速率限制时是否抛出异常
"""
if imdbid:
# 优先使用IMDBID查询
logger.info(f"开始使用IMDBID {imdbid} 查询豆瓣信息 ...")
result = await self.doubanapi.async_imdbid(imdbid)
processed_result = self._process_imdbid_result(result, imdbid)
if processed_result:
return processed_result
# 搜索
logger.info(f"开始使用名称 {name} 匹配豆瓣信息 ...")
result = await self.doubanapi.async_search(f"{name} {year or ''}".strip())
return self._process_search_results(result, name, mtype, year, season)
def movie_top250(self, page: int = 1, count: int = 30) -> List[MediaInfo]:
"""
获取豆瓣电影TOP250
@@ -922,11 +1131,12 @@ class DoubanModule(_ModuleBase):
return None
return self.scraper.get_metadata_img(mediainfo=mediainfo, season=season, episode=episode)
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
@staticmethod
def _validate_douban_obtain_images_params(mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
验证豆瓣 obtain_images 参数
:param mediainfo: 媒体信息
:return: None 表示不处理MediaInfo 表示继续处理
"""
if settings.RECOGNIZE_SOURCE != "douban":
return None
@@ -935,22 +1145,66 @@ class DoubanModule(_ModuleBase):
if mediainfo.backdrop_path:
# 没有图片缺失
return mediainfo
# 调用图片接口
if not mediainfo.backdrop_path:
if mediainfo.type == MediaType.MOVIE:
info = self.doubanapi.movie_photos(mediainfo.douban_id)
else:
info = self.doubanapi.tv_photos(mediainfo.douban_id)
if not info:
return mediainfo
images = info.get("photos")
# 背景图
if images:
backdrop = images[0].get("image", {}).get("large") or {}
if backdrop:
mediainfo.backdrop_path = backdrop.get("url")
return None
@staticmethod
def _process_douban_images(mediainfo: MediaInfo, info: dict) -> MediaInfo:
"""
处理豆瓣图片数据
:param mediainfo: 媒体信息
:param info: 图片信息
:return: 更新后的媒体信息
"""
if not info:
return mediainfo
images = info.get("photos")
# 背景图
if images:
backdrop = images[0].get("image", {}).get("large") or {}
if backdrop:
mediainfo.backdrop_path = backdrop.get("url")
return mediainfo
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
# 验证参数
result = self._validate_douban_obtain_images_params(mediainfo)
if result is not None:
return result
# 调用图片接口
if mediainfo.type == MediaType.MOVIE:
info = self.doubanapi.movie_photos(mediainfo.douban_id)
else:
info = self.doubanapi.tv_photos(mediainfo.douban_id)
# 处理图片数据
return self._process_douban_images(mediainfo, info)
async def async_obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片(异步版本)
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
# 验证参数
result = self._validate_douban_obtain_images_params(mediainfo)
if result is not None:
return result
# 调用图片接口
if mediainfo.type == MediaType.MOVIE:
info = await self.doubanapi.async_movie_photos(mediainfo.douban_id)
else:
info = await self.doubanapi.async_tv_photos(mediainfo.douban_id)
# 处理图片数据
return self._process_douban_images(mediainfo, info)
def clear_cache(self):
"""
清除缓存
@@ -962,35 +1216,19 @@ class DoubanModule(_ModuleBase):
def douban_movie_credits(self, doubanid: str) -> List[schemas.MediaPerson]:
"""
根据TMDBID查询电影演职员表
根据豆瓣ID查询电影演职员表
:param doubanid: 豆瓣ID
"""
result = self.doubanapi.movie_celebrities(subject_id=doubanid)
if not result:
return []
ret_list = result.get("actors") or []
if ret_list:
# 更新豆瓣演员信息中的ID从URI中提取'douban://douban.com/celebrity/1316132?subject_id=27503705' subject_id
for doubaninfo in ret_list:
doubaninfo['id'] = doubaninfo.get('uri', '').split('?subject_id=')[-1]
return [schemas.MediaPerson(source='douban', **doubaninfo) for doubaninfo in ret_list]
return []
return self._process_celebrity_data(result)
def douban_tv_credits(self, doubanid: str) -> List[schemas.MediaPerson]:
"""
根据TMDBID查询电视剧演职员表
根据豆瓣ID查询电视剧演职员表
:param doubanid: 豆瓣ID
"""
result = self.doubanapi.tv_celebrities(subject_id=doubanid)
if not result:
return []
ret_list = result.get("actors") or []
if ret_list:
# 更新豆瓣演员信息中的ID从URI中提取'douban://douban.com/celebrity/1316132?subject_id=27503705' subject_id
for doubaninfo in ret_list:
doubaninfo['id'] = doubaninfo.get('uri', '').split('?subject_id=')[-1]
return [schemas.MediaPerson(source='douban', **doubaninfo) for doubaninfo in ret_list]
return []
return self._process_celebrity_data(result)
def douban_movie_recommend(self, doubanid: str) -> List[MediaInfo]:
"""
@@ -1056,3 +1294,102 @@ class DoubanModule(_ModuleBase):
works = collections.get("works")
return [MediaInfo(douban_info=work.get("subject")) for work in works]
return []
@staticmethod
def _process_celebrity_data(result: dict) -> List[schemas.MediaPerson]:
"""
处理演职员表数据的公共方法
:param result: API返回的演职员表数据
:return: 处理后的演员列表
"""
if not result:
return []
ret_list = result.get("actors") or []
if ret_list:
# 更新豆瓣演员信息中的ID从URI中提取'douban://douban.com/celebrity/1316132?subject_id=27503705' subject_id
for doubaninfo in ret_list:
doubaninfo['id'] = doubaninfo.get('uri', '').split('?subject_id=')[-1]
return [schemas.MediaPerson(source='douban', **doubaninfo) for doubaninfo in ret_list]
return []
async def async_douban_movie_credits(self, doubanid: str) -> List[schemas.MediaPerson]:
"""
根据豆瓣ID查询电影演职员表异步版本
:param doubanid: 豆瓣ID
"""
result = await self.doubanapi.async_movie_celebrities(subject_id=doubanid)
return self._process_celebrity_data(result)
async def async_douban_tv_credits(self, doubanid: str) -> List[schemas.MediaPerson]:
"""
根据豆瓣ID查询电视剧演职员表异步版本
:param doubanid: 豆瓣ID
"""
result = await self.doubanapi.async_tv_celebrities(subject_id=doubanid)
return self._process_celebrity_data(result)
async def async_douban_movie_recommend(self, doubanid: str) -> List[MediaInfo]:
"""
根据豆瓣ID查询推荐电影异步版本
:param doubanid: 豆瓣ID
"""
recommend = await self.doubanapi.async_movie_recommendations(subject_id=doubanid)
if recommend:
return [MediaInfo(douban_info=info) for info in recommend]
return []
async def async_douban_tv_recommend(self, doubanid: str) -> List[MediaInfo]:
"""
根据豆瓣ID查询推荐电视剧异步版本
:param doubanid: 豆瓣ID
"""
recommend = await self.doubanapi.async_tv_recommendations(subject_id=doubanid)
if recommend:
return [MediaInfo(douban_info=info) for info in recommend]
return []
async def async_douban_person_detail(self, person_id: int) -> schemas.MediaPerson:
"""
获取人物详细信息(异步版本)
:param person_id: 豆瓣人物ID
"""
detail = await self.doubanapi.async_person_detail(person_id)
if detail:
also_known_as = []
infos = detail.get("extra", {}).get("info")
if infos:
also_known_as = ["".join(info) for info in infos]
image = detail.get("cover_img", {}).get("url")
if image:
image = image.replace("/l/public/", "/s/public/")
return schemas.MediaPerson(source='douban', **{
"id": detail.get("id"),
"name": detail.get("title"),
"avatar": image,
"biography": detail.get("extra", {}).get("short_info"),
"also_known_as": also_known_as,
})
return schemas.MediaPerson(source='douban')
async def async_douban_person_credits(self, person_id: int, page: int = 1) -> List[MediaInfo]:
"""
根据豆瓣ID查询人物参演作品异步版本
:param person_id: 人物ID
:param page: 页码
"""
# 获取人物参演作品集
personinfo = await self.doubanapi.async_person_detail(person_id)
if not personinfo:
return []
collection_id = None
for module in personinfo.get("modules"):
if module.get("type") == "work_collections":
collection_id = module.get("payload", {}).get("id")
# 查询作品集内容
if collection_id:
collections = await self.doubanapi.async_person_work(subject_id=collection_id, start=(page - 1) * 20,
count=20)
if collections:
works = collections.get("works")
return [MediaInfo(douban_info=work.get("subject")) for work in works]
return []

View File

@@ -83,6 +83,279 @@ class TheMovieDbModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
@staticmethod
def _validate_recognize_params(meta: MetaBase, tmdbid: Optional[int]) -> bool:
"""
验证识别参数
"""
if not tmdbid and not meta:
return False
if meta and not tmdbid and settings.RECOGNIZE_SOURCE != "themoviedb":
return False
if meta and not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return False
return True
@staticmethod
def _prepare_search_names(meta: MetaBase) -> List[str]:
"""
准备搜索名称列表
"""
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
return list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
def _search_by_name(self, name: str, meta: MetaBase, group_seasons: List[dict]) -> dict:
"""
根据名称搜索媒体信息
"""
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
return self.tmdb.match_multi(name)
else:
if meta.type == MediaType.TV:
# 确定是电视
info = self.tmdb.match(name=name,
year=meta.year,
mtype=meta.type,
season_year=meta.year,
season_number=meta.begin_season,
group_seasons=group_seasons)
if not info:
# 去掉年份再查一次
info = self.tmdb.match(name=name, mtype=meta.type)
return info
else:
# 有年份先按电影查
info = self.tmdb.match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self.tmdb.match(name=name, year=meta.year, mtype=MediaType.TV,
group_seasons=group_seasons)
if not info:
# 去掉年份和类型再查一次
info = self.tmdb.match_multi(name=name)
return info
async def _async_search_by_name(self, name: str, meta: MetaBase, group_seasons: List[dict]) -> dict:
"""
根据名称搜索媒体信息(异步版本)
"""
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
return await self.tmdb.async_match_multi(name)
else:
if meta.type == MediaType.TV:
# 确定是电视
info = await self.tmdb.async_match(name=name,
year=meta.year,
mtype=meta.type,
season_year=meta.year,
season_number=meta.begin_season,
group_seasons=group_seasons)
if not info:
# 去掉年份再查一次
info = await self.tmdb.async_match(name=name, mtype=meta.type)
return info
else:
# 有年份先按电影查
info = await self.tmdb.async_match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = await self.tmdb.async_match(name=name, year=meta.year, mtype=MediaType.TV,
group_seasons=group_seasons)
if not info:
# 去掉年份和类型再查一次
info = await self.tmdb.async_match_multi(name=name)
return info
def _process_episode_groups(self, mediainfo: MediaInfo, episode_group: Optional[str],
group_seasons: List[dict]) -> MediaInfo:
"""
处理剧集组信息
"""
if mediainfo.type == MediaType.TV and mediainfo.episode_groups:
if group_seasons:
# 指定剧集组时
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一季时间
first_date = episodes[0].get("air_date")
if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
else:
# 每季年份
season_years = {}
for group in mediainfo.episode_groups:
if group.get('type') != 6:
# 只处理剧集部分
continue
group_episodes = self.tmdb.get_tv_group_seasons(group.get('id'))
if not group_episodes:
continue
for group_episode in group_episodes:
season = group_episode.get('order')
episodes = group_episode.get('episodes')
if not episodes:
continue
# 当前季第一季时间
first_date = episodes[0].get("air_date")
# 判断是不是日期格式
if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
if season_years:
mediainfo.season_years = season_years
return mediainfo
async def _async_process_episode_groups(self, mediainfo: MediaInfo, episode_group: Optional[str],
group_seasons: List[dict]) -> MediaInfo:
"""
处理剧集组信息(异步版本)
"""
if mediainfo.type == MediaType.TV and mediainfo.episode_groups:
if group_seasons:
# 指定剧集组时
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一季时间
first_date = episodes[0].get("air_date")
if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
else:
# 每季年份
season_years = {}
for group in mediainfo.episode_groups:
if group.get('type') != 6:
# 只处理剧集部分
continue
group_episodes = await self.tmdb.async_get_tv_group_seasons(group.get('id'))
if not group_episodes:
continue
for group_episode in group_episodes:
season = group_episode.get('order')
episodes = group_episode.get('episodes')
if not episodes:
continue
# 当前季第一季时间
first_date = episodes[0].get("air_date")
# 判断是不是日期格式
if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
if season_years:
mediainfo.season_years = season_years
return mediainfo
def _build_media_info_result(self, info: dict, meta: MetaBase, tmdbid: Optional[int],
episode_group: Optional[str], group_seasons: List[dict]) -> MediaInfo:
"""
构建MediaInfo结果
"""
# 确定二级分类
if info.get('media_type') == MediaType.TV:
cat = self.category.get_tv_category(info)
else:
cat = self.category.get_movie_category(info)
# 赋值TMDB信息并返回
mediainfo = MediaInfo(tmdb_info=info)
mediainfo.set_category(cat)
if meta:
logger.info(f"{meta.name} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.tmdb_id}")
else:
logger.info(f"{tmdbid} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year}")
# 处理剧集组信息
return self._process_episode_groups(mediainfo, episode_group, group_seasons)
async def _async_build_media_info_result(self, info: dict, meta: MetaBase, tmdbid: Optional[int],
episode_group: Optional[str], group_seasons: List[dict]) -> MediaInfo:
"""
构建MediaInfo结果异步版本
"""
# 确定二级分类
if info.get('media_type') == MediaType.TV:
cat = self.category.get_tv_category(info)
else:
cat = self.category.get_movie_category(info)
# 赋值TMDB信息并返回
mediainfo = MediaInfo(tmdb_info=info)
mediainfo.set_category(cat)
if meta:
logger.info(f"{meta.name} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.tmdb_id}")
else:
logger.info(f"{tmdbid} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year}")
# 处理剧集组信息
return await self._async_process_episode_groups(mediainfo, episode_group, group_seasons)
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
tmdbid: Optional[int] = None,
@@ -98,19 +371,13 @@ class TheMovieDbModule(_ModuleBase):
:param cache: 是否使用缓存
:return: 识别的媒体信息,包括剧集信息
"""
if not tmdbid and not meta:
return None
if meta and not tmdbid \
and settings.RECOGNIZE_SOURCE != "themoviedb":
# 验证参数
if not self._validate_recognize_params(meta, tmdbid):
return None
if not meta:
# 未提供元数据时直接使用tmdbid查询不使用缓存
cache_info = {}
elif not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return None
else:
# 读取缓存
if mtype:
@@ -132,50 +399,13 @@ class TheMovieDbModule(_ModuleBase):
# 直接查询详情
info = self.tmdb.get_info(mtype=mtype, tmdbid=tmdbid)
if not info and meta:
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, "zh-hans") if meta.cn_name else None
# 使用中英文名分别识别,去重去空,但要保持顺序
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
# 准备搜索名称
names = self._prepare_search_names(meta)
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
info = self.tmdb.match_multi(name)
else:
if meta.type == MediaType.TV:
# 确定是电视
info = self.tmdb.match(name=name,
year=meta.year,
mtype=meta.type,
season_year=meta.year,
season_number=meta.begin_season,
group_seasons=group_seasons)
if not info:
# 去掉年份再查一次
info = self.tmdb.match(name=name,
mtype=meta.type)
else:
# 有年份先按电影查
info = self.tmdb.match(name=name,
year=meta.year,
mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self.tmdb.match(name=name,
year=meta.year,
mtype=MediaType.TV,
group_seasons=group_seasons)
if not info:
# 去掉年份和类型再查一次
info = self.tmdb.match_multi(name=name)
info = self._search_by_name(name, meta, group_seasons)
if not info:
# 从网站查询
info = self.tmdb.match_web(name=name,
mtype=meta.type)
info = self.tmdb.match_web(name=name, mtype=meta.type)
if info:
# 查到就退出
break
@@ -201,78 +431,88 @@ class TheMovieDbModule(_ModuleBase):
info = None
if info:
# 确定二级分类
if info.get('media_type') == MediaType.TV:
cat = self.category.get_tv_category(info)
else:
cat = self.category.get_movie_category(info)
# 赋值TMDB信息并返回
mediainfo = MediaInfo(tmdb_info=info)
mediainfo.set_category(cat)
if meta:
logger.info(f"{meta.name} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.tmdb_id}")
else:
logger.info(f"{tmdbid} TMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year}")
return self._build_media_info_result(info, meta, tmdbid, episode_group, group_seasons)
else:
logger.info(f"{meta.name if meta else tmdbid} 未匹配到TMDB媒体信息")
# 使用剧集组的集信息和年份
if mediainfo.type == MediaType.TV and mediainfo.episode_groups:
if group_seasons:
# 指定剧集组时
seasons = {}
season_info = []
season_years = {}
for group_season in group_seasons:
# 季
season = group_season.get("order")
# 集列表
episodes = group_season.get("episodes")
if not episodes:
continue
seasons[season] = [ep.get("episode_number") for ep in episodes]
season_info.append(group_season)
# 当前季第一季时间
first_date = episodes[0].get("air_date")
if re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
# 每季集清单
if seasons:
mediainfo.seasons = seasons
mediainfo.number_of_seasons = len(seasons)
# 每季集详情
if season_info:
mediainfo.season_info = season_info
# 每季年份
if season_years:
mediainfo.season_years = season_years
# 所有剧集组
mediainfo.episode_group = episode_group
mediainfo.episode_groups = group_seasons
else:
# 每季年份
season_years = {}
for group in mediainfo.episode_groups:
if group.get('type') != 6:
# 只处理剧集部分
continue
group_episodes = self.tmdb.get_tv_group_seasons(group.get('id'))
if not group_episodes:
continue
for group_episode in group_episodes:
season = group_episode.get('order')
episodes = group_episode.get('episodes')
if not episodes:
continue
# 当前季第一季时间
first_date = episodes[0].get("air_date")
# 判断是不是日期格式
if first_date and re.match(r"^\d{4}-\d{2}-\d{2}$", first_date):
season_years[season] = str(first_date).split("-")[0]
if season_years:
mediainfo.season_years = season_years
return mediainfo
return None
async def async_recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
tmdbid: Optional[int] = None,
episode_group: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息(异步版本)
:param meta: 识别的元数据
:param mtype: 识别的媒体类型与tmdbid配套
:param tmdbid: tmdbid
:param episode_group: 剧集组
:param cache: 是否使用缓存
:return: 识别的媒体信息,包括剧集信息
"""
# 验证参数
if not self._validate_recognize_params(meta, tmdbid):
return None
if not meta:
# 未提供元数据时直接使用tmdbid查询不使用缓存
cache_info = {}
else:
# 读取缓存
if mtype:
meta.type = mtype
if tmdbid:
meta.tmdbid = tmdbid
cache_info = self.cache.get(meta)
# 查询剧集组
group_seasons = []
if episode_group:
group_seasons = await self.tmdb.async_get_tv_group_seasons(episode_group)
# 识别匹配
if not cache_info or not cache:
info = None
# 缓存没有或者强制不使用缓存
if tmdbid:
# 直接查询详情
info = await self.tmdb.async_get_info(mtype=mtype, tmdbid=tmdbid)
if not info and meta:
# 准备搜索名称
names = self._prepare_search_names(meta)
for name in names:
info = await self._async_search_by_name(name, meta, group_seasons)
if not info:
# 从网站查询
info = await self.tmdb.async_match_web(name=name, mtype=meta.type)
if info:
# 查到就退出
break
# 补充全量信息
if info and not info.get("genres"):
info = await self.tmdb.async_get_info(mtype=info.get("media_type"),
tmdbid=info.get("id"))
elif not info:
logger.error("识别媒体信息时未提供元数据或唯一且有效的tmdbid")
return None
# 保存到缓存
if meta:
self.cache.update(meta, info)
else:
# 使用缓存信息
if cache_info.get("title"):
logger.info(f"{meta.name} 使用TMDB识别缓存{cache_info.get('title')}")
info = await self.tmdb.async_get_info(mtype=cache_info.get("type"),
tmdbid=cache_info.get("id"))
else:
logger.info(f"{meta.name} 使用TMDB识别缓存无法识别")
info = None
if info:
return await self._async_build_media_info_result(info, meta, tmdbid, episode_group, group_seasons)
else:
logger.info(f"{meta.name if meta else tmdbid} 未匹配到TMDB媒体信息")
@@ -528,11 +768,12 @@ class TheMovieDbModule(_ModuleBase):
"""
self.cache.save()
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
@staticmethod
def _validate_obtain_images_params(mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
验证 obtain_images 参数
:param mediainfo: 媒体信息
:return: None 表示不处理MediaInfo 表示继续处理
"""
if settings.RECOGNIZE_SOURCE != "themoviedb":
return None
@@ -543,13 +784,16 @@ class TheMovieDbModule(_ModuleBase):
and mediainfo.backdrop_path:
# 没有图片缺失
return mediainfo
# 调用TMDB图片接口
if mediainfo.type == MediaType.MOVIE:
images = self.tmdb.get_movie_images(mediainfo.tmdb_id)
else:
images = self.tmdb.get_tv_images(mediainfo.tmdb_id)
if not images:
return mediainfo
return None
@staticmethod
def _process_tmdb_images(mediainfo: MediaInfo, images: dict) -> MediaInfo:
"""
处理 TMDB 图片数据
:param mediainfo: 媒体信息
:param images: 图片数据
:return: 更新后的媒体信息
"""
if isinstance(images, list):
images = images[0]
# 背景图
@@ -572,6 +816,50 @@ class TheMovieDbModule(_ModuleBase):
mediainfo.poster_path = posters[0].get("file_path")
return mediainfo
def obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
# 验证参数
result = self._validate_obtain_images_params(mediainfo)
if result is not None:
return result
# 调用TMDB图片接口
if mediainfo.type == MediaType.MOVIE:
images = self.tmdb.get_movie_images(mediainfo.tmdb_id)
else:
images = self.tmdb.get_tv_images(mediainfo.tmdb_id)
if not images:
return mediainfo
# 处理图片数据
return self._process_tmdb_images(mediainfo, images)
async def async_obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]:
"""
补充抓取媒体信息图片(异步版本)
:param mediainfo: 识别的媒体信息
:return: 更新后的媒体信息
"""
# 验证参数
result = self._validate_obtain_images_params(mediainfo)
if result is not None:
return result
# 调用TMDB图片接口
if mediainfo.type == MediaType.MOVIE:
images = await self.tmdb.async_get_movie_images(mediainfo.tmdb_id)
else:
images = await self.tmdb.async_get_tv_images(mediainfo.tmdb_id)
if not images:
return mediainfo
# 处理图片数据
return self._process_tmdb_images(mediainfo, images)
def obtain_specific_image(self, mediaid: Union[str, int], mtype: MediaType,
image_type: MediaImageType, image_prefix: Optional[str] = "w500",
season: Optional[int] = None, episode: Optional[int] = None) -> Optional[str]:

View File

@@ -11,7 +11,7 @@ from app.core.config import settings
from app.log import logger
from app.schemas import APIRateLimitException
from app.schemas.types import MediaType
from app.utils.http import RequestUtils
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.limit import rate_limit_exponential
from app.utils.string import StringUtils
from .tmdbv3api import TMDb, Search, Movie, TV, Season, Episode, Discover, Trending, Person, Collection
@@ -138,6 +138,250 @@ class TmdbApi:
return True
return False
# 公共方法
@staticmethod
def _validate_match_params(name: str, search_obj) -> bool:
"""
验证匹配方法的基本参数
"""
if not search_obj:
return False
if not name:
return False
return True
@staticmethod
def _generate_year_range(year: Optional[str]) -> List[Optional[str]]:
"""
生成年份范围用于匹配
"""
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
return year_range
@staticmethod
def _log_match_debug(mtype: MediaType, name: str, year: Optional[str] = None,
season_number: Optional[int] = None, season_year: Optional[str] = None):
"""
记录匹配调试日志
"""
if season_number and season_year:
logger.debug(f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
else:
logger.debug(f"正在识别{mtype.value}{name}, 年份={year} ...")
@staticmethod
def _set_media_type(info: dict, mtype: MediaType) -> dict:
"""
设置媒体类型
"""
if info:
info['media_type'] = mtype
return info
@staticmethod
def _sort_multi_results(multis: List[dict]) -> List[dict]:
"""
按年份降序排列搜索结果,电影在前面
"""
return sorted(
multis,
key=lambda x: ("1"
if x.get("media_type") == "movie"
else "0") + (x.get('release_date')
or x.get('first_air_date')
or '0000-00-00'),
reverse=True
)
@staticmethod
def _convert_media_type(ret_info: dict) -> dict:
"""
转换媒体类型为MediaType枚举
"""
if (ret_info
and not isinstance(ret_info.get("media_type"), MediaType)):
ret_info['media_type'] = MediaType.MOVIE if ret_info.get("media_type") == "movie" else MediaType.TV
return ret_info
def _match_multi_item(self, name: str, multi: dict, get_info_func) -> Optional[dict]:
"""
匹配单个多媒体搜索结果项
:param name: 查询名称
:param multi: 搜索结果项
:param get_info_func: 获取详细信息的函数(同步或异步)
:return: 匹配的结果或None
"""
if multi.get("media_type") == "movie":
if self.__compare_names(name, multi.get('title')) \
or self.__compare_names(name, multi.get('original_title')):
return multi
# 匹配别名、译名
if not multi.get("names"):
multi = get_info_func(mtype=MediaType.MOVIE, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
return multi
elif multi.get("media_type") == "tv":
if self.__compare_names(name, multi.get('name')) \
or self.__compare_names(name, multi.get('original_name')):
return multi
# 匹配别名、译名
if not multi.get("names"):
multi = get_info_func(mtype=MediaType.TV, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
return multi
return None
async def _async_match_multi_item(self, name: str, multi: dict) -> Optional[dict]:
"""
匹配单个多媒体搜索结果项(异步版本)
:param name: 查询名称
:param multi: 搜索结果项
:return: 匹配的结果或None
"""
if multi.get("media_type") == "movie":
if self.__compare_names(name, multi.get('title')) \
or self.__compare_names(name, multi.get('original_title')):
return multi
# 匹配别名、译名
if not multi.get("names"):
multi = await self.async_get_info(mtype=MediaType.MOVIE, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
return multi
elif multi.get("media_type") == "tv":
if self.__compare_names(name, multi.get('name')) \
or self.__compare_names(name, multi.get('original_name')):
return multi
# 匹配别名、译名
if not multi.get("names"):
multi = await self.async_get_info(mtype=MediaType.TV, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
return multi
return None
# match_web 公共方法
@staticmethod
def _validate_web_params(name: str) -> Optional[dict]:
"""
验证网站搜索参数
:return: None表示继续dict表示直接返回结果
"""
if not name:
return None
if StringUtils.is_chinese(name):
return {}
return None # 继续执行
@staticmethod
def _build_tmdb_search_url(name: str) -> str:
"""
构建TMDB搜索URL
"""
return "https://www.themoviedb.org/search?query=%s" % quote(name)
@staticmethod
def _validate_response(res) -> Optional[dict]:
"""
验证HTTP响应
:return: None表示继续dict表示直接返回结果Exception表示抛出异常
"""
if res is None:
return None
if res.status_code == 429:
raise APIRateLimitException("触发TheDbMovie网站限流获取媒体信息失败")
if res.status_code != 200:
return {}
return None # 继续执行
@staticmethod
def _extract_tmdb_links(html_text: str, mtype: MediaType) -> List[str]:
"""
从HTML文本中提取TMDB链接
"""
if not html_text:
return []
html = None
try:
tmdb_links = []
html = etree.HTML(html_text)
if mtype == MediaType.TV:
links = html.xpath("//a[@data-id and @data-media-type='tv']/@href")
else:
links = html.xpath("//a[@data-id]/@href")
for link in links:
if not link or (not link.startswith("/tv") and not link.startswith("/movie")):
continue
if link not in tmdb_links:
tmdb_links.append(link)
return tmdb_links
except Exception as err:
logger.error(f"解析TMDB网站HTML出错{str(err)}")
return []
finally:
if html is not None:
del html
@staticmethod
def _log_web_search_result(name: str, tmdbinfo: dict):
"""
记录网站搜索结果日志
"""
if tmdbinfo.get('media_type') == MediaType.MOVIE:
logger.info("%s 从WEB识别到 电影TMDBID=%s, 名称=%s, 上映日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('title'),
tmdbinfo.get('release_date')))
else:
logger.info("%s 从WEB识别到 电视剧TMDBID=%s, 名称=%s, 首播日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('name'),
tmdbinfo.get('first_air_date')))
def _process_web_search_links(self, name: str, mtype: MediaType,
tmdb_links: List[str], get_info_func) -> Optional[dict]:
"""
处理网站搜索得到的链接
"""
if len(tmdb_links) == 1:
tmdbinfo = get_info_func(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=tmdb_links[0].split("/")[-1])
if tmdbinfo:
if mtype == MediaType.TV and tmdbinfo.get('media_type') != MediaType.TV:
return {}
self._log_web_search_result(name, tmdbinfo)
return tmdbinfo
elif len(tmdb_links) > 1:
logger.info("%s TMDB网站返回数据过多%s" % (name, len(tmdb_links)))
else:
logger.info("%s TMDB网站未查询到媒体信息" % name)
return {}
async def _async_process_web_search_links(self, name: str,
mtype: MediaType, tmdb_links: List[str]) -> Optional[dict]:
"""
处理网站搜索得到的链接(异步版本)
"""
if len(tmdb_links) == 1:
tmdbinfo = await self.async_get_info(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=int(tmdb_links[0].split("/")[-1]))
if tmdbinfo:
if mtype == MediaType.TV and tmdbinfo.get('media_type') != MediaType.TV:
return {}
self._log_web_search_result(name, tmdbinfo)
return tmdbinfo
elif len(tmdb_links) > 1:
logger.info("%s TMDB网站返回数据过多%s" % (name, len(tmdb_links)))
else:
logger.info("%s TMDB网站未查询到媒体信息" % name)
return {}
@staticmethod
def __get_names(tmdb_info: dict) -> List[str]:
"""
@@ -188,47 +432,36 @@ class TmdbApi:
:param group_seasons: 集数组信息
:return: TMDB的INFO同时会将mtype赋值到media_type中
"""
if not self.search:
return None
if not name:
# 基本参数验证
if not self._validate_match_params(name, self.search):
return None
# TMDB搜索
info = {}
if mtype != MediaType.TV:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = self.__search_movie_by_name(name, year)
year_range = self._generate_year_range(year)
for search_year in year_range:
self._log_match_debug(mtype, name, search_year)
info = self.__search_movie_by_name(name, search_year)
if info:
info['media_type'] = MediaType.MOVIE
break
info = self._set_media_type(info, MediaType.MOVIE)
else:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
logger.debug(
f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
self._log_match_debug(mtype, name, season_year, season_number, season_year)
info = self.__search_tv_by_season(name,
season_year,
season_number,
group_seasons)
if not info:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = self.__search_tv_by_name(name, year)
year_range = self._generate_year_range(year)
for search_year in year_range:
self._log_match_debug(mtype, name, search_year)
info = self.__search_tv_by_name(name, search_year)
if info:
break
if info:
info['media_type'] = MediaType.TV
# 返回
info = self._set_media_type(info, MediaType.TV)
return info
def __search_movie_by_name(self, name: str, year: str) -> Optional[dict]:
@@ -454,51 +687,24 @@ class TmdbApi:
print(traceback.format_exc())
return None
logger.debug(f"API返回{str(self.search.total_results)}")
# 返回结果
ret_info = {}
if (multis is None) or (len(multis) == 0):
logger.debug(f"{name} 未找到相关媒体息!")
return {}
else:
# 按年份降序排列,电影在前面
multis = sorted(
multis,
key=lambda x: ("1"
if x.get("media_type") == "movie"
else "0") + (x.get('release_date')
or x.get('first_air_date')
or '0000-00-00'),
reverse=True
)
for multi in multis:
if multi.get("media_type") == "movie":
if self.__compare_names(name, multi.get('title')) \
or self.__compare_names(name, multi.get('original_title')):
ret_info = multi
break
# 匹配别名、译名
if not multi.get("names"):
multi = self.get_info(mtype=MediaType.MOVIE, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
ret_info = multi
break
elif multi.get("media_type") == "tv":
if self.__compare_names(name, multi.get('name')) \
or self.__compare_names(name, multi.get('original_name')):
ret_info = multi
break
# 匹配别名、译名
if not multi.get("names"):
multi = self.get_info(mtype=MediaType.TV, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
ret_info = multi
break
# 类型变更
if (ret_info
and not isinstance(ret_info.get("media_type"), MediaType)):
ret_info['media_type'] = MediaType.MOVIE if ret_info.get("media_type") == "movie" else MediaType.TV
return ret_info
# 按年份降序排列,电影在前面
multis = self._sort_multi_results(multis)
ret_info = {}
for multi in multis:
matched = self._match_multi_item(name, multi, self.get_info)
if matched:
ret_info = matched
break
# 类型变更
return self._convert_media_type(ret_info)
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@rate_limit_exponential(source="match_tmdb_web", base_wait=5, max_wait=1800, enable_logging=True)
@@ -508,66 +714,28 @@ class TmdbApi:
:param name: 名称
:param mtype: 媒体类型
"""
if not name:
return None
if StringUtils.is_chinese(name):
return {}
# 参数验证
validation_result = self._validate_web_params(name)
if validation_result is not None:
return validation_result
logger.info("正在从TheDbMovie网站查询%s ..." % name)
tmdb_url = "https://www.themoviedb.org/search?query=%s" % quote(name)
tmdb_url = self._build_tmdb_search_url(name)
res = RequestUtils(timeout=5, ua=settings.NORMAL_USER_AGENT, proxies=settings.PROXY).get_res(url=tmdb_url)
if res is None:
return None
if res.status_code == 429:
raise APIRateLimitException("触发TheDbMovie网站限流获取媒体信息失败")
if res.status_code != 200:
return {}
html = None
html_text = res.text
if not html_text:
return {}
# 响应验证
response_result = self._validate_response(res)
if response_result is not None:
return response_result
try:
tmdb_links = []
html = etree.HTML(html_text)
if mtype == MediaType.TV:
links = html.xpath("//a[@data-id and @data-media-type='tv']/@href")
else:
links = html.xpath("//a[@data-id]/@href")
for link in links:
if not link or (not link.startswith("/tv") and not link.startswith("/movie")):
continue
if link not in tmdb_links:
tmdb_links.append(link)
if len(tmdb_links) == 1:
tmdbinfo = self.get_info(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=tmdb_links[0].split("/")[-1])
if tmdbinfo:
if mtype == MediaType.TV and tmdbinfo.get('media_type') != MediaType.TV:
return {}
if tmdbinfo.get('media_type') == MediaType.MOVIE:
logger.info("%s 从WEB识别到 电影TMDBID=%s, 名称=%s, 上映日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('title'),
tmdbinfo.get('release_date')))
else:
logger.info("%s 从WEB识别到 电视剧TMDBID=%s, 名称=%s, 首播日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('name'),
tmdbinfo.get('first_air_date')))
return tmdbinfo
elif len(tmdb_links) > 1:
logger.info("%s TMDB网站返回数据过多%s" % (name, len(tmdb_links)))
else:
logger.info("%s TMDB网站未查询到媒体信息" % name)
return {}
# 提取链接
tmdb_links = self._extract_tmdb_links(res.text, mtype)
# 处理结果
return self._process_web_search_links(name, mtype, tmdb_links, self.get_info)
except Exception as err:
logger.error(f"从TheDbMovie网站查询出错{str(err)}")
return {}
finally:
if html is not None:
del html
def get_info(self,
mtype: MediaType,
@@ -1648,6 +1816,38 @@ class TmdbApi:
return None
# 公共异步方法
@cached(maxsize=settings.CONF.tmdb, ttl=settings.CONF.meta)
@rate_limit_exponential(source="match_tmdb_web", base_wait=5, max_wait=1800, enable_logging=True)
async def async_match_web(self, name: str, mtype: MediaType) -> Optional[dict]:
"""
搜索TMDB网站直接抓取结果结果只有一条时才返回异步版本
:param name: 名称
:param mtype: 媒体类型
"""
# 参数验证
validation_result = self._validate_web_params(name)
if validation_result is not None:
return validation_result
logger.info("正在从TheDbMovie网站查询%s ..." % name)
tmdb_url = self._build_tmdb_search_url(name)
res = await AsyncRequestUtils(timeout=5, ua=settings.NORMAL_USER_AGENT, proxies=settings.PROXY).get_res(
url=tmdb_url)
# 响应验证
response_result = self._validate_response(res)
if response_result is not None:
return response_result
try:
# 提取链接
tmdb_links = self._extract_tmdb_links(res.text, mtype)
# 处理结果
return await self._async_process_web_search_links(name, mtype, tmdb_links)
except Exception as err:
logger.error(f"从TheDbMovie网站查询出错{str(err)}")
return {}
async def async_search_multiis(self, title: str) -> List[dict]:
"""
同时查询模糊匹配的电影、电视剧TMDB信息异步版本
@@ -1776,47 +1976,36 @@ class TmdbApi:
:param group_seasons: 集数组信息
:return: TMDB的INFO同时会将mtype赋值到media_type中
"""
if not self.search:
return None
if not name:
# 基本参数验证
if not self._validate_match_params(name, self.search):
return None
# TMDB搜索
info = {}
if mtype != MediaType.TV:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = await self.__async_search_movie_by_name(name, year)
year_range = self._generate_year_range(year)
for search_year in year_range:
self._log_match_debug(mtype, name, search_year)
info = await self.__async_search_movie_by_name(name, search_year)
if info:
info['media_type'] = MediaType.MOVIE
break
info = self._set_media_type(info, MediaType.MOVIE)
else:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
logger.debug(
f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
self._log_match_debug(mtype, name, season_year, season_number, season_year)
info = await self.__async_search_tv_by_season(name,
season_year,
season_number,
group_seasons)
if not info:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = await self.__async_search_tv_by_name(name, year)
year_range = self._generate_year_range(year)
for search_year in year_range:
self._log_match_debug(mtype, name, search_year)
info = await self.__async_search_tv_by_name(name, search_year)
if info:
break
if info:
info['media_type'] = MediaType.TV
# 返回
info = self._set_media_type(info, MediaType.TV)
return info
async def async_match_multi(self, name: str) -> Optional[dict]:
@@ -1835,51 +2024,24 @@ class TmdbApi:
print(traceback.format_exc())
return None
logger.debug(f"API返回{str(self.search.total_results)}")
# 返回结果
ret_info = {}
if (multis is None) or (len(multis) == 0):
logger.debug(f"{name} 未找到相关媒体息!")
return {}
else:
# 按年份降序排列,电影在前面
multis = sorted(
multis,
key=lambda x: ("1"
if x.get("media_type") == "movie"
else "0") + (x.get('release_date')
or x.get('first_air_date')
or '0000-00-00'),
reverse=True
)
for multi in multis:
if multi.get("media_type") == "movie":
if self.__compare_names(name, multi.get('title')) \
or self.__compare_names(name, multi.get('original_title')):
ret_info = multi
break
# 匹配别名、译名
if not multi.get("names"):
multi = await self.async_get_info(mtype=MediaType.MOVIE, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
ret_info = multi
break
elif multi.get("media_type") == "tv":
if self.__compare_names(name, multi.get('name')) \
or self.__compare_names(name, multi.get('original_name')):
ret_info = multi
break
# 匹配别名、译名
if not multi.get("names"):
multi = await self.async_get_info(mtype=MediaType.TV, tmdbid=multi.get("id"))
if multi and self.__compare_names(name, multi.get("names")):
ret_info = multi
break
# 类型变更
if (ret_info
and not isinstance(ret_info.get("media_type"), MediaType)):
ret_info['media_type'] = MediaType.MOVIE if ret_info.get("media_type") == "movie" else MediaType.TV
return ret_info
# 按年份降序排列,电影在前面
multis = self._sort_multi_results(multis)
ret_info = {}
for multi in multis:
matched = await self._async_match_multi_item(name, multi)
if matched:
ret_info = matched
break
# 类型变更
return self._convert_media_type(ret_info)
async def async_get_info(self,
mtype: MediaType,