优化内存管理和垃圾回收机制

This commit is contained in:
jxxghp
2025-06-03 11:45:17 +08:00
parent 9d436ec7ed
commit e48d51fe6e
4 changed files with 354 additions and 80 deletions

View File

@@ -1,6 +1,7 @@
import re
import traceback
from typing import Dict, List, Union, Optional
import gc
from cachetools import cached, TTLCache
@@ -14,6 +15,7 @@ from app.db.systemconfig_oper import SystemConfigOper
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.helper.torrent import TorrentHelper
from app.helper.memory import MemoryManager, memory_optimized, clear_large_objects
from app.log import logger
from app.schemas import Notification
from app.schemas.types import SystemConfigKey, MessageChannel, NotificationType, MediaType
@@ -37,6 +39,21 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
self.systemconfig = SystemConfigOper()
self.mediachain = MediaChain()
self.torrenthelper = TorrentHelper()
# 初始化内存管理器
self.memory_manager = MemoryManager()
# 启动内存监控(如果需要)
if settings.BIG_MEMORY_MODE:
self.memory_manager.set_threshold(85) # 大内存模式下提高阈值
else:
self.memory_manager.set_threshold(75) # 普通模式下较低阈值
self.memory_manager.start_monitoring()
def __del__(self):
"""
析构函数,停止内存监控
"""
if hasattr(self, 'memory_manager'):
self.memory_manager.stop_monitoring()
@property
def cache_file(self) -> str:
@@ -79,13 +96,16 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
logger.info(f'开始清理种子缓存数据 ...')
self.remove_cache(self._spider_file)
self.remove_cache(self._rss_file)
# 强制垃圾回收
self.memory_manager.force_gc()
logger.info(f'种子缓存数据清理完成')
@cached(cache=TTLCache(maxsize=128, ttl=595))
@cached(cache=TTLCache(maxsize=64, ttl=300))
@memory_optimized(force_gc_after=True, log_memory=True)
def browse(self, domain: str, keyword: Optional[str] = None, cat: Optional[str] = None,
page: Optional[int] = 0) -> List[TorrentInfo]:
"""
浏览站点首页内容返回种子清单TTL缓存10分钟
浏览站点首页内容返回种子清单TTL缓存5分钟
:param domain: 站点域名
:param keyword: 搜索标题
:param cat: 搜索分类
@@ -98,10 +118,11 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
return []
return self.refresh_torrents(site=site, keyword=keyword, cat=cat, page=page)
@cached(cache=TTLCache(maxsize=128, ttl=295))
@cached(cache=TTLCache(maxsize=64, ttl=180))
@memory_optimized(force_gc_after=True, log_memory=True)
def rss(self, domain: str) -> List[TorrentInfo]:
"""
获取站点RSS内容返回种子清单TTL缓存5分钟
获取站点RSS内容返回种子清单TTL缓存3分钟
:param domain: 站点域名
"""
logger.info(f'开始获取站点 {domain} RSS ...')
@@ -144,6 +165,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
return ret_torrents
@memory_optimized(force_gc_after=True, log_memory=True)
def refresh(self, stype: Optional[str] = None, sites: List[int] = None) -> Dict[str, List[Context]]:
"""
刷新站点最新资源,识别并缓存起来
@@ -170,6 +192,9 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
indexers = self.siteshelper.get_indexers()
# 需要刷新的站点domain
domains = []
# 处理计数器,用于定期内存检查
processed_count = 0
# 遍历站点缓存资源
for indexer in indexers:
if global_vars.is_system_stopped:
@@ -218,7 +243,7 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
logger.warn(f'{torrent.title} 未识别到媒体信息')
# 存储空的媒体信息
mediainfo = MediaInfo()
# 清理多余数据
# 清理多余数据,减少内存占用
mediainfo.clear()
# 上下文
context = Context(meta_info=meta, media_info=mediainfo, torrent_info=torrent)
@@ -229,9 +254,24 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
torrents_cache[domain].append(context)
# 如果超过了限制条数则移除掉前面的
if len(torrents_cache[domain]) > settings.CACHE_CONF["torrents"]:
# 优化直接删除旧数据无需重复清理数据进缓存前已经clear过
old_contexts = torrents_cache[domain][:-settings.CACHE_CONF["torrents"]]
torrents_cache[domain] = torrents_cache[domain][-settings.CACHE_CONF["torrents"]:]
# 清理旧对象
clear_large_objects(*old_contexts)
# 优化:清理不再需要的临时变量
del meta, mediainfo, context
# 每处理一定数量的种子后检查内存
processed_count += 1
if processed_count % 10 == 0:
self.memory_manager.check_memory_and_cleanup()
# 回收资源
del torrents
# 定期执行垃圾回收
gc.collect()
else:
logger.info(f'{indexer.get("name")} 没有获取到种子')
@@ -243,7 +283,18 @@ class TorrentsChain(ChainBase, metaclass=Singleton):
# 去除不在站点范围内的缓存种子
if sites and torrents_cache:
old_cache = torrents_cache
torrents_cache = {k: v for k, v in torrents_cache.items() if k in domains}
# 清理不再使用的缓存数据数据进缓存前已经clear过无需重复清理
removed_contexts = []
for domain, contexts in old_cache.items():
if domain not in domains:
removed_contexts.extend(contexts)
# 批量清理
if removed_contexts:
clear_large_objects(*removed_contexts)
del old_cache
return torrents_cache
def __renew_rss_url(self, domain: str, site: dict):

