From e48d51fe6ee541792104ff99452d0e2429fe1911 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Tue, 3 Jun 2025 11:45:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=86=85=E5=AD=98=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=92=8C=E5=9E=83=E5=9C=BE=E5=9B=9E=E6=94=B6=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/torrents.py | 61 ++++++++++++++-- app/helper/browser.py | 68 +++++++++++++----- app/helper/memory.py | 158 ++++++++++++++++++++++++++++++++++++++++++ app/helper/rss.py | 147 +++++++++++++++++++++++---------------- 4 files changed, 354 insertions(+), 80 deletions(-) create mode 100644 app/helper/memory.py diff --git a/app/chain/torrents.py b/app/chain/torrents.py index 07a71d86..5508a899 100644 --- a/app/chain/torrents.py +++ b/app/chain/torrents.py @@ -1,6 +1,7 @@ import re import traceback from typing import Dict, List, Union, Optional +import gc from cachetools import cached, TTLCache @@ -14,6 +15,7 @@ from app.db.systemconfig_oper import SystemConfigOper from app.helper.rss import RssHelper from app.helper.sites import SitesHelper from app.helper.torrent import TorrentHelper +from app.helper.memory import MemoryManager, memory_optimized, clear_large_objects from app.log import logger from app.schemas import Notification from app.schemas.types import SystemConfigKey, MessageChannel, NotificationType, MediaType @@ -37,6 +39,21 @@ class TorrentsChain(ChainBase, metaclass=Singleton): self.systemconfig = SystemConfigOper() self.mediachain = MediaChain() self.torrenthelper = TorrentHelper() + # 初始化内存管理器 + self.memory_manager = MemoryManager() + # 启动内存监控(如果需要) + if settings.BIG_MEMORY_MODE: + self.memory_manager.set_threshold(85) # 大内存模式下提高阈值 + else: + self.memory_manager.set_threshold(75) # 普通模式下较低阈值 + self.memory_manager.start_monitoring() + + def __del__(self): + """ + 析构函数,停止内存监控 + """ + if hasattr(self, 'memory_manager'): + self.memory_manager.stop_monitoring() @property def cache_file(self) -> str: @@ -79,13 +96,16 @@ class TorrentsChain(ChainBase, metaclass=Singleton): logger.info(f'开始清理种子缓存数据 ...') self.remove_cache(self._spider_file) self.remove_cache(self._rss_file) + # 强制垃圾回收 + self.memory_manager.force_gc() logger.info(f'种子缓存数据清理完成') - @cached(cache=TTLCache(maxsize=128, ttl=595)) + @cached(cache=TTLCache(maxsize=64, ttl=300)) + @memory_optimized(force_gc_after=True, log_memory=True) def browse(self, domain: str, keyword: Optional[str] = None, cat: Optional[str] = None, page: Optional[int] = 0) -> List[TorrentInfo]: """ - 浏览站点首页内容,返回种子清单,TTL缓存10分钟 + 浏览站点首页内容,返回种子清单,TTL缓存5分钟 :param domain: 站点域名 :param keyword: 搜索标题 :param cat: 搜索分类 @@ -98,10 +118,11 @@ class TorrentsChain(ChainBase, metaclass=Singleton): return [] return self.refresh_torrents(site=site, keyword=keyword, cat=cat, page=page) - @cached(cache=TTLCache(maxsize=128, ttl=295)) + @cached(cache=TTLCache(maxsize=64, ttl=180)) + @memory_optimized(force_gc_after=True, log_memory=True) def rss(self, domain: str) -> List[TorrentInfo]: """ - 获取站点RSS内容,返回种子清单,TTL缓存5分钟 + 获取站点RSS内容,返回种子清单,TTL缓存3分钟 :param domain: 站点域名 """ logger.info(f'开始获取站点 {domain} RSS ...') @@ -144,6 +165,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): return ret_torrents + @memory_optimized(force_gc_after=True, log_memory=True) def refresh(self, stype: Optional[str] = None, sites: List[int] = None) -> Dict[str, List[Context]]: """ 刷新站点最新资源,识别并缓存起来 @@ -170,6 +192,9 @@ class TorrentsChain(ChainBase, metaclass=Singleton): indexers = self.siteshelper.get_indexers() # 需要刷新的站点domain domains = [] + # 处理计数器,用于定期内存检查 + processed_count = 0 + # 遍历站点缓存资源 for indexer in indexers: if global_vars.is_system_stopped: @@ -218,7 +243,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton): logger.warn(f'{torrent.title} 未识别到媒体信息') # 存储空的媒体信息 mediainfo = MediaInfo() - # 清理多余数据 + # 清理多余数据,减少内存占用 mediainfo.clear() # 上下文 context = Context(meta_info=meta, media_info=mediainfo, torrent_info=torrent) @@ -229,9 +254,24 @@ class TorrentsChain(ChainBase, metaclass=Singleton): torrents_cache[domain].append(context) # 如果超过了限制条数则移除掉前面的 if len(torrents_cache[domain]) > settings.CACHE_CONF["torrents"]: + # 优化:直接删除旧数据,无需重复清理(数据进缓存前已经clear过) + old_contexts = torrents_cache[domain][:-settings.CACHE_CONF["torrents"]] torrents_cache[domain] = torrents_cache[domain][-settings.CACHE_CONF["torrents"]:] + # 清理旧对象 + clear_large_objects(*old_contexts) + + # 优化:清理不再需要的临时变量 + del meta, mediainfo, context + + # 每处理一定数量的种子后检查内存 + processed_count += 1 + if processed_count % 10 == 0: + self.memory_manager.check_memory_and_cleanup() + # 回收资源 del torrents + # 定期执行垃圾回收 + gc.collect() else: logger.info(f'{indexer.get("name")} 没有获取到种子') @@ -243,7 +283,18 @@ class TorrentsChain(ChainBase, metaclass=Singleton): # 去除不在站点范围内的缓存种子 if sites and torrents_cache: + old_cache = torrents_cache torrents_cache = {k: v for k, v in torrents_cache.items() if k in domains} + # 清理不再使用的缓存数据(数据进缓存前已经clear过,无需重复清理) + removed_contexts = [] + for domain, contexts in old_cache.items(): + if domain not in domains: + removed_contexts.extend(contexts) + # 批量清理 + if removed_contexts: + clear_large_objects(*removed_contexts) + del old_cache + return torrents_cache def __renew_rss_url(self, domain: str, site: dict): diff --git a/app/helper/browser.py b/app/helper/browser.py index 79e1dc80..5f12f189 100644 --- a/app/helper/browser.py +++ b/app/helper/browser.py @@ -1,4 +1,5 @@ from typing import Callable, Any, Optional +import gc from playwright.sync_api import sync_playwright, Page from cf_clearance import sync_cf_retry, sync_stealth @@ -35,26 +36,43 @@ class PlaywrightHelper: :param headless: 是否无头模式 :param timeout: 超时时间 """ + result = None try: with sync_playwright() as playwright: - browser = playwright[self.browser_type].launch(headless=headless) - context = browser.new_context(user_agent=ua, proxy=proxies) - page = context.new_page() - if cookies: - page.set_extra_http_headers({"cookie": cookies}) + browser = None + context = None + page = None try: + browser = playwright[self.browser_type].launch(headless=headless) + context = browser.new_context(user_agent=ua, proxy=proxies) + page = context.new_page() + + if cookies: + page.set_extra_http_headers({"cookie": cookies}) + if not self.__pass_cloudflare(url, page): logger.warn("cloudflare challenge fail!") page.wait_for_load_state("networkidle", timeout=timeout * 1000) + # 回调函数 - return callback(page) + result = callback(page) + except Exception as e: logger.error(f"网页操作失败: {str(e)}") finally: - browser.close() + # 确保资源被正确清理 + if page: + page.close() + if context: + context.close() + if browser: + browser.close() + # 强制垃圾回收 + gc.collect() except Exception as e: - logger.error(f"网页操作失败: {str(e)}") - return None + logger.error(f"Playwright初始化失败: {str(e)}") + + return result def get_page_source(self, url: str, cookies: Optional[str] = None, @@ -71,26 +89,42 @@ class PlaywrightHelper: :param headless: 是否无头模式 :param timeout: 超时时间 """ - source = "" + source = None try: with sync_playwright() as playwright: - browser = playwright[self.browser_type].launch(headless=headless) - context = browser.new_context(user_agent=ua, proxy=proxies) - page = context.new_page() - if cookies: - page.set_extra_http_headers({"cookie": cookies}) + browser = None + context = None + page = None try: + browser = playwright[self.browser_type].launch(headless=headless) + context = browser.new_context(user_agent=ua, proxy=proxies) + page = context.new_page() + + if cookies: + page.set_extra_http_headers({"cookie": cookies}) + if not self.__pass_cloudflare(url, page): logger.warn("cloudflare challenge fail!") page.wait_for_load_state("networkidle", timeout=timeout * 1000) + source = page.content() + except Exception as e: logger.error(f"获取网页源码失败: {str(e)}") source = None finally: - browser.close() + # 确保资源被正确清理 + if page: + page.close() + if context: + context.close() + if browser: + browser.close() + # 强制垃圾回收 + gc.collect() except Exception as e: - logger.error(f"获取网页源码失败: {str(e)}") + logger.error(f"Playwright初始化失败: {str(e)}") + return source diff --git a/app/helper/memory.py b/app/helper/memory.py new file mode 100644 index 00000000..ad391ff9 --- /dev/null +++ b/app/helper/memory.py @@ -0,0 +1,158 @@ +import gc +import psutil +import threading +import time +from typing import Optional, Callable, Any +from functools import wraps +from app.log import logger +from app.utils.singleton import Singleton + + +class MemoryManager(metaclass=Singleton): + """ + 内存管理工具类,用于监控和优化内存使用 + """ + + def __init__(self): + self._memory_threshold = 80 # 内存使用率阈值(%) + self._check_interval = 300 # 检查间隔(秒) + self._monitoring = False + self._monitor_thread: Optional[threading.Thread] = None + + def get_memory_usage(self) -> dict: + """ + 获取当前内存使用情况 + """ + process = psutil.Process() + memory_info = process.memory_info() + system_memory = psutil.virtual_memory() + + return { + 'rss': memory_info.rss / 1024 / 1024, # MB + 'vms': memory_info.vms / 1024 / 1024, # MB + 'percent': process.memory_percent(), + 'system_percent': system_memory.percent, + 'system_available': system_memory.available / 1024 / 1024 / 1024 # GB + } + + def force_gc(self, generation: Optional[int] = None) -> int: + """ + 强制执行垃圾回收 + :param generation: 垃圾回收代数,None表示所有代数 + :return: 回收的对象数量 + """ + before_memory = self.get_memory_usage() + + if generation is not None: + collected = gc.collect(generation) + else: + collected = gc.collect() + + after_memory = self.get_memory_usage() + memory_freed = before_memory['rss'] - after_memory['rss'] + + if memory_freed > 1: # 释放超过1MB才记录 + logger.info(f"垃圾回收完成: 回收对象 {collected} 个, 释放内存 {memory_freed:.2f}MB") + + return collected + + def check_memory_and_cleanup(self) -> bool: + """ + 检查内存使用率,如果过高则执行清理 + :return: 是否执行了清理 + """ + memory_info = self.get_memory_usage() + + if memory_info['percent'] > self._memory_threshold: + logger.warning(f"内存使用率过高: {memory_info['percent']:.1f}%, 开始清理...") + self.force_gc() + return True + return False + + def start_monitoring(self): + """ + 开始内存监控 + """ + if self._monitoring: + return + + self._monitoring = True + self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self._monitor_thread.start() + logger.info("内存监控已启动") + + def stop_monitoring(self): + """ + 停止内存监控 + """ + self._monitoring = False + if self._monitor_thread: + self._monitor_thread.join(timeout=5) + logger.info("内存监控已停止") + + def _monitor_loop(self): + """ + 内存监控循环 + """ + while self._monitoring: + try: + self.check_memory_and_cleanup() + time.sleep(self._check_interval) + except Exception as e: + logger.error(f"内存监控出错: {e}") + time.sleep(60) # 出错后等待1分钟再继续 + + def set_threshold(self, threshold: int): + """ + 设置内存使用率阈值 + """ + self._memory_threshold = max(50, min(95, threshold)) + + def set_check_interval(self, interval: int): + """ + 设置检查间隔 + """ + self._check_interval = max(60, interval) + + +def memory_optimized(force_gc_after: bool = False, log_memory: bool = False): + """ + 内存优化装饰器 + :param force_gc_after: 函数执行后是否强制垃圾回收 + :param log_memory: 是否记录内存使用情况 + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs) -> Any: + memory_manager = MemoryManager() + + if log_memory: + before_memory = memory_manager.get_memory_usage() + logger.debug(f"{func.__name__} 执行前内存: {before_memory['rss']:.1f}MB") + + try: + result = func(*args, **kwargs) + return result + finally: + if force_gc_after: + memory_manager.force_gc() + + if log_memory: + after_memory = memory_manager.get_memory_usage() + logger.debug(f"{func.__name__} 执行后内存: {after_memory['rss']:.1f}MB") + + return wrapper + return decorator + + +def clear_large_objects(*objects): + """ + 清理大型对象的辅助函数 + """ + for obj in objects: + if hasattr(obj, 'clear') and callable(obj.clear): + obj.clear() + elif hasattr(obj, '__dict__'): + obj.__dict__.clear() + del obj + gc.collect() \ No newline at end of file diff --git a/app/helper/rss.py b/app/helper/rss.py index cebcb137..14a69346 100644 --- a/app/helper/rss.py +++ b/app/helper/rss.py @@ -3,6 +3,7 @@ import traceback import xml.dom.minidom from typing import List, Tuple, Union, Optional from urllib.parse import urljoin +import gc import chardet from lxml import etree @@ -19,6 +20,11 @@ class RssHelper: """ RSS帮助类,解析RSS报文、获取RSS地址等 """ + + # RSS解析限制配置 + MAX_RSS_SIZE = 50 * 1024 * 1024 # 50MB最大RSS文件大小 + MAX_RSS_ITEMS = 1000 # 最大解析条目数 + # 各站点RSS链接获取配置 rss_link_conf = { "default": { @@ -224,8 +230,7 @@ class RssHelper: }, } - @staticmethod - def parse(url, proxy: bool = False, timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]: + def parse(self, url, proxy: bool = False, timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]: """ 解析RSS订阅URL,获取RSS中的种子信息 :param url: RSS地址 @@ -238,6 +243,7 @@ class RssHelper: ret_array: list = [] if not url: return False + try: ret = RequestUtils(proxies=settings.PROXY if proxy else None, timeout=timeout, headers=headers).get_res(url) @@ -246,11 +252,15 @@ class RssHelper: except Exception as err: logger.error(f"获取RSS失败:{str(err)} - {traceback.format_exc()}") return False + if ret: - ret_xml = "" try: - # 使用chardet检测字符编码 + # 检查响应大小,避免处理过大的RSS文件 raw_data = ret.content + if raw_data and len(raw_data) > self.MAX_RSS_SIZE: + logger.warning(f"RSS文件过大: {len(raw_data)/1024/1024:.1f}MB,跳过解析") + return False + if raw_data: try: result = chardet.detect(raw_data) @@ -269,65 +279,84 @@ class RssHelper: ret.encoding = ret.apparent_encoding if not ret_xml: ret_xml = ret.text - # 解析XML - dom_tree = xml.dom.minidom.parseString(ret_xml) - rootNode = dom_tree.documentElement - items = rootNode.getElementsByTagName("item") - for item in items: - try: - # 标题 - title = DomUtils.tag_value(item, "title", default="") - if not title: + + # 解析XML - 使用try-finally确保DOM树被清理 + dom_tree = None + try: + dom_tree = xml.dom.minidom.parseString(ret_xml) + rootNode = dom_tree.documentElement + items = rootNode.getElementsByTagName("item") + + # 限制处理的条目数量 + items_count = min(len(items), self.MAX_RSS_ITEMS) + if len(items) > self.MAX_RSS_ITEMS: + logger.warning(f"RSS条目过多: {len(items)},仅处理前{self.MAX_RSS_ITEMS}个") + + for i, item in enumerate(items[:items_count]): + try: + # 定期执行垃圾回收 + if i > 0 and i % 100 == 0: + gc.collect() + + # 标题 + title = DomUtils.tag_value(item, "title", default="") + if not title: + continue + # 描述 + description = DomUtils.tag_value(item, "description", default="") + # 种子页面 + link = DomUtils.tag_value(item, "link", default="") + # 种子链接 + enclosure = DomUtils.tag_value(item, "enclosure", "url", default="") + if not enclosure and not link: + continue + # 部分RSS只有link没有enclosure + if not enclosure and link: + enclosure = link + # 大小 + size = DomUtils.tag_value(item, "enclosure", "length", default=0) + if size and str(size).isdigit(): + size = int(size) + else: + size = 0 + # 发布日期 + pubdate = DomUtils.tag_value(item, "pubDate", default="") + if pubdate: + # 转换为时间 + pubdate = StringUtils.get_time(pubdate) + # 获取豆瓣昵称 + nickname = DomUtils.tag_value(item, "dc:createor", default="") + # 返回对象 + tmp_dict = {'title': title, + 'enclosure': enclosure, + 'size': size, + 'description': description, + 'link': link, + 'pubdate': pubdate} + # 如果豆瓣昵称不为空,返回数据增加豆瓣昵称,供doubansync插件获取 + if nickname: + tmp_dict['nickname'] = nickname + ret_array.append(tmp_dict) + except Exception as e1: + logger.debug(f"解析RSS条目失败:{str(e1)} - {traceback.format_exc()}") continue - # 描述 - description = DomUtils.tag_value(item, "description", default="") - # 种子页面 - link = DomUtils.tag_value(item, "link", default="") - # 种子链接 - enclosure = DomUtils.tag_value(item, "enclosure", "url", default="") - if not enclosure and not link: - continue - # 部分RSS只有link没有enclosure - if not enclosure and link: - enclosure = link - # 大小 - size = DomUtils.tag_value(item, "enclosure", "length", default=0) - if size and str(size).isdigit(): - size = int(size) - else: - size = 0 - # 发布日期 - pubdate = DomUtils.tag_value(item, "pubDate", default="") - if pubdate: - # 转换为时间 - pubdate = StringUtils.get_time(pubdate) - # 获取豆瓣昵称 - nickname = DomUtils.tag_value(item, "dc:createor", default="") - # 返回对象 - tmp_dict = {'title': title, - 'enclosure': enclosure, - 'size': size, - 'description': description, - 'link': link, - 'pubdate': pubdate} - # 如果豆瓣昵称不为空,返回数据增加豆瓣昵称,供doubansync插件获取 - if nickname: - tmp_dict['nickname'] = nickname - ret_array.append(tmp_dict) - except Exception as e1: - logger.debug(f"解析RSS失败:{str(e1)} - {traceback.format_exc()}") - continue + finally: + # DOM树必须显式清理 - 这是xml.dom.minidom的特殊要求 + if dom_tree: + dom_tree.unlink() + except Exception as e2: logger.error(f"解析RSS失败:{str(e2)} - {traceback.format_exc()}") - # RSS过期 观众RSS 链接已过期,您需要获得一个新的! pthome RSS Link has expired, You need to get a new one! + # RSS过期检查 _rss_expired_msg = [ "RSS 链接已过期, 您需要获得一个新的!", "RSS Link has expired, You need to get a new one!", "RSS Link has expired, You need to get new!" ] - if ret_xml in _rss_expired_msg: + if 'ret_xml' in locals() and ret_xml and ret_xml in _rss_expired_msg: return None return False + return ret_array def get_rss_link(self, url: str, cookie: str, ua: str, proxy: bool = False) -> Tuple[str, str]: @@ -369,12 +398,14 @@ class RssHelper: return "", f"获取 {url} RSS链接失败,错误码:{res.status_code},错误原因:{res.reason}" else: return "", f"获取RSS链接失败:无法连接 {url} " + # 解析HTML - html = etree.HTML(html_text) - if StringUtils.is_valid_html_element(html): - rss_link = html.xpath(site_conf.get("xpath")) - if rss_link: - return str(rss_link[-1]), "" + if html_text: + html = etree.HTML(html_text) + if StringUtils.is_valid_html_element(html): + rss_link = html.xpath(site_conf.get("xpath")) + if rss_link: + return str(rss_link[-1]), "" return "", f"获取RSS链接失败:{url}" except Exception as e: return "", f"获取 {url} RSS链接失败:{str(e)}"