From 79c637e003a0896f2457ae2181d8810151631739 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 18 Nov 2024 08:01:43 +0800 Subject: [PATCH] =?UTF-8?q?fix=20#3154=20=E7=9B=B8=E5=90=8C=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E9=81=BF=E5=85=8D=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/event.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index f61ac6f6..86c09116 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -84,6 +84,7 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 + self.__processing_events = {} # 用于记录当前正在处理的事件 {event_hash: event} def start(self): """ @@ -129,6 +130,14 @@ class EventManager(metaclass=Singleton): for handler in handlers.values() ) + @staticmethod + def __get_event_hash(event: Event) -> str: + """ + 计算事件的唯一标识符(hash) + """ + data_string = str(event.event_type.value) + str(event.event_data) + return str(uuid.uuid5(uuid.NAMESPACE_DNS, data_string)) + def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Union[Dict, ChainEventData]] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: """ @@ -139,6 +148,12 @@ class EventManager(metaclass=Singleton): :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ event = Event(etype, data, priority) + event_hash = self.__get_event_hash(event) + with self.__lock: + if event_hash in self.__processing_events: + logger.debug(f"Duplicate event ignored: {event}") + return None + self.__processing_events[event_hash] = event if isinstance(etype, EventType): self.__trigger_broadcast_event(event) elif isinstance(etype, ChainEventType): @@ -320,9 +335,14 @@ class EventManager(metaclass=Singleton): """ 触发链式事件,按顺序调用订阅的处理器,并记录处理耗时 """ - logger.debug(f"Triggering synchronous chain event: {event}") - dispatch = self.__dispatch_chain_event(event) - return event if dispatch else None + try: + logger.debug(f"Triggering synchronous chain event: {event}") + dispatch = self.__dispatch_chain_event(event) + return event if dispatch else None + finally: + event_hash = self.__get_event_hash(event) + with self.__lock: + self.__processing_events.pop(event_hash, None) def __trigger_broadcast_event(self, event: Event): """ @@ -363,6 +383,9 @@ class EventManager(metaclass=Singleton): return for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) + event_hash = self.__get_event_hash(event) + with self.__lock: + self.__processing_events.pop(event_hash, None) def __safe_invoke_handler(self, handler: Callable, event: Event): """