fix: stop torrent search paging after short pages

This commit is contained in:
jxxghp
2026-05-19 17:54:39 +08:00
parent cdddd8e080
commit 7866aee1de
12 changed files with 497 additions and 82 deletions

View File

@@ -922,6 +922,18 @@ class ChainBase(metaclass=ABCMeta):
"""
return await self.async_run_module("async_search_collections", name=name)
def get_search_page_size(
self,
site: dict,
keyword: Optional[str] = None,
) -> Optional[int]:
"""
获取站点搜索单页容量;返回 None 表示当前搜索入口不支持可靠翻页。
"""
return self.run_module(
"get_search_page_size", site=site, keyword=keyword
)
def search_torrents(
self,
site: dict,

View File

@@ -4,7 +4,7 @@ import json
import random
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, as_completed, wait
from datetime import datetime
from typing import AsyncIterator, Any, Dict, Tuple
from typing import List, Optional
@@ -68,6 +68,14 @@ class SearchChain(ChainBase):
start_page = max(start_page, 0)
return list(range(start_page, start_page + cls._get_search_resource_pages()))
def _should_continue_search_pages(self, site: dict, page_results: Optional[List[Any]],
keyword: Optional[str] = None) -> bool:
"""
判断是否继续抓取下一页;少于站点单页容量时视为当前站点已到末页。
"""
page_size = self.get_search_page_size(site=site, keyword=keyword)
return page_size is not None and len(page_results or []) >= page_size
@property
def is_ai_recommend_enabled(self) -> bool:
"""
@@ -1283,35 +1291,62 @@ class SearchChain(ChainBase):
text=f"开始搜索,共 {len(indexer_sites)} 个站点,{len(search_pages)} 页 ...")
# 结果集
results = []
# 多页搜索会放大请求数,线程池仍按系统线程池配置做上限,避免瞬时打满站点
max_workers = min(total_num, settings.CONF.threadpool or total_num)
# 同一站点按页顺序抓取,避免空页后仍继续请求该站点的后续页
max_workers = min(len(indexer_sites), settings.CONF.threadpool or len(indexer_sites))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
all_task = []
pending_tasks = {}
def submit_site_page(site: dict, page_index: int):
"""
提交单个站点页搜索任务,并记录该任务对应的站点和页码位置。
"""
search_page = search_pages[page_index]
search_keyword = mediainfo.imdb_id if area == "imdbid" and mediainfo else keyword
if area == "imdbid":
# 搜索IMDBID
task = executor.submit(self.search_torrents, site=site,
keyword=search_keyword,
mtype=mediainfo.type if mediainfo else None,
page=search_page)
else:
# 搜索标题
task = executor.submit(self.search_torrents, site=site,
keyword=search_keyword,
mtype=mediainfo.type if mediainfo else None,
page=search_page)
pending_tasks[task] = (site, page_index, search_page, search_keyword)
for site in indexer_sites:
for search_page in search_pages:
if area == "imdbid":
# 搜索IMDBID
task = executor.submit(self.search_torrents, site=site,
keyword=mediainfo.imdb_id if mediainfo else None,
mtype=mediainfo.type if mediainfo else None,
page=search_page)
else:
# 搜索标题
task = executor.submit(self.search_torrents, site=site,
keyword=keyword,
mtype=mediainfo.type if mediainfo else None,
page=search_page)
all_task.append(task)
for future in as_completed(all_task):
if global_vars.is_system_stopped:
break
finish_count += 1
result = future.result()
if result:
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ...")
submit_site_page(site=site, page_index=0)
try:
while pending_tasks:
if global_vars.is_system_stopped:
break
done_tasks, _ = wait(pending_tasks, return_when=FIRST_COMPLETED)
for future in done_tasks:
site, page_index, search_page, search_keyword = pending_tasks.pop(future)
finish_count += 1
result = future.result()
if result:
results.extend(result)
if (
self._should_continue_search_pages(
site=site, page_results=result, keyword=search_keyword
)
and page_index + 1 < len(search_pages)
):
submit_site_page(site=site, page_index=page_index + 1)
else:
logger.debug(
f"{site.get('name')}{search_page} 页返回 {len(result or [])} 条,停止继续翻页"
)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ...")
finally:
for task in pending_tasks:
task.cancel()
# 计算耗时
end_time = datetime.now()
# 更新进度
@@ -1372,7 +1407,7 @@ class SearchChain(ChainBase):
async def search_site_page(site: dict, search_page: int) -> List[TorrentInfo]:
"""
控制单次站点页请求的并发量,避免多页搜索把所有请求一次性打出去
控制单次站点页请求的并发量,并返回该页的资源列表
"""
async with semaphore:
if area == "imdbid":
@@ -1387,23 +1422,54 @@ class SearchChain(ChainBase):
mtype=mediainfo.type if mediainfo else None,
page=search_page)
# 创建异步任务列表
tasks = []
for site in indexer_sites:
for search_page in search_pages:
tasks.append(search_site_page(site, search_page))
pending_tasks = {}
# 使用asyncio.as_completed来处理并发任务
for future in asyncio.as_completed(tasks):
if global_vars.is_system_stopped:
break
finish_count += 1
result = await future
if result:
results.extend(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ...")
def submit_site_page(site: dict, page_index: int):
"""
提交异步站点页搜索任务,并记录该任务对应的站点和页码位置。
"""
search_page = search_pages[page_index]
search_keyword = mediainfo.imdb_id if area == "imdbid" and mediainfo else keyword
task = asyncio.create_task(search_site_page(site=site, search_page=search_page))
pending_tasks[task] = (site, page_index, search_page, search_keyword)
for site in indexer_sites:
submit_site_page(site=site, page_index=0)
try:
while pending_tasks:
if global_vars.is_system_stopped:
break
done_tasks, _ = await asyncio.wait(
pending_tasks.keys(),
return_when=asyncio.FIRST_COMPLETED,
)
for future in done_tasks:
site, page_index, search_page, search_keyword = pending_tasks.pop(future)
finish_count += 1
result = await future
if result:
results.extend(result)
if (
self._should_continue_search_pages(
site=site, page_results=result, keyword=search_keyword
)
and page_index + 1 < len(search_pages)
):
submit_site_page(site=site, page_index=page_index + 1)
else:
logger.debug(
f"{site.get('name')}{search_page} 页返回 {len(result or [])} 条,停止继续翻页"
)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress.update(value=finish_count / total_num * 100,
text=f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ...")
finally:
for task in pending_tasks:
if not task.done():
task.cancel()
if pending_tasks:
await asyncio.gather(*pending_tasks.keys(), return_exceptions=True)
# 计算耗时
end_time = datetime.now()
@@ -1471,7 +1537,10 @@ class SearchChain(ChainBase):
semaphore = asyncio.Semaphore(settings.CONF.threadpool or total_num)
async def search_site(site: dict, search_page: int) -> Tuple[dict, int, List[TorrentInfo]]:
async def search_site(site: dict, search_page: int) -> List[TorrentInfo]:
"""
搜索单个站点页,用于渐进式返回入口。
"""
async with semaphore:
if area == "imdbid":
site_result = await self.async_search_torrents(site=site,
@@ -1483,42 +1552,70 @@ class SearchChain(ChainBase):
keyword=keyword,
mtype=mediainfo.type if mediainfo else None,
page=search_page)
return site, search_page, site_result or []
return site_result or []
tasks = {}
def submit_site_page(site: dict, page_index: int):
"""
提交渐进式站点页搜索任务,并保留站点和页码上下文。
"""
search_page = search_pages[page_index]
search_keyword = mediainfo.imdb_id if area == "imdbid" and mediainfo else keyword
task = asyncio.create_task(search_site(site=site, search_page=search_page))
tasks[task] = (site, page_index, search_page, search_keyword)
for site in indexer_sites:
submit_site_page(site=site, page_index=0)
tasks = [
asyncio.create_task(search_site(site, search_page))
for site in indexer_sites
for search_page in search_pages
]
results_count = 0
try:
for future in asyncio.as_completed(tasks):
while tasks:
if global_vars.is_system_stopped:
break
finish_count += 1
site, search_page, result = await future
results_count += len(result)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress_value = finish_count / total_num * 100
progress_text = f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ..."
progress.update(value=progress_value, text=progress_text)
yield {
"type": "append",
"stage": "searching",
"value": progress_value,
"text": progress_text,
"items": result,
"site": site.get("name"),
"site_id": site.get("id"),
"page": search_page,
"finished": finish_count,
"total": total_num,
"total_items": results_count
}
done_tasks, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED,
)
for future in done_tasks:
site, page_index, search_page, search_keyword = tasks.pop(future)
finish_count += 1
result = await future
results_count += len(result)
if (
self._should_continue_search_pages(
site=site, page_results=result, keyword=search_keyword
)
and page_index + 1 < len(search_pages)
):
submit_site_page(site=site, page_index=page_index + 1)
else:
logger.debug(
f"{site.get('name')}{search_page} 页返回 {len(result)} 条,停止继续翻页"
)
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
progress_value = finish_count / total_num * 100
progress_text = f"正在搜索{keyword or ''},已完成 {finish_count} / {total_num} 个请求 ..."
progress.update(value=progress_value, text=progress_text)
yield {
"type": "append",
"stage": "searching",
"value": progress_value,
"text": progress_text,
"items": result,
"site": site.get("name"),
"site_id": site.get("id"),
"page": search_page,
"finished": finish_count,
"total": total_num,
"total_items": results_count
}
finally:
for task in tasks:
if not task.done():
task.cancel()
if tasks:
await asyncio.gather(*tasks.keys(), return_exceptions=True)
end_time = datetime.now()
progress.update(value=100,

View File

@@ -149,6 +149,30 @@ class IndexerModule(_ModuleBase):
site_downloader=site.get("downloader"),
**result) for result in result_array]
@staticmethod
def get_search_page_size(site: dict, keyword: Optional[str] = None) -> Optional[int]:
"""
获取站点搜索单页容量None 表示当前搜索入口不支持可靠翻页。
"""
site = site or {}
parser = site.get("parser")
parser_classes = {
"TNodeSpider": TNodeSpider,
"TorrentLeech": TorrentLeech,
"mTorrent": MTorrentSpider,
"Yema": YemaSpider,
"Haidan": HaiDanSpider,
"HDDolby": HddolbySpider,
"RousiPro": RousiSpider,
}
if parser in parser_classes:
return parser_classes[parser].get_search_page_size(keyword=keyword)
try:
page_size = int(site.get("result_num") or SiteSpider.default_result_num())
except (TypeError, ValueError):
page_size = SiteSpider.default_result_num()
return page_size if page_size > 0 else SiteSpider.default_result_num()
def search_torrents(self, site: dict,
keyword: str = None,
mtype: MediaType = None,

View File

@@ -22,6 +22,8 @@ class SiteSpider:
站点爬虫
"""
_default_result_num = 100
@property
def __class__(self):
return object
@@ -67,7 +69,7 @@ class SiteSpider:
self.list = self.browse.get('list') or self.list
self.fields = self.browse.get('fields') or self.fields
self.domain = indexer.get('domain')
self.result_num = int(indexer.get('result_num') or 100)
self.result_num = int(indexer.get('result_num') or self.default_result_num())
self._timeout = int(indexer.get('timeout') or 15)
self.page = page
if self.domain and not str(self.domain).endswith("/"):
@@ -82,6 +84,13 @@ class SiteSpider:
self.torrents_info = {}
self.torrents_info_array = []
@classmethod
def default_result_num(cls) -> int:
"""
获取普通配置站点的默认单页数量。
"""
return cls._default_result_num
def __get_search_url(self):
"""
获取搜索URL

View File

@@ -49,6 +49,13 @@ class HaiDanSpider:
"7": 1
}
@classmethod
def get_search_page_size(cls, keyword: str = None) -> None:
"""
海胆搜索入口当前没有接入页码参数,不参与自动翻页。
"""
return None
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:

