feat: add subtitle search functionality and related data handling

This commit is contained in:
jxxghp
2026-06-09 06:46:26 +08:00
parent 738d92445a
commit e3c5a94c52
12 changed files with 1105 additions and 36 deletions

View File

@@ -1,7 +1,7 @@
from datetime import datetime
from typing import List, Optional, Tuple, Union
from app.core.context import TorrentInfo
from app.core.context import SubtitleInfo, TorrentInfo
from app.db.site_oper import SiteOper
from app.helper.module import ModuleHelper
from app.helper.sites import SitesHelper # noqa
@@ -160,6 +160,24 @@ class IndexerModule(_ModuleBase):
site_downloader=site.get("downloader"),
**result) for result in result_array]
@staticmethod
def __parse_subtitle_result(site: dict, result_array: list, seconds: int) -> List[SubtitleInfo]:
"""
解析字幕搜索结果为 SubtitleInfo 对象。
"""
if not result_array or len(result_array) == 0:
logger.warn(f"{site.get('name')} 未搜索到字幕,耗时 {seconds}")
return []
logger.info(
f"{site.get('name')} 字幕搜索完成,耗时 {seconds} 秒,返回数据:{len(result_array)}")
return [SubtitleInfo(site=site.get("id"),
site_name=site.get("name"),
site_cookie=site.get("cookie"),
site_ua=site.get("ua"),
site_proxy=site.get("proxy"),
site_order=site.get("pri"),
**result) for result in result_array]
@staticmethod
def get_search_page_size(site: dict, keyword: Optional[str] = None) -> Optional[int]:
"""
@@ -270,6 +288,47 @@ class IndexerModule(_ModuleBase):
seconds=seconds
)
def search_subtitles(self, site: dict,
keyword: str = None,
page: Optional[int] = 0) -> List[SubtitleInfo]:
"""
搜索一个站点的字幕资源。
:param site: 站点
:param keyword: 搜索关键词
:param page: 页码
:return: 字幕列表
"""
result = []
start_time = datetime.now()
error_flag = False
if not site.get("subtitles"):
return []
if not self.__search_check(site, keyword):
return []
search_word = self.__clear_search_text(keyword)
try:
error_flag, result = self.__spider_search(
search_word=search_word,
indexer=site,
page=page,
search_type="subtitles"
)
except Exception as err:
logger.error(f"{site.get('name')} 字幕搜索出错:{str(err)}")
seconds = (datetime.now() - start_time).seconds
self.__indexer_statistic(site=site, error_flag=error_flag, seconds=seconds)
return self.__parse_subtitle_result(
site=site,
result_array=result,
seconds=seconds
)
async def async_search_torrents(self, site: dict,
keyword: str = None,
mtype: MediaType = None,
@@ -365,12 +424,54 @@ class IndexerModule(_ModuleBase):
seconds=seconds
)
async def async_search_subtitles(self, site: dict,
keyword: str = None,
page: Optional[int] = 0) -> List[SubtitleInfo]:
"""
异步搜索一个站点的字幕资源。
:param site: 站点
:param keyword: 搜索关键词
:param page: 页码
:return: 字幕列表
"""
result = []
start_time = datetime.now()
error_flag = False
if not site.get("subtitles"):
return []
if not self.__search_check(site, keyword):
return []
search_word = self.__clear_search_text(keyword)
try:
error_flag, result = await self.__async_spider_search(
search_word=search_word,
indexer=site,
page=page,
search_type="subtitles"
)
except Exception as err:
logger.error(f"{site.get('name')} 字幕搜索出错:{str(err)}")
seconds = (datetime.now() - start_time).seconds
await self.__async_indexer_statistic(site=site, error_flag=error_flag, seconds=seconds)
return self.__parse_subtitle_result(
site=site,
result_array=result,
seconds=seconds
)
@staticmethod
def __spider_search(indexer: dict,
search_word: Optional[str] = None,
mtype: MediaType = None,
cat: Optional[str] = None,
page: Optional[int] = 0) -> Tuple[bool, List[dict]]:
page: Optional[int] = 0,
search_type: Optional[str] = "torrents") -> Tuple[bool, List[dict]]:
"""
根据关键字搜索单个站点
:param: indexer: 站点配置
@@ -385,7 +486,8 @@ class IndexerModule(_ModuleBase):
keyword=search_word,
mtype=mtype,
cat=cat,
page=page)
page=page,
search_type=search_type)
try:
return _spider.is_error, _spider.get_torrents()
@@ -397,7 +499,8 @@ class IndexerModule(_ModuleBase):
search_word: Optional[str] = None,
mtype: MediaType = None,
cat: Optional[str] = None,
page: Optional[int] = 0) -> Tuple[bool, List[dict]]:
page: Optional[int] = 0,
search_type: Optional[str] = "torrents") -> Tuple[bool, List[dict]]:
"""
异步根据关键字搜索单个站点
:param: indexer: 站点配置
@@ -412,7 +515,8 @@ class IndexerModule(_ModuleBase):
keyword=search_word,
mtype=mtype,
cat=cat,
page=page)
page=page,
search_type=search_type)
try:
result = await _spider.async_get_torrents()

View File

@@ -43,7 +43,8 @@ class SiteSpider:
mtype: MediaType = None,
cat: Optional[str] = None,
page: Optional[int] = 0,
referer: Optional[str] = None):
referer: Optional[str] = None,
search_type: Optional[str] = "torrents"):
"""
设置查询参数
:param indexer: 索引器
@@ -58,20 +59,32 @@ class SiteSpider:
self.keyword = keyword
self.cat = cat
self.mtype = mtype
self.search_type = search_type or "torrents"
self.indexerid = indexer.get('id')
self.indexername = indexer.get('name')
self.search = indexer.get('search')
self.batch = indexer.get('batch')
self.browse = indexer.get('browse')
self.category = indexer.get('category')
self.list = indexer.get('torrents').get('list', {})
self.fields = indexer.get('torrents').get('fields')
if not keyword and self.browse:
self.list = self.browse.get('list') or self.list
self.fields = self.browse.get('fields') or self.fields
if self.search_type == "subtitles":
subtitle_conf = indexer.get('subtitles') or {}
self.search = subtitle_conf.get('search')
self.batch = subtitle_conf.get('batch')
self.browse = subtitle_conf.get('browse')
self.category = subtitle_conf.get('category')
self.list = subtitle_conf.get('list') or {}
self.fields = subtitle_conf.get('fields') or {}
result_num = subtitle_conf.get('result_num') or indexer.get('result_num')
else:
self.search = indexer.get('search')
self.batch = indexer.get('batch')
self.browse = indexer.get('browse')
self.category = indexer.get('category')
self.list = (indexer.get('torrents') or {}).get('list', {})
self.fields = (indexer.get('torrents') or {}).get('fields') or {}
if not keyword and self.browse:
self.list = self.browse.get('list') or self.list
self.fields = self.browse.get('fields') or self.fields
result_num = indexer.get('result_num')
self._field_templates = self.__build_field_templates()
self.domain = indexer.get('domain')
self.result_num = int(indexer.get('result_num') or self.default_result_num())
self.result_num = int(result_num or self.default_result_num())
self._timeout = int(indexer.get('timeout') or 15)
self.page = page
if self.domain and not str(self.domain).endswith("/"):
@@ -399,6 +412,30 @@ class SiteSpider:
else:
self.torrents_info['enclosure'] = download_link
def __get_report_url(self, torrent: Any):
"""
获取字幕举报页面链接。
"""
if 'report' not in self.fields:
return
selector = self.fields.get('report', {})
item = self._safe_query(torrent, selector)
report_link = self.__filter_text(item, selector.get('filters'))
if report_link:
self.torrents_info['report_url'] = self.__normalize_link(report_link)
def __get_language_icon(self, torrent: Any):
"""
获取字幕语言图标链接。
"""
if 'language_icon' not in self.fields:
return
selector = self.fields.get('language_icon', {})
item = self._safe_query(torrent, selector)
icon_link = self.__filter_text(item, selector.get('filters'))
if icon_link:
self.torrents_info['language_icon'] = self.__normalize_link(icon_link)
def __get_imdbid(self, torrent: Any):
# imdbid
if "imdbid" not in self.fields:
@@ -600,6 +637,49 @@ class SiteSpider:
else:
self.torrents_info['category'] = MediaType.UNKNOWN.value
def __get_subtitle_field(self, torrent: Any, field_name: str):
"""
按配置读取字幕字段。
"""
selector = self.fields.get(field_name, {})
if not selector:
return
item = self._safe_query(torrent, selector)
value = self.__filter_text(item, selector.get('filters'))
if value is not None:
self.torrents_info[field_name] = value
def __fill_subtitle_ids(self):
"""
从字幕下载链接中补充站点种子ID和字幕ID。
"""
enclosure = self.torrents_info.get("enclosure")
if not enclosure:
return
query_params = parse_qs(urlparse(enclosure).query)
if not self.torrents_info.get("torrent_id"):
torrent_id = query_params.get("torrentid") or query_params.get("torrent_id")
if torrent_id:
self.torrents_info["torrent_id"] = torrent_id[0]
if not self.torrents_info.get("subtitle_id"):
subtitle_id = query_params.get("subid") or query_params.get("subtitle")
if subtitle_id:
self.torrents_info["subtitle_id"] = subtitle_id[0]
def __normalize_link(self, link: Optional[str]) -> Optional[str]:
"""
将站点相对链接转换为绝对链接。
"""
if not link:
return None
if not link.startswith("http"):
if link.startswith("//"):
return self.domain.split(":")[0] + ":" + link
if link.startswith("/"):
return self.domain + link[1:]
return self.domain + link
return link
def _safe_query(self, torrent: Any, selector_config: Optional[dict]) -> Optional[str]:
"""
安全地执行PyQuery查询并自动清理资源
@@ -672,6 +752,34 @@ class SiteSpider:
finally:
self.torrents_info.clear()
def get_subtitle_info(self, subtitle: Any) -> dict:
"""
解析单条字幕数据。
"""
self.torrents_info = {}
try:
self.__get_title(subtitle)
self.__get_description(subtitle)
self.__get_detail(subtitle)
self.__get_download(subtitle)
self.__get_size(subtitle)
self.__get_pubdate(subtitle)
self.__get_date_elapsed(subtitle)
self.__get_grabs(subtitle)
self.__get_language_icon(subtitle)
self.__get_report_url(subtitle)
for field_name in (
"language", "uploader", "torrent_id", "subtitle_id", "file_name"
):
self.__get_subtitle_field(subtitle, field_name)
self.__fill_subtitle_ids()
return self.torrents_info.copy() if self.torrents_info else {}
except Exception as err:
logger.error("%s 字幕搜索出现错误:%s" % (self.indexername, str(err)))
return {}
finally:
self.torrents_info.clear()
@staticmethod
def __filter_text(text: Optional[str], filters: Optional[List[dict]]) -> str:
"""
@@ -758,16 +866,17 @@ class SiteSpider:
self.is_error = True
return []
rust_torrents = rust_accel.parse_indexer_torrents(
html_text=html_text,
domain=self.domain,
list_config=self.list,
fields=self.fields,
category=self.category,
result_num=self.result_num
)
if rust_torrents is not None:
return rust_torrents
if self.search_type != "subtitles":
rust_torrents = rust_accel.parse_indexer_torrents(
html_text=html_text,
domain=self.domain,
list_config=self.list,
fields=self.fields,
category=self.category,
result_num=self.result_num
)
if rust_torrents is not None:
return rust_torrents
# 清空旧结果
self.torrents_info_array = []
@@ -785,7 +894,10 @@ class SiteSpider:
torrent_query = PyQuery(torn)
try:
# 直接获取种子信息,避免深拷贝
torrent_info = self.get_info(torrent_query)
if self.search_type == "subtitles":
torrent_info = self.get_subtitle_info(torrent_query)
else:
torrent_info = self.get_info(torrent_query)
if torrent_info:
# 浅拷贝即可,减少内存使用
self.torrents_info_array.append(torrent_info)