Compare commits

..

2 Commits
v2.13.14 ... v2

Author SHA1 Message Date
jxxghp
4233ebfba6 Persist notification clear markers 2026-06-24 16:58:08 +08:00
jxxghp
af32c5e9bb chore: bump moviepilot-rust to 0.1.11 2026-06-24 14:49:30 +08:00
8 changed files with 197 additions and 15 deletions

View File

@@ -1,4 +1,5 @@
import json
import time
from typing import Union, Any, List, Optional
from fastapi import APIRouter, BackgroundTasks, Depends, Request
@@ -13,16 +14,54 @@ from app.core.security import verify_token, verify_apitoken
from app.db import get_async_db
from app.db.models import User
from app.db.message_oper import MessageOper
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.helper.service import ServiceConfigHelper
from app.helper.webpush import is_webpush_subscription_gone
from app.log import logger
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.schemas.types import MessageChannel
from app.schemas.types import MessageChannel, SystemConfigKey
router = APIRouter()
def _normalize_notification_clear_timestamp(value: Any) -> int:
"""
规范化通知清理时间戳。
"""
try:
normalized_value = int(value or 0)
except (TypeError, ValueError):
return 0
return normalized_value if normalized_value > 0 else 0
def _get_notification_clear_before() -> schemas.NotificationClearBefore:
"""
读取通知中心清理时间配置。
"""
value = SystemConfigOper().get(SystemConfigKey.NotificationClearBefore)
if isinstance(value, dict):
return schemas.NotificationClearBefore(
all=_normalize_notification_clear_timestamp(value.get("all")),
system=_normalize_notification_clear_timestamp(value.get("system")),
media=_normalize_notification_clear_timestamp(value.get("media")),
)
return schemas.NotificationClearBefore(
all=_normalize_notification_clear_timestamp(value),
)
def _format_notification_clear_time(value: int) -> Optional[str]:
"""
将清理时间戳转换为消息表使用的时间字符串。
"""
if not value:
return None
timestamp = value / 1000 if value > 10000000000 else value
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
def start_message_chain(body: Any, form: Any, args: Any):
"""
启动链式任务
@@ -140,10 +179,32 @@ async def get_notification_message(
"""
获取系统发送的通知消息列表。
"""
messages = await MessageOper(db).async_list_sent_by_page(page=page, count=count)
clear_before = _get_notification_clear_before()
messages = await MessageOper(db).async_list_sent_by_page(
page=page,
count=count,
all_clear_before=_format_notification_clear_time(clear_before.all),
system_clear_before=_format_notification_clear_time(clear_before.system),
media_clear_before=_format_notification_clear_time(clear_before.media),
)
return [schemas.NotificationHistoryItem(**message.to_dict()) for message in messages]
@router.delete("/notification", summary="清理通知消息", response_model=schemas.Response)
async def clear_notification_message(
scope: schemas.NotificationClearScope = schemas.NotificationClearScope.All,
_: schemas.TokenPayload = Depends(verify_token),
):
"""
记录通知中心清理时间,后续通知历史查询会在服务端过滤。
"""
clear_before = _get_notification_clear_before()
value = clear_before.model_dump()
value[scope.value] = int(time.time() * 1000)
await SystemConfigOper().async_set(SystemConfigKey.NotificationClearBefore, value)
return schemas.Response(success=True, data={"clear_before": value})
def wechat_verify(
echostr: str,
msg_signature: str,

View File

@@ -114,9 +114,21 @@ class MessageOper(DbOper):
return await Message.async_list_by_page(self._db, page, count)
async def async_list_sent_by_page(
self, page: Optional[int] = 1, count: Optional[int] = 30
self,
page: Optional[int] = 1,
count: Optional[int] = 30,
all_clear_before: Optional[str] = None,
system_clear_before: Optional[str] = None,
media_clear_before: Optional[str] = None,
) -> list[Message]:
"""
分页获取系统发送的通知消息。
"""
return await Message.async_list_sent_by_page(self._db, page, count)
return await Message.async_list_sent_by_page(
self._db,
page,
count,
all_clear_before=all_clear_before,
system_clear_before=system_clear_before,
media_clear_before=media_clear_before,
)

View File

@@ -1,6 +1,6 @@
from typing import List, Optional
from sqlalchemy import Column, Integer, String, JSON, Index, select
from sqlalchemy import Column, Integer, String, JSON, Index, and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
@@ -81,14 +81,38 @@ class Message(Base):
@classmethod
@async_db_query
async def async_list_sent_by_page(
cls, db: AsyncSession, page: Optional[int] = 1, count: Optional[int] = 30
cls,
db: AsyncSession,
page: Optional[int] = 1,
count: Optional[int] = 30,
all_clear_before: Optional[str] = None,
system_clear_before: Optional[str] = None,
media_clear_before: Optional[str] = None,
) -> List["Message"]:
"""
分页获取系统发送的通知消息。
"""
statement = select(cls).where(cls.action == 1)
if all_clear_before:
statement = statement.where(cls.reg_time > all_clear_before)
if system_clear_before:
statement = statement.where(
or_(
and_(cls.image.isnot(None), cls.image != ""),
cls.reg_time > system_clear_before,
)
)
if media_clear_before:
statement = statement.where(
or_(
cls.image.is_(None),
cls.image == "",
cls.reg_time > media_clear_before,
)
)
result = await db.execute(
select(cls)
.where(cls.action == 1)
statement
.order_by(cls.reg_time.desc(), cls.id.desc())
.offset((page - 1) * count)
.limit(count)

View File

@@ -7,6 +7,32 @@ from pydantic import BaseModel, Field, field_validator
from app.schemas.types import ContentType, NotificationType, MessageChannel
class NotificationClearScope(str, Enum):
"""
通知中心清理范围。
"""
# 全部消息
All = "all"
# 系统消息
System = "system"
# 媒体消息
Media = "media"
class NotificationClearBefore(BaseModel):
"""
通知中心按范围记录的清理时间。
"""
# 全部消息清理时间
all: int = 0
# 系统消息清理时间
system: int = 0
# 媒体消息清理时间
media: int = 0
class MessageResponse(BaseModel):
"""
消息发送响应包含消息ID等信息用于后续编辑