View File

@@ -1,4 +1,5 @@
from typing import Callable, Any, Optional
import gc
from playwright.sync_api import sync_playwright, Page
from cf_clearance import sync_cf_retry, sync_stealth
@@ -35,26 +36,43 @@ class PlaywrightHelper:
:param headless: 是否无头模式
:param timeout: 超时时间
"""
result = None
try:
with sync_playwright() as playwright:
browser = playwright[self.browser_type].launch(headless=headless)
context = browser.new_context(user_agent=ua, proxy=proxies)
page = context.new_page()
if cookies:
page.set_extra_http_headers({"cookie": cookies})
browser = None
context = None
page = None
try:
browser = playwright[self.browser_type].launch(headless=headless)
context = browser.new_context(user_agent=ua, proxy=proxies)
page = context.new_page()
if cookies:
page.set_extra_http_headers({"cookie": cookies})
if not self.__pass_cloudflare(url, page):
logger.warn("cloudflare challenge fail")
page.wait_for_load_state("networkidle", timeout=timeout * 1000)
# 回调函数
return callback(page)
result = callback(page)
except Exception as e:
logger.error(f"网页操作失败: {str(e)}")
finally:
browser.close()
# 确保资源被正确清理
if page:
page.close()
if context:
context.close()
if browser:
browser.close()
# 强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f"网页操作失败: {str(e)}")
return None
logger.error(f"Playwright初始化失败: {str(e)}")
return result
def get_page_source(self, url: str,
cookies: Optional[str] = None,
@@ -71,26 +89,42 @@ class PlaywrightHelper:
:param headless: 是否无头模式
:param timeout: 超时时间
"""
source = ""
source = None
try:
with sync_playwright() as playwright:
browser = playwright[self.browser_type].launch(headless=headless)
context = browser.new_context(user_agent=ua, proxy=proxies)
page = context.new_page()
if cookies:
page.set_extra_http_headers({"cookie": cookies})
browser = None
context = None
page = None
try:
browser = playwright[self.browser_type].launch(headless=headless)
context = browser.new_context(user_agent=ua, proxy=proxies)
page = context.new_page()
if cookies:
page.set_extra_http_headers({"cookie": cookies})
if not self.__pass_cloudflare(url, page):
logger.warn("cloudflare challenge fail")
page.wait_for_load_state("networkidle", timeout=timeout * 1000)
source = page.content()
except Exception as e:
logger.error(f"获取网页源码失败: {str(e)}")
source = None
finally:
browser.close()
# 确保资源被正确清理
if page:
page.close()
if context:
context.close()
if browser:
browser.close()
# 强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f"获取网页源码失败: {str(e)}")
logger.error(f"Playwright初始化失败: {str(e)}")
return source

158
app/helper/memory.py Normal file
View File

