diff --git a/app/core/config.py b/app/core/config.py index 0794c3a0..17cc23d8 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -274,12 +274,6 @@ class ConfigModel(BaseModel): REPO_GITHUB_TOKEN: Optional[str] = None # 大内存模式 BIG_MEMORY_MODE: bool = False - # 是否启用内存监控 - MEMORY_ANALYSIS: bool = False - # 内存快照间隔(分钟) - MEMORY_SNAPSHOT_INTERVAL: int = 30 - # 保留的内存快照文件数量 - MEMORY_SNAPSHOT_KEEP_COUNT: int = 20 # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False # 是否启用编码探测的性能模式 diff --git a/app/helper/memory.py b/app/helper/memory.py deleted file mode 100644 index 51648b7f..00000000 --- a/app/helper/memory.py +++ /dev/null @@ -1,940 +0,0 @@ -import gc -import sys -import threading -import time -import os -import tracemalloc -from datetime import datetime -from typing import Optional, Dict, List, Tuple -from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError - -import psutil -from pympler import muppy, summary, asizeof - -from app.core.config import settings -from app.core.event import eventmanager, Event -from app.log import logger -from app.schemas import ConfigChangeEventData -from app.schemas.types import EventType -from app.utils.singleton import Singleton - - -class TimeoutException(Exception): - """超时异常""" - pass - - -class MemoryHelper(metaclass=Singleton): - """ - 内存管理工具类,用于监控和优化内存使用 - """ - - def __init__(self): - # 检查间隔(秒) - 从配置获取,默认5分钟 - self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60 - self._monitoring = False - self._monitor_thread: Optional[threading.Thread] = None - # 内存快照保存目录 - self._memory_snapshot_dir = settings.LOG_PATH / "memory_snapshots" - # 保留的快照文件数量 - self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT - - # 性能优化参数 - self._max_objects_to_analyze = 50000 # 限制分析对象数量 - self._analysis_timeout = 30 # 分析超时时间(秒) - self._large_object_threshold = 1024 * 1024 # 大对象阈值(1MB) - - # 线程池用于超时控制 - self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="MemoryAnalysis") - - # 启用tracemalloc以获得更详细的内存信息 - if not tracemalloc.is_tracing(): - tracemalloc.start(25) # 保留25个帧 - - def _run_with_timeout(self, func, *args, **kwargs): - """ - 在超时限制下运行函数,使用ThreadPoolExecutor实现跨平台超时控制 - """ - try: - future = self._executor.submit(func, *args, **kwargs) - result = future.result(timeout=self._analysis_timeout) - return result - except FutureTimeoutError: - logger.warning(f"内存分析函数 {func.__name__} 超时 ({self._analysis_timeout}秒)") - return None - except Exception as e: - logger.error(f"内存分析函数 {func.__name__} 出错: {e}") - return None - - def __del__(self): - """析构函数,确保线程池正确关闭""" - try: - if hasattr(self, '_executor'): - self._executor.shutdown(wait=False) - except: - pass - - @eventmanager.register(EventType.ConfigChanged) - def handle_config_changed(self, event: Event): - """ - 处理配置变更事件,更新内存监控设置 - :param event: 事件对象 - """ - if not event: - return - event_data: ConfigChangeEventData = event.event_data - if event_data.key not in ['MEMORY_ANALYSIS', 'MEMORY_SNAPSHOT_INTERVAL', 'MEMORY_SNAPSHOT_KEEP_COUNT']: - return - - # 更新配置 - if event_data.key == 'MEMORY_SNAPSHOT_INTERVAL': - self._check_interval = settings.MEMORY_SNAPSHOT_INTERVAL * 60 - elif event_data.key == 'MEMORY_SNAPSHOT_KEEP_COUNT': - self._keep_count = settings.MEMORY_SNAPSHOT_KEEP_COUNT - self.stop_monitoring() - self.start_monitoring() - - def start_monitoring(self): - """ - 开始内存监控 - """ - if not settings.MEMORY_ANALYSIS: - return - if self._monitoring: - return - - # 创建内存快照目录 - self._memory_snapshot_dir.mkdir(parents=True, exist_ok=True) - - # 初始化内存分析器 - self._monitoring = True - self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) - self._monitor_thread.start() - logger.info("内存监控已启动") - - def stop_monitoring(self): - """ - 停止内存监控 - """ - self._monitoring = False - if self._monitor_thread: - self._monitor_thread.join(timeout=5) - logger.info("内存监控已停止") - - def _monitor_loop(self): - """ - 内存监控循环 - """ - logger.info("内存监控循环开始") - while self._monitoring: - try: - # 生成内存快照 - self._create_memory_snapshot() - time.sleep(self._check_interval) - except Exception as e: - logger.error(f"内存监控出错: {e}") - # 出错后等待1分钟再继续 - time.sleep(60) - logger.info("内存监控循环结束") - - def _create_memory_snapshot(self): - """ - 创建内存快照并保存到文件 - """ - try: - # 获取当前时间戳 - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - snapshot_file = self._memory_snapshot_dir / f"memory_snapshot_{timestamp}.txt" - - # 获取系统内存使用情况 - memory_usage = psutil.Process().memory_info().rss - - logger.info(f"开始创建内存快照: {snapshot_file}") - - # 第一步:写入基本信息和系统内存统计 - self._write_system_memory_info(snapshot_file, memory_usage) - - # 第二步:写入Python对象类型统计(优化版本) - self._write_python_objects_info_optimized(snapshot_file) - - # 第三步:分析并写入类实例内存使用情况(简化版本) - self._append_class_analysis_simple(snapshot_file) - - # 第四步:分析并写入大内存变量详情(优化版本) - self._append_variable_analysis_optimized(snapshot_file) - - # 第五步:分析内存泄漏和增长趋势 - self._append_memory_leak_analysis(snapshot_file) - - logger.info(f"内存快照已保存: {snapshot_file}, 当前内存使用: {memory_usage / 1024 / 1024:.2f} MB") - - # 清理过期的快照文件 - self._cleanup_old_snapshots() - - except Exception as e: - logger.error(f"创建内存快照失败: {e}") - - def _write_system_memory_info(self, snapshot_file, memory_usage): - """ - 写入系统内存信息 - """ - process = psutil.Process() - memory_info = process.memory_info() - memory_percent = process.memory_percent() - - # 获取系统总内存信息 - system_memory = psutil.virtual_memory() - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(f"内存快照时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write("=" * 80 + "\n") - f.write("系统内存使用情况:\n") - f.write("-" * 80 + "\n") - f.write(f"当前进程内存使用: {memory_usage / 1024 / 1024:.2f} MB\n") - f.write(f"进程内存使用率: {memory_percent:.2f}%\n") - f.write(f"系统总内存: {system_memory.total / 1024 / 1024 / 1024:.2f} GB\n") - f.write(f"系统可用内存: {system_memory.available / 1024 / 1024 / 1024:.2f} GB\n") - f.write(f"系统内存使用率: {system_memory.percent:.2f}%\n") - f.write(f"进程RSS内存: {memory_info.rss / 1024 / 1024:.2f} MB\n") - f.write(f"进程VMS内存: {memory_info.vms / 1024 / 1024:.2f} MB\n") - f.write(f"进程共享内存: {memory_info.shared / 1024 / 1024:.2f} MB\n") - f.write(f"进程文本段: {memory_info.text / 1024 / 1024:.2f} MB\n") - f.write(f"进程数据段: {memory_info.data / 1024 / 1024:.2f} MB\n") - - f.flush() - - def _write_python_objects_info_optimized(self, snapshot_file): - """ - 优化的Python对象类型统计信息 - """ - try: - # 获取当前tracemalloc统计 - current, peak = tracemalloc.get_traced_memory() - - # 限制分析的对象数量 - all_objects = muppy.get_objects() - if len(all_objects) > self._max_objects_to_analyze: - # 随机采样,避免总是分析相同的对象 - import random - all_objects = random.sample(all_objects, self._max_objects_to_analyze) - logger.info(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象") - - # 使用超时机制 - def analyze_objects(): - sum1 = summary.summarize(all_objects) - return sum1 - - sum1 = self._run_with_timeout(analyze_objects) - if sum1 is None: - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("Python内存使用情况:\n") - f.write("-" * 80 + "\n") - f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - f.write("对象分析超时,跳过详细统计\n") - f.flush() - return - - # 计算Python对象总内存 - python_total_mb = 0 - for line in summary.format_(sum1): - if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): - parts = line.split('|') - if len(parts) >= 3: - try: - size_str = parts[2].strip() - if 'MB' in size_str: - size_mb = float(size_str.replace('MB', '').strip()) - python_total_mb += size_mb - except: - pass - - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("Python内存使用情况:\n") - f.write("-" * 80 + "\n") - f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - f.write(f"Python对象总内存: {python_total_mb:.2f} MB\n") - f.write(f"分析对象数量: {len(all_objects):,}\n") - - f.write("\n对象类型统计 (前20个):\n") - f.write("-" * 80 + "\n") - # 写入对象统计信息(限制行数) - line_count = 0 - for line in summary.format_(sum1): - if line_count >= 25: # 只显示前25行 - break - f.write(line + "\n") - line_count += 1 - - f.flush() - - except Exception as e: - logger.error(f"写入Python对象信息失败: {e}") - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write(f"\nPython对象分析失败: {e}\n") - f.flush() - - def _append_class_analysis_simple(self, snapshot_file): - """ - 简化的类实例内存使用情况分析 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("类实例内存使用情况 (简化版):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - f.flush() - - try: - logger.debug("开始分析类实例内存使用情况") - - def analyze_classes(): - class_info = {} - processed_count = 0 - - # 获取所有对象(限制数量) - all_objects = muppy.get_objects() - if len(all_objects) > self._max_objects_to_analyze: - import random - all_objects = random.sample(all_objects, self._max_objects_to_analyze) - - for obj in all_objects: - try: - # 跳过类对象本身 - if isinstance(obj, type): - continue - - # 获取对象的类名 - obj_class = type(obj) - try: - if hasattr(obj_class, '__module__') and hasattr(obj_class, '__name__'): - class_name = f"{obj_class.__module__}.{obj_class.__name__}" - else: - class_name = str(obj_class) - except: - class_name = f"" - - # 只计算对象本身的大小,避免深度计算 - size_bytes = sys.getsizeof(obj) - if size_bytes < 100: # 跳过太小的对象 - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - - if class_name in class_info: - class_info[class_name]['size_mb'] += size_mb - class_info[class_name]['count'] += 1 - else: - class_info[class_name] = { - 'name': class_name, - 'size_mb': size_mb, - 'count': 1 - } - - except Exception: - continue - - # 按内存大小排序,只返回前50个 - sorted_classes = sorted(class_info.values(), key=lambda x: x['size_mb'], reverse=True) - return sorted_classes[:50] - - class_objects = self._run_with_timeout(analyze_classes) - - # 更新文件内容 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if class_objects: - for i, class_info in enumerate(class_objects, 1): - f.write(f"{i:3d}. {class_info['name']:<50} " - f"{class_info['size_mb']:>8.2f} MB ({class_info['count']} 个实例)\n") - else: - f.write("类实例分析超时或失败\n") - - f.flush() - - except Exception as e: - logger.error(f"获取类实例信息失败: {e}") - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - content = content.replace("正在分析中...\n", f"获取类实例信息失败: {e}\n") - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("类实例分析已完成并写入") - - def _append_variable_analysis_optimized(self, snapshot_file): - """ - 优化的大内存变量详情分析 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("大内存变量详情 (优化版):\n") - f.write("-" * 80 + "\n") - f.write("正在分析中...\n") - f.flush() - - try: - logger.debug("开始分析大内存变量") - - def analyze_large_variables(): - large_vars = [] - processed_count = 0 - calculated_objects = set() - - # 获取所有对象(限制数量) - all_objects = muppy.get_objects() - if len(all_objects) > self._max_objects_to_analyze: - import random - all_objects = random.sample(all_objects, self._max_objects_to_analyze) - - for obj in all_objects: - # 跳过类对象 - if isinstance(obj, type): - continue - - # 跳过已经计算过的对象 - obj_id = id(obj) - if obj_id in calculated_objects: - continue - - try: - # 首先使用 sys.getsizeof 快速筛选 - shallow_size = sys.getsizeof(obj) - if shallow_size < self._large_object_threshold: # 只处理大于1MB的对象 - continue - - # 对于较大的对象,使用 asizeof 进行深度计算 - size_bytes = asizeof.asizeof(obj) - - # 只处理大于1MB的对象 - if size_bytes < self._large_object_threshold: - continue - - size_mb = size_bytes / 1024 / 1024 - processed_count += 1 - calculated_objects.add(obj_id) - - # 获取对象信息 - var_info = self._get_variable_info_simple(obj, size_mb) - if var_info: - large_vars.append(var_info) - - # 如果已经找到足够多的大对象,可以提前结束 - if len(large_vars) >= 50: # 限制数量 - break - - except Exception: - continue - - # 按内存大小排序并返回前30个 - large_vars.sort(key=lambda x: x['size_mb'], reverse=True) - return large_vars[:30] - - large_variables = self._run_with_timeout(analyze_large_variables) - - # 更新文件内容 - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - - content = content.replace("正在分析中...\n", "") - - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - - if large_variables: - for i, var_info in enumerate(large_variables, 1): - f.write( - f"{i:3d}. {var_info['name']:<30} {var_info['type']:<15} {var_info['size_mb']:>8.2f} MB\n") - else: - f.write("大内存变量分析超时或失败\n") - - f.flush() - - except Exception as e: - logger.error(f"获取大内存变量信息失败: {e}") - with open(snapshot_file, 'r', encoding='utf-8') as f: - content = f.read() - content = content.replace("正在分析中...\n", f"获取变量信息失败: {e}\n") - with open(snapshot_file, 'w', encoding='utf-8') as f: - f.write(content) - f.flush() - - logger.debug("大内存变量分析已完成并写入") - - def _get_variable_info_simple(self, obj, size_mb): - """ - 简化的变量信息获取 - """ - try: - obj_type = type(obj).__name__ - - # 简化的变量名获取 - var_name = f"{obj_type}_{id(obj)}" - - # 生成描述性信息 - if isinstance(obj, dict): - key_count = len(obj) - var_name = f"dict_{key_count}items_{id(obj)}" - elif isinstance(obj, (list, tuple, set)): - var_name = f"{obj_type}_{len(obj)}items_{id(obj)}" - elif isinstance(obj, str): - var_name = f"str_{len(obj)}chars_{id(obj)}" - elif hasattr(obj, '__class__') and hasattr(obj.__class__, '__name__'): - var_name = f"{obj.__class__.__name__}_{id(obj)}" - - return { - 'name': var_name, - 'type': obj_type, - 'size_mb': size_mb - } - - except Exception: - return None - - def _append_memory_leak_analysis(self, snapshot_file): - """ - 分析内存泄漏和增长趋势 - """ - with open(snapshot_file, 'a', encoding='utf-8') as f: - f.write("\n" + "=" * 80 + "\n") - f.write("内存泄漏分析:\n") - f.write("-" * 80 + "\n") - - # 获取tracemalloc统计 - current, peak = tracemalloc.get_traced_memory() - f.write(f"当前tracemalloc内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - - # 获取内存分配统计(限制数量) - try: - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics('lineno') - - f.write("\n内存分配最多的位置 (前5个):\n") - f.write("-" * 80 + "\n") - for i, stat in enumerate(top_stats[:5], 1): - f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") - f.write(f" {stat.traceback.format()}\n") - - except Exception as e: - f.write(f"获取内存分配统计失败: {e}\n") - - # 垃圾回收统计 - f.write("\n垃圾回收统计:\n") - f.write("-" * 80 + "\n") - for i in range(3): - count = gc.get_count()[i] - f.write(f"GC代 {i}: {count} 次\n") - - # 获取不可达对象数量 - unreachable = len(gc.garbage) - f.write(f"不可达对象数量: {unreachable}\n") - - f.flush() - - logger.debug("内存泄漏分析已完成并写入") - - def _cleanup_old_snapshots(self): - """ - 清理过期的内存快照文件,只保留最近的指定数量文件 - """ - try: - snapshot_files = list(self._memory_snapshot_dir.glob("memory_snapshot_*.txt")) - if len(snapshot_files) > self._keep_count: - # 按修改时间排序,删除最旧的文件 - snapshot_files.sort(key=lambda x: x.stat().st_mtime) - for old_file in snapshot_files[:-self._keep_count]: - old_file.unlink() - logger.debug(f"已删除过期内存快照: {old_file}") - except Exception as e: - logger.error(f"清理过期快照失败: {e}") - - def create_detailed_memory_analysis(self): - """ - 创建详细的内存分析报告,专门用于诊断内存问题 - """ - try: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - analysis_file = self._memory_snapshot_dir / f"detailed_memory_analysis_{timestamp}.txt" - - logger.info(f"开始创建详细内存分析: {analysis_file}") - - with open(analysis_file, 'w', encoding='utf-8') as f: - f.write(f"详细内存分析报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write("=" * 100 + "\n\n") - - # 1. 系统级内存分析 - self._write_detailed_system_analysis(f) - - # 2. Python对象深度分析(优化版) - self._write_detailed_python_analysis_optimized(f) - - # 3. 内存映射详细分析(简化版) - self._write_detailed_memory_maps_simple(f) - - # 4. 大对象分析(优化版) - self._write_detailed_large_objects_optimized(f) - - # 5. 内存泄漏检测 - self._write_memory_leak_detection(f) - - logger.info(f"详细内存分析已保存: {analysis_file}") - return analysis_file - - except Exception as e: - logger.error(f"创建详细内存分析失败: {e}") - return None - - def _write_detailed_system_analysis(self, f): - """ - 写入详细的系统内存分析 - """ - f.write("1. 系统级内存分析\n") - f.write("-" * 50 + "\n") - - process = psutil.Process() - memory_info = process.memory_info() - - f.write(f"进程ID: {process.pid}\n") - f.write(f"进程名称: {process.name()}\n") - f.write(f"进程命令行: {' '.join(process.cmdline())}\n\n") - - f.write("内存使用详情:\n") - f.write(f" RSS (物理内存): {memory_info.rss / 1024 / 1024:.2f} MB\n") - f.write(f" VMS (虚拟内存): {memory_info.vms / 1024 / 1024:.2f} MB\n") - f.write(f" 共享内存: {memory_info.shared / 1024 / 1024:.2f} MB\n") - f.write(f" 文本段: {memory_info.text / 1024 / 1024:.2f} MB\n") - f.write(f" 数据段: {memory_info.data / 1024 / 1024:.2f} MB\n") - f.write(f" 库内存: {memory_info.lib / 1024 / 1024:.2f} MB\n") - f.write(f" 脏页: {memory_info.dirty / 1024 / 1024:.2f} MB\n") - - # 系统内存信息 - system_memory = psutil.virtual_memory() - f.write(f"\n系统内存:\n") - f.write(f" 总内存: {system_memory.total / 1024 / 1024 / 1024:.2f} GB\n") - f.write(f" 可用内存: {system_memory.available / 1024 / 1024 / 1024:.2f} GB\n") - f.write(f" 使用率: {system_memory.percent:.2f}%\n") - f.write(f" 缓存: {system_memory.cached / 1024 / 1024 / 1024:.2f} GB\n") - f.write(f" 缓冲区: {system_memory.buffers / 1024 / 1024 / 1024:.2f} GB\n") - - f.write("\n" + "=" * 100 + "\n\n") - - def _write_detailed_python_analysis_optimized(self, f): - """ - 写入优化的Python对象分析 - """ - f.write("2. Python对象深度分析 (优化版)\n") - f.write("-" * 50 + "\n") - - # 强制垃圾回收 - collected = gc.collect() - f.write(f"垃圾回收清理对象数: {collected}\n\n") - - # 获取所有对象(限制数量) - all_objects = muppy.get_objects() - if len(all_objects) > self._max_objects_to_analyze: - import random - all_objects = random.sample(all_objects, self._max_objects_to_analyze) - f.write(f"对象数量过多,采样分析 {self._max_objects_to_analyze} 个对象\n\n") - - f.write(f"总对象数: {len(all_objects):,}\n") - - # 按类型统计(简化版) - type_stats = {} - for obj in all_objects: - try: - obj_type = type(obj).__name__ - if obj_type not in type_stats: - type_stats[obj_type] = {'count': 0, 'size': 0} - type_stats[obj_type]['count'] += 1 - type_stats[obj_type]['size'] += sys.getsizeof(obj) - except: - continue - - # 按大小排序 - sorted_types = sorted(type_stats.items(), key=lambda x: x[1]['size'], reverse=True) - - f.write("对象类型统计 (按内存大小排序,前15个):\n") - f.write(f"{'类型':<20} {'数量':<10} {'总大小(MB)':<12} {'平均大小(B)':<12}\n") - f.write("-" * 60 + "\n") - - total_python_memory = 0 - for obj_type, stats in sorted_types[:15]: # 只显示前15个 - size_mb = stats['size'] / 1024 / 1024 - avg_size = stats['size'] / stats['count'] if stats['count'] > 0 else 0 - total_python_memory += size_mb - f.write(f"{obj_type:<20} {stats['count']:<10,} {size_mb:<12.2f} {avg_size:<12.1f}\n") - - f.write(f"\nPython对象总内存: {total_python_memory:.2f} MB\n") - - # 计算未统计内存 - process = psutil.Process() - total_memory = process.memory_info().rss / 1024 / 1024 - unaccounted = total_memory - total_python_memory - f.write(f"未统计内存: {unaccounted:.2f} MB ({unaccounted/total_memory*100:.1f}%)\n") - - f.write("\n" + "=" * 100 + "\n\n") - - def _write_detailed_memory_maps_simple(self, f): - """ - 写入简化的内存映射分析 - """ - f.write("3. 内存映射分析 (简化版)\n") - f.write("-" * 50 + "\n") - - try: - process = psutil.Process() - memory_maps = process.memory_maps() - - # 按权限分类 - perm_stats = {} - - for mmap in memory_maps: - size_mb = mmap.size / 1024 / 1024 - perms = mmap.perms - - # 按权限统计 - if perms not in perm_stats: - perm_stats[perms] = {'count': 0, 'size': 0} - perm_stats[perms]['count'] += 1 - perm_stats[perms]['size'] += size_mb - - f.write("按权限分类的内存映射:\n") - f.write(f"{'权限':<10} {'数量':<8} {'大小(MB)':<12}\n") - f.write("-" * 35 + "\n") - for perms, stats in sorted(perm_stats.items(), key=lambda x: x[1]['size'], reverse=True): - f.write(f"{perms:<10} {stats['count']:<8} {stats['size']:<12.2f}\n") - - except Exception as e: - f.write(f"内存映射分析失败: {e}\n") - - f.write("\n" + "=" * 100 + "\n\n") - - def _write_detailed_large_objects_optimized(self, f): - """ - 写入优化的大对象详细分析 - """ - f.write("4. 大对象详细分析 (优化版)\n") - f.write("-" * 50 + "\n") - - def analyze_large_objects(): - large_objects = [] - - # 获取所有对象(限制数量) - all_objects = muppy.get_objects() - if len(all_objects) > self._max_objects_to_analyze: - import random - all_objects = random.sample(all_objects, self._max_objects_to_analyze) - - for obj in all_objects: - try: - # 快速筛选 - shallow_size = sys.getsizeof(obj) - if shallow_size < self._large_object_threshold: - continue - - # 深度计算 - size = asizeof.asizeof(obj) - if size > self._large_object_threshold: - large_objects.append((obj, size)) - - # 限制数量 - if len(large_objects) >= 20: - break - - except: - continue - - return large_objects - - large_objects = self._run_with_timeout(analyze_large_objects) - - if large_objects: - f.write(f"大对象 (>1MB) 数量: {len(large_objects)}\n\n") - - for i, (obj, size) in enumerate(large_objects, 1): - size_mb = size / 1024 / 1024 - obj_type = type(obj).__name__ - - f.write(f"{i:2d}. {obj_type} - {size_mb:.2f} MB\n") - - # 简化的对象信息 - try: - if isinstance(obj, dict): - f.write(f" 字典项数: {len(obj)}\n") - elif isinstance(obj, (list, tuple)): - f.write(f" 元素数量: {len(obj)}\n") - elif isinstance(obj, str): - f.write(f" 字符串长度: {len(obj)}\n") - except: - pass - - f.write("\n") - else: - f.write("大对象分析超时或未找到大对象\n") - - f.write("=" * 100 + "\n\n") - - def _write_memory_leak_detection(self, f): - """ - 写入内存泄漏检测 - """ - f.write("5. 内存泄漏检测\n") - f.write("-" * 50 + "\n") - - # tracemalloc分析 - current, peak = tracemalloc.get_traced_memory() - f.write(f"tracemalloc当前内存: {current / 1024 / 1024:.2f} MB\n") - f.write(f"tracemalloc峰值内存: {peak / 1024 / 1024:.2f} MB\n") - - try: - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics('lineno') - - f.write(f"\n内存分配最多的位置 (前10个):\n") - f.write("-" * 50 + "\n") - for i, stat in enumerate(top_stats[:10], 1): - f.write(f"{i:2d}. {stat.count:>8} 个对象, {stat.size / 1024 / 1024:>8.2f} MB\n") - for line in stat.traceback.format(): - f.write(f" {line}\n") - f.write("\n") - except Exception as e: - f.write(f"获取tracemalloc统计失败: {e}\n") - - # 垃圾回收分析 - f.write("垃圾回收分析:\n") - f.write("-" * 50 + "\n") - gc_counts = gc.get_count() - f.write(f"GC计数: {gc_counts}\n") - - # 检查不可达对象 - unreachable = len(gc.garbage) - f.write(f"不可达对象数量: {unreachable}\n") - if unreachable > 0: - f.write("不可达对象详情:\n") - for i, obj in enumerate(gc.garbage[:5], 1): # 只显示前5个 - f.write(f" {i}. {type(obj).__name__} - {id(obj)}\n") - - f.write("\n" + "=" * 100 + "\n\n") - - def get_memory_summary(self) -> Dict[str, float]: - """ - 获取内存使用摘要 - """ - try: - process = psutil.Process() - memory_info = process.memory_info() - - # 获取Python对象总内存(简化版) - all_objects = muppy.get_objects() - if len(all_objects) > 10000: # 限制对象数量 - import random - all_objects = random.sample(all_objects, 10000) - - sum1 = summary.summarize(all_objects) - - python_total_mb = 0 - for line in summary.format_(sum1): - if '|' in line and line.strip() and not line.startswith('=') and not line.startswith('-'): - parts = line.split('|') - if len(parts) >= 3: - try: - size_str = parts[2].strip() - if 'MB' in size_str: - size_mb = float(size_str.replace('MB', '').strip()) - python_total_mb += size_mb - except: - pass - - total_memory = memory_info.rss / 1024 / 1024 - unaccounted = total_memory - python_total_mb - - return { - 'total_memory_mb': total_memory, - 'python_objects_mb': python_total_mb, - 'unaccounted_mb': unaccounted, - 'unaccounted_percent': (unaccounted / total_memory * 100) if total_memory > 0 else 0 - } - except Exception as e: - logger.error(f"获取内存摘要失败: {e}") - return {} - - def force_garbage_collection(self): - """ - 强制垃圾回收并返回清理的对象数量 - """ - try: - collected = gc.collect() - logger.info(f"强制垃圾回收完成,清理了 {collected} 个对象") - return collected - except Exception as e: - logger.error(f"强制垃圾回收失败: {e}") - return 0 - - def analyze_memory_growth(self, interval_seconds: int = 300) -> Dict[str, float]: - """ - 分析内存增长趋势 - :param interval_seconds: 分析间隔(秒) - :return: 内存增长信息 - """ - try: - # 获取当前内存使用 - current_summary = self.get_memory_summary() - - # 等待指定时间 - time.sleep(interval_seconds) - - # 获取新的内存使用 - new_summary = self.get_memory_summary() - - if current_summary and new_summary: - growth_info = { - 'total_growth_mb': new_summary['total_memory_mb'] - current_summary['total_memory_mb'], - 'python_growth_mb': new_summary['python_objects_mb'] - current_summary['python_objects_mb'], - 'unaccounted_growth_mb': new_summary['unaccounted_mb'] - current_summary['unaccounted_mb'], - 'growth_rate_mb_per_hour': (new_summary['total_memory_mb'] - current_summary['total_memory_mb']) * 3600 / interval_seconds - } - - logger.info(f"内存增长分析: 总增长 {growth_info['total_growth_mb']:.2f} MB, " - f"Python对象增长 {growth_info['python_growth_mb']:.2f} MB, " - f"未统计增长 {growth_info['unaccounted_growth_mb']:.2f} MB") - - return growth_info - - return {} - - except Exception as e: - logger.error(f"分析内存增长失败: {e}") - return {} - - -# 使用示例 -if __name__ == "__main__": - # 创建内存分析器实例 - memory_helper = MemoryHelper() - - # 获取内存摘要 - summary = memory_helper.get_memory_summary() - print("内存使用摘要:") - for key, value in summary.items(): - print(f" {key}: {value:.2f}") - - # 创建详细分析报告 - analysis_file = memory_helper.create_detailed_memory_analysis() - if analysis_file: - print(f"详细分析报告已保存到: {analysis_file}") - - # 强制垃圾回收 - collected = memory_helper.force_garbage_collection() - print(f"垃圾回收清理了 {collected} 个对象") diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 64e0844f..3a50e8be 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -5,7 +5,6 @@ from fastapi import FastAPI from app.chain.system import SystemChain from app.startup.command_initializer import init_command, stop_command, restart_command -from app.startup.memory_initializer import init_memory_manager, stop_memory_manager from app.startup.modules_initializer import init_modules, stop_modules from app.startup.monitor_initializer import stop_monitor, init_monitor from app.startup.plugins_initializer import init_plugins, stop_plugins, sync_plugins @@ -52,8 +51,6 @@ async def lifespan(app: FastAPI): init_command() # 初始化工作流 init_workflow() - # 初始化内存管理 - init_memory_manager() # 插件同步到本地 sync_plugins_task = asyncio.create_task(init_extra()) try: @@ -71,8 +68,6 @@ async def lifespan(app: FastAPI): print(str(e)) # 备份插件 SystemChain().backup_plugins() - # 停止内存管理器 - stop_memory_manager() # 停止工作流 stop_workflow() # 停止命令 diff --git a/app/startup/memory_initializer.py b/app/startup/memory_initializer.py deleted file mode 100644 index 80719184..00000000 --- a/app/startup/memory_initializer.py +++ /dev/null @@ -1,15 +0,0 @@ -from app.helper.memory import MemoryHelper - - -def init_memory_manager(): - """ - 初始化内存监控器 - """ - MemoryHelper().start_monitoring() - - -def stop_memory_manager(): - """ - 停止内存监控器 - """ - MemoryHelper().stop_monitoring()