diff --git a/app/db/models/sitestatistic.py b/app/db/models/sitestatistic.py index 21a384de..77c3f0f9 100644 --- a/app/db/models/sitestatistic.py +++ b/app/db/models/sitestatistic.py @@ -1,9 +1,10 @@ from datetime import datetime -from sqlalchemy import Column, Integer, String, Sequence, JSON +from sqlalchemy import Column, Integer, String, Sequence, JSON, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session -from app.db import db_query, db_update, Base +from app.db import db_query, db_update, Base, async_db_query class SiteStatistic(Base): @@ -31,6 +32,12 @@ class SiteStatistic(Base): def get_by_domain(cls, db: Session, domain: str): return db.query(cls).filter(cls.domain == domain).first() + @classmethod + @async_db_query + async def async_get_by_domain(cls, db: AsyncSession, domain: str): + result = await db.execute(select(cls).where(cls.domain == domain)) + return result.scalar_one_or_none() + @classmethod @db_update def reset(cls, db: Session): diff --git a/app/db/site_oper.py b/app/db/site_oper.py index da377fee..723c0789 100644 --- a/app/db/site_oper.py +++ b/app/db/site_oper.py @@ -243,3 +243,62 @@ class SiteOper(DbOper): lst_state=1, lst_mod_date=lst_date ).create(self._db) + + async def async_success(self, domain: str, seconds: Optional[int] = None): + """ + 异步站点访问成功 + """ + lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + sta = await SiteStatistic.async_get_by_domain(self._db, domain) + if sta: + avg_seconds, note = None, {} + if seconds is not None: + note: dict = sta.note or {} + note[lst_date] = seconds or 1 + avg_times = len(note.keys()) + if avg_times > 10: + note = dict(sorted(note.items(), key=lambda x: x[0], reverse=True)[:10]) + avg_seconds = sum([v for v in note.values()]) // avg_times + await sta.async_update(self._db, { + "success": sta.success + 1, + "seconds": avg_seconds or sta.seconds, + "lst_state": 0, + "lst_mod_date": lst_date, + "note": note or sta.note + }) + else: + note = {} + if seconds is not None: + note = { + lst_date: seconds or 1 + } + await SiteStatistic( + domain=domain, + success=1, + fail=0, + seconds=seconds or 1, + lst_state=0, + lst_mod_date=lst_date, + note=note + ).async_create(self._db) + + async def async_fail(self, domain: str): + """ + 异步站点访问失败 + """ + lst_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + sta = await SiteStatistic.async_get_by_domain(self._db, domain) + if sta: + await sta.async_update(self._db, { + "fail": sta.fail + 1, + "lst_state": 1, + "lst_mod_date": lst_date + }) + else: + await SiteStatistic( + domain=domain, + success=0, + fail=1, + lst_state=1, + lst_mod_date=lst_date + ).async_create(self._db) diff --git a/app/modules/indexer/__init__.py b/app/modules/indexer/__init__.py index 39dbc4c4..a25ea5da 100644 --- a/app/modules/indexer/__init__.py +++ b/app/modules/indexer/__init__.py @@ -78,6 +78,95 @@ class IndexerModule(_ModuleBase): def init_setting(self) -> Tuple[str, Union[str, bool]]: pass + @staticmethod + def __search_check(site: dict, search_word: Optional[str] = None) -> bool: + """ + 检查是否可以执行搜索 + """ + # 可能为关键字或ttxxxx + if search_word \ + and site.get('language') == "en" \ + and StringUtils.is_chinese(search_word): + # 不支持中文 + logger.warn(f"{site.get('name')} 不支持中文搜索") + return False + + # 站点流控 + state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain"))) + if state: + logger.warn(msg) + return False + + return True + + @staticmethod + def __clear_search_text(text: Optional[str]) -> Optional[str]: + """ + 清理搜索文本 + :param text: 需要清理的文本 + :return: 清理后的文本 + """ + if not text: + return text + # 去除特殊字符和多余空格 + return StringUtils.clear(text, replace_word=" ", allow_space=True) + + @staticmethod + def __indexer_statistic(site: dict, error_flag: bool = False, seconds: int = 0) -> None: + """ + 索引器统计 + """ + domain = StringUtils.get_url_domain(site.get("domain")) + if error_flag: + SiteOper().fail(domain) + else: + SiteOper().success(domain=domain, seconds=seconds) + + @staticmethod + async def __async_indexer_statistic(site: dict, error_flag: bool = False, seconds: int = 0) -> None: + """ + 异步索引器统计 + """ + domain = StringUtils.get_url_domain(site.get("domain")) + if error_flag: + await SiteOper().async_fail(domain) + else: + await SiteOper().async_success(domain=domain, seconds=seconds) + + @staticmethod + def __parse_result(site: dict, result_array: list, search_count: int, seconds: int) -> TorrentInfo: + """ + 解析搜索结果为 TorrentInfo 对象 + """ + + def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]: + """ + 去除重复的种子 + :param _torrents: 种子列表 + :return: 去重后的种子列表 + """ + if not settings.SEARCH_MULTIPLE_NAME: + return _torrents + # 通过encosure去重 + return list({f"{t.title}_{t.description}": t for t in _torrents}.values()) + + if not result_array or len(result_array) == 0: + logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds} 秒") + return [] + else: + logger.info( + f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}") + torrents = [TorrentInfo(site=site.get("id"), + site_name=site.get("name"), + site_cookie=site.get("cookie"), + site_ua=site.get("ua"), + site_proxy=site.get("proxy"), + site_order=site.get("pri"), + site_downloader=site.get("downloader"), + **result) for result in result_array] + # 去重 + return __remove_duplicate(torrents) + def search_torrents(self, site: dict, keywords: List[str] = None, mtype: MediaType = None, @@ -93,55 +182,28 @@ class IndexerModule(_ModuleBase): :return: 资源列表 """ - def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]: - """ - 去除重复的种子 - :param _torrents: 种子列表 - :return: 去重后的种子列表 - """ - if not settings.SEARCH_MULTIPLE_NAME: - return _torrents - # 通过encosure去重 - return list({f"{t.title}_{t.description}": t for t in _torrents}.values()) - - # 确认搜索的名字 - if not keywords: - # 浏览种子页 - keywords = [''] - - # 开始索引 + # 索引结果 result_array = [] - # 开始计时 start_time = datetime.now() - - # 搜索多个关键字 + # 错误标志 error_flag = False + # 搜索次数 search_count = 0 - for search_word in keywords: - # 可能为关键字或ttxxxx - if search_word \ - and site.get('language') == "en" \ - and StringUtils.is_chinese(search_word): - # 不支持中文 - logger.warn(f"{site.get('name')} 不支持中文搜索") - continue - - # 站点流控 - state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain"))) - if state: - logger.warn(msg) + for search_word in keywords or ['']: + # 检查是否可以执行搜索 + if not self.__search_check(site, search_word): continue + # 强制休眠 1-10 秒 if search_count > 0: - # 强制休眠 1-10 秒 logger.info(f"站点 {site.get('name')} 已搜索 {search_count} 次,强制休眠 1-10 秒 ...") time.sleep(random.randint(1, 10)) # 去除搜索关键字中的特殊字符 - if search_word: - search_word = StringUtils.clear(search_word, replace_word=" ", allow_space=True) + search_word = self.__clear_search_text(search_word) + # 开始搜索 try: if site.get('parser') == "TNodeSpider": error_flag, result = TNodeSpider(site).search( @@ -204,30 +266,15 @@ class IndexerModule(_ModuleBase): seconds = (datetime.now() - start_time).seconds # 统计索引情况 - domain = StringUtils.get_url_domain(site.get("domain")) - if error_flag: - SiteOper().fail(domain) - else: - SiteOper().success(domain=domain, seconds=seconds) + self.__indexer_statistic(site=site, error_flag=error_flag, seconds=seconds) # 返回结果 - if not result_array or len(result_array) == 0: - logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds} 秒") - return [] - else: - logger.info( - f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}") - # TorrentInfo - torrents = [TorrentInfo(site=site.get("id"), - site_name=site.get("name"), - site_cookie=site.get("cookie"), - site_ua=site.get("ua"), - site_proxy=site.get("proxy"), - site_order=site.get("pri"), - site_downloader=site.get("downloader"), - **result) for result in result_array] - # 去重 - return __remove_duplicate(torrents) + return self.__parse_result( + site=site, + result_array=result_array, + search_count=search_count, + seconds=seconds + ) async def async_search_torrents(self, site: dict, keywords: List[str] = None, @@ -244,58 +291,31 @@ class IndexerModule(_ModuleBase): :return: 资源列表 """ - def __remove_duplicate(_torrents: List[TorrentInfo]) -> List[TorrentInfo]: - """ - 去除重复的种子 - :param _torrents: 种子列表 - :return: 去重后的种子列表 - """ - if not settings.SEARCH_MULTIPLE_NAME: - return _torrents - # 通过encosure去重 - return list({f"{t.title}_{t.description}": t for t in _torrents}.values()) - - # 确认搜索的名字 - if not keywords: - # 浏览种子页 - keywords = [''] - - # 开始索引 + # 索引结果 result_array = [] - # 开始计时 start_time = datetime.now() - - # 搜索多个关键字 + # 错误标志 error_flag = False + # 搜索次数 search_count = 0 - for search_word in keywords: - # 可能为关键字或ttxxxx - if search_word \ - and site.get('language') == "en" \ - and StringUtils.is_chinese(search_word): - # 不支持中文 - logger.warn(f"{site.get('name')} 不支持中文搜索") - continue - - # 站点流控 - state, msg = SitesHelper().check(StringUtils.get_url_domain(site.get("domain"))) - if state: - logger.warn(msg) + # 遍历搜索关键字 + for search_word in keywords or ['']: + # 检查是否可以执行搜索 + if not self.__search_check(site, search_word): continue + # 强制休眠 1-10 秒 if search_count > 0: - # 强制休眠 1-10 秒 logger.info(f"站点 {site.get('name')} 已搜索 {search_count} 次,强制休眠 1-10 秒 ...") await asyncio.sleep(random.randint(1, 10)) # 去除搜索关键字中的特殊字符 - if search_word: - search_word = StringUtils.clear(search_word, replace_word=" ", allow_space=True) + search_word = self.__clear_search_text(search_word) + # 开始搜索 try: if site.get('parser') == "TNodeSpider": - # 目前这些特殊爬虫还没有异步版本,暂时使用同步版本 error_flag, result = await TNodeSpider(site).async_search( keyword=search_word, page=page @@ -358,30 +378,15 @@ class IndexerModule(_ModuleBase): seconds = (datetime.now() - start_time).seconds # 统计索引情况 - domain = StringUtils.get_url_domain(site.get("domain")) - if error_flag: - SiteOper().fail(domain) - else: - SiteOper().success(domain=domain, seconds=seconds) + await self.__async_indexer_statistic(site=site, error_flag=error_flag, seconds=seconds) # 返回结果 - if not result_array or len(result_array) == 0: - logger.warn(f"{site.get('name')} 未搜索到数据,共搜索 {search_count} 次,耗时 {seconds} 秒") - return [] - else: - logger.info( - f"{site.get('name')} 搜索完成,共搜索 {search_count} 次,耗时 {seconds} 秒,返回数据:{len(result_array)}") - # TorrentInfo - torrents = [TorrentInfo(site=site.get("id"), - site_name=site.get("name"), - site_cookie=site.get("cookie"), - site_ua=site.get("ua"), - site_proxy=site.get("proxy"), - site_order=site.get("pri"), - site_downloader=site.get("downloader"), - **result) for result in result_array] - # 去重 - return __remove_duplicate(torrents) + return self.__parse_result( + site=site, + result_array=result_array, + search_count=search_count, + seconds=seconds + ) @staticmethod def __spider_search(indexer: dict,