feat(event): improve event consumer logic for handling of events

This commit is contained in:
InfinityPacer
2024-09-20 20:37:29 +08:00
parent 688693b31f
commit 857383c8d0
3 changed files with 121 additions and 116 deletions

View File

@@ -1,8 +1,4 @@
import copy
import importlib
import threading
import traceback
from threading import Thread
from typing import Any, Union, Dict
from app.chain import ChainBase
@@ -12,11 +8,9 @@ from app.chain.subscribe import SubscribeChain
from app.chain.system import SystemChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.event import Event as ManagerEvent
from app.core.event import eventmanager, EventManager
from app.core.event import Event as ManagerEvent, eventmanager
from app.core.plugin import PluginManager
from app.helper.message import MessageHelper
from app.helper.thread import ThreadHelper
from app.log import logger
from app.scheduler import Scheduler
from app.schemas import Notification
@@ -41,12 +35,7 @@ class Command(metaclass=Singleton):
# 内建命令
_commands = {}
# 退出事件
_event = threading.Event()
def __init__(self):
# 事件管理器
self.eventmanager = EventManager()
# 插件管理器
self.pluginmanager = PluginManager()
# 处理链
@@ -55,8 +44,6 @@ class Command(metaclass=Singleton):
self.scheduler = Scheduler()
# 消息管理器
self.messagehelper = MessageHelper()
# 线程管理器
self.threader = ThreadHelper()
# 内置命令
self._commands = {
"/cookiecloud": {
@@ -172,72 +159,9 @@ class Command(metaclass=Singleton):
# 广播注册命令菜单
if not settings.DEV:
self.chain.register_commands(commands=self.get_commands())
# 消息处理线程
self._thread = Thread(target=self.__run)
# 启动事件处理线程
self._thread.start()
# 重启msg
SystemChain().restart_finish()
def __run(self):
"""
事件处理线程
"""
while not self._event.is_set():
event, handlers = self.eventmanager.get_event()
if event:
logger.info(f"处理事件:{event.event_type} - {handlers}")
if not handlers and event.event_callback:
event.event_callback()
for handler in handlers:
names = handler.__qualname__.split(".")
[class_name, method_name] = names
try:
if class_name in self.pluginmanager.get_plugin_ids():
# 插件事件
result = self.threader.submit(
self.pluginmanager.run_plugin_method,
class_name, method_name, copy.deepcopy(event)
)
if event.event_callback:
event.event_callback(result)
else:
# 检查全局变量中是否存在
if class_name not in globals():
# 导入模块除了插件和Command本身只有chain能响应事件
try:
module = importlib.import_module(
f"app.chain.{class_name[:-5].lower()}"
)
class_obj = getattr(module, class_name)()
except Exception as e:
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
continue
else:
# 通过类名创建类实例
class_obj = globals()[class_name]()
# 检查类是否存在并调用方法
if hasattr(class_obj, method_name):
self.threader.submit(
getattr(class_obj, method_name),
copy.deepcopy(event)
)
except Exception as e:
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
self.messagehelper.put(title=f"{event.event_type} 事件处理出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.eventmanager.send_event(
EventType.SystemError,
{
"type": "event",
"event_type": event.event_type,
"event_handle": f"{class_name}.{method_name}",
"error": str(e),
"traceback": traceback.format_exc()
}
)
def __run_command(self, command: Dict[str, any], data_str: str = "",
channel: MessageChannel = None, source: str = None, userid: Union[str, int] = None):
"""
@@ -292,18 +216,6 @@ class Command(metaclass=Singleton):
# 没有参数
command['func']()
def stop(self):
"""
停止事件处理线程
"""
logger.info("正在停止事件处理...")
self._event.set()
try:
self._thread.join()
logger.info("事件处理停止完成")
except Exception as e:
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")
def get_commands(self):
"""
获取命令列表
@@ -361,7 +273,7 @@ class Command(metaclass=Singleton):
"""
发送插件命令
"""
EventManager().send_event(etype, data)
eventmanager.send_event(etype, data)
@eventmanager.register(EventType.CommandExcute)
def command_event(self, event: ManagerEvent) -> None:

View File

@@ -1,10 +1,14 @@
import copy
import importlib
import inspect
import threading
import time
import traceback
import uuid
from queue import PriorityQueue, Empty
from typing import Callable, Dict, List, Union, Optional
from app.helper.message import MessageHelper
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas.types import EventType, ChainEventType
@@ -12,8 +16,7 @@ from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
MAX_EVENT_WORKER_POOL_SIZE = 50 # 最大事件工作线程池大小
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的超时时间(秒)
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒)
class Event:
@@ -55,11 +58,13 @@ class EventManager(metaclass=Singleton):
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
"""
def __init__(self, max_workers: int = MAX_EVENT_WORKER_POOL_SIZE):
"""
:param max_workers: 线程池最大工作线程数
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
# 退出事件
_event = threading.Event()
def __init__(self):
self.messagehelper = MessageHelper()
self.__executor = ThreadHelper() # 动态线程池,用于消费事件
self.__consumer_threads = [] # 用于保存启动的事件消费者线程
self.__event_queue = PriorityQueue() # 优先级队列
self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者
@@ -68,9 +73,30 @@ class EventManager(metaclass=Singleton):
self.__lock = threading.Lock() # 线程锁
self.__condition = threading.Condition(self.__lock) # 条件变量
def start(self):
"""
开始广播事件处理线程
"""
# 启动消费者线程用于处理广播事件
self._event.set()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True)
thread.start()
self.__consumer_threads.append(thread) # 将线程对象保存到列表中
def stop(self):
"""
停止广播事件处理线程
"""
logger.info("正在停止事件处理...")
self._event.clear() # 停止广播事件处理
try:
# 通过遍历保存的线程来等待它们完成
for consumer_thread in self.__consumer_threads:
consumer_thread.join()
logger.info("事件处理停止完成")
except Exception as e:
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
@@ -250,14 +276,15 @@ class EventManager(metaclass=Singleton):
:param event: 要调度的事件对象
"""
handlers = self.__chain_subscribers.get(event.event_type, {})
if not handlers:
return
self.__log_event_lifecycle(event, "started")
for handler_id, (priority, handler) in handlers.items():
start_time = time.time()
self.__safe_invoke_handler(handler, event)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
f"Handler {handler.__qualname__} (Priority: {priority}) , completed in {time.time() - start_time:.3f}s"
)
self.__log_event_lifecycle(event, "completed")
def __dispatch_broadcast_event(self, event: Event):
@@ -266,41 +293,91 @@ class EventManager(metaclass=Singleton):
:param event: 要调度的事件对象
"""
handlers = self.__broadcast_subscribers.get(event.event_type, {})
if not handlers:
return
for handler_id, handler in handlers.items():
self.__executor.submit(self.__safe_invoke_handler, handler, event)
def __safe_invoke_handler(self, handler: Callable, event: Event):
"""
安全调用事件处理器,捕获异常并记录日志
:param handler: 要调用的处理器
调用处理器,处理链式或广播事件
:param handler: 处理器
:param event: 事件对象
"""
if not self.__is_handler_enabled(handler):
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution")
return
# 根据事件类型判断是否需要深复制
is_broadcast_event = isinstance(event.event_type, EventType)
event_to_process = copy.deepcopy(event) if is_broadcast_event else event
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
try:
handler(event)
from app.core.plugin import PluginManager
if class_name in PluginManager().get_plugin_ids():
# 定义一个插件调用函数
def plugin_callable():
PluginManager().run_plugin_method(class_name, method_name, event_to_process)
if is_broadcast_event:
self.__executor.submit(plugin_callable)
else:
plugin_callable()
else:
# 获取全局对象或模块类的实例
class_obj = self.__get_class_instance(class_name)
if class_obj and hasattr(class_obj, method_name):
method = getattr(class_obj, method_name)
if is_broadcast_event:
self.__executor.submit(method, event_to_process)
else:
method(event_to_process)
except Exception as e:
self.__handle_event_error(event, handler, e)
@staticmethod
def __get_class_instance(class_name: str):
"""
根据类名获取类实例,首先检查全局变量中是否存在该类,如果不存在则尝试动态导入模块。
:param class_name: 类的名称
:return: 类的实例
"""
# 检查类是否在全局变量中
if class_name in globals():
class_obj = globals()[class_name]()
else:
# 如果类不在全局变量中,尝试动态导入模块并创建实例
# 导入模块除了插件和Command只有chain能响应事件
try:
module = importlib.import_module(f"app.chain.{class_name[:-5].lower()}")
class_obj = getattr(module, class_name)()
except Exception as e:
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
return None
return class_obj
def __fixed_broadcast_consumer(self):
"""
固定的后台广播消费者线程,持续从队列中提取事件
"""
while True:
while not self._event.is_set():
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
with self.__condition:
# 当队列为空时,线程进入等待状态,直到有新事件到来
while self.__event_queue.empty():
# 阻塞等待,直到有事件插入
self.__condition.wait()
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_broadcast_event(event)
except Empty:
logger.debug("Queue is empty, waiting for new events.")
logger.debug("Queue is empty, waiting for new events")
@staticmethod
def __log_event_lifecycle(event: Event, stage: str):
@@ -309,15 +386,28 @@ class EventManager(metaclass=Singleton):
"""
logger.debug(f"{stage} - {event}")
@staticmethod
def __handle_event_error(event: Event, handler: Callable, error: Exception):
def __handle_event_error(self, event: Event, handler: Callable, e: Exception):
"""
全局错误处理器,用于处理事件处理中的异常
"""
logger.error(
f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}")
# 可以将错误事件重新发送到事件队列或执行其他逻辑
# eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id})
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
names = handler.__qualname__.split(".")
class_name, method_name = names[0], names[1]
self.messagehelper.put(title=f"{event.event_type} 事件处理出错",
message=f"{class_name}.{method_name}{str(e)}",
role="system")
self.send_event(
EventType.SystemError,
{
"type": "event",
"event_type": event.event_type,
"event_handle": f"{class_name}.{method_name}",
"error": str(e),
"traceback": traceback.format_exc()
}
)
def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]):
"""

View File

@@ -30,6 +30,7 @@ except ImportError as e:
print(error_message, file=sys.stderr)
sys.exit(1)
from app.core.event import EventManager
from app.core.plugin import PluginManager
from app.db.init import init_db, update_db
from app.helper.thread import ThreadHelper
@@ -212,7 +213,7 @@ def shutdown_server():
PluginManager().stop()
PluginManager().stop_monitor()
# 停止事件消费
Command().stop()
EventManager().stop()
# 停止虚拟显示
DisplayHelper().stop()
# 停止定时服务
@@ -245,8 +246,10 @@ def start_module():
Monitor()
# 启动定时服务
Scheduler()
# 启动事件消费
# 加载命令
Command()
# 启动事件消费
EventManager().start()
# 初始化路由
init_routers()
# 启动前端服务