diff --git a/app/core/event.py b/app/core/event.py index f61ac6f6..86c09116 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -84,6 +84,7 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 + self.__processing_events = {} # 用于记录当前正在处理的事件 {event_hash: event} def start(self): """ @@ -129,6 +130,14 @@ class EventManager(metaclass=Singleton): for handler in handlers.values() ) + @staticmethod + def __get_event_hash(event: Event) -> str: + """ + 计算事件的唯一标识符(hash) + """ + data_string = str(event.event_type.value) + str(event.event_data) + return str(uuid.uuid5(uuid.NAMESPACE_DNS, data_string)) + def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Union[Dict, ChainEventData]] = None, priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]: """ @@ -139,6 +148,12 @@ class EventManager(metaclass=Singleton): :return: 如果是链式事件,返回处理后的事件数据;否则返回 None """ event = Event(etype, data, priority) + event_hash = self.__get_event_hash(event) + with self.__lock: + if event_hash in self.__processing_events: + logger.debug(f"Duplicate event ignored: {event}") + return None + self.__processing_events[event_hash] = event if isinstance(etype, EventType): self.__trigger_broadcast_event(event) elif isinstance(etype, ChainEventType): @@ -320,9 +335,14 @@ class EventManager(metaclass=Singleton): """ 触发链式事件,按顺序调用订阅的处理器,并记录处理耗时 """ - logger.debug(f"Triggering synchronous chain event: {event}") - dispatch = self.__dispatch_chain_event(event) - return event if dispatch else None + try: + logger.debug(f"Triggering synchronous chain event: {event}") + dispatch = self.__dispatch_chain_event(event) + return event if dispatch else None + finally: + event_hash = self.__get_event_hash(event) + with self.__lock: + self.__processing_events.pop(event_hash, None) def __trigger_broadcast_event(self, event: Event): """ @@ -363,6 +383,9 @@ class EventManager(metaclass=Singleton): return for handler_id, handler in handlers.items(): self.__executor.submit(self.__safe_invoke_handler, handler, event) + event_hash = self.__get_event_hash(event) + with self.__lock: + self.__processing_events.pop(event_hash, None) def __safe_invoke_handler(self, handler: Callable, event: Event): """