From 713d44eac359c1f575d9dfacb1a12dbea88d264f Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 31 Jul 2025 19:34:50 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=AE=9E=E7=8E=B0=E9=9D=9E?= =?UTF-8?q?=E9=98=BB=E5=A1=9E=E6=96=87=E4=BB=B6=E6=97=A5=E5=BF=97=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/log.py | 256 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 220 insertions(+), 36 deletions(-) diff --git a/app/log.py b/app/log.py index a855830a..9f9d9e83 100644 --- a/app/log.py +++ b/app/log.py @@ -1,7 +1,11 @@ +import asyncio import logging +import queue import sys import threading -from logging.handlers import RotatingFileHandler +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional @@ -33,6 +37,14 @@ class LogConfigModel(BaseModel): LOG_CONSOLE_FORMAT: str = "%(leveltext)s[%(name)s] %(asctime)s %(message)s" # 文件日志格式 LOG_FILE_FORMAT: str = "【%(levelname)s】%(asctime)s - %(message)s" + # 异步文件写入队列大小 + ASYNC_FILE_QUEUE_SIZE: int = 1000 + # 异步文件写入线程数 + ASYNC_FILE_WORKERS: int = 2 + # 批量写入大小 + BATCH_WRITE_SIZE: int = 50 + # 写入超时时间(秒) + WRITE_TIMEOUT: float = 3.0 class LogSettings(BaseSettings, LogConfigModel): @@ -64,6 +76,9 @@ class LogSettings(BaseSettings, LogConfigModel): env_file_encoding = "utf-8" +# 实例化日志设置 +log_settings = LogSettings() + # 日志级别颜色映射 level_name_colors = { logging.DEBUG: lambda level_name: click.style(str(level_name), fg="cyan"), @@ -88,6 +103,170 @@ class CustomFormatter(logging.Formatter): return super().format(record) +class LogEntry: + """ + 日志条目 + """ + + def __init__(self, level: str, message: str, file_path: Path, timestamp: datetime = None): + self.level = level + self.message = message + self.file_path = file_path + self.timestamp = timestamp or datetime.now() + + +class NonBlockingFileHandler: + """ + 非阻塞文件处理器 - 透明地处理协程环境中的文件写入 + """ + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if hasattr(self, '_initialized'): + return + + self._initialized = True + self._write_queue = queue.Queue(maxsize=log_settings.ASYNC_FILE_QUEUE_SIZE) + self._executor = ThreadPoolExecutor(max_workers=log_settings.ASYNC_FILE_WORKERS, + thread_name_prefix="LogWriter") + self._batch_buffer = {} + self._last_flush = {} + self._running = True + + # 启动后台写入线程 + self._write_thread = threading.Thread(target=self._batch_writer, daemon=True) + self._write_thread.start() + + def write_log(self, level: str, message: str, file_path: Path): + """ + 写入日志 - 自动检测协程环境并使用合适的方式 + """ + entry = LogEntry(level, message, file_path) + + # 检测是否在协程环境中 + if self._is_in_event_loop(): + # 在协程环境中,使用非阻塞方式 + self._write_non_blocking(entry) + else: + # 不在协程环境中,直接同步写入 + self._write_sync(entry) + + @staticmethod + def _is_in_event_loop() -> bool: + """ + 检测当前是否在事件循环中 + """ + try: + loop = asyncio.get_running_loop() + return loop is not None + except RuntimeError: + return False + + def _write_non_blocking(self, entry: LogEntry): + """ + 非阻塞写入(用于协程环境) + """ + try: + self._write_queue.put_nowait(entry) + except queue.Full: + # 队列满时,使用线程池处理 + self._executor.submit(self._write_sync, entry) + + @staticmethod + def _write_sync(entry: LogEntry): + """ + 同步写入日志 + """ + try: + # 确保目录存在 + entry.file_path.parent.mkdir(parents=True, exist_ok=True) + + # 格式化时间戳 + timestamp = entry.timestamp.strftime('%Y-%m-%d %H:%M:%S,') + f"{entry.timestamp.microsecond // 1000:03d}" + line = f"【{entry.level.upper()}】{timestamp} - {entry.message}\n" + + # 写入文件 + with open(entry.file_path, 'a', encoding='utf-8') as f: + f.write(line) + except Exception as e: + # 如果文件写入失败,至少输出到控制台 + print(f"日志写入失败 {entry.file_path}: {e}") + print(f"【{entry.level.upper()}】{entry.timestamp} - {entry.message}") + + def _batch_writer(self): + """ + 后台批量写入线程 + """ + while self._running: + try: + # 收集一批日志条目 + batch = [] + end_time = time.time() + log_settings.WRITE_TIMEOUT + + while len(batch) < log_settings.BATCH_WRITE_SIZE and time.time() < end_time: + try: + remaining_time = max(0, end_time - time.time()) + entry = self._write_queue.get(timeout=remaining_time) + batch.append(entry) + except queue.Empty: + break + + if batch: + self._write_batch(batch) + + except Exception as e: + print(f"批量写入线程错误: {e}") + time.sleep(0.1) + + def _write_batch(self, batch: list): + """ + 批量写入日志 + """ + # 按文件分组 + file_groups = {} + for entry in batch: + if entry.file_path not in file_groups: + file_groups[entry.file_path] = [] + file_groups[entry.file_path].append(entry) + + # 批量写入每个文件 + for file_path, entries in file_groups.items(): + try: + # 确保目录存在 + file_path.parent.mkdir(parents=True, exist_ok=True) + + # 批量写入 + with open(file_path, 'a', encoding='utf-8') as f: + for entry in entries: + timestamp = entry.timestamp.strftime( + '%Y-%m-%d %H:%M:%S,') + f"{entry.timestamp.microsecond // 1000:03d}" + line = f"【{entry.level.upper()}】{timestamp} - {entry.message}\n" + f.write(line) + except Exception as e: + print(f"批量写入失败 {file_path}: {e}") + # 回退到逐个写入 + for entry in entries: + self._write_sync(entry) + + def shutdown(self): + """ + 关闭文件处理器 + """ + self._running = False + if hasattr(self, '_write_thread'): + self._write_thread.join(timeout=5) + if self._executor: + self._executor.shutdown(wait=True) + + class LoggerManager: """ 日志管理 @@ -98,6 +277,8 @@ class LoggerManager: _default_log_file = "moviepilot.log" # 线程锁 _lock = threading.Lock() + # 非阻塞文件处理器 + _file_handler = NonBlockingFileHandler() def get_logger(self, name: str) -> logging.Logger: """ @@ -112,8 +293,8 @@ class LoggerManager: # 检查是否已经创建过这个 logger _logger = self._loggers.get(logfile) if not _logger: - # 如果没有,就使用现有的 __setup_logger 来创建一个新的 - _logger = self.__setup_logger(log_file=logfile) + # 如果没有,就使用现有的 __setup_console_logger 来创建一个新的 + _logger = self.__setup_console_logger(log_file=logfile) self._loggers[logfile] = _logger return _logger @@ -171,13 +352,12 @@ class LoggerManager: return caller_name or "log.py", plugin_name @staticmethod - def __setup_logger(log_file: str): + def __setup_console_logger(log_file: str): """ - 初始化日志实例 + 初始化控制台日志实例(文件输出由 NonBlockingFileHandler 处理) :param log_file:日志文件相对路径 """ log_file_path = log_settings.LOG_PATH / log_file - log_file_path.parent.mkdir(parents=True, exist_ok=True) # 创建新实例 _logger = logging.getLogger(log_file_path.stem) @@ -189,24 +369,12 @@ class LoggerManager: for handler in _logger.handlers: _logger.removeHandler(handler) - # 终端日志 + # 只设置终端日志(文件日志由 NonBlockingFileHandler 处理) console_handler = logging.StreamHandler() console_formatter = CustomFormatter(log_settings.LOG_CONSOLE_FORMAT) console_handler.setFormatter(console_formatter) _logger.addHandler(console_handler) - # 文件日志 - file_handler = RotatingFileHandler( - filename=log_file_path, - mode="a", - maxBytes=log_settings.LOG_MAX_FILE_SIZE_BYTES, - backupCount=log_settings.LOG_BACKUP_COUNT, - encoding="utf-8" - ) - file_formatter = CustomFormatter(log_settings.LOG_FILE_FORMAT) - file_handler.setFormatter(file_formatter) - _logger.addHandler(file_handler) - # 禁止向父级log传递 _logger.propagate = False @@ -226,22 +394,15 @@ class LoggerManager: 更新 Logger 的 handler 配置 :param _logger: 需要更新的 Logger 实例 """ - # 更新现有 handler + # 更新现有 handler(只有控制台 handler) for handler in _logger.handlers: try: - if isinstance(handler, RotatingFileHandler): - # 更新最大文件大小和备份数量 - handler.maxBytes = log_settings.LOG_MAX_FILE_SIZE_BYTES - handler.backupCount = log_settings.LOG_BACKUP_COUNT - # 更新日志文件输出格式 - file_formatter = CustomFormatter(log_settings.LOG_FILE_FORMAT) - handler.setFormatter(file_formatter) - elif isinstance(handler, logging.StreamHandler): + if isinstance(handler, logging.StreamHandler): # 更新控制台输出格式 console_formatter = CustomFormatter(log_settings.LOG_CONSOLE_FORMAT) handler.setFormatter(console_formatter) except Exception as e: - logger.error(f"Failed to update handler: {handler}. Error: {e}") + print(f"更新日志处理器失败: {handler}. 错误: {e}") # 更新日志级别 _logger.setLevel(LoggerManager.__get_log_level()) @@ -260,6 +421,16 @@ class LoggerManager: """ # 获取调用者文件名和插件名 caller_name, plugin_name = self.__get_caller() + + # 格式化消息 + formatted_msg = f"{caller_name} - {msg}" + if args: + try: + formatted_msg = formatted_msg % args + except (TypeError, ValueError): + # 如果格式化失败,直接拼接 + formatted_msg = f"{formatted_msg} {' '.join(str(arg) for arg in args)}" + # 区分插件日志 if plugin_name: # 使用插件日志文件 @@ -267,16 +438,24 @@ class LoggerManager: else: # 使用默认日志文件 logfile = self._default_log_file - with LoggerManager._lock: # 添加锁 - # 获取调用者的模块的logger + + # 构建完整的日志文件路径 + log_file_path = log_settings.LOG_PATH / logfile + + # 使用非阻塞文件处理器写入文件日志 + self._file_handler.write_log(method.upper(), formatted_msg, log_file_path) + + # 同时保持控制台输出(使用标准 logging) + with LoggerManager._lock: _logger = self._loggers.get(logfile) if not _logger: - _logger = self.__setup_logger(log_file=logfile) + _logger = self.__setup_console_logger(log_file=logfile) self._loggers[logfile] = _logger - # 调用logger的方法打印日志 + + # 只在控制台输出,文件写入已由 _file_handler 处理 if hasattr(_logger, method): log_method = getattr(_logger, method) - log_method(f"{caller_name} - {msg}", *args, **kwargs) + log_method(formatted_msg) def info(self, msg: str, *args, **kwargs): """ @@ -314,9 +493,14 @@ class LoggerManager: """ self.logger("critical", msg, *args, **kwargs) + @classmethod + def shutdown(cls): + """ + 关闭日志管理器,清理资源 + """ + if cls._file_handler: + cls._file_handler.shutdown() -# 实例化日志设置 -log_settings = LogSettings() # 初始化日志管理 logger = LoggerManager()