fix MessageQueueManager

This commit is contained in:
jxxghp
2025-03-10 10:02:32 +08:00
parent 81d08ca517
commit 6603a30e7e
4 changed files with 46 additions and 18 deletions

View File

@@ -24,7 +24,7 @@ from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.message import MessageHelper, MessageQueueManager
from app.helper.progress import ProgressHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper
@@ -479,6 +479,7 @@ def reload_module(_: User = Depends(get_current_active_superuser)):
"""
重新加载模块(仅管理员)
"""
MessageQueueManager().init_config()
ModuleManager().reload()
Scheduler().init()
Monitor().init()

View File

@@ -494,11 +494,6 @@ class ChainBase(metaclass=ABCMeta):
:param message: 消息体
:return: 成功或失败
"""
logger.info(f"发送消息channel={message.channel}"
f"source={message.source},"
f"title={message.title}, "
f"text={message.text}"
f"userid={message.userid}")
# 保存原消息
self.messagehelper.put(message, role="user", title=message.title)
self.messageoper.add(**message.dict())

View File

@@ -6,19 +6,22 @@ import threading
import time
from datetime import datetime
from typing import Any, Union
from typing import List, Dict, Optional, Callable
from typing import List, Optional, Callable
from app.utils.singleton import Singleton
from core.config import global_vars
from db.systemconfig_oper import SystemConfigOper
from log import logger
from schemas.types import SystemConfigKey
from app.core.config import global_vars
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas.types import SystemConfigKey
from app.utils.singleton import Singleton, SingletonClass
from app.log import logger
class MessageQueueManager(metaclass=Singleton):
class MessageQueueManager(metaclass=SingletonClass):
"""
消息发送队列管理器
"""
schedule_periods: List[tuple[int, int, int, int]] = []
def __init__(
self,
send_callback: Optional[Callable] = None,
@@ -30,9 +33,8 @@ class MessageQueueManager(metaclass=Singleton):
:param send_callback: 实际发送消息的回调函数
:param check_interval: 时间检查间隔(秒)
"""
self.schedule_periods = self._parse_schedule(
SystemConfigOper().get(SystemConfigKey.NotificationSendTime)
)
self.init_config()
self.queue: queue.Queue[Any] = queue.Queue()
self.send_callback = send_callback
self.check_interval = check_interval
@@ -41,6 +43,14 @@ class MessageQueueManager(metaclass=Singleton):
self.thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.thread.start()
def init_config(self):
"""
初始化配置
"""
self.schedule_periods = self._parse_schedule(
SystemConfigOper().get(SystemConfigKey.NotificationSendTime)
)
@staticmethod
def _parse_schedule(periods: Union[list, dict]) -> List[tuple[int, int, int, int]]:
"""
@@ -106,9 +116,10 @@ class MessageQueueManager(metaclass=Singleton):
"""
if self.send_callback:
try:
logger.info(f"发送消息:{kwargs}")
self.send_callback(*args, **kwargs)
except Exception as e:
logger.error(str(e))
logger.error(f"发送消息错误:{str(e)}")
def _monitor_loop(self) -> None:
"""

View File

@@ -3,7 +3,7 @@ import abc
class Singleton(abc.ABCMeta, type):
"""
类单例模式
类单例模式(按参数)
"""
_instances: dict = {}
@@ -19,3 +19,24 @@ class AbstractSingleton(abc.ABC, metaclass=Singleton):
"""
抽像类单例模式
"""
pass
class SingletonClass(abc.ABCMeta, type):
"""
类单例模式(按类)
"""
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonClass, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class AbstractSingletonClass(abc.ABC, metaclass=SingletonClass):
"""
抽像类单例模式(按类)
"""
pass