diff --git a/app/modules/discord/__init__.py b/app/modules/discord/__init__.py
index 3ceaafbf..82d48288 100644
--- a/app/modules/discord/__init__.py
+++ b/app/modules/discord/__init__.py
@@ -102,6 +102,52 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 Discord 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("DISCORD_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 Discord 命令或命令型按钮回调是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
+ @staticmethod
+ def _send_admin_denied(
+ client: Optional[Discord],
+ userid: Optional[Union[str, int]],
+ chat_id: Optional[Union[str, int]] = None,
+ ) -> None:
+ """
+ 向 Discord 非管理员用户发送命令拒绝提示。
+ """
+ if client and userid:
+ client.send_msg(
+ title="只有管理员才有权限执行此命令",
+ userid=str(userid),
+ original_chat_id=str(chat_id) if chat_id else None,
+ )
+
def message_parser(
self, source: str, body: Any, form: Any, args: Any
) -> Optional[CommingMessage]:
@@ -119,6 +165,7 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
client_config = self.get_config(source)
if not client_config:
return None
+ client: Discord = self.get_instance(client_config.name)
try:
msg_json: dict = json.loads(body)
except Exception as e:
@@ -137,6 +184,11 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
message_id = msg_json.get("message_id")
chat_id = msg_json.get("chat_id")
if callback_data and userid:
+ if str(callback_data).strip().startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid, username
+ ):
+ self._send_admin_denied(client, userid, chat_id)
+ return None
logger.info(
f"收到来自 {client_config.name} 的 Discord 按钮回调:"
f"userid={userid}, username={username}, callback_data={callback_data}"
@@ -161,6 +213,11 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
audio_refs = self._extract_audio_refs(msg_json)
files = self._extract_files(msg_json)
if (text or images or audio_refs or files) and userid:
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid, username
+ ):
+ self._send_admin_denied(client, userid, chat_id)
+ return None
logger.info(
f"收到来自 {client_config.name} 的 Discord 消息:"
f"userid={userid}, username={username}, text={text}, "
diff --git a/app/modules/feishu/feishu.py b/app/modules/feishu/feishu.py
index e284dae8..2aa362cc 100644
--- a/app/modules/feishu/feishu.py
+++ b/app/modules/feishu/feishu.py
@@ -5,7 +5,7 @@ import tempfile
import threading
import uuid
from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
import lark_oapi as lark
@@ -107,6 +107,19 @@ class Feishu:
self._api_client = self._build_api_client()
self._start_ws_client()
+ def _should_reject_admin_command(
+ self, *user_ids: Optional[Union[str, int]]
+ ) -> bool:
+ """判断飞书命令或命令型按钮回调是否应因非管理员身份被拒绝。"""
+ if not self._admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in self._admins for candidate in candidates)
+
def _build_api_client(self) -> lark.Client:
"""构建飞书 OpenAPI client,用于发送和编辑消息。"""
return (
@@ -494,6 +507,16 @@ class Feishu:
callback_data = message.get("callback_data")
if not callback_data:
return None
+ if str(callback_data).strip().startswith("/") and self._should_reject_admin_command(
+ open_id, user_id
+ ):
+ self.send_text(
+ "只有管理员才有权限执行此命令",
+ userid=str(userid),
+ chat_id=message.get("chat_id"),
+ receive_id_type="open_id" if open_id else "user_id",
+ )
+ return None
return CommingMessage(
channel=MessageChannel.Feishu,
source=self._name,
@@ -522,7 +545,7 @@ class Feishu:
if not text and not images and not audio_refs and not files:
return None
- if text.startswith("/") and self._admins and str(userid) not in self._admins:
+ if text.startswith("/") and self._should_reject_admin_command(open_id, user_id):
self.send_text(
"只有管理员才有权限执行此命令",
userid=str(userid),
diff --git a/app/modules/qqbot/__init__.py b/app/modules/qqbot/__init__.py
index 0a5557b3..1016e8a1 100644
--- a/app/modules/qqbot/__init__.py
+++ b/app/modules/qqbot/__init__.py
@@ -82,6 +82,46 @@ class QQBotModule(_ModuleBase, _MessageBase[QQBot]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 QQ 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("QQBOT_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 QQ 斜杠命令是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
+ @staticmethod
+ def _send_admin_denied(
+ client: Optional[QQBot], userid: Optional[Union[str, int]]
+ ) -> None:
+ """
+ 向 QQ 非管理员用户发送命令拒绝提示。
+ """
+ if client and userid:
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=str(userid))
+
def message_parser(
self, source: str, body: Any, form: Any, args: Any
) -> Optional[CommingMessage]:
@@ -92,6 +132,7 @@ class QQBotModule(_ModuleBase, _MessageBase[QQBot]):
client_config = self.get_config(source)
if not client_config:
return None
+ client: QQBot = self.get_instance(client_config.name)
try:
if isinstance(body, bytes):
msg_body = json.loads(body)
@@ -116,6 +157,11 @@ class QQBotModule(_ModuleBase, _MessageBase[QQBot]):
user_openid = author.get("user_openid", "")
if not user_openid:
return None
+ if content.startswith("/") and self._should_reject_admin_command(
+ client_config.config, user_openid
+ ):
+ self._send_admin_denied(client, user_openid)
+ return None
logger.info(
f"收到 QQ 私聊消息: userid={user_openid}, "
f"text={(content or '')[:50]}..., images={len(images) if images else 0}, "
@@ -137,6 +183,11 @@ class QQBotModule(_ModuleBase, _MessageBase[QQBot]):
group_openid = msg_body.get("group_openid", "")
# 群聊用 group:group_openid 作为 userid,便于回复时识别
userid = f"group:{group_openid}" if group_openid else member_openid
+ if content.startswith("/") and self._should_reject_admin_command(
+ client_config.config, member_openid, userid
+ ):
+ self._send_admin_denied(client, userid)
+ return None
logger.info(
f"收到 QQ 群消息: group={group_openid}, userid={member_openid}, "
f"text={(content or '')[:50]}..., images={len(images) if images else 0}, "
diff --git a/app/modules/slack/__init__.py b/app/modules/slack/__init__.py
index dd7a9ba4..17228027 100644
--- a/app/modules/slack/__init__.py
+++ b/app/modules/slack/__init__.py
@@ -82,6 +82,44 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 Slack 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("SLACK_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 Slack 命令或命令型按钮回调是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
+ @staticmethod
+ def _send_admin_denied(client: Optional[Slack], userid: Optional[Union[str, int]]) -> None:
+ """
+ 向 Slack 非管理员用户发送命令拒绝提示。
+ """
+ if client and userid:
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=str(userid))
+
def message_parser(
self, source: str, body: Any, form: Any, args: Any
) -> Optional[CommingMessage]:
@@ -209,6 +247,7 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
client_config = self.get_config(source)
if not client_config:
return None
+ client: Slack = self.get_instance(client_config.name)
try:
msg_json = json.loads(body)
while isinstance(msg_json, str):
@@ -229,6 +268,11 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
userid = msg_json.get("user")
text = msg_json.get("text")
username = msg_json.get("user")
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid, username
+ ):
+ self._send_admin_denied(client, userid)
+ return None
message_id = msg_json.get("ts")
chat_id = msg_json.get("channel")
images = self._extract_images(msg_json)
@@ -240,6 +284,11 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
# 使用CALLBACK前缀标识按钮回调
text = f"CALLBACK:{callback_data}"
username = msg_json.get("user", {}).get("name")
+ if str(callback_data).strip().startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid, username
+ ):
+ self._send_admin_denied(client, userid)
+ return None
# 获取原消息信息用于编辑
message_info = msg_json.get("message", {})
@@ -275,6 +324,11 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
flags=re.IGNORECASE,
).strip()
username = ""
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid
+ ):
+ self._send_admin_denied(client, userid)
+ return None
message_id = msg_json.get("event", {}).get("ts")
chat_id = msg_json.get("event", {}).get("channel")
images = self._extract_images(msg_json.get("event", {}))
@@ -284,11 +338,19 @@ class SlackModule(_ModuleBase, _MessageBase[Slack]):
userid = msg_json.get("user", {}).get("id")
text = msg_json.get("callback_id")
username = msg_json.get("user", {}).get("username")
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, userid, username
+ ):
+ self._send_admin_denied(client, userid)
+ return None
elif msg_json.get("command"):
userid = msg_json.get("user_id")
text = msg_json.get("command")
username = msg_json.get("user_name")
chat_id = msg_json.get("channel_id")
+ if self._should_reject_admin_command(client_config.config, userid, username):
+ self._send_admin_denied(client, userid)
+ return None
else:
return None
logger.info(
diff --git a/app/modules/synologychat/__init__.py b/app/modules/synologychat/__init__.py
index 3658ae62..6eedd75f 100644
--- a/app/modules/synologychat/__init__.py
+++ b/app/modules/synologychat/__init__.py
@@ -88,6 +88,46 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 Synology Chat 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("SYNOLOGYCHAT_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 Synology Chat 斜杠命令是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
+ @staticmethod
+ def _send_admin_denied(
+ client: Optional[SynologyChat], userid: Optional[Union[str, int]]
+ ) -> None:
+ """
+ 向 Synology Chat 非管理员用户发送命令拒绝提示。
+ """
+ if client and userid:
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=str(userid))
+
def message_parser(self, source: str, body: Any, form: Any,
args: Any) -> Optional[CommingMessage]:
"""
@@ -127,6 +167,11 @@ class SynologyChatModule(_ModuleBase, _MessageBase[SynologyChat]):
audio_refs = self._extract_audio_refs(message)
files = self._extract_files(message)
if (text or images or audio_refs or files) and user_id:
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, user_id, user_name
+ ):
+ self._send_admin_denied(client, user_id)
+ return None
logger.info(
f"收到来自 {client_config.name} 的SynologyChat消息:"
f"userid={user_id}, username={user_name}, text={text}, "
diff --git a/app/modules/telegram/__init__.py b/app/modules/telegram/__init__.py
index 20aa6184..4da3aa1c 100644
--- a/app/modules/telegram/__init__.py
+++ b/app/modules/telegram/__init__.py
@@ -77,6 +77,36 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 Telegram 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("TELEGRAM_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 Telegram 命令或命令型按钮回调是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
def message_parser(
self, source: str, body: Any, form: Any, args: Any
) -> Optional[CommingMessage]:
@@ -149,16 +179,15 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
if message:
# 处理按钮回调
if "callback_query" in message:
- return self._handle_callback_query(message, client_config)
+ return self._handle_callback_query(message, client_config, client)
# 处理普通消息
return self._handle_text_message(message, client_config, client)
return None
- @staticmethod
def _handle_callback_query(
- message: dict, client_config: NotificationConf
+ self, message: dict, client_config: NotificationConf, client: Telegram
) -> Optional[CommingMessage]:
"""
处理按钮回调查询
@@ -170,6 +199,17 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
user_name = user_info.get("username")
if callback_data and user_id:
+ if str(callback_data).strip().startswith("/") and self._should_reject_admin_command(
+ client_config.config, user_id, user_name
+ ):
+ if client:
+ client.answer_callback_query(
+ callback_query_id=callback_query.get("id"),
+ text="只有管理员才有权限执行此命令",
+ show_alert=True,
+ )
+ return None
+
logger.info(
f"收到来自 {client_config.name} 的Telegram按钮回调:"
f"userid={user_id}, username={user_name}, callback_data={callback_data}"
@@ -237,16 +277,10 @@ class TelegramModule(_ModuleBase, _MessageBase[Telegram]):
else None
)
- admin_users = client_config.config.get("TELEGRAM_ADMINS")
user_list = client_config.config.get("TELEGRAM_USERS")
- config_chat_id = client_config.config.get("TELEGRAM_CHAT_ID")
if cleaned_text and cleaned_text.startswith("/"):
- if (
- admin_users
- and str(user_id) not in admin_users.split(",")
- and str(user_id) != config_chat_id
- ):
+ if self._should_reject_admin_command(client_config.config, user_id, user_name):
client.send_msg(
title="只有管理员才有权限执行此命令", userid=user_id
)
diff --git a/app/modules/vocechat/__init__.py b/app/modules/vocechat/__init__.py
index 78ec5c74..076ec332 100644
--- a/app/modules/vocechat/__init__.py
+++ b/app/modules/vocechat/__init__.py
@@ -87,6 +87,46 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析 VoceChat 管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("VOCECHAT_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls,
+ config: Optional[dict],
+ *user_ids: Optional[Union[str, int]],
+ ) -> bool:
+ """
+ 判断 VoceChat 斜杠命令是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ candidates = [
+ str(user_id).strip()
+ for user_id in user_ids
+ if user_id is not None and str(user_id).strip()
+ ]
+ return not any(candidate in admins for candidate in candidates)
+
+ @staticmethod
+ def _send_admin_denied(
+ client: Optional[VoceChat], userid: Optional[Union[str, int]]
+ ) -> None:
+ """
+ 向 VoceChat 非管理员用户发送命令拒绝提示。
+ """
+ if client and userid:
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=str(userid))
+
def message_parser(self, source: str, body: Any, form: Any,
args: Any) -> Optional[CommingMessage]:
"""
@@ -120,6 +160,7 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
client_config = self.get_config(source)
if not client_config:
return None
+ client: VoceChat = self.get_instance(client_config.name)
# 报文体
msg_body = json.loads(body)
# 类型
@@ -149,6 +190,11 @@ class VoceChatModule(_ModuleBase, _MessageBase[VoceChat]):
# 处理消息内容
if (text or images or audio_refs or files) and userid:
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, msg_body.get("from_uid"), userid
+ ):
+ self._send_admin_denied(client, userid)
+ return None
logger.info(
f"收到来自 {client_config.name} 的VoceChat消息:"
f"userid={userid}, text={text}, images={len(images) if images else 0}, "
diff --git a/app/modules/wechat/__init__.py b/app/modules/wechat/__init__.py
index 0533664d..580a0087 100644
--- a/app/modules/wechat/__init__.py
+++ b/app/modules/wechat/__init__.py
@@ -66,6 +66,29 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
def _is_bot_mode(config: dict) -> bool:
return (config or {}).get("WECHAT_MODE", "app") == "bot"
+ @staticmethod
+ def _get_admins(config: Optional[dict]) -> List[str]:
+ """
+ 解析企业微信管理员配置,兼容逗号分隔和首尾空白。
+ """
+ return [
+ admin.strip()
+ for admin in str((config or {}).get("WECHAT_ADMINS") or "").split(",")
+ if admin.strip()
+ ]
+
+ @classmethod
+ def _should_reject_admin_command(
+ cls, config: Optional[dict], user_id: Optional[str]
+ ) -> bool:
+ """
+ 判断企业微信菜单或斜杠命令是否应因非管理员身份被拒绝。
+ """
+ admins = cls._get_admins(config)
+ if not admins:
+ return False
+ return str(user_id or "").strip() not in admins
+
@classmethod
def _create_client(cls, conf):
if cls._is_bot_mode(conf.config):
@@ -171,13 +194,10 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
audio_refs = None
files = None
if msg_type == "event" and event == "click":
- # 校验用户有权限执行交互命令
- if client_config.config.get('WECHAT_ADMINS'):
- wechat_admins = client_config.config.get('WECHAT_ADMINS').split(',')
- if wechat_admins and not any(
- user_id == admin_user for admin_user in wechat_admins):
- client.send_msg(title="用户无权限执行菜单命令", userid=user_id)
- return None
+ # 企业微信菜单最终会转成命令文本,需与斜杠命令使用一致的管理员校验。
+ if self._should_reject_admin_command(client_config.config, user_id):
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
+ return None
# 根据EventKey执行命令
content = DomUtils.tag_value(root_node, "EventKey")
logger.info(f"收到来自 {client_config.name} 的微信事件:userid={user_id}, event={content}")
@@ -221,6 +241,12 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
else:
return None
+ if content and content.startswith("/") and self._should_reject_admin_command(
+ client_config.config, user_id
+ ):
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
+ return None
+
if content or images or audio_refs or files:
# 处理消息内容
return CommingMessage(channel=MessageChannel.Wechat, source=client_config.name,
@@ -274,17 +300,13 @@ class WechatModule(_ModuleBase, _MessageBase[WeChat]):
if text:
text = re.sub(r"@\S+", "", text).strip()
- if text and text.startswith("/") and client_config.config.get('WECHAT_ADMINS'):
- wechat_admins = [
- admin.strip()
- for admin in client_config.config.get('WECHAT_ADMINS', '').split(',')
- if admin.strip()
- ]
- if wechat_admins and sender not in wechat_admins:
- client: WeChatBot = self.get_instance(client_config.name)
- if client:
- client.send_msg(title="只有管理员才有权限执行此命令", userid=sender)
- return None
+ if text and text.startswith("/") and self._should_reject_admin_command(
+ client_config.config, sender
+ ):
+ client: WeChatBot = self.get_instance(client_config.name)
+ if client:
+ client.send_msg(title="只有管理员才有权限执行此命令", userid=sender)
+ return None
if not text and not images and not audio_refs and not files:
return None
diff --git a/app/modules/wechatclawbot/__init__.py b/app/modules/wechatclawbot/__init__.py
index f95e4763..c4bce8f2 100644
--- a/app/modules/wechatclawbot/__init__.py
+++ b/app/modules/wechatclawbot/__init__.py
@@ -181,7 +181,9 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]):
for admin in str(client_config.config.get("WECHATCLAWBOT_ADMINS") or "").split(",")
if admin.strip()
]
- if text.startswith("/") and admins and user_id not in admins:
+ callback_data = text[9:].strip() if text.startswith("CALLBACK:") else ""
+ is_admin_command = text.startswith("/") or callback_data.startswith("/")
+ if is_admin_command and admins and user_id not in admins:
client = self.get_instance(client_config.name)
if client:
client.send_msg(title="只有管理员才有权限执行此命令", userid=user_id)
diff --git a/tests/test_message_channel_permissions.py b/tests/test_message_channel_permissions.py
new file mode 100644
index 00000000..06cc5f7f
--- /dev/null
+++ b/tests/test_message_channel_permissions.py
@@ -0,0 +1,403 @@
+import json
+import unittest
+from types import SimpleNamespace
+from unittest.mock import Mock, patch
+
+from app.modules.feishu.feishu import Feishu
+from app.modules.discord import DiscordModule
+from app.modules.qqbot import QQBotModule
+from app.modules.slack import SlackModule
+from app.modules.synologychat import SynologyChatModule
+from app.modules.telegram import TelegramModule
+from app.modules.vocechat import VoceChatModule
+from app.modules.wechatclawbot import WechatClawBotModule
+
+
+class TestMessageChannelPermissions(unittest.TestCase):
+ """消息渠道管理员权限测试。"""
+
+ def test_feishu_command_callback_blocks_non_admin(self):
+ """飞书命令型按钮回调应拦截非管理员。"""
+ with (
+ patch.object(Feishu, "_build_api_client", return_value=Mock()),
+ patch.object(Feishu, "_start_ws_client"),
+ ):
+ client = Feishu(
+ FEISHU_APP_ID="app-id",
+ FEISHU_APP_SECRET="app-secret",
+ FEISHU_ADMINS="ou_admin",
+ name="feishu-test",
+ )
+
+ with patch.object(client, "send_text", return_value={"success": True}) as send_text:
+ message = client.parse_message(
+ {
+ "type": "cardAction",
+ "callback_data": "/sites",
+ "message_id": "om_1",
+ "chat_id": "oc_1",
+ "sender": {
+ "open_id": "ou_user",
+ "user_id": "u_user",
+ "name": "tester",
+ },
+ }
+ )
+
+ self.assertIsNone(message)
+ send_text.assert_called_once_with(
+ "只有管理员才有权限执行此命令",
+ userid="ou_user",
+ chat_id="oc_1",
+ receive_id_type="open_id",
+ )
+
+ def test_telegram_command_callback_blocks_non_admin(self):
+ """Telegram 命令型按钮回调应拦截非管理员。"""
+ module = TelegramModule()
+ client = SimpleNamespace(answer_callback_query=Mock(), bot_username=None)
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="telegram-test", config={"TELEGRAM_ADMINS": "10001"}
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="telegram-test",
+ body=json.dumps(
+ {
+ "callback_query": {
+ "id": "callback-1",
+ "from": {"id": 10002, "username": "tester"},
+ "data": "/sites",
+ "message": {"message_id": 12, "chat": {"id": "-100"}},
+ }
+ }
+ ),
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.answer_callback_query.assert_called_once_with(
+ callback_query_id="callback-1",
+ text="只有管理员才有权限执行此命令",
+ show_alert=True,
+ )
+
+ def test_slack_command_callback_blocks_non_admin(self):
+ """Slack 命令型按钮回调应拦截非管理员。"""
+ module = SlackModule()
+ client = SimpleNamespace(send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="slack-test", config={"SLACK_ADMINS": "UADMIN"}
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="slack-test",
+ body=json.dumps(
+ {
+ "type": "block_actions",
+ "user": {"id": "UUSER", "name": "tester"},
+ "actions": [{"value": "/sites"}],
+ "message": {"ts": "1710000000.000100"},
+ "container": {"channel_id": "C01"},
+ }
+ ),
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="UUSER"
+ )
+
+ def test_discord_command_interaction_blocks_non_admin(self):
+ """Discord 命令型按钮回调应拦截非管理员。"""
+ module = DiscordModule()
+ client = SimpleNamespace(send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="discord-test", config={"DISCORD_ADMINS": "admin-id"}
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="discord-test",
+ body=json.dumps(
+ {
+ "type": "interaction",
+ "userid": "user-id",
+ "username": "tester",
+ "callback_data": "/sites",
+ "message_id": "msg-1",
+ "chat_id": "chat-1",
+ }
+ ),
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令",
+ userid="user-id",
+ original_chat_id="chat-1",
+ )
+
+ def test_non_command_callbacks_allow_non_admin(self):
+ """非命令型按钮回调不应套用管理员限制。"""
+ with (
+ patch.object(Feishu, "_build_api_client", return_value=Mock()),
+ patch.object(Feishu, "_start_ws_client"),
+ ):
+ feishu = Feishu(
+ FEISHU_APP_ID="app-id",
+ FEISHU_APP_SECRET="app-secret",
+ FEISHU_ADMINS="ou_admin",
+ name="feishu-test",
+ )
+ with patch.object(feishu, "send_text") as send_text:
+ feishu_message = feishu.parse_message(
+ {
+ "type": "cardAction",
+ "callback_data": "sites:req:refresh",
+ "sender": {"open_id": "ou_user", "user_id": "u_user"},
+ }
+ )
+ self.assertIsNotNone(feishu_message)
+ send_text.assert_not_called()
+
+ telegram_module = TelegramModule()
+ telegram_client = SimpleNamespace(answer_callback_query=Mock(), bot_username=None)
+ with patch.object(
+ telegram_module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="telegram-test", config={"TELEGRAM_ADMINS": "10001"}
+ ),
+ ), patch.object(telegram_module, "get_instance", return_value=telegram_client):
+ telegram_message = telegram_module.message_parser(
+ source="telegram-test",
+ body=json.dumps(
+ {
+ "callback_query": {
+ "id": "callback-1",
+ "from": {"id": 10002, "username": "tester"},
+ "data": "sites:req:refresh",
+ }
+ }
+ ),
+ form={},
+ args={},
+ )
+ self.assertIsNotNone(telegram_message)
+ telegram_client.answer_callback_query.assert_not_called()
+
+ slack_module = SlackModule()
+ slack_client = SimpleNamespace(send_msg=Mock())
+ with patch.object(
+ slack_module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="slack-test", config={"SLACK_ADMINS": "UADMIN"}
+ ),
+ ), patch.object(slack_module, "get_instance", return_value=slack_client):
+ slack_message = slack_module.message_parser(
+ source="slack-test",
+ body=json.dumps(
+ {
+ "type": "block_actions",
+ "user": {"id": "UUSER", "name": "tester"},
+ "actions": [{"value": "sites:req:refresh"}],
+ "message": {"ts": "1710000000.000100"},
+ "container": {"channel_id": "C01"},
+ }
+ ),
+ form={},
+ args={},
+ )
+ self.assertIsNotNone(slack_message)
+ slack_client.send_msg.assert_not_called()
+
+ discord_module = DiscordModule()
+ discord_client = SimpleNamespace(send_msg=Mock())
+ with patch.object(
+ discord_module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="discord-test", config={"DISCORD_ADMINS": "admin-id"}
+ ),
+ ), patch.object(discord_module, "get_instance", return_value=discord_client):
+ discord_message = discord_module.message_parser(
+ source="discord-test",
+ body=json.dumps(
+ {
+ "type": "interaction",
+ "userid": "user-id",
+ "username": "tester",
+ "callback_data": "sites:req:refresh",
+ }
+ ),
+ form={},
+ args={},
+ )
+ self.assertIsNotNone(discord_message)
+ discord_client.send_msg.assert_not_called()
+
+ clawbot_module = WechatClawBotModule()
+ clawbot_client = SimpleNamespace(send_msg=Mock())
+ with patch.object(
+ clawbot_module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="wechatclawbot-test",
+ config={"WECHATCLAWBOT_ADMINS": "admin-user"},
+ ),
+ ), patch.object(clawbot_module, "get_instance", return_value=clawbot_client):
+ clawbot_message = clawbot_module.message_parser(
+ source="wechatclawbot-test",
+ body={
+ "__channel__": "wechatclawbot",
+ "userid": "normal-user",
+ "text": "CALLBACK:sites:req:refresh",
+ },
+ form={},
+ args={},
+ )
+ self.assertIsNotNone(clawbot_message)
+ clawbot_client.send_msg.assert_not_called()
+
+ def test_qq_slash_command_blocks_non_admin(self):
+ """QQ 斜杠命令应拦截非管理员。"""
+ module = QQBotModule()
+ client = SimpleNamespace(send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="qq-test", config={"QQBOT_ADMINS": "admin-openid"}
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="qq-test",
+ body={
+ "type": "C2C_MESSAGE_CREATE",
+ "content": "/sites",
+ "author": {"user_openid": "user-openid"},
+ },
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="user-openid"
+ )
+
+ def test_vocechat_slash_command_blocks_non_admin(self):
+ """VoceChat 斜杠命令应拦截非管理员。"""
+ module = VoceChatModule()
+ client = SimpleNamespace(send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="vocechat-test",
+ config={"VOCECHAT_ADMINS": "UID#1", "channel_id": "2"},
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="vocechat-test",
+ body=json.dumps(
+ {
+ "detail": {
+ "type": "normal",
+ "content_type": "text/plain",
+ "content": "/sites",
+ },
+ "from_uid": 3,
+ "target": {"uid": 1},
+ }
+ ),
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="UID#3"
+ )
+
+ def test_synologychat_slash_command_blocks_non_admin(self):
+ """Synology Chat 斜杠命令应拦截非管理员。"""
+ module = SynologyChatModule()
+ client = SimpleNamespace(check_token=Mock(return_value=True), send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="synology-test", config={"SYNOLOGYCHAT_ADMINS": "admin"}
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="synology-test",
+ body={},
+ form={
+ "token": "token",
+ "text": "/sites",
+ "user_id": "42",
+ "username": "tester",
+ },
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="42"
+ )
+
+ def test_wechatclawbot_command_callback_blocks_non_admin(self):
+ """微信 ClawBot 命令型回调消息应拦截非管理员。"""
+ module = WechatClawBotModule()
+ client = SimpleNamespace(send_msg=Mock())
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="wechatclawbot-test",
+ config={"WECHATCLAWBOT_ADMINS": "admin-user"},
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="wechatclawbot-test",
+ body={
+ "__channel__": "wechatclawbot",
+ "userid": "normal-user",
+ "text": "CALLBACK:/sites",
+ },
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="normal-user"
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_wechat_permissions.py b/tests/test_wechat_permissions.py
new file mode 100644
index 00000000..e30af221
--- /dev/null
+++ b/tests/test_wechat_permissions.py
@@ -0,0 +1,153 @@
+import json
+import unittest
+from types import SimpleNamespace
+from unittest.mock import Mock, patch
+
+from app.modules.wechat import WechatModule
+
+
+class TestWechatPermissions(unittest.TestCase):
+ """企业微信命令权限控制测试。"""
+
+ def _parse_encrypted_xml(self, xml_message: bytes, config: dict, client: SimpleNamespace):
+ """
+ 使用模拟解密结果解析企业微信自建应用回调。
+ """
+ module = WechatModule()
+ crypt = Mock()
+ crypt.DecryptMsg.return_value = (0, xml_message)
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(name="wechat-test", config=config),
+ ), patch.object(
+ module, "get_instance", return_value=client
+ ), patch(
+ "app.modules.wechat.WXBizMsgCrypt",
+ return_value=crypt,
+ ):
+ return module.message_parser(
+ source="wechat-test",
+ body=b"encrypted",
+ form={},
+ args={"msg_signature": "sig", "timestamp": "1", "nonce": "n"},
+ )
+
+ def test_menu_click_blocks_non_admin(self):
+ """
+ 非管理员点击企业微信菜单时应被拦截。
+ """
+ client = SimpleNamespace(send_msg=Mock())
+ message = self._parse_encrypted_xml(
+ b"""
+
+
+
+
+
+
+ """,
+ {
+ "WECHAT_TOKEN": "token",
+ "WECHAT_ENCODING_AESKEY": "encoding",
+ "WECHAT_CORPID": "corpid",
+ "WECHAT_ADMINS": "user-1",
+ },
+ client,
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="user-2"
+ )
+
+ def test_menu_click_allows_admin_with_padded_config(self):
+ """
+ 管理员配置含空格时,菜单权限判断仍应正确放行。
+ """
+ client = SimpleNamespace(send_msg=Mock())
+ message = self._parse_encrypted_xml(
+ b"""
+
+
+
+
+
+
+ """,
+ {
+ "WECHAT_TOKEN": "token",
+ "WECHAT_ENCODING_AESKEY": "encoding",
+ "WECHAT_CORPID": "corpid",
+ "WECHAT_ADMINS": " admin-1, user-1 ",
+ },
+ client,
+ )
+
+ self.assertIsNotNone(message)
+ self.assertEqual(message.text, "/sites")
+ client.send_msg.assert_not_called()
+
+ def test_text_command_blocks_non_admin(self):
+ """
+ 非管理员发送企业微信斜杠命令时应被拦截。
+ """
+ client = SimpleNamespace(send_msg=Mock())
+ message = self._parse_encrypted_xml(
+ b"""
+
+
+
+
+
+ """,
+ {
+ "WECHAT_TOKEN": "token",
+ "WECHAT_ENCODING_AESKEY": "encoding",
+ "WECHAT_CORPID": "corpid",
+ "WECHAT_ADMINS": "user-1",
+ },
+ client,
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="user-2"
+ )
+
+ def test_bot_text_command_blocks_non_admin(self):
+ """
+ 企业微信智能机器人模式也应拦截非管理员斜杠命令。
+ """
+ module = WechatModule()
+ client = SimpleNamespace(send_msg=Mock())
+ body = json.dumps(
+ {
+ "body": {
+ "from": {"userid": "user-2"},
+ "msgtype": "text",
+ "text": {"content": "/sites"},
+ }
+ }
+ )
+
+ with patch.object(
+ module,
+ "get_config",
+ return_value=SimpleNamespace(
+ name="wechat-bot-test",
+ config={"WECHAT_MODE": "bot", "WECHAT_ADMINS": "user-1"},
+ ),
+ ), patch.object(module, "get_instance", return_value=client):
+ message = module.message_parser(
+ source="wechat-bot-test",
+ body=body,
+ form={},
+ args={},
+ )
+
+ self.assertIsNone(message)
+ client.send_msg.assert_called_once_with(
+ title="只有管理员才有权限执行此命令", userid="user-2"
+ )