feat(event): optimize handler

This commit is contained in:
InfinityPacer
2024-09-20 16:26:45 +08:00
parent 3bee5a8a86
commit be63e9ed15
2 changed files with 115 additions and 106 deletions

View File

@@ -1,3 +1,4 @@
import inspect
import threading
import time
import uuid
@@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton):
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
@@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton):
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
"""
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 ChainEventType)
@@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton):
else:
logger.error(f"Unknown event type: {etype}")
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None],
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
priority: int = DEFAULT_EVENT_PRIORITY):
"""
注册事件处理器,将处理器添加到对应的事件订阅列表中
@@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].append(handler)
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]):
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
"""
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 ChainEventType)
@@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton):
self.__broadcast_subscribers[event_type].remove(handler)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
def disable_event_handler(self, target: Union[Callable, type]):
"""
禁用指定名称的事件处理器或事件处理类,防止其响应事件
:param handler_name: 要禁用的事件处理器名称
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
禁用指定的事件处理器或事件处理
:param target: 处理器函数或类
"""
if class_name:
self.__disabled_classes.add(class_name)
logger.debug(f"Disabled event handler class: {class_name}")
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.add(identifier)
logger.debug(f"Disabled event handler class: {identifier}")
else:
self.__disabled_handlers.add(handler_name)
logger.debug(f"Disabled event handler: {handler_name}")
self.__disabled_handlers.add(identifier)
logger.debug(f"Disabled event handler: {identifier}")
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
def enable_event_handler(self, target: Union[Callable, type]):
"""
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件
:param handler_name: 要启用的事件处理器名称
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
启用指定的事件处理器或事件处理
:param target: 处理器函数或类
"""
if class_name:
self.__disabled_classes.discard(class_name)
logger.debug(f"Enabled event handler class: {class_name}")
identifier = self.__get_handler_identifier(target)
if isinstance(target, type):
self.__disabled_classes.discard(identifier)
logger.debug(f"Enabled event handler class: {identifier}")
else:
self.__disabled_handlers.discard(handler_name)
logger.debug(f"Enabled event handler: {handler_name}")
self.__disabled_handlers.discard(identifier)
logger.debug(f"Enabled event handler: {identifier}")
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
"""
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for _, handler in handlers
)
else:
handlers = self.__broadcast_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for handler in handlers
)
def visualize_handlers(self) -> List[Dict[str, str]]:
def visualize_handlers(self) -> List[Dict]:
"""
可视化所有事件处理器,包括是否被禁用的状态
:return: 处理器列表,包含处理器名称、类名和状态
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
"""
handler_info = []
with self.__lock:
for event_type, handlers in self.__broadcast_subscribers.items():
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
"handler_name": handler.__name__,
"class_name": class_name,
"status": status
})
for event_type, handlers in self.__chain_subscribers.items():
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
"handler_name": handler.__name__,
"class_name": class_name,
"priority": priority,
"status": status
})
# 统一处理广播事件和链式事件
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
for handler_data in subscribers:
if isinstance(subscribers, dict):
priority, handler = handler_data
else:
priority = None
handler = handler_data
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 检查处理器的启用状态
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
# 构建处理器信息字典
handler_dict = {
"event_type": event_type.value,
"handler_identifier": handler_id,
"status": status
}
if priority is not None:
handler_dict["priority"] = priority
handler_info.append(handler_dict)
return handler_info
def __trigger_chain_event(self, event: Event) -> Dict:
@staticmethod
def __get_handler_identifier(target: Union[Callable, type]) -> str:
"""
触发链式事件,按顺序调用订阅的处理器
:param event: 处理的事件对象
:return: 返回处理后的事件数据
获取处理器或处理器类的唯一标识符,包括模块名和类名
:param target: 处理器函数或类
:return: 唯一标识符
"""
if isinstance(target, type):
# 如果是类,使用模块名和类名
module_name = target.__module__
class_name = target.__qualname__
return f"{module_name}.{class_name}"
else:
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
module = inspect.getmodule(target)
module_name = module.__name__ if module else "unknown_module"
qualname = target.__qualname__
return f"{module_name}.{qualname}"
def __is_handler_enabled(self, handler: Callable) -> bool:
"""
检查处理器是否已启用(没有被禁用)
:param handler: 处理器函数
:return: 如果处理器启用则返回 True否则返回 False
"""
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
# 获取处理器所属类的唯一标识符
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
return False
return True
def __trigger_chain_event(self, event: Event) -> Event:
"""
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
"""
logger.debug(f"Triggering synchronous chain event: {event}")
self.__dispatch_chain_event(event)
return event.event_data
return event
def __trigger_broadcast_event(self, event: Event):
"""
@@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton):
def __dispatch_chain_event(self, event: Event):
"""
同步方式调度链式事件,按优先级顺序逐个调用事件处理器
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, [])
self.__log_event_lifecycle(event, "started")
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
start_time = time.time()
try:
handler(event.event_data)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
except Exception as e:
self.__handle_event_error(event, handler, e)
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
self.__log_event_lifecycle(event, "completed")
def __dispatch_broadcast_event(self, event: Event):
@@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton):
"""
handlers = self.__broadcast_subscribers.get(event.event_type, [])
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
self.__executor.submit(self.__safe_invoke_handler, handler, event)
self.__executor.submit(self.__safe_invoke_handler, handler, event)
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event):
def __safe_invoke_handler(self, handler: Callable, event: Event):
"""
安全调用事件处理器,捕获异常并记录日志
:param handler: 要调用的处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
return
try:
handler(event.event_data)
handler(event)
except Exception as e:
self.__handle_event_error(event, handler, e)
@@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton):
- 或事件类型成员的列表
"""
def decorator(f: Callable[[Dict], None]):
def decorator(f: Callable):
event_list = []
# 如果传入的是列表,处理每个事件类型