remove memory helper

This commit is contained in:
jxxghp
2025-07-09 19:11:37 +08:00
parent 1a86d96bfa
commit 036cb6f3b0
4 changed files with 0 additions and 966 deletions

View File

@@ -274,12 +274,6 @@ class ConfigModel(BaseModel):
REPO_GITHUB_TOKEN: Optional[str] = None
# 大内存模式
BIG_MEMORY_MODE: bool = False
# 是否启用内存监控
MEMORY_ANALYSIS: bool = False
# 内存快照间隔(分钟)
MEMORY_SNAPSHOT_INTERVAL: int = 30
# 保留的内存快照文件数量
MEMORY_SNAPSHOT_KEEP_COUNT: int = 20
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 是否启用编码探测的性能模式

View File

@@ -1,940 +0,0 @@
import gc
import sys
import threading
import time
import os
import tracemalloc
from datetime import datetime
from typing import Optional, Dict, List, Tuple
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
import psutil
from pympler import muppy, summary, asizeof
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class TimeoutException(Exception):
"""超时异常"""
pass
class MemoryHelper(metaclass=Singleton):
"""
内存管理工具类,用于监控和优化内存使用
"""
def __init__(self):
# 检查间隔(秒) - 从配置获取默认5分钟
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
self._monitoring = False
self._monitor_thread: Optional[threading.Thread] = None
# 内存快照保存目录
self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots"
# 保留的快照文件数量
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
# 性能优化参数
self._max_objects_to_analyze = 50000 # 限制分析对象数量
self._analysis_timeout = 30 # 分析超时时间(秒)
self._large_object_threshold = 1024 * 1024 # 大对象阈值1MB
# 线程池用于超时控制
self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MemoryAnalysis")
# 启用tracemalloc以获得更详细的内存信息
if not tracemalloc.is_tracing():
tracemalloc.start(25) # 保留25个帧
def _run_with_timeout(self, func, *args, **kwargs):
"""
在超时限制下运行函数使用ThreadPoolExecutor实现跨平台超时控制
"""
try:
future = self._executor.submit(func, *args, **kwargs)
result = future.result(timeout=self._analysis_timeout)
return result
except FutureTimeoutError:
logger.warning(f"内存分析函数 {func.__name__} 超时 ({self._analysis_timeout}秒)")
return None
except Exception as e:
logger.error(f"内存分析函数 {func.__name__} 出错: {e}")
return None
def __del__(self):
"""析构函数,确保线程池正确关闭"""
try:
if hasattr(self, '_executor'):
self._executor.shutdown(wait=False)
except:
pass
@eventmanager.register(EventType.ConfigChanged)
def handle_config_changed(self, event: Event):
"""
处理配置变更事件,更新内存监控设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']:
return
# 更新配置
if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL':
self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60
elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT':
self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT
self.stop_monitoring()
self.start_monitoring()
def start_monitoring(self):
"""
开始内存监控
"""
if not settings.MEMORY_ANALYSIS:
return
if self._monitoring:
return
# 创建内存快照目录
self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True)
# 初始化内存分析器
self._monitoring = True
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
logger.info("内存监控已启动")
def stop_monitoring(self):
"""
停止内存监控
"""
self._monitoring = False
if self._monitor_thread:
self._monitor_thread.join(timeout=5)
logger.info("内存监控已停止")
def _monitor_loop(self):
"""
内存监控循环
"""
logger.info("内存监控循环开始")
while self._monitoring:
try:
# 生成内存快照
self._create_memory_snapshot()
time.sleep(self._check_interval)
except Exception as e:
logger.error(f"内存监控出错: {e}")
# 出错后等待1分钟再继续
time.sleep(60)
logger.info("内存监控循环结束")
def _create_memory_snapshot(self):
"""
创建内存快照并保存到文件
"""
try:
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt"
# 获取系统内存使用情况
memory_usage = psutil.Process().memory_info().rss
logger.info(f"开始创建内存快照: {snapshot_file}")
# 第一步:写入基本信息和系统内存统计
self._write_system_memory_info(snapshot_file, memory_usage)
# 第二步写入Python对象类型统计优化版本
self._write_python_objects_info_optimized(snapshot_file)
# 第三步:分析并写入类实例内存使用情况(简化版本)
self._append_class_analysis_simple(snapshot_file)
# 第四步:分析并写入大内存变量详情(优化版本)
self._append_variable_analysis_optimized(snapshot_file)
# 第五步:分析内存泄漏和增长趋势
self._append_memory_leak_analysis(snapshot_file)
logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB")
# 清理过期的快照文件
self._cleanup_old_snapshots()
except Exception as e:
logger.error(f"创建内存快照失败: {e}")
def _write_system_memory_info(self, snapshot_file, memory_usage):
"""
写入系统内存信息
"""
process = psutil.Process()
memory_info = process.memory_info()
memory_percent = process.memory_percent()
# 获取系统总内存信息
system_memory = psutil.virtual_memory()
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 80 + "\n")
f.write("系统内存使用情况:\n")
f.write("-" * 80 + "\n")
f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n")
f.write(f"进程内存使用率: {memory_percent:.2f}%\n")
f.write(f"系统总内存: {system_memory.total / 1024 / 1024 / 1024:.2f} GB\n")
f.write(f"系统可用内存: {system_memory.available / 1024 / 1024 / 1024:.2f} GB\n")
f.write(f"系统内存使用率: {system_memory.percent:.2f}%\n")
f.write(f"进程RSS内存: {memory_info.rss / 1024 / 1024:.2f} MB\n")
f.write(f"进程VMS内存: {memory_info.vms / 1024 / 1024:.2f} MB\n")
f.write(f"进程共享内存: {memory_info.shared / 1024 / 1024:.2f} MB\n")
f.write(f"进程文本段: {memory_info.text / 1024 / 1024:.2f} MB\n")
f.write(f"进程数据段: {memory_info.data / 1024 / 1024:.2f} MB\n")
f.flush()
def _write_python_objects_info_optimized(self, snapshot_file):
"""
优化的Python对象类型统计信息
"""
try:
# 获取当前tracemalloc统计
current, peak = tracemalloc.get_traced_memory()
# 限制分析的对象数量
all_objects = muppy.get_objects()
if len(all_objects) > self._max_objects_to_analyze:
# 随机采样,避免总是分析相同的对象
import random
all_objects = random.sample(all_objects, self._max_objects_to_analyze)
logger.info(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象")
# 使用超时机制
def analyze_objects():
sum1 = summary.summarize(all_objects)
return sum1
sum1 = self._run_with_timeout(analyze_objects)
if sum1 is None:
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("Python内存使用情况:\n")
f.write("-" * 80 + "\n")
f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n")
f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n")
f.write("对象分析超时,跳过详细统计\n")
f.flush()
return
# 计算Python对象总内存
python_total_mb = 0
for line in summary.format_(sum1):
if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'):
parts = line.split('|')
if len(parts) >= 3:
try:
size_str = parts[2].strip()
if 'MB' in size_str:
size_mb = float(size_str.replace('MB', '').strip())
python_total_mb += size_mb
except:
pass
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("Python内存使用情况:\n")
f.write("-" * 80 + "\n")
f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n")
f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n")
f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n")
f.write(f"分析对象数量: {len(all_objects):,}\n")
f.write("\n对象类型统计 (前20个):\n")
f.write("-" * 80 + "\n")
# 写入对象统计信息(限制行数)
line_count = 0
for line in summary.format_(sum1):
if line_count >= 25: # 只显示前25行
break
f.write(line + "\n")
line_count += 1
f.flush()
except Exception as e:
logger.error(f"写入Python对象信息失败: {e}")
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write(f"\nPython对象分析失败: {e}\n")
f.flush()
def _append_class_analysis_simple(self, snapshot_file):
"""
简化的类实例内存使用情况分析
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("类实例内存使用情况 (简化版):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
f.flush()
try:
logger.debug("开始分析类实例内存使用情况")
def analyze_classes():
class_info = {}
processed_count = 0
# 获取所有对象(限制数量)
all_objects = muppy.get_objects()
if len(all_objects) > self._max_objects_to_analyze:
import random
all_objects = random.sample(all_objects, self._max_objects_to_analyze)
for obj in all_objects:
try:
# 跳过类对象本身
if isinstance(obj, type):
continue
# 获取对象的类名
obj_class = type(obj)
try:
if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'):
class_name = f"{obj_class.__module__}.{obj_class.__name__}"
else:
class_name = str(obj_class)
except:
class_name = f"<unknown_class_{id(obj_class)}>"
# 只计算对象本身的大小,避免深度计算
size_bytes = sys.getsizeof(obj)
if size_bytes < 100: # 跳过太小的对象
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
if class_name in class_info:
class_info[class_name]['size_mb'] += size_mb
class_info[class_name]['count'] += 1
else:
class_info[class_name] = {
'name': class_name,
'size_mb': size_mb,
'count': 1
}
except Exception:
continue
# 按内存大小排序只返回前50个
sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True)
return sorted_classes[:50]
class_objects = self._run_with_timeout(analyze_classes)
# 更新文件内容
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if class_objects:
for i, class_info in enumerate(class_objects, 1):
f.write(f"{i:3d}. {class_info['name']:<50} "
f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n")
else:
f.write("类实例分析超时或失败\n")
f.flush()
except Exception as e:
logger.error(f"获取类实例信息失败: {e}")
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("类实例分析已完成并写入")
def _append_variable_analysis_optimized(self, snapshot_file):
"""
优化的大内存变量详情分析
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("大内存变量详情 (优化版):\n")
f.write("-" * 80 + "\n")
f.write("正在分析中...\n")
f.flush()
try:
logger.debug("开始分析大内存变量")
def analyze_large_variables():
large_vars = []
processed_count = 0
calculated_objects = set()
# 获取所有对象(限制数量)
all_objects = muppy.get_objects()
if len(all_objects) > self._max_objects_to_analyze:
import random
all_objects = random.sample(all_objects, self._max_objects_to_analyze)
for obj in all_objects:
# 跳过类对象
if isinstance(obj, type):
continue
# 跳过已经计算过的对象
obj_id = id(obj)
if obj_id in calculated_objects:
continue
try:
# 首先使用 sys.getsizeof 快速筛选
shallow_size = sys.getsizeof(obj)
if shallow_size < self._large_object_threshold: # 只处理大于1MB的对象
continue
# 对于较大的对象,使用 asizeof 进行深度计算
size_bytes = asizeof.asizeof(obj)
# 只处理大于1MB的对象
if size_bytes < self._large_object_threshold:
continue
size_mb = size_bytes / 1024 / 1024
processed_count += 1
calculated_objects.add(obj_id)
# 获取对象信息
var_info = self._get_variable_info_simple(obj, size_mb)
if var_info:
large_vars.append(var_info)
# 如果已经找到足够多的大对象,可以提前结束
if len(large_vars) >= 50: # 限制数量
break
except Exception:
continue
# 按内存大小排序并返回前30个
large_vars.sort(key=lambda x: x['size_mb'], reverse=True)
return large_vars[:30]
large_variables = self._run_with_timeout(analyze_large_variables)
# 更新文件内容
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", "")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
if large_variables:
for i, var_info in enumerate(large_variables, 1):
f.write(
f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n")
else:
f.write("大内存变量分析超时或失败\n")
f.flush()
except Exception as e:
logger.error(f"获取大内存变量信息失败: {e}")
with open(snapshot_file, 'r', encoding='utf-8') as f:
content = f.read()
content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write(content)
f.flush()
logger.debug("大内存变量分析已完成并写入")
def _get_variable_info_simple(self, obj, size_mb):
"""
简化的变量信息获取
"""
try:
obj_type = type(obj).__name__
# 简化的变量名获取
var_name = f"{obj_type}_{id(obj)}"
# 生成描述性信息
if isinstance(obj, dict):
key_count = len(obj)
var_name = f"dict_{key_count}items_{id(obj)}"
elif isinstance(obj, (list, tuple, set)):
var_name = f"{obj_type}_{len(obj)}items_{id(obj)}"
elif isinstance(obj, str):
var_name = f"str_{len(obj)}chars_{id(obj)}"
elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'):
var_name = f"{obj.__class__.__name__}_{id(obj)}"
return {
'name': var_name,
'type': obj_type,
'size_mb': size_mb
}
except Exception:
return None
def _append_memory_leak_analysis(self, snapshot_file):
"""
分析内存泄漏和增长趋势
"""
with open(snapshot_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 80 + "\n")
f.write("内存泄漏分析:\n")
f.write("-" * 80 + "\n")
# 获取tracemalloc统计
current, peak = tracemalloc.get_traced_memory()
f.write(f"当前tracemalloc内存: {current / 1024 / 1024:.2f} MB\n")
f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n")
# 获取内存分配统计(限制数量)
try:
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
f.write("\n内存分配最多的位置 (前5个):\n")
f.write("-" * 80 + "\n")
for i, stat in enumerate(top_stats[:5], 1):
f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n")
f.write(f" {stat.traceback.format()}\n")
except Exception as e:
f.write(f"获取内存分配统计失败: {e}\n")
# 垃圾回收统计
f.write("\n垃圾回收统计:\n")
f.write("-" * 80 + "\n")
for i in range(3):
count = gc.get_count()[i]
f.write(f"GC代 {i}: {count}\n")
# 获取不可达对象数量
unreachable = len(gc.garbage)
f.write(f"不可达对象数量: {unreachable}\n")
f.flush()
logger.debug("内存泄漏分析已完成并写入")
def _cleanup_old_snapshots(self):
"""
清理过期的内存快照文件,只保留最近的指定数量文件
"""
try:
snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt"))
if len(snapshot_files) > self._keep_count:
# 按修改时间排序,删除最旧的文件
snapshot_files.sort(key=lambda x: x.stat().st_mtime)
for old_file in snapshot_files[:-self._keep_count]:
old_file.unlink()
logger.debug(f"已删除过期内存快照: {old_file}")
except Exception as e:
logger.error(f"清理过期快照失败: {e}")
def create_detailed_memory_analysis(self):
"""
创建详细的内存分析报告,专门用于诊断内存问题
"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
analysis_file = self._memory_snapshot_dir / f"detailed_memory_analysis_{timestamp}.txt"
logger.info(f"开始创建详细内存分析: {analysis_file}")
with open(analysis_file, 'w', encoding='utf-8') as f:
f.write(f"详细内存分析报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 100 + "\n\n")
# 1. 系统级内存分析
self._write_detailed_system_analysis(f)
# 2. Python对象深度分析优化版
self._write_detailed_python_analysis_optimized(f)
# 3. 内存映射详细分析(简化版)
self._write_detailed_memory_maps_simple(f)
# 4. 大对象分析(优化版)
self._write_detailed_large_objects_optimized(f)
# 5. 内存泄漏检测
self._write_memory_leak_detection(f)
logger.info(f"详细内存分析已保存: {analysis_file}")
return analysis_file
except Exception as e:
logger.error(f"创建详细内存分析失败: {e}")
return None
def _write_detailed_system_analysis(self, f):
"""
写入详细的系统内存分析
"""
f.write("1. 系统级内存分析\n")
f.write("-" * 50 + "\n")
process = psutil.Process()
memory_info = process.memory_info()
f.write(f"进程ID: {process.pid}\n")
f.write(f"进程名称: {process.name()}\n")
f.write(f"进程命令行: {' '.join(process.cmdline())}\n\n")
f.write("内存使用详情:\n")
f.write(f" RSS (物理内存): {memory_info.rss / 1024 / 1024:.2f} MB\n")
f.write(f" VMS (虚拟内存): {memory_info.vms / 1024 / 1024:.2f} MB\n")
f.write(f" 共享内存: {memory_info.shared / 1024 / 1024:.2f} MB\n")
f.write(f" 文本段: {memory_info.text / 1024 / 1024:.2f} MB\n")
f.write(f" 数据段: {memory_info.data / 1024 / 1024:.2f} MB\n")
f.write(f" 库内存: {memory_info.lib / 1024 / 1024:.2f} MB\n")
f.write(f" 脏页: {memory_info.dirty / 1024 / 1024:.2f} MB\n")
# 系统内存信息
system_memory = psutil.virtual_memory()
f.write(f"\n系统内存:\n")
f.write(f" 总内存: {system_memory.total / 1024 / 1024 / 1024:.2f} GB\n")
f.write(f" 可用内存: {system_memory.available / 1024 / 1024 / 1024:.2f} GB\n")
f.write(f" 使用率: {system_memory.percent:.2f}%\n")
f.write(f" 缓存: {system_memory.cached / 1024 / 1024 / 1024:.2f} GB\n")
f.write(f" 缓冲区: {system_memory.buffers / 1024 / 1024 / 1024:.2f} GB\n")
f.write("\n" + "=" * 100 + "\n\n")
def _write_detailed_python_analysis_optimized(self, f):
"""
写入优化的Python对象分析
"""
f.write("2. Python对象深度分析 (优化版)\n")
f.write("-" * 50 + "\n")
# 强制垃圾回收
collected = gc.collect()
f.write(f"垃圾回收清理对象数: {collected}\n\n")
# 获取所有对象(限制数量)
all_objects = muppy.get_objects()
if len(all_objects) > self._max_objects_to_analyze:
import random
all_objects = random.sample(all_objects, self._max_objects_to_analyze)
f.write(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象\n\n")
f.write(f"总对象数: {len(all_objects):,}\n")
# 按类型统计(简化版)
type_stats = {}
for obj in all_objects:
try:
obj_type = type(obj).__name__
if obj_type not in type_stats:
type_stats[obj_type] = {'count': 0, 'size': 0}
type_stats[obj_type]['count'] += 1
type_stats[obj_type]['size'] += sys.getsizeof(obj)
except:
continue
# 按大小排序
sorted_types = sorted(type_stats.items(), key=lambda x: x[1]['size'], reverse=True)
f.write("对象类型统计 (按内存大小排序前15个):\n")
f.write(f"{'类型':<20} {'数量':<10} {'总大小(MB)':<12} {'平均大小(B)':<12}\n")
f.write("-" * 60 + "\n")
total_python_memory = 0
for obj_type, stats in sorted_types[:15]: # 只显示前15个
size_mb = stats['size'] / 1024 / 1024
avg_size = stats['size'] / stats['count'] if stats['count'] > 0 else 0
total_python_memory += size_mb
f.write(f"{obj_type:<20} {stats['count']:<10,} {size_mb:<12.2f} {avg_size:<12.1f}\n")
f.write(f"\nPython对象总内存: {total_python_memory:.2f} MB\n")
# 计算未统计内存
process = psutil.Process()
total_memory = process.memory_info().rss / 1024 / 1024
unaccounted = total_memory - total_python_memory
f.write(f"未统计内存: {unaccounted:.2f} MB ({unaccounted/total_memory*100:.1f}%)\n")
f.write("\n" + "=" * 100 + "\n\n")
def _write_detailed_memory_maps_simple(self, f):
"""
写入简化的内存映射分析
"""
f.write("3. 内存映射分析 (简化版)\n")
f.write("-" * 50 + "\n")
try:
process = psutil.Process()
memory_maps = process.memory_maps()
# 按权限分类
perm_stats = {}
for mmap in memory_maps:
size_mb = mmap.size / 1024 / 1024
perms = mmap.perms
# 按权限统计
if perms not in perm_stats:
perm_stats[perms] = {'count': 0, 'size': 0}
perm_stats[perms]['count'] += 1
perm_stats[perms]['size'] += size_mb
f.write("按权限分类的内存映射:\n")
f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n")
f.write("-" * 35 + "\n")
for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True):
f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n")
except Exception as e:
f.write(f"内存映射分析失败: {e}\n")
f.write("\n" + "=" * 100 + "\n\n")
def _write_detailed_large_objects_optimized(self, f):
"""
写入优化的大对象详细分析
"""
f.write("4. 大对象详细分析 (优化版)\n")
f.write("-" * 50 + "\n")
def analyze_large_objects():
large_objects = []
# 获取所有对象(限制数量)
all_objects = muppy.get_objects()
if len(all_objects) > self._max_objects_to_analyze:
import random
all_objects = random.sample(all_objects, self._max_objects_to_analyze)
for obj in all_objects:
try:
# 快速筛选
shallow_size = sys.getsizeof(obj)
if shallow_size < self._large_object_threshold:
continue
# 深度计算
size = asizeof.asizeof(obj)
if size > self._large_object_threshold:
large_objects.append((obj, size))
# 限制数量
if len(large_objects) >= 20:
break
except:
continue
return large_objects
large_objects = self._run_with_timeout(analyze_large_objects)
if large_objects:
f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n")
for i, (obj, size) in enumerate(large_objects, 1):
size_mb = size / 1024 / 1024
obj_type = type(obj).__name__
f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n")
# 简化的对象信息
try:
if isinstance(obj, dict):
f.write(f" 字典项数: {len(obj)}\n")
elif isinstance(obj, (list, tuple)):
f.write(f" 元素数量: {len(obj)}\n")
elif isinstance(obj, str):
f.write(f" 字符串长度: {len(obj)}\n")
except:
pass
f.write("\n")
else:
f.write("大对象分析超时或未找到大对象\n")
f.write("=" * 100 + "\n\n")
def _write_memory_leak_detection(self, f):
"""
写入内存泄漏检测
"""
f.write("5. 内存泄漏检测\n")
f.write("-" * 50 + "\n")
# tracemalloc分析
current, peak = tracemalloc.get_traced_memory()
f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n")
f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n")
try:
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
f.write(f"\n内存分配最多的位置 (前10个):\n")
f.write("-" * 50 + "\n")
for i, stat in enumerate(top_stats[:10], 1):
f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n")
for line in stat.traceback.format():
f.write(f" {line}\n")
f.write("\n")
except Exception as e:
f.write(f"获取tracemalloc统计失败: {e}\n")
# 垃圾回收分析
f.write("垃圾回收分析:\n")
f.write("-" * 50 + "\n")
gc_counts = gc.get_count()
f.write(f"GC计数: {gc_counts}\n")
# 检查不可达对象
unreachable = len(gc.garbage)
f.write(f"不可达对象数量: {unreachable}\n")
if unreachable > 0:
f.write("不可达对象详情:\n")
for i, obj in enumerate(gc.garbage[:5], 1): # 只显示前5个
f.write(f" {i}. {type(obj).__name__} - {id(obj)}\n")
f.write("\n" + "=" * 100 + "\n\n")
def get_memory_summary(self) -> Dict[str, float]:
"""
获取内存使用摘要
"""
try:
process = psutil.Process()
memory_info = process.memory_info()
# 获取Python对象总内存简化版
all_objects = muppy.get_objects()
if len(all_objects) > 10000: # 限制对象数量
import random
all_objects = random.sample(all_objects, 10000)
sum1 = summary.summarize(all_objects)
python_total_mb = 0
for line in summary.format_(sum1):
if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'):
parts = line.split('|')
if len(parts) >= 3:
try:
size_str = parts[2].strip()
if 'MB' in size_str:
size_mb = float(size_str.replace('MB', '').strip())
python_total_mb += size_mb
except:
pass
total_memory = memory_info.rss / 1024 / 1024
unaccounted = total_memory - python_total_mb
return {
'total_memory_mb': total_memory,
'python_objects_mb': python_total_mb,
'unaccounted_mb': unaccounted,
'unaccounted_percent': (unaccounted / total_memory * 100) if total_memory > 0 else 0
}
except Exception as e:
logger.error(f"获取内存摘要失败: {e}")
return {}
def force_garbage_collection(self):
"""
强制垃圾回收并返回清理的对象数量
"""
try:
collected = gc.collect()
logger.info(f"强制垃圾回收完成,清理了 {collected} 个对象")
return collected
except Exception as e:
logger.error(f"强制垃圾回收失败: {e}")
return 0
def analyze_memory_growth(self, interval_seconds: int = 300) -> Dict[str, float]:
"""
分析内存增长趋势
:param interval_seconds: 分析间隔(秒)
:return: 内存增长信息
"""
try:
# 获取当前内存使用
current_summary = self.get_memory_summary()
# 等待指定时间
time.sleep(interval_seconds)
# 获取新的内存使用
new_summary = self.get_memory_summary()
if current_summary and new_summary:
growth_info = {
'total_growth_mb': new_summary['total_memory_mb'] - current_summary['total_memory_mb'],
'python_growth_mb': new_summary['python_objects_mb'] - current_summary['python_objects_mb'],
'unaccounted_growth_mb': new_summary['unaccounted_mb'] - current_summary['unaccounted_mb'],
'growth_rate_mb_per_hour': (new_summary['total_memory_mb'] - current_summary['total_memory_mb']) * 3600 / interval_seconds
}
logger.info(f"内存增长分析: 总增长 {growth_info['total_growth_mb']:.2f} MB, "
f"Python对象增长 {growth_info['python_growth_mb']:.2f} MB, "
f"未统计增长 {growth_info['unaccounted_growth_mb']:.2f} MB")
return growth_info
return {}
except Exception as e:
logger.error(f"分析内存增长失败: {e}")
return {}
# 使用示例
if __name__ == "__main__":
# 创建内存分析器实例
memory_helper = MemoryHelper()
# 获取内存摘要
summary = memory_helper.get_memory_summary()
print("内存使用摘要:")
for key, value in summary.items():
print(f" {key}: {value:.2f}")
# 创建详细分析报告
analysis_file = memory_helper.create_detailed_memory_analysis()
if analysis_file:
print(f"详细分析报告已保存到: {analysis_file}")
# 强制垃圾回收
collected = memory_helper.force_garbage_collection()
print(f"垃圾回收清理了 {collected} 个对象")

View File

@@ -5,7 +5,6 @@ from fastapi import FastAPI
from app.chain.system import SystemChain
from app.startup.command_initializer import init_command, stop_command, restart_command
from app.startup.memory_initializer import init_memory_manager, stop_memory_manager
from app.startup.modules_initializer import init_modules, stop_modules
from app.startup.monitor_initializer import stop_monitor, init_monitor
from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins
@@ -52,8 +51,6 @@ async def lifespan(app: FastAPI):
init_command()
# 初始化工作流
init_workflow()
# 初始化内存管理
init_memory_manager()
# 插件同步到本地
sync_plugins_task = asyncio.create_task(init_extra())
try:
@@ -71,8 +68,6 @@ async def lifespan(app: FastAPI):
print(str(e))
# 备份插件
SystemChain().backup_plugins()
# 停止内存管理器
stop_memory_manager()
# 停止工作流
stop_workflow()
# 停止命令

View File

@@ -1,15 +0,0 @@
from app.helper.memory import MemoryHelper
def init_memory_manager():
"""
初始化内存监控器
"""
MemoryHelper().start_monitoring()
def stop_memory_manager():
"""
停止内存监控器
"""
MemoryHelper().stop_monitoring()