mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-28 03:02:34 +08:00
feat(event): add visualization and enhance handler
This commit is contained in:
@@ -62,6 +62,7 @@ class EventManager(metaclass=Singleton):
|
||||
self.__event_queue = PriorityQueue() # 优先级队列
|
||||
self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表
|
||||
self.__disabled_handlers = set() # 禁用的事件处理器集合
|
||||
self.__disabled_classes = set() # 禁用的事件处理器类集合
|
||||
self.__lock = threading.Lock() # 线程锁
|
||||
self.__condition = threading.Condition(self.__lock) # 条件变量
|
||||
|
||||
@@ -114,21 +115,31 @@ class EventManager(metaclass=Singleton):
|
||||
event_kind = Event.get_event_kind(event_type)
|
||||
logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
|
||||
|
||||
def disable_event_handler(self, handler_name: str):
|
||||
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
||||
"""
|
||||
禁用指定名称的事件处理器,防止其响应事件
|
||||
禁用指定名称的事件处理器或事件处理类,防止其响应事件
|
||||
:param handler_name: 要禁用的事件处理器名称
|
||||
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
|
||||
"""
|
||||
self.__disabled_handlers.add(handler_name)
|
||||
logger.debug(f"Disabled event handler: {handler_name}")
|
||||
if class_name:
|
||||
self.__disabled_classes.add(class_name)
|
||||
logger.debug(f"Disabled event handler class: {class_name}")
|
||||
else:
|
||||
self.__disabled_handlers.add(handler_name)
|
||||
logger.debug(f"Disabled event handler: {handler_name}")
|
||||
|
||||
def enable_event_handler(self, handler_name: str):
|
||||
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
||||
"""
|
||||
启用指定名称的事件处理器,使其可以继续响应事件
|
||||
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件
|
||||
:param handler_name: 要启用的事件处理器名称
|
||||
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
|
||||
"""
|
||||
self.__disabled_handlers.discard(handler_name)
|
||||
logger.debug(f"Enabled event handler: {handler_name}")
|
||||
if class_name:
|
||||
self.__disabled_classes.discard(class_name)
|
||||
logger.debug(f"Enabled event handler class: {class_name}")
|
||||
else:
|
||||
self.__disabled_handlers.discard(handler_name)
|
||||
logger.debug(f"Enabled event handler: {handler_name}")
|
||||
|
||||
def check(self, etype: Union[EventType, SyncEventType]) -> bool:
|
||||
"""
|
||||
@@ -139,7 +150,34 @@ class EventManager(metaclass=Singleton):
|
||||
if etype not in self.__subscribers:
|
||||
return False
|
||||
handlers = self.__subscribers.get(etype, [])
|
||||
return any(handler.__name__ not in self.__disabled_handlers for handler in handlers)
|
||||
return any(
|
||||
handler.__name__ not in self.__disabled_handlers and
|
||||
handler.__qualname__.split(".")[0] not in self.__disabled_classes
|
||||
for handler in handlers
|
||||
)
|
||||
|
||||
def visualize_handlers(self) -> List[Dict[str, str]]:
|
||||
"""
|
||||
可视化所有事件处理器,包括是否被禁用的状态
|
||||
:return: 处理器列表,包含处理器名称、类名和状态
|
||||
"""
|
||||
handler_info = []
|
||||
with (self.__lock):
|
||||
for event_type, handlers in self.__subscribers.items():
|
||||
for handler in handlers:
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
status = (
|
||||
"disabled"
|
||||
if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes
|
||||
else "enabled"
|
||||
)
|
||||
handler_info.append({
|
||||
"event_type": event_type.value,
|
||||
"handler_name": handler.__name__,
|
||||
"class_name": class_name,
|
||||
"status": status
|
||||
})
|
||||
return handler_info
|
||||
|
||||
def __trigger_event(self, event: Event) -> Dict:
|
||||
"""
|
||||
@@ -166,8 +204,12 @@ class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
handlers = self.__subscribers.get(event.event_type, [])
|
||||
for handler in handlers:
|
||||
if handler.__name__ not in self.__disabled_handlers:
|
||||
handler(event.event_data)
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
||||
try:
|
||||
handler(event.event_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling event {event.event_type}: {str(e)}", exc_info=True)
|
||||
|
||||
def __dispatch_event_async(self, event: Event):
|
||||
"""
|
||||
@@ -176,8 +218,21 @@ class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
handlers = self.__subscribers.get(event.event_type, [])
|
||||
for handler in handlers:
|
||||
if handler.__name__ not in self.__disabled_handlers:
|
||||
self.__executor.submit(handler, event.event_data)
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
||||
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
||||
|
||||
@staticmethod
|
||||
def __safe_invoke_handler(handler: Callable[[Dict], None], event: Event):
|
||||
"""
|
||||
安全调用事件处理器,捕获异常并记录日志
|
||||
:param handler: 要调用的处理器
|
||||
:param event: 事件对象
|
||||
"""
|
||||
try:
|
||||
handler(event.event_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in asynchronous handler {handler.__name__}: {str(e)}", exc_info=True)
|
||||
|
||||
def __fixed_consumer(self):
|
||||
"""
|
||||
|
||||
@@ -7,6 +7,7 @@ import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
|
||||
|
||||
from app.helper.sites import SitesHelper
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
|
||||
@@ -17,7 +18,6 @@ from app.db.plugindata_oper import PluginDataOper
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.helper.module import ModuleHelper
|
||||
from app.helper.plugin import PluginHelper
|
||||
from app.helper.sites import SitesHelper
|
||||
from app.log import logger
|
||||
from app.schemas.types import SystemConfigKey
|
||||
from app.utils.crypto import RSAUtils
|
||||
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
|
||||
# 未安装的不加载
|
||||
if plugin_id not in installed_plugins:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
continue
|
||||
# 生成实例
|
||||
plugin_obj = plugin()
|
||||
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
|
||||
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
|
||||
# 启用的插件才设置事件注册状态可用
|
||||
if plugin_obj.get_state():
|
||||
eventmanager.enable_events_hander(plugin_id)
|
||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
||||
else:
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
except Exception as err:
|
||||
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
|
||||
|
||||
@@ -184,10 +184,10 @@ class PluginManager(metaclass=Singleton):
|
||||
self._running_plugins[plugin_id].init_plugin(conf)
|
||||
if self._running_plugins[plugin_id].get_state():
|
||||
# 设置启用的插件事件注册状态可用
|
||||
eventmanager.enable_events_hander(plugin_id)
|
||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
||||
else:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
|
||||
def stop(self, pid: str = None):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user