fix search async

This commit is contained in:
jxxghp
2025-08-01 11:38:23 +08:00
parent d657bf8ed8
commit 00779d0f10
3 changed files with 190 additions and 119 deletions

View File

@@ -1,9 +1,10 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, Sequence, JSON
from sqlalchemy import Column, Integer, String, Sequence, JSON, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app.db import db_query, db_update, Base
from app.db import db_query, db_update, Base, async_db_query
class SiteStatistic(Base):
@@ -31,6 +32,12 @@ class SiteStatistic(Base):
def get_by_domain(cls, db: Session, domain: str):
return db.query(cls).filter(cls.domain == domain).first()
@classmethod
@async_db_query
async def async_get_by_domain(cls, db: AsyncSession, domain: str):
result = await db.execute(select(cls).where(cls.domain == domain))
return result.scalar_one_or_none()
@classmethod
@db_update
def reset(cls, db: Session):

View File

@@ -243,3 +243,62 @@ class SiteOper(DbOper):
lst_state=1,
lst_mod_date=lst_date
).create(self._db)
async def async_success(self, domain: str, seconds: Optional[int] = None):
"""
异步站点访问成功
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
if sta:
avg_seconds, note = None, {}
if seconds is not None:
note: dict = sta.note or {}
note[lst_date] = seconds or 1
avg_times = len(note.keys())
if avg_times > 10:
note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10])
avg_seconds = sum([v for v in note.values()]) // avg_times
await sta.async_update(self._db, {
"success": sta.success + 1,
"seconds": avg_seconds or sta.seconds,
"lst_state": 0,
"lst_mod_date": lst_date,
"note": note or sta.note
})
else:
note = {}
if seconds is not None:
note = {
lst_date: seconds or 1
}
await SiteStatistic(
domain=domain,
success=1,
fail=0,
seconds=seconds or 1,
lst_state=0,
lst_mod_date=lst_date,
note=note
).async_create(self._db)
async def async_fail(self, domain: str):
"""
异步站点访问失败
"""
lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sta = await SiteStatistic.async_get_by_domain(self._db, domain)
if sta:
await sta.async_update(self._db, {
"fail": sta.fail + 1,
"lst_state": 1,
"lst_mod_date": lst_date
})
else:
await SiteStatistic(
domain=domain,
success=0,
fail=1,
lst_state=1,
lst_mod_date=lst_date
).async_create(self._db)

View File

@@ -78,6 +78,95 @@ class IndexerModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
@staticmethod
def __search_check(site: dict, search_word: Optional[str] = None) -> bool:
"""
检查是否可以执行搜索
"""
# 可能为关键字或ttxxxx
if search_word \
and site.get('language') == "en" \
and StringUtils.is_chinese(search_word):
# 不支持中文
logger.warn(f"{site.get('name')} 不支持中文搜索")
return False
# 站点流控
state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain")))
if state:
logger.warn(msg)
return False
return True
@staticmethod
def __clear_search_text(text: Optional[str]) -> Optional[str]:
"""
清理搜索文本
:param text: 需要清理的文本
:return: 清理后的文本
"""
if not text:
return text
# 去除特殊字符和多余空格
return StringUtils.clear(text, replace_word=" ", allow_space=True)
@staticmethod
def __indexer_statistic(site: dict, error_flag: bool = False, seconds: int = 0) -> None:
"""
索引器统计
"""
domain = StringUtils.get_url_domain(site.get("domain"))
if error_flag:
SiteOper().fail(domain)
else:
SiteOper().success(domain=domain, seconds=seconds)
@staticmethod
async def __async_indexer_statistic(site: dict, error_flag: bool = False, seconds: int = 0) -> None:
"""
异步索引器统计
"""
domain = StringUtils.get_url_domain(site.get("domain"))
if error_flag:
await SiteOper().async_fail(domain)
else:
await SiteOper().async_success(domain=domain, seconds=seconds)
@staticmethod
def __parse_result(site: dict, result_array: list, search_count: int, seconds: int) -> TorrentInfo:
"""
解析搜索结果为 TorrentInfo 对象
"""
def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]:
"""
去除重复的种子
:param _torrents: 种子列表
:return: 去重后的种子列表
"""
if not settings.SEARCH_MULTIPLE_NAME:
return _torrents
# 通过encosure去重
return list({f"{t.title}_{t.description}": t for t in _torrents}.values())
if not result_array or len(result_array) == 0:
logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds}")
return []
else:
logger.info(
f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}")
torrents = [TorrentInfo(site=site.get("id"),
site_name=site.get("name"),
site_cookie=site.get("cookie"),
site_ua=site.get("ua"),
site_proxy=site.get("proxy"),
site_order=site.get("pri"),
site_downloader=site.get("downloader"),
**result) for result in result_array]
# 去重
return __remove_duplicate(torrents)
def search_torrents(self, site: dict,
keywords: List[str] = None,
mtype: MediaType = None,
@@ -93,55 +182,28 @@ class IndexerModule(_ModuleBase):
:return: 资源列表
"""
def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]:
"""
去除重复的种子
:param _torrents: 种子列表
:return: 去重后的种子列表
"""
if not settings.SEARCH_MULTIPLE_NAME:
return _torrents
# 通过encosure去重
return list({f"{t.title}_{t.description}": t for t in _torrents}.values())
# 确认搜索的名字
if not keywords:
# 浏览种子页
keywords = ['']
# 开始索引
# 索引结果
result_array = []
# 开始计时
start_time = datetime.now()
# 搜索多个关键字
# 错误标志
error_flag = False
# 搜索次数
search_count = 0
for search_word in keywords:
# 可能为关键字或ttxxxx
if search_word \
and site.get('language') == "en" \
and StringUtils.is_chinese(search_word):
# 不支持中文
logger.warn(f"{site.get('name')} 不支持中文搜索")
continue
# 站点流控
state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain")))
if state:
logger.warn(msg)
for search_word in keywords or ['']:
# 检查是否可以执行搜索
if not self.__search_check(site, search_word):
continue
# 强制休眠 1-10 秒
if search_count > 0:
# 强制休眠 1-10 秒
logger.info(f"站点 {site.get('name')} 已搜索 {search_count} 次,强制休眠 1-10 秒 ...")
time.sleep(random.randint(1, 10))
# 去除搜索关键字中的特殊字符
if search_word:
search_word = StringUtils.clear(search_word, replace_word=" ", allow_space=True)
search_word = self.__clear_search_text(search_word)
# 开始搜索
try:
if site.get('parser') == "TNodeSpider":
error_flag, result = TNodeSpider(site).search(
@@ -204,30 +266,15 @@ class IndexerModule(_ModuleBase):
seconds = (datetime.now() - start_time).seconds
# 统计索引情况
domain = StringUtils.get_url_domain(site.get("domain"))
if error_flag:
SiteOper().fail(domain)
else:
SiteOper().success(domain=domain, seconds=seconds)
self.__indexer_statistic(site=site, error_flag=error_flag, seconds=seconds)
# 返回结果
if not result_array or len(result_array) == 0:
logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds}")
return []
else:
logger.info(
f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}")
# TorrentInfo
torrents = [TorrentInfo(site=site.get("id"),
site_name=site.get("name"),
site_cookie=site.get("cookie"),
site_ua=site.get("ua"),
site_proxy=site.get("proxy"),
site_order=site.get("pri"),
site_downloader=site.get("downloader"),
**result) for result in result_array]
# 去重
return __remove_duplicate(torrents)
return self.__parse_result(
site=site,
result_array=result_array,
search_count=search_count,
seconds=seconds
)
async def async_search_torrents(self, site: dict,
keywords: List[str] = None,
@@ -244,58 +291,31 @@ class IndexerModule(_ModuleBase):
:return: 资源列表
"""
def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]:
"""
去除重复的种子
:param _torrents: 种子列表
:return: 去重后的种子列表
"""
if not settings.SEARCH_MULTIPLE_NAME:
return _torrents
# 通过encosure去重
return list({f"{t.title}_{t.description}": t for t in _torrents}.values())
# 确认搜索的名字
if not keywords:
# 浏览种子页
keywords = ['']
# 开始索引
# 索引结果
result_array = []
# 开始计时
start_time = datetime.now()
# 搜索多个关键字
# 错误标志
error_flag = False
# 搜索次数
search_count = 0
for search_word in keywords:
# 可能为关键字或ttxxxx
if search_word \
and site.get('language') == "en" \
and StringUtils.is_chinese(search_word):
# 不支持中文
logger.warn(f"{site.get('name')} 不支持中文搜索")
continue
# 站点流控
state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain")))
if state:
logger.warn(msg)
# 遍历搜索关键字
for search_word in keywords or ['']:
# 检查是否可以执行搜索
if not self.__search_check(site, search_word):
continue
# 强制休眠 1-10 秒
if search_count > 0:
# 强制休眠 1-10 秒
logger.info(f"站点 {site.get('name')} 已搜索 {search_count} 次,强制休眠 1-10 秒 ...")
await asyncio.sleep(random.randint(1, 10))
# 去除搜索关键字中的特殊字符
if search_word:
search_word = StringUtils.clear(search_word, replace_word=" ", allow_space=True)
search_word = self.__clear_search_text(search_word)
# 开始搜索
try:
if site.get('parser') == "TNodeSpider":
# 目前这些特殊爬虫还没有异步版本,暂时使用同步版本
error_flag, result = await TNodeSpider(site).async_search(
keyword=search_word,
page=page
@@ -358,30 +378,15 @@ class IndexerModule(_ModuleBase):
seconds = (datetime.now() - start_time).seconds
# 统计索引情况
domain = StringUtils.get_url_domain(site.get("domain"))
if error_flag:
SiteOper().fail(domain)
else:
SiteOper().success(domain=domain, seconds=seconds)
await self.__async_indexer_statistic(site=site, error_flag=error_flag, seconds=seconds)
# 返回结果
if not result_array or len(result_array) == 0:
logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds}")
return []
else:
logger.info(
f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}")
# TorrentInfo
torrents = [TorrentInfo(site=site.get("id"),
site_name=site.get("name"),
site_cookie=site.get("cookie"),
site_ua=site.get("ua"),
site_proxy=site.get("proxy"),
site_order=site.get("pri"),
site_downloader=site.get("downloader"),
**result) for result in result_array]
# 去重
return __remove_duplicate(torrents)
return self.__parse_result(
site=site,
result_array=result_array,
search_count=search_count,
seconds=seconds
)
@staticmethod
def __spider_search(indexer: dict,