@@ -0,0 +1,158 @@
import gc
import psutil
import threading
import time
from typing import Optional, Callable, Any
from functools import wraps
from app.log import logger
from app.utils.singleton import Singleton
class MemoryManager(metaclass=Singleton):
"""
内存管理工具类,用于监控和优化内存使用
"""
def __init__(self):
self._memory_threshold = 80 # 内存使用率阈值(%)
self._check_interval = 300 # 检查间隔(秒)
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
def get_memory_usage(self) -> dict:
"""
获取当前内存使用情况
"""
process = psutil.Process()
memory_info = process.memory_info()
system_memory = psutil.virtual_memory()
return {
'rss': memory_info.rss / 1024 / 1024, # MB
'vms': memory_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent(),
'system_percent': system_memory.percent,
'system_available': system_memory.available / 1024 / 1024 / 1024 # GB
}
def force_gc(self, generation: Optional[int] = None) -> int:
"""
强制执行垃圾回收
:param generation: 垃圾回收代数None表示所有代数
:return: 回收的对象数量
"""
before_memory = self.get_memory_usage()
if generation is not None:
collected = gc.collect(generation)
else:
collected = gc.collect()
after_memory = self.get_memory_usage()
memory_freed = before_memory['rss'] - after_memory['rss']
if memory_freed > 1: # 释放超过1MB才记录
logger.info(f"垃圾回收完成: 回收对象 {collected} 个, 释放内存 {memory_freed:.2f}MB")
return collected
def check_memory_and_cleanup(self) -> bool:
"""
检查内存使用率,如果过高则执行清理
:return: 是否执行了清理
"""
memory_info = self.get_memory_usage()
if memory_info['percent'] > self._memory_threshold:
logger.warning(f"内存使用率过高: {memory_info['percent']:.1f}%, 开始清理...")
self.force_gc()
return True
return False
def start_monitoring(self):
"""
开始内存监控
"""
if self._monitoring:
return
self._monitoring = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("内存监控已启动")
def stop_monitoring(self):
"""
停止内存监控
"""
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
logger.info("内存监控已停止")
def _monitor_loop(self):
"""
内存监控循环
"""
while self._monitoring:
try:
self.check_memory_and_cleanup()
time.sleep(self._check_interval)
except Exception as e:
logger.error(f"内存监控出错: {e}")
time.sleep(60) # 出错后等待1分钟再继续
def set_threshold(self, threshold: int):
"""
设置内存使用率阈值
"""
self._memory_threshold = max(50, min(95, threshold))
def set_check_interval(self, interval: int):
"""
设置检查间隔
"""
self._check_interval = max(60, interval)
def memory_optimized(force_gc_after: bool = False, log_memory: bool = False):
"""
内存优化装饰器
:param force_gc_after: 函数执行后是否强制垃圾回收
:param log_memory: 是否记录内存使用情况
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
memory_manager = MemoryManager()
if log_memory:
before_memory = memory_manager.get_memory_usage()
logger.debug(f"{func.__name__} 执行前内存: {before_memory['rss']:.1f}MB")
try:
result = func(*args, **kwargs)
return result
finally:
if force_gc_after:
memory_manager.force_gc()
if log_memory:
after_memory = memory_manager.get_memory_usage()
logger.debug(f"{func.__name__} 执行后内存: {after_memory['rss']:.1f}MB")
return wrapper
return decorator
def clear_large_objects(*objects):
"""
清理大型对象的辅助函数
"""
for obj in objects:
if hasattr(obj, 'clear') and callable(obj.clear):
obj.clear()
elif hasattr(obj, '__dict__'):
obj.__dict__.clear()
del obj
gc.collect()

View File

@@ -3,6 +3,7 @@ import traceback
import xml.dom.minidom
from typing import List, Tuple, Union, Optional
from urllib.parse import urljoin
import gc
import chardet
from lxml import etree
@@ -19,6 +20,11 @@ class RssHelper:
"""
RSS帮助类解析RSS报文、获取RSS地址等
"""
# RSS解析限制配置
MAX_RSS_SIZE = 50 * 1024 * 1024 # 50MB最大RSS文件大小
MAX_RSS_ITEMS = 1000 # 最大解析条目数
# 各站点RSS链接获取配置
rss_link_conf = {
"default": {
@@ -224,8 +230,7 @@ class RssHelper:
},
}
@staticmethod
def parse(url, proxy: bool = False, timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]:
def parse(self, url, proxy: bool = False, timeout: Optional[int] = 15, headers: dict = None) -> Union[List[dict], None, bool]:
"""
解析RSS订阅URL获取RSS中的种子信息
:param url: RSS地址
@@ -238,6 +243,7 @@ class RssHelper:
ret_array: list = []
if not url:
return False
try:
ret = RequestUtils(proxies=settings.PROXY if proxy else None,
timeout=timeout, headers=headers).get_res(url)
@@ -246,11 +252,15 @@ class RssHelper:
except Exception as err:
logger.error(f"获取RSS失败{str(err)} - {traceback.format_exc()}")
return False
if ret:
ret_xml = ""
try:
# 使用chardet检测字符编码
# 检查响应大小避免处理过大的RSS文件
raw_data = ret.content
if raw_data and len(raw_data) > self.MAX_RSS_SIZE:
logger.warning(f"RSS文件过大: {len(raw_data)/1024/1024:.1f}MB跳过解析")
return False
if raw_data:
try:
result = chardet.detect(raw_data)
@@ -269,65 +279,84 @@ class RssHelper:
ret.encoding = ret.apparent_encoding
if not ret_xml:
ret_xml = ret.text
# 解析XML
dom_tree = xml.dom.minidom.parseString(ret_xml)
rootNode = dom_tree.documentElement
items = rootNode.getElementsByTagName("item")
for item in items:
try:
# 标题
title = DomUtils.tag_value(item, "title", default="")
if not title:
# 解析XML - 使用try-finally确保DOM树被清理
dom_tree = None
try:
dom_tree = xml.dom.minidom.parseString(ret_xml)
rootNode = dom_tree.documentElement
items = rootNode.getElementsByTagName("item")
# 限制处理的条目数量
items_count = min(len(items), self.MAX_RSS_ITEMS)
if len(items) > self.MAX_RSS_ITEMS:
logger.warning(f"RSS条目过多: {len(items)},仅处理前{self.MAX_RSS_ITEMS}")
for i, item in enumerate(items[:items_count]):
try:
# 定期执行垃圾回收
if i > 0 and i % 100 == 0:
gc.collect()
# 标题
title = DomUtils.tag_value(item, "title", default="")
if not title:
continue
# 描述
description = DomUtils.tag_value(item, "description", default="")
# 种子页面
link = DomUtils.tag_value(item, "link", default="")
# 种子链接
enclosure = DomUtils.tag_value(item, "enclosure", "url", default="")
if not enclosure and not link:
continue
# 部分RSS只有link没有enclosure
if not enclosure and link:
enclosure = link
# 大小
size = DomUtils.tag_value(item, "enclosure", "length", default=0)
if size and str(size).isdigit():
size = int(size)
else:
size = 0
# 发布日期
pubdate = DomUtils.tag_value(item, "pubDate", default="")
if pubdate:
# 转换为时间
pubdate = StringUtils.get_time(pubdate)
# 获取豆瓣昵称
nickname = DomUtils.tag_value(item, "dc:createor", default="")
# 返回对象
tmp_dict = {'title': title,
'enclosure': enclosure,
'size': size,
'description': description,
'link': link,
'pubdate': pubdate}
# 如果豆瓣昵称不为空返回数据增加豆瓣昵称供doubansync插件获取
if nickname:
tmp_dict['nickname'] = nickname
ret_array.append(tmp_dict)
except Exception as e1:
logger.debug(f"解析RSS条目失败{str(e1)} - {traceback.format_exc()}")
continue
# 描述
description = DomUtils.tag_value(item, "description", default="")
# 种子页面
link = DomUtils.tag_value(item, "link", default="")
# 种子链接
enclosure = DomUtils.tag_value(item, "enclosure", "url", default="")
if not enclosure and not link:
continue
# 部分RSS只有link没有enclosure
if not enclosure and link:
enclosure = link
# 大小
size = DomUtils.tag_value(item, "enclosure", "length", default=0)
if size and str(size).isdigit():
size = int(size)
else:
size = 0
# 发布日期
pubdate = DomUtils.tag_value(item, "pubDate", default="")
if pubdate:
# 转换为时间
pubdate = StringUtils.get_time(pubdate)
# 获取豆瓣昵称
nickname = DomUtils.tag_value(item, "dc:createor", default="")
# 返回对象
tmp_dict = {'title': title,
'enclosure': enclosure,
'size': size,
'description': description,
'link': link,
'pubdate': pubdate}
# 如果豆瓣昵称不为空返回数据增加豆瓣昵称供doubansync插件获取
if nickname:
tmp_dict['nickname'] = nickname
ret_array.append(tmp_dict)
except Exception as e1:
logger.debug(f"解析RSS失败{str(e1)} - {traceback.format_exc()}")
continue
finally:
# DOM树必须显式清理 - 这是xml.dom.minidom的特殊要求
if dom_tree:
dom_tree.unlink()
except Exception as e2:
logger.error(f"解析RSS失败{str(e2)} - {traceback.format_exc()}")
# RSS过期 观众RSS 链接已过期,您需要获得一个新的! pthome RSS Link has expired, You need to get a new one!
# RSS过期检查
_rss_expired_msg = [
"RSS 链接已过期, 您需要获得一个新的!",
"RSS Link has expired, You need to get a new one!",
"RSS Link has expired, You need to get new!"
]
if ret_xml in _rss_expired_msg:
if 'ret_xml' in locals() and ret_xml and ret_xml in _rss_expired_msg:
return None
return False
return ret_array
def get_rss_link(self, url: str, cookie: str, ua: str, proxy: bool = False) -> Tuple[str, str]:
@@ -369,12 +398,14 @@ class RssHelper:
return "", f"获取 {url} RSS链接失败错误码{res.status_code},错误原因:{res.reason}"
else:
return "", f"获取RSS链接失败无法连接 {url} "
# 解析HTML
html = etree.HTML(html_text)
if StringUtils.is_valid_html_element(html):
rss_link = html.xpath(site_conf.get("xpath"))
if rss_link:
return str(rss_link[-1]), ""
if html_text:
html = etree.HTML(html_text)
if StringUtils.is_valid_html_element(html):
rss_link = html.xpath(site_conf.get("xpath"))
if rss_link:
return str(rss_link[-1]), ""
return "", f"获取RSS链接失败{url}"
except Exception as e:
return "", f"获取 {url} RSS链接失败{str(e)}"