feat: normalize internal system user ID in notification dispatch

- Add SYSTEM_INTERNAL_USER_ID constant and helpers to app.utils.identity
- Ensure internal user ID is normalized to None before dispatching notifications, preventing misrouting to external channels
- Refactor MessageChain to use normalization for all message dispatch methods
- Add tests for internal user ID normalization and notification dispatch behavior
This commit is contained in:
jxxghp
2026-04-21 14:32:14 +08:00
parent 1282ad5004
commit ae15eac0f8
4 changed files with 146 additions and 25 deletions

View File

@@ -34,6 +34,7 @@ from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas.message import ChannelCapabilityManager, ChannelCapability
from app.schemas.types import MessageChannel
from app.utils.identity import SYSTEM_INTERNAL_USER_ID
class AgentChain(ChainBase):
@@ -543,16 +544,12 @@ class MoviePilotAgent:
"""
通过原渠道发送消息给用户
"""
user_id = self.user_id
if self.user_id == "system":
user_id = None
await AgentChain().async_post_message(
Notification(
channel=self.channel,
source=self.source,
mtype=NotificationType.Agent,
userid=user_id,
userid=self.user_id,
username=self.username,
title=title,
text=message,
@@ -853,7 +850,7 @@ class AgentManager:
try:
# 每次使用唯一的 session_id避免共享上下文
session_id = f"__agent_heartbeat_{uuid.uuid4().hex[:12]}__"
user_id = "system"
user_id = SYSTEM_INTERNAL_USER_ID
logger.info("智能体心跳唤醒:开始检查待处理任务...")
@@ -948,7 +945,7 @@ class AgentManager:
return
session_id = f"__agent_retry_transfer_batch_{uuid.uuid4().hex[:8]}__"
user_id = "system"
user_id = SYSTEM_INTERNAL_USER_ID
ids_str = ", ".join(str(i) for i in history_ids)
logger.info(
@@ -1107,7 +1104,7 @@ class AgentManager:
手动触发单条历史记录的 AI 整理。
"""
session_id = f"__agent_manual_redo_{history_id}_{uuid.uuid4().hex[:8]}__"
user_id = "system"
user_id = SYSTEM_INTERNAL_USER_ID
agent = MoviePilotAgent(
session_id=session_id,
user_id=user_id,

View File

@@ -38,6 +38,7 @@ from app.schemas import (
TransferDirectoryConf,
MessageResponse,
)
from app.utils.identity import normalize_internal_user_id
from app.schemas.category import CategoryConfig
from app.schemas.types import (
TorrentStatus,
@@ -119,6 +120,21 @@ class ChainBase(metaclass=ABCMeta):
"""
self.filecache.delete(filename)
@staticmethod
def _normalize_notification_for_dispatch(
message: Notification
) -> Notification:
"""
规范化待发送的通知消息。
后台任务会复用内部占位用户ID作为会话身份这里在真正发送前清空
让消息重新走默认通知路由或基于 targets 的目标解析。
"""
dispatch_message = copy.deepcopy(message)
dispatch_message.userid = normalize_internal_user_id(
dispatch_message.userid
)
return dispatch_message
async def async_remove_cache(self, filename: str) -> None:
"""
异步删除缓存同时删除Redis和本地缓存
@@ -1119,10 +1135,13 @@ class ChainBase(metaclass=ABCMeta):
# 保存消息
self.messagehelper.put(message, role="user", title=message.title)
self.messageoper.add(**message.model_dump())
dispatch_message = self._normalize_notification_for_dispatch(message)
# 发送消息按设置隔离
if not message.userid and message.mtype:
if not dispatch_message.userid and dispatch_message.mtype:
# 消息隔离设置
notify_action = ServiceConfigHelper.get_notification_switch(message.mtype)
notify_action = ServiceConfigHelper.get_notification_switch(
dispatch_message.mtype
)
if notify_action:
# 'admin' 'user,admin' 'user' 'all'
actions = notify_action.split(",")
@@ -1131,7 +1150,7 @@ class ChainBase(metaclass=ABCMeta):
send_orignal = False
useroper = UserOper()
for action in actions:
send_message = copy.deepcopy(message)
send_message = copy.deepcopy(dispatch_message)
if action == "admin" and not admin_sended:
# 仅发送管理员
logger.info(f"{send_message.mtype} 的消息已设置发送给管理员")
@@ -1186,13 +1205,13 @@ class ChainBase(metaclass=ABCMeta):
# 发送消息事件
self.eventmanager.send_event(
etype=EventType.NoticeMessage,
data={**message.model_dump(), "type": message.mtype},
data={**dispatch_message.model_dump(), "type": dispatch_message.mtype},
)
# 按原消息发送
self.messagequeue.send_message(
"post_message",
message=message,
immediately=True if message.userid else False,
message=dispatch_message,
immediately=True if dispatch_message.userid else False,
**kwargs,
)
@@ -1233,10 +1252,13 @@ class ChainBase(metaclass=ABCMeta):
# 保存消息
self.messagehelper.put(message, role="user", title=message.title)
await self.messageoper.async_add(**message.model_dump())
dispatch_message = self._normalize_notification_for_dispatch(message)
# 发送消息按设置隔离
if not message.userid and message.mtype:
if not dispatch_message.userid and dispatch_message.mtype:
# 消息隔离设置
notify_action = ServiceConfigHelper.get_notification_switch(message.mtype)
notify_action = ServiceConfigHelper.get_notification_switch(
dispatch_message.mtype
)
if notify_action:
# 'admin' 'user,admin' 'user' 'all'
actions = notify_action.split(",")
@@ -1245,7 +1267,7 @@ class ChainBase(metaclass=ABCMeta):
send_orignal = False
useroper = UserOper()
for action in actions:
send_message = copy.deepcopy(message)
send_message = copy.deepcopy(dispatch_message)
if action == "admin" and not admin_sended:
# 仅发送管理员
logger.info(f"{send_message.mtype} 的消息已设置发送给管理员")
@@ -1300,13 +1322,13 @@ class ChainBase(metaclass=ABCMeta):
# 发送消息事件
await self.eventmanager.async_send_event(
etype=EventType.NoticeMessage,
data={**message.model_dump(), "type": message.mtype},
data={**dispatch_message.model_dump(), "type": dispatch_message.mtype},
)
# 按原消息发送
await self.messagequeue.async_send_message(
"post_message",
message=message,
immediately=True if message.userid else False,
message=dispatch_message,
immediately=True if dispatch_message.userid else False,
**kwargs,
)
@@ -1324,11 +1346,12 @@ class ChainBase(metaclass=ABCMeta):
message, role="user", note=note_list, title=message.title
)
self.messageoper.add(**message.model_dump(), note=note_list)
dispatch_message = self._normalize_notification_for_dispatch(message)
return self.messagequeue.send_message(
"post_medias_message",
message=message,
message=dispatch_message,
medias=medias,
immediately=True if message.userid else False,
immediately=True if dispatch_message.userid else False,
)
def post_torrents_message(
@@ -1345,11 +1368,12 @@ class ChainBase(metaclass=ABCMeta):
message, role="user", note=note_list, title=message.title
)
self.messageoper.add(**message.model_dump(), note=note_list)
dispatch_message = self._normalize_notification_for_dispatch(message)
return self.messagequeue.send_message(
"post_torrents_message",
message=message,
message=dispatch_message,
torrents=torrents,
immediately=True if message.userid else False,
immediately=True if dispatch_message.userid else False,
)
def delete_message(
@@ -1411,7 +1435,10 @@ class ChainBase(metaclass=ABCMeta):
:param message: 消息体
:return: 消息响应包含message_id, chat_id等
"""
return self.run_module("send_direct_message", message=message)
return self.run_module(
"send_direct_message",
message=self._normalize_notification_for_dispatch(message),
)
def metadata_img(
self,

27
app/utils/identity.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Optional, Union
# 后台任务会话使用的内部占位用户ID。
# 它只用于在 agent/memory/session 侧标识“系统触发的任务”,
# 不能直接作为真实消息接收人下发到 Telegram/企业微信 等通知渠道。
SYSTEM_INTERNAL_USER_ID = "system"
def is_internal_user_id(userid: Optional[Union[str, int]]) -> bool:
"""
判断是否为系统内部占位用户ID。
"""
return (
isinstance(userid, str)
and userid.strip().lower() == SYSTEM_INTERNAL_USER_ID
)
def normalize_internal_user_id(
userid: Optional[Union[str, int]]
) -> Optional[Union[str, int]]:
"""
将系统内部占位用户ID归一化为 None避免被通知渠道误认为真实接收人。
"""
if is_internal_user_id(userid):
return None
return userid

View File

@@ -0,0 +1,70 @@
import sys
import unittest
from types import ModuleType
from unittest.mock import patch
sys.modules.setdefault("qbittorrentapi", ModuleType("qbittorrentapi"))
setattr(sys.modules["qbittorrentapi"], "TorrentFilesList", list)
sys.modules.setdefault("transmission_rpc", ModuleType("transmission_rpc"))
setattr(sys.modules["transmission_rpc"], "File", object)
sys.modules.setdefault("psutil", ModuleType("psutil"))
from app.chain.message import MessageChain
from app.schemas import Notification
from app.utils.identity import (
SYSTEM_INTERNAL_USER_ID,
is_internal_user_id,
normalize_internal_user_id,
)
class TestSystemNotificationDispatch(unittest.TestCase):
def test_internal_userid_identity_helpers(self):
self.assertTrue(is_internal_user_id(SYSTEM_INTERNAL_USER_ID))
self.assertTrue(is_internal_user_id(" System "))
self.assertIsNone(normalize_internal_user_id(SYSTEM_INTERNAL_USER_ID))
self.assertEqual(normalize_internal_user_id("10001"), "10001")
def test_post_message_normalizes_internal_userid_before_queueing(self):
chain = MessageChain()
message = Notification(
userid=SYSTEM_INTERNAL_USER_ID,
username="admin",
title="后台报告",
text="任务完成",
)
with patch("app.chain.MessageTemplateHelper.render", return_value=message), patch.object(
chain.messagehelper, "put"
), patch.object(chain.messageoper, "add"), patch.object(
chain.eventmanager, "send_event"
) as send_event, patch.object(
chain.messagequeue, "send_message"
) as send_message:
chain.post_message(message)
event_payload = send_event.call_args.kwargs["data"]
queued_message = send_message.call_args.kwargs["message"]
self.assertIsNone(event_payload["userid"])
self.assertIsNone(queued_message.userid)
self.assertFalse(send_message.call_args.kwargs["immediately"])
def test_send_direct_message_normalizes_internal_userid(self):
chain = MessageChain()
message = Notification(
userid=SYSTEM_INTERNAL_USER_ID,
username="admin",
title="后台报告",
text="任务完成",
)
with patch.object(chain, "run_module") as run_module:
chain.send_direct_message(message)
sent_message = run_module.call_args.kwargs["message"]
self.assertIsNone(sent_message.userid)
if __name__ == "__main__":
unittest.main()