Files
MoviePilot/tests/test_message_notifications.py

228 lines
7.4 KiB
Python

import json
from unittest.mock import Mock
from app.db import SessionFactory
from app.db.message_oper import MessageOper
from app.db.models.message import Message
from app.chain import ChainBase
from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.meta import MetaBase
from app.helper.message import MessageHelper
from app.schemas import Notification
from app.schemas.types import MediaType, NotificationType
def _clear_messages() -> None:
"""
清空消息表,隔离通知测试数据。
"""
with SessionFactory() as db:
db.query(Message).delete()
db.commit()
def _reset_message_helper(helper: MessageHelper) -> None:
"""
清空单例消息队列和去重缓存,避免用例间互相影响。
"""
while helper.get() is not None:
pass
helper._recent_notification_keys.clear()
def test_notification_history_only_lists_sent_messages() -> None:
"""
通知历史应返回已发送消息,包含通过消息链登记的智能体消息。
"""
_clear_messages()
oper = MessageOper()
oper.add(title="系统通知", text="下载完成", action=1, mtype=NotificationType.Download)
oper.add(title="用户消息", text="帮我搜索", action=0)
oper.add(title="智能体回复", text="已处理", action=1, mtype=NotificationType.Agent)
messages = MessageOper().list_by_page(page=1, count=10)
assert [message.title for message in messages if message.action == 1] == ["智能体回复", "系统通知"]
def test_web_message_history_returns_all_messages() -> None:
"""
Web 消息历史返回消息表中的全部记录。
"""
_clear_messages()
oper = MessageOper()
oper.add(title="智能体回复", text="已处理", action=1, mtype=NotificationType.Agent)
oper.add(title="用户消息", text="/ai 帮我处理", action=0)
oper.add(title="普通通知", text="下载完成", action=1, mtype=NotificationType.Download)
messages = MessageOper().list_by_page(page=1, count=10)
assert [message.title for message in messages] == ["普通通知", "用户消息", "智能体回复"]
def test_system_helper_message_only_enters_sse_queue() -> None:
"""
系统实时消息只进入前端 SSE 队列,不写入通知历史。
"""
_clear_messages()
helper = MessageHelper()
_reset_message_helper(helper)
helper.put("调度任务执行失败", role="system", title="系统错误")
assert MessageOper().list_by_page(page=1, count=10) == []
realtime_message = json.loads(helper.get())
assert realtime_message["type"] == "system"
assert realtime_message["title"] == "系统错误"
assert realtime_message["text"] == "调度任务执行失败"
def test_plugin_helper_message_deduplicates_recent_sse_messages() -> None:
"""
短时间内相同插件实时消息只应推送一次,不写入通知历史。
"""
_clear_messages()
helper = MessageHelper()
_reset_message_helper(helper)
helper.put("站点刷流任务出错,获取下载器实例失败,请检查配置", role="plugin", title="站点刷流")
helper.put("站点刷流任务出错,获取下载器实例失败,请检查配置", role="plugin", title="站点刷流")
assert MessageOper().list_by_page(page=1, count=10) == []
assert json.loads(helper.get())["title"] == "站点刷流"
assert helper.get() is None
def test_agent_helper_message_does_not_enter_sse_queue() -> None:
"""
智能体消息不进入前端 SSE 队列。
"""
helper = MessageHelper()
_reset_message_helper(helper)
helper.put("智能体回复", role="agent", title="MoviePilot助手")
assert helper.get() is None
def test_user_helper_message_does_not_enter_sse_queue() -> None:
"""
用户消息不进入前端 SSE 队列。
"""
helper = MessageHelper()
_reset_message_helper(helper)
helper.put("用户输入", role="user", title="admin")
assert helper.get() is None
def test_notification_post_message_is_persisted_without_sse_queue() -> None:
"""
业务通知通过消息链发送时只登记数据库,不进入前端 SSE 队列。
"""
_clear_messages()
helper = MessageHelper()
_reset_message_helper(helper)
chain = ChainBase()
chain.messagequeue.send_message = Mock()
chain.eventmanager.send_event = Mock()
chain.post_message(
Notification(
mtype=NotificationType.Download,
title="下载完成",
text="影片已加入下载器",
)
)
messages = MessageOper().list_by_page(page=1, count=10)
assert len(messages) == 1
assert messages[0].title == "下载完成"
assert messages[0].mtype == NotificationType.Download.value
assert helper.get() is None
chain.messagequeue.send_message.assert_called_once()
def test_agent_notification_post_message_is_persisted_without_sse_queue() -> None:
"""
智能体消息通过消息链发送时登记数据库,但不进入前端 SSE 队列。
"""
_clear_messages()
helper = MessageHelper()
_reset_message_helper(helper)
chain = ChainBase()
chain.messagequeue.send_message = Mock()
chain.eventmanager.send_event = Mock()
chain.post_message(
Notification(
mtype=NotificationType.Agent,
title="MoviePilot助手",
text="已完成处理",
)
)
messages = MessageOper().list_by_page(page=1, count=10)
assert len(messages) == 1
assert messages[0].title == "MoviePilot助手"
assert messages[0].mtype == NotificationType.Agent.value
assert helper.get() is None
chain.messagequeue.send_message.assert_called_once()
def test_transient_notification_post_message_skips_history_but_dispatches() -> None:
"""
标记为不保存历史的过程消息应跳过数据库登记,但仍正常派发。
"""
_clear_messages()
chain = ChainBase()
chain.messagequeue.send_message = Mock()
chain.eventmanager.send_event = Mock()
chain.post_message(
Notification(
title="请选择下载目录",
text="1. 默认目录",
save_history=False,
)
)
assert MessageOper().list_by_page(page=1, count=10) == []
assert "save_history" not in chain.eventmanager.send_event.call_args.kwargs["data"]
chain.eventmanager.send_event.assert_called_once()
chain.messagequeue.send_message.assert_called_once()
def test_transient_media_and_torrent_lists_skip_history_but_dispatch() -> None:
"""
传统交互候选列表标记为不保存历史时,只发送到渠道,不写入消息表。
"""
_clear_messages()
chain = ChainBase()
media = MediaInfo(type=MediaType.MOVIE, title="星际穿越", year="2014")
torrent = Context(
meta_info=MetaBase("星际穿越"),
media_info=media,
torrent_info=TorrentInfo(
title="星际穿越.2014.1080p",
site_name="TestSite",
enclosure="https://example.com/demo.torrent",
),
)
chain.messagequeue.send_message = Mock()
chain.post_medias_message(
Notification(title="请选择媒体", save_history=False),
medias=[media],
)
chain.post_torrents_message(
Notification(title="请选择资源", save_history=False),
torrents=[torrent],
)
assert MessageOper().list_by_page(page=1, count=10) == []
assert chain.messagequeue.send_message.call_count == 2