feat:实现非阻塞文件日志处理

This commit is contained in:
jxxghp
2025-07-31 19:34:50 +08:00
parent aea44c1d97
commit 713d44eac3

View File

@@ -1,7 +1,11 @@
import asyncio
import logging
import queue
import sys
import threading
from logging.handlers import RotatingFileHandler
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional
@@ -33,6 +37,14 @@ class LogConfigModel(BaseModel):
LOG_CONSOLE_FORMAT: str = "%(leveltext)s[%(name)s] %(asctime)s %(message)s"
# 文件日志格式
LOG_FILE_FORMAT: str = "%(levelname)s%(asctime)s - %(message)s"
# 异步文件写入队列大小
ASYNC_FILE_QUEUE_SIZE: int = 1000
# 异步文件写入线程数
ASYNC_FILE_WORKERS: int = 2
# 批量写入大小
BATCH_WRITE_SIZE: int = 50
# 写入超时时间(秒)
WRITE_TIMEOUT: float = 3.0
class LogSettings(BaseSettings, LogConfigModel):
@@ -64,6 +76,9 @@ class LogSettings(BaseSettings, LogConfigModel):
env_file_encoding = "utf-8"
# 实例化日志设置
log_settings = LogSettings()
# 日志级别颜色映射
level_name_colors = {
logging.DEBUG: lambda level_name: click.style(str(level_name), fg="cyan"),
@@ -88,6 +103,170 @@ class CustomFormatter(logging.Formatter):
return super().format(record)
class LogEntry:
"""
日志条目
"""
def __init__(self, level: str, message: str, file_path: Path, timestamp: datetime = None):
self.level = level
self.message = message
self.file_path = file_path
self.timestamp = timestamp or datetime.now()
class NonBlockingFileHandler:
"""
非阻塞文件处理器 - 透明地处理协程环境中的文件写入
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self._initialized = True
self._write_queue = queue.Queue(maxsize=log_settings.ASYNC_FILE_QUEUE_SIZE)
self._executor = ThreadPoolExecutor(max_workers=log_settings.ASYNC_FILE_WORKERS,
thread_name_prefix="LogWriter")
self._batch_buffer = {}
self._last_flush = {}
self._running = True
# 启动后台写入线程
self._write_thread = threading.Thread(target=self._batch_writer, daemon=True)
self._write_thread.start()
def write_log(self, level: str, message: str, file_path: Path):
"""
写入日志 - 自动检测协程环境并使用合适的方式
"""
entry = LogEntry(level, message, file_path)
# 检测是否在协程环境中
if self._is_in_event_loop():
# 在协程环境中,使用非阻塞方式
self._write_non_blocking(entry)
else:
# 不在协程环境中,直接同步写入
self._write_sync(entry)
@staticmethod
def _is_in_event_loop() -> bool:
"""
检测当前是否在事件循环中
"""
try:
loop = asyncio.get_running_loop()
return loop is not None
except RuntimeError:
return False
def _write_non_blocking(self, entry: LogEntry):
"""
非阻塞写入(用于协程环境)
"""
try:
self._write_queue.put_nowait(entry)
except queue.Full:
# 队列满时,使用线程池处理
self._executor.submit(self._write_sync, entry)
@staticmethod
def _write_sync(entry: LogEntry):
"""
同步写入日志
"""
try:
# 确保目录存在
entry.file_path.parent.mkdir(parents=True, exist_ok=True)
# 格式化时间戳
timestamp = entry.timestamp.strftime('%Y-%m-%d %H:%M:%S,') + f"{entry.timestamp.microsecond // 1000:03d}"
line = f"{entry.level.upper()}{timestamp} - {entry.message}\n"
# 写入文件
with open(entry.file_path, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
# 如果文件写入失败,至少输出到控制台
print(f"日志写入失败 {entry.file_path}: {e}")
print(f"{entry.level.upper()}{entry.timestamp} - {entry.message}")
def _batch_writer(self):
"""
后台批量写入线程
"""
while self._running:
try:
# 收集一批日志条目
batch = []
end_time = time.time() + log_settings.WRITE_TIMEOUT
while len(batch) < log_settings.BATCH_WRITE_SIZE and time.time() < end_time:
try:
remaining_time = max(0, end_time - time.time())
entry = self._write_queue.get(timeout=remaining_time)
batch.append(entry)
except queue.Empty:
break
if batch:
self._write_batch(batch)
except Exception as e:
print(f"批量写入线程错误: {e}")
time.sleep(0.1)
def _write_batch(self, batch: list):
"""
批量写入日志
"""
# 按文件分组
file_groups = {}
for entry in batch:
if entry.file_path not in file_groups:
file_groups[entry.file_path] = []
file_groups[entry.file_path].append(entry)
# 批量写入每个文件
for file_path, entries in file_groups.items():
try:
# 确保目录存在
file_path.parent.mkdir(parents=True, exist_ok=True)
# 批量写入
with open(file_path, 'a', encoding='utf-8') as f:
for entry in entries:
timestamp = entry.timestamp.strftime(
'%Y-%m-%d %H:%M:%S,') + f"{entry.timestamp.microsecond // 1000:03d}"
line = f"{entry.level.upper()}{timestamp} - {entry.message}\n"
f.write(line)
except Exception as e:
print(f"批量写入失败 {file_path}: {e}")
# 回退到逐个写入
for entry in entries:
self._write_sync(entry)
def shutdown(self):
"""
关闭文件处理器
"""
self._running = False
if hasattr(self, '_write_thread'):
self._write_thread.join(timeout=5)
if self._executor:
self._executor.shutdown(wait=True)
class LoggerManager:
"""
日志管理
@@ -98,6 +277,8 @@ class LoggerManager:
_default_log_file = "moviepilot.log"
# 线程锁
_lock = threading.Lock()
# 非阻塞文件处理器
_file_handler = NonBlockingFileHandler()
def get_logger(self, name: str) -> logging.Logger:
"""
@@ -112,8 +293,8 @@ class LoggerManager:
# 检查是否已经创建过这个 logger
_logger = self._loggers.get(logfile)
if not _logger:
# 如果没有,就使用现有的 __setup_logger 来创建一个新的
_logger = self.__setup_logger(log_file=logfile)
# 如果没有,就使用现有的 __setup_console_logger 来创建一个新的
_logger = self.__setup_console_logger(log_file=logfile)
self._loggers[logfile] = _logger
return _logger
@@ -171,13 +352,12 @@ class LoggerManager:
return caller_name or "log.py", plugin_name
@staticmethod
def __setup_logger(log_file: str):
def __setup_console_logger(log_file: str):
"""
初始化日志实例
初始化控制台日志实例(文件输出由 NonBlockingFileHandler 处理)
:param log_file日志文件相对路径
"""
log_file_path = log_settings.LOG_PATH / log_file
log_file_path.parent.mkdir(parents=True, exist_ok=True)
# 创建新实例
_logger = logging.getLogger(log_file_path.stem)
@@ -189,24 +369,12 @@ class LoggerManager:
for handler in _logger.handlers:
_logger.removeHandler(handler)
# 终端日志
# 只设置终端日志(文件日志由 NonBlockingFileHandler 处理)
console_handler = logging.StreamHandler()
console_formatter = CustomFormatter(log_settings.LOG_CONSOLE_FORMAT)
console_handler.setFormatter(console_formatter)
_logger.addHandler(console_handler)
# 文件日志
file_handler = RotatingFileHandler(
filename=log_file_path,
mode="a",
maxBytes=log_settings.LOG_MAX_FILE_SIZE_BYTES,
backupCount=log_settings.LOG_BACKUP_COUNT,
encoding="utf-8"
)
file_formatter = CustomFormatter(log_settings.LOG_FILE_FORMAT)
file_handler.setFormatter(file_formatter)
_logger.addHandler(file_handler)
# 禁止向父级log传递
_logger.propagate = False
@@ -226,22 +394,15 @@ class LoggerManager:
更新 Logger 的 handler 配置
:param _logger: 需要更新的 Logger 实例
"""
# 更新现有 handler
# 更新现有 handler(只有控制台 handler
for handler in _logger.handlers:
try:
if isinstance(handler, RotatingFileHandler):
# 更新最大文件大小和备份数量
handler.maxBytes = log_settings.LOG_MAX_FILE_SIZE_BYTES
handler.backupCount = log_settings.LOG_BACKUP_COUNT
# 更新日志文件输出格式
file_formatter = CustomFormatter(log_settings.LOG_FILE_FORMAT)
handler.setFormatter(file_formatter)
elif isinstance(handler, logging.StreamHandler):
if isinstance(handler, logging.StreamHandler):
# 更新控制台输出格式
console_formatter = CustomFormatter(log_settings.LOG_CONSOLE_FORMAT)
handler.setFormatter(console_formatter)
except Exception as e:
logger.error(f"Failed to update handler: {handler}. Error: {e}")
print(f"更新日志处理器失败: {handler}. 错误: {e}")
# 更新日志级别
_logger.setLevel(LoggerManager.__get_log_level())
@@ -260,6 +421,16 @@ class LoggerManager:
"""
# 获取调用者文件名和插件名
caller_name, plugin_name = self.__get_caller()
# 格式化消息
formatted_msg = f"{caller_name} - {msg}"
if args:
try:
formatted_msg = formatted_msg % args
except (TypeError, ValueError):
# 如果格式化失败,直接拼接
formatted_msg = f"{formatted_msg} {' '.join(str(arg) for arg in args)}"
# 区分插件日志
if plugin_name:
# 使用插件日志文件
@@ -267,16 +438,24 @@ class LoggerManager:
else:
# 使用默认日志文件
logfile = self._default_log_file
with LoggerManager._lock: # 添加锁
# 获取调用者的模块的logger
# 构建完整的日志文件路径
log_file_path = log_settings.LOG_PATH / logfile
# 使用非阻塞文件处理器写入文件日志
self._file_handler.write_log(method.upper(), formatted_msg, log_file_path)
# 同时保持控制台输出(使用标准 logging
with LoggerManager._lock:
_logger = self._loggers.get(logfile)
if not _logger:
_logger = self.__setup_logger(log_file=logfile)
_logger = self.__setup_console_logger(log_file=logfile)
self._loggers[logfile] = _logger
# 调用logger的方法打印日志
# 只在控制台输出,文件写入已由 _file_handler 处理
if hasattr(_logger, method):
log_method = getattr(_logger, method)
log_method(f"{caller_name} - {msg}", *args, **kwargs)
log_method(formatted_msg)
def info(self, msg: str, *args, **kwargs):
"""
@@ -314,9 +493,14 @@ class LoggerManager:
"""
self.logger("critical", msg, *args, **kwargs)
@classmethod
def shutdown(cls):
"""
关闭日志管理器,清理资源
"""
if cls._file_handler:
cls._file_handler.shutdown()
# 实例化日志设置
log_settings = LogSettings()
# 初始化日志管理
logger = LoggerManager()