Merge pull request #4987 from Aqr-K/refactor/plugin-monitor

This commit is contained in:
jxxghp
2025-09-22 11:41:13 +08:00
committed by GitHub
2 changed files with 340 additions and 16 deletions

View File

@@ -27,7 +27,7 @@ from app.helper.sites import SitesHelper # noqa
from app.log import logger
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.limit import rate_limit_window
from app.utils.debouncer import debounce
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
@@ -47,33 +47,62 @@ class PluginMonitorHandler(FileSystemEventHandler):
if not event_path.name.endswith(".py") or "pycache" in event_path.parts:
return
# 读取插件根目录下的__init__.py文件读取class XXXX(_PluginBase)的类名
# 防抖模式下处理文件修改事件
self._handle_modification(event_path)
@debounce(interval=1.0, leading=False, source="PluginMonitorHandler", enable_logging=False)
def _handle_modification(self, event_path: Path):
"""
处理文件修改事件
:param event_path:
:return:
"""
logger.debug(f"防抖计时结束,开始处理文件修改事件: {event_path}")
# 解析插件ID
pid = self._get_plugin_id_from_path(event_path)
if not pid:
logger.debug(f"文件不属于任何有效插件,已忽略: {event_path}")
return
# 触发重载
self.__reload_plugin(pid)
@staticmethod
def _get_plugin_id_from_path(event_path: Path) -> Optional[str]:
"""
根据文件路径解析出插件的ID。
:param event_path: 被修改文件的 Path 对象。
:return: 插件ID字符串如果不是有效插件文件则返回 None。
"""
try:
plugins_root = settings.ROOT_PATH / "app" / "plugins"
# 确保修改的文件在 plugins 目录下
if plugins_root not in event_path.parents:
return
# 获取插件目录路径没有找到__init__.py时说明不是有效包跳过插件重载
# 插件重载目前没有支持app/plugins/plugin/package/__init__.py的场景这里也不做支持
return None
# 找到插件的根目录
plugin_dir = event_path.parent
while plugin_dir.parent != plugins_root:
plugin_dir = plugin_dir.parent
if plugin_dir == plugins_root: # 防止无限循环
break
init_file = plugin_dir / "__init__.py"
if not init_file.exists():
logger.debug(f"{plugin_dir} 下没有找到 __init__.py跳过插件重载")
return
return None
# 读取 __init__.py 文件,查找插件主类名
with open(init_file, "r", encoding="utf-8") as f:
lines = f.readlines()
pid = None
for line in lines:
if line.startswith("class") and "(_PluginBase)" in line:
pid = line.split("class ")[1].split("(_PluginBase)")[0].strip()
if pid:
self.__reload_plugin(pid)
for line in f:
if line.startswith("class") and "(_PluginBase)" in line:
# 解析出类名作为插件ID
return line.split("class ")[1].split("(_PluginBase)")[0].strip()
return None
except Exception as e:
logger.error(f"插件文件修改后重载出错:{str(e)}")
logger.error(f"从路径解析插件ID时出错: {e}")
return None
@staticmethod
@rate_limit_window(max_calls=1, window_seconds=2, source="PluginMonitor", enable_logging=False)
def __reload_plugin(pid):
"""
重新加载插件

295
app/utils/debounce.py Normal file
View File

@@ -0,0 +1,295 @@
import asyncio
import functools
import inspect
from abc import ABC, abstractmethod
from threading import Timer, Lock
from typing import Callable, Any, Optional
from app.log import logger
class BaseDebouncer(ABC):
"""
防抖器的抽象基类。定义了防抖器的基本接口和日志功能。
所有防抖器实现类必须继承此类并实现其抽象方法。
"""
def __init__(self, func: Callable, interval: float, *,
leading: bool = False, enable_logging: bool = False, source: str = ""):
"""
初始化防抖器实例。
:param func: 要防抖的函数或协程
:param interval: 防抖间隔,单位秒
:param leading: 是否启用前沿模式
:param enable_logging: 是否启用日志记录
:param source: 日志来源标识
"""
self.func = func
self.interval = interval
self.leading = leading
self.enable_logging = enable_logging
self.source = source
@abstractmethod
def __call__(self, *args, **kwargs) -> None:
"""
定义防抖调用的契约,子类必须实现。
"""
pass
@abstractmethod
def cancel(self) -> None:
"""
定义取消挂起调用的契约,子类必须实现。
"""
pass
def format_log(self, message: str) -> str:
"""
格式化日志消息,加入 source 前缀。
"""
return f"[{self.source}] {message}" if self.source else message
def log(self, level: str, message: str):
"""
根据日志级别记录日志。
"""
if self.enable_logging:
log_method = getattr(logger, level, logger.debug)
log_method(self.format_log(message))
def log_debug(self, message: str):
"""
记录调试日志。
"""
self.log("debug", message)
def log_info(self, message: str):
"""
记录信息日志。
"""
self.log("info", message)
def log_warning(self, message: str):
"""
记录警告日志。
"""
self.log("warning", message)
def error(self, message: str):
"""
记录错误日志。
"""
self.log("error", message)
def critical(self, message: str):
"""
记录严重错误日志。
"""
self.log("critical", message)
class Debouncer(BaseDebouncer):
"""
同步防抖实现类
"""
def __init__(self, *args, **kwargs):
"""
初始化防抖器实例。
"""
super().__init__(*args, **kwargs)
self.timer: Optional[Timer] = None
self.lock = Lock()
# 用于前沿模式,标记是否处于“冷却”或“不应期”
self.is_cooling_down = False
def __call__(self, *args, **kwargs) -> None:
"""
调用防抖函数。
:param args:
:param kwargs:
:return:
"""
with self.lock:
if self.leading:
self._call_leading(*args, **kwargs)
else:
self._call_trailing(*args, **kwargs)
def _call_leading(self, *args, **kwargs):
"""
前沿模式的逻辑。
"""
# 如果不在冷却期,则立即执行
if not self.is_cooling_down:
self.log_info("前沿模式: 立即执行函数。")
self.func(*args, **kwargs)
# 无论是否执行,都重置冷却计时器
if self.timer and self.timer.is_alive():
self.timer.cancel()
# 设置自己进入冷却期
self.is_cooling_down = True
# 在间隔结束后,将冷却状态解除
self.timer = Timer(self.interval, self._end_cool_down)
self.timer.start()
self.log_debug(f"前沿模式: 进入 {self.interval} 秒的冷却期。")
def _end_cool_down(self):
"""
计时器到期后,解除冷却状态
"""
with self.lock:
self.is_cooling_down = False
self.log_debug("前沿模式: 冷却时间结束,可以再次立即执行。")
def _call_trailing(self, *args, **kwargs):
"""
后沿模式的逻辑。
"""
# 【日志点】记录计时器被重置
if self.timer and self.timer.is_alive():
self.timer.cancel()
self.log_debug("后沿模式: 检测到新的调用,已重置计时器。")
def execute():
self.log_info("后沿模式: 计时结束,开始执行函数。")
self.func(*args, **kwargs)
self.timer = Timer(self.interval, execute)
self.timer.start()
self.log_debug(f"后沿模式: 计时器已启动,将在 {self.interval} 秒后执行。")
def cancel(self) -> None:
"""
取消任何挂起的调用,并重置状态。
"""
with self.lock:
if self.timer and self.timer.is_alive():
self.timer.cancel()
self.timer = None
self.log_info("防抖器被手动取消。")
self.is_cooling_down = False
class AsyncDebouncer(BaseDebouncer):
"""
异步防抖实现类。
"""
def __init__(self, *args, **kwargs):
"""
初始化异步防抖器实例。
"""
super().__init__(*args, **kwargs)
self.task: Optional[asyncio.Task] = None
self.lock = asyncio.Lock()
self.is_cooling_down = False
async def __call__(self, *args, **kwargs) -> None:
"""
异步调用防抖函数。
"""
async with self.lock:
if self.leading:
await self._call_leading(*args, **kwargs)
else:
await self._call_trailing(*args, **kwargs)
async def _call_leading(self, *args, **kwargs):
"""
前沿模式的逻辑。
"""
if not self.is_cooling_down:
self.log_info("前沿模式 (async): 立即执行协程。")
await self.func(*args, **kwargs)
if self.task and not self.task.done():
self.task.cancel()
self.is_cooling_down = True
self.task = asyncio.create_task(self._end_cool_down())
self.log_debug(f"前沿模式 (async): 进入 {self.interval} 秒的冷却期。")
async def _end_cool_down(self):
"""
计时器到期后,解除冷却状态
"""
await asyncio.sleep(self.interval)
async with self.lock:
self.is_cooling_down = False
self.log_debug("前沿模式 (async): 冷却时间结束。")
async def _call_trailing(self, *args, **kwargs):
"""
后沿模式的逻辑。
"""
if self.task and not self.task.done():
self.task.cancel()
self.log_debug("后沿模式 (async): 检测到新的调用,已取消旧任务。")
self.task = asyncio.create_task(self._delayed_execute(*args, **kwargs))
self.log_debug(f"后沿模式 (async): 任务已创建,将在 {self.interval} 秒后执行。")
async def _delayed_execute(self, *args, **kwargs):
"""
延迟执行实际的协程函数。
"""
try:
await asyncio.sleep(self.interval)
self.log_info("后沿模式 (async): 延迟结束,开始执行协程。")
await self.func(*args, **kwargs)
except asyncio.CancelledError:
# 任务被取消是正常行为,无需处理
pass
async def cancel(self) -> None:
"""
取消任何挂起的调用,并重置状态。
"""
async with self.lock:
if self.task and not self.task.done():
self.task.cancel()
self.task = None
self.log_info("异步防抖器被手动取消。")
self.is_cooling_down = False
def debounce(interval: float, *, leading: bool = False,
enable_logging: bool = False, source: str = "") -> Callable:
"""
支持同步和异步的防抖装饰器工厂。
"""
def decorator(func: Callable) -> Callable:
# 检查函数类型,并选择合适的引擎
if inspect.iscoroutinefunction(func):
# 异步函数,使用 AsyncDebouncer
instance = AsyncDebouncer(func, interval,
leading=leading,
enable_logging=enable_logging,
source=source)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
await instance(*args, **kwargs)
async_wrapper.cancel = instance.cancel
return async_wrapper
else:
# 同步函数,使用 Debouncer
instance = Debouncer(func, interval,
leading=leading,
enable_logging=enable_logging,
source=source)
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
instance(*args, **kwargs)
wrapper.cancel = instance.cancel
return wrapper
return decorator