From 3d43750e9bda6d4d6ba8732b57004c9e39f7a456 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 10 Sep 2025 17:33:12 +0800 Subject: [PATCH] fix async event --- app/core/event.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/app/core/event.py b/app/core/event.py index 37defeaf..8dbd9c48 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,3 +1,4 @@ +import asyncio import importlib import inspect import random @@ -71,15 +72,26 @@ class EventManager(metaclass=Singleton): """ def __init__(self): - self.__executor = ThreadHelper() # 动态线程池,用于消费事件 - self.__consumer_threads = [] # 用于保存启动的事件消费者线程 - self.__event_queue = PriorityQueue() # 优先级队列 - self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者 - self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者 - self.__disabled_handlers = set() # 禁用的事件处理器集合 - self.__disabled_classes = set() # 禁用的事件处理器类集合 - self.__lock = threading.Lock() # 线程锁 - self.__event = threading.Event() # 退出事件 + # 动态线程池,用于消费事件 + self.__executor = ThreadHelper() + # 用于保存启动的事件消费者线程 + self.__consumer_threads = [] + # 优先级队列 + self.__event_queue = PriorityQueue() + # 广播事件的订阅者 + self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} + # 链式事件的订阅者 + self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} + # 禁用的事件处理器集合 + self.__disabled_handlers = set() + # 禁用的事件处理器类集合 + self.__disabled_classes = set() + # 线程锁 + self.__lock = threading.Lock() + # 退出事件 + self.__event = threading.Event() + # 当前事件循环 + self.loop = asyncio.get_event_loop() def start(self): """ @@ -438,7 +450,15 @@ class EventManager(metaclass=Singleton): isolated_event = Event(event_type=event.event_type, event_data=event_data_copy, priority=event.priority) - self.__executor.submit(self.__safe_invoke_handler, handler, isolated_event) + if inspect.iscoroutinefunction(handler): + # 对于异步函数,直接在事件循环中运行 + asyncio.run_coroutine_threadsafe( + self.__safe_invoke_handler_async(handler, isolated_event), + self.loop + ) + else: + # 对于同步函数,在线程池中运行 + self.__executor.submit(self.__safe_invoke_handler, handler, isolated_event) def __safe_invoke_handler(self, handler: Callable, event: Event): """