View File

@@ -267,6 +267,8 @@ class SystemConfigKey(Enum):
AIAgentConfig = "AIAgentConfig"
# 通知消息格式模板
NotificationTemplates = "NotificationTemplates"
# 通知中心清理时间
NotificationClearBefore = "NotificationClearBefore"
# 刮削开关设置
ScrapingSwitchs = "ScrapingSwitchs"
# 插件安装统计

View File

@@ -1,4 +1,4 @@
moviepilot-rust~=0.1.10
moviepilot-rust~=0.1.11
pydantic>=2.13.4,<3.0.0
pydantic-settings>=2.14.1,<3.0.0
SQLAlchemy~=2.0.50

View File

@@ -420,7 +420,7 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| POST | `/api/v1/torrent/cache/refresh` | Refresh torrent cache |
| POST | `/api/v1/torrent/cache/reidentify/{domain}/{torrent_hash}` | Re-identify torrent. Params: `tmdbid`, `doubanid` |
### Message (6 endpoints)
### Message (8 endpoints)
| Method | Path | Description |
|--------|------|-------------|
@@ -428,6 +428,8 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| GET | `/api/v1/message/` | Callback verification. Params: `token`, `echostr`, `msg_signature`, `timestamp`, `nonce`, `source` |
| POST | `/api/v1/message/web` | Send web message. Params: `text` (required) |
| GET | `/api/v1/message/web` | Get web messages. Params: `page`, `count` |
| GET | `/api/v1/message/notification` | Get notification history. Params: `page`, `count`; server filters cleared history |
| DELETE | `/api/v1/message/notification` | Mark notification history as cleared. Params: `scope` (`all`, `system`, `media`) |
| POST | `/api/v1/message/webpush/subscribe` | WebPush subscribe. Body: Subscription JSON |
| POST | `/api/v1/message/webpush/send` | Send WebPush notification. Body: SubscriptionMessage JSON |

View File

@@ -1,15 +1,18 @@
import asyncio
import json
from unittest.mock import Mock
from app.db import SessionFactory
from app.db.message_oper import MessageOper
from app.db.models.message import Message
from app.api.endpoints.message import clear_notification_message, get_notification_message
from app.chain import ChainBase
from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.meta import MetaBase
from app.db import AsyncSessionFactory, SessionFactory
from app.db.message_oper import MessageOper
from app.db.models.message import Message
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.schemas import Notification
from app.schemas.types import MediaType, NotificationType
from app.schemas import Notification, NotificationClearScope
from app.schemas.types import MediaType, NotificationType, SystemConfigKey
def _clear_messages() -> None:
@@ -19,6 +22,7 @@ def _clear_messages() -> None:
with SessionFactory() as db:
db.query(Message).delete()
db.commit()
SystemConfigOper().delete(SystemConfigKey.NotificationClearBefore)
def _reset_message_helper(helper: MessageHelper) -> None:
@@ -30,6 +34,15 @@ def _reset_message_helper(helper: MessageHelper) -> None:
helper._recent_notification_keys.clear()
def _set_message_time(title: str, reg_time: str) -> None:
"""
调整测试消息时间,避免消息写入时的当前秒影响清理边界断言。
"""
with SessionFactory() as db:
db.query(Message).filter(Message.title == title).update({"reg_time": reg_time})
db.commit()
def test_notification_history_only_lists_sent_messages() -> None:
"""
通知历史应返回已发送消息,包含通过消息链登记的智能体消息。
@@ -58,6 +71,48 @@ def test_web_message_history_returns_all_messages() -> None:
assert [message.title for message in messages] == ["普通通知", "用户消息", "智能体回复"]
def test_notification_clear_marker_filters_history_across_requests() -> None:
"""
通知清理时间写入后端后,后续通知历史查询应直接返回过滤后的结果。
"""
_clear_messages()
oper = MessageOper()
oper.add(
title="旧系统通知",
text="任务失败",
action=1,
mtype=NotificationType.Other,
)
oper.add(
title="旧媒体通知",
text="影片入库",
image="https://example.com/poster.jpg",
action=1,
)
_set_message_time("旧系统通知", "2026-01-01 00:00:00")
_set_message_time("旧媒体通知", "2026-01-01 00:00:00")
asyncio.run(clear_notification_message(scope=NotificationClearScope.Media))
oper.add(
title="新媒体通知",
text="影片入库",
image="https://example.com/new.jpg",
action=1,
)
_set_message_time("新媒体通知", "2999-01-01 00:00:00")
async def _load_titles() -> list[str]:
"""
通过异步接口读取通知标题。
"""
async with AsyncSessionFactory() as db:
messages = await get_notification_message(db=db)
return [message.title for message in messages]
assert asyncio.run(_load_titles()) == ["新媒体通知", "旧系统通知"]
def test_system_helper_message_only_enters_sse_queue() -> None:
"""
系统实时消息只进入前端 SSE 队列,不写入通知历史。