View File

@@ -20,7 +20,7 @@ class HddolbySpider:
_cookie = None
_ua = None
_apikey = None
_size = 40
_size = 100
_pageurl = None
_timeout = 15
_searchurl = None
@@ -57,6 +57,13 @@ class HddolbySpider:
"hfr": "高帧率",
}
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量。
"""
return cls._size
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:
@@ -88,7 +95,7 @@ class HddolbySpider:
return {
"keyword": keyword,
"page_number": page,
"page_size": 100,
"page_size": self._size,
"categories": categories,
"visible": 1,
}

View File

@@ -53,6 +53,13 @@ class MTorrentSpider:
"7": "DIY 国配 中字"
}
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量。
"""
return cls._size
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:

View File

@@ -39,6 +39,13 @@ class RousiSpider:
# API KEY
_apikey = None
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量。
"""
return cls._size
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:

View File

@@ -17,6 +17,13 @@ class TNodeSpider(metaclass=SingletonClass):
_downloadurl = "%sapi/torrent/download/%s"
_pageurl = "%storrent/info/%s"
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量。
"""
return cls._size
def __init__(self, indexer: dict):
if indexer:
self._indexerid = indexer.get('id')

View File

@@ -17,6 +17,13 @@ class TorrentLeech:
_pageurl = "%storrent/%s"
_timeout = 15
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量;关键词搜索 URL 当前没有可靠页码入口。
"""
return None if keyword else cls._size
def __init__(self, indexer: dict):
self._indexer = indexer
if indexer.get('proxy'):

View File

@@ -44,6 +44,13 @@ class YemaSpider:
"12": "完结",
}
@classmethod
def get_search_page_size(cls, keyword: Optional[str] = None) -> Optional[int]:
"""
获取搜索接口单页容量。
"""
return cls._size
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer: