Compare commits

..

4 Commits
v2.13.14 ... v2

Author SHA1 Message Date
InfinityPacer
43e89ebf77 fix: respect explicit subscribe best version settings (#6001)
* fix(subscribe): respect explicit best version settings

* fix(subscribe): isolate delete event failures
2026-06-25 11:31:13 +08:00
xiaoQQya
bc52653ec1 fix: 修复观众加入日期获取问题 (#5999) 2026-06-24 20:47:23 +08:00
jxxghp
4233ebfba6 Persist notification clear markers 2026-06-24 16:58:08 +08:00
jxxghp
af32c5e9bb chore: bump moviepilot-rust to 0.1.11 2026-06-24 14:49:30 +08:00
13 changed files with 259 additions and 32 deletions

View File

@@ -1,4 +1,5 @@
import json import json
import time
from typing import Union, Any, List, Optional from typing import Union, Any, List, Optional
from fastapi import APIRouter, BackgroundTasks, Depends, Request from fastapi import APIRouter, BackgroundTasks, Depends, Request
@@ -13,16 +14,54 @@ from app.core.security import verify_token, verify_apitoken
from app.db import get_async_db from app.db import get_async_db
from app.db.models import User from app.db.models import User
from app.db.message_oper import MessageOper from app.db.message_oper import MessageOper
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser from app.db.user_oper import get_current_active_superuser
from app.helper.service import ServiceConfigHelper from app.helper.service import ServiceConfigHelper
from app.helper.webpush import is_webpush_subscription_gone from app.helper.webpush import is_webpush_subscription_gone
from app.log import logger from app.log import logger
from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt from app.modules.wechat.WXBizMsgCrypt3 import WXBizMsgCrypt
from app.schemas.types import MessageChannel from app.schemas.types import MessageChannel, SystemConfigKey
router = APIRouter() router = APIRouter()
def _normalize_notification_clear_timestamp(value: Any) -> int:
"""
规范化通知清理时间戳。
"""
try:
normalized_value = int(value or 0)
except (TypeError, ValueError):
return 0
return normalized_value if normalized_value > 0 else 0
def _get_notification_clear_before() -> schemas.NotificationClearBefore:
"""
读取通知中心清理时间配置。
"""
value = SystemConfigOper().get(SystemConfigKey.NotificationClearBefore)
if isinstance(value, dict):
return schemas.NotificationClearBefore(
all=_normalize_notification_clear_timestamp(value.get("all")),
system=_normalize_notification_clear_timestamp(value.get("system")),
media=_normalize_notification_clear_timestamp(value.get("media")),
)
return schemas.NotificationClearBefore(
all=_normalize_notification_clear_timestamp(value),
)
def _format_notification_clear_time(value: int) -> Optional[str]:
"""
将清理时间戳转换为消息表使用的时间字符串。
"""
if not value:
return None
timestamp = value / 1000 if value > 10000000000 else value
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
def start_message_chain(body: Any, form: Any, args: Any): def start_message_chain(body: Any, form: Any, args: Any):
""" """
启动链式任务 启动链式任务
@@ -140,10 +179,32 @@ async def get_notification_message(
""" """
获取系统发送的通知消息列表。 获取系统发送的通知消息列表。
""" """
messages = await MessageOper(db).async_list_sent_by_page(page=page, count=count) clear_before = _get_notification_clear_before()
messages = await MessageOper(db).async_list_sent_by_page(
page=page,
count=count,
all_clear_before=_format_notification_clear_time(clear_before.all),
system_clear_before=_format_notification_clear_time(clear_before.system),
media_clear_before=_format_notification_clear_time(clear_before.media),
)
return [schemas.NotificationHistoryItem(**message.to_dict()) for message in messages] return [schemas.NotificationHistoryItem(**message.to_dict()) for message in messages]
@router.delete("/notification", summary="清理通知消息", response_model=schemas.Response)
async def clear_notification_message(
scope: schemas.NotificationClearScope = schemas.NotificationClearScope.All,
_: schemas.TokenPayload = Depends(verify_token),
):
"""
记录通知中心清理时间,后续通知历史查询会在服务端过滤。
"""
clear_before = _get_notification_clear_before()
value = clear_before.model_dump()
value[scope.value] = int(time.time() * 1000)
await SystemConfigOper().async_set(SystemConfigKey.NotificationClearBefore, value)
return schemas.Response(success=True, data={"clear_before": value})
def wechat_verify( def wechat_verify(
echostr: str, echostr: str,
msg_signature: str, msg_signature: str,

View File

@@ -19,6 +19,7 @@ from app.db.models.user import User
from app.db.systemconfig_oper import SystemConfigOper from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user_async from app.db.user_oper import get_current_active_user_async
from app.helper.server import MoviePilotServerHelper from app.helper.server import MoviePilotServerHelper
from app.log import logger
from app.scheduler import Scheduler from app.scheduler import Scheduler
from app.schemas.types import MediaType, EventType, SystemConfigKey from app.schemas.types import MediaType, EventType, SystemConfigKey
@@ -41,6 +42,14 @@ def start_subscribe_add(
) )
def build_subscribe_event_payload(subscribe: Subscribe) -> dict:
"""
从 ORM 已加载字段构造订阅事件快照,避免异步接口里属性懒加载触发隐式 IO。
"""
values = subscribe.__dict__
return {column.name: values.get(column.name) for column in subscribe.__table__.columns}
@router.get("/", summary="查询所有订阅", response_model=List[schemas.Subscribe]) @router.get("/", summary="查询所有订阅", response_model=List[schemas.Subscribe])
async def read_subscribes( async def read_subscribes(
db: AsyncSession = Depends(get_async_db), db: AsyncSession = Depends(get_async_db),
@@ -349,16 +358,27 @@ async def delete_subscribe_by_mediaid(
subscribe = await Subscribe.async_get_by_mediaid(db, mediaid) subscribe = await Subscribe.async_get_by_mediaid(db, mediaid)
if subscribe: if subscribe:
delete_subscribes.append(subscribe) delete_subscribes.append(subscribe)
delete_events = []
for subscribe in delete_subscribes: for subscribe in delete_subscribes:
# 在删除之前获取订阅信息 subscribe_info = build_subscribe_event_payload(subscribe)
subscribe_info = subscribe.to_dict() subscribe_id = subscribe_info.get("id")
subscribe_id = subscribe.id if not subscribe_id:
await Subscribe.async_delete(db, subscribe_id) continue
# 发送事件 delete_events.append((subscribe_id, subscribe_info))
await eventmanager.async_send_event( await db.delete(subscribe)
EventType.SubscribeDeleted, try:
{"subscribe_id": subscribe_id, "subscribe_info": subscribe_info}, await db.commit()
) except Exception:
await db.rollback()
raise
for subscribe_id, subscribe_info in delete_events:
try:
await eventmanager.async_send_event(
EventType.SubscribeDeleted,
{"subscribe_id": subscribe_id, "subscribe_info": subscribe_info},
)
except Exception as err:
logger.error(f"发送订阅删除事件失败:{subscribe_id} - {err}", exc_info=True)
return schemas.Response(success=True) return schemas.Response(success=True)
@@ -726,8 +746,13 @@ async def delete_subscribe(
subscribe = await Subscribe.async_get(db, subscribe_id) subscribe = await Subscribe.async_get(db, subscribe_id)
if subscribe: if subscribe:
# 在删除之前获取订阅信息 # 在删除之前获取订阅信息
subscribe_info = subscribe.to_dict() subscribe_info = build_subscribe_event_payload(subscribe)
await Subscribe.async_delete(db, subscribe_id) await db.delete(subscribe)
try:
await db.commit()
except Exception:
await db.rollback()
raise
# 发送事件 # 发送事件
await eventmanager.async_send_event( await eventmanager.async_send_event(
EventType.SubscribeDeleted, EventType.SubscribeDeleted,
@@ -735,6 +760,6 @@ async def delete_subscribe(
) )
# 统计订阅 # 统计订阅
MoviePilotServerHelper.sub_done_async( MoviePilotServerHelper.sub_done_async(
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid} {"tmdbid": subscribe_info.get("tmdbid"), "doubanid": subscribe_info.get("doubanid")}
) )
return schemas.Response(success=True) return schemas.Response(success=True)

View File

@@ -582,8 +582,8 @@ class SubscribeChain(ChainBase):
"include") else kwargs.get("include"), "include") else kwargs.get("include"),
'exclude': self.__get_default_subscribe_config(mtype, "exclude") if not kwargs.get( 'exclude': self.__get_default_subscribe_config(mtype, "exclude") if not kwargs.get(
"exclude") else kwargs.get("exclude"), "exclude") else kwargs.get("exclude"),
'best_version': self.__get_default_subscribe_config(mtype, "best_version") if not kwargs.get( 'best_version': self.__get_default_subscribe_config(mtype, "best_version")
"best_version") else kwargs.get("best_version"), if kwargs.get("best_version") is None else kwargs.get("best_version"),
'best_version_full': self.__get_default_subscribe_config(mtype, "best_version_full") 'best_version_full': self.__get_default_subscribe_config(mtype, "best_version_full")
if kwargs.get("best_version_full") is None else kwargs.get("best_version_full"), if kwargs.get("best_version_full") is None else kwargs.get("best_version_full"),
'search_imdbid': self.__get_default_subscribe_config(mtype, "search_imdbid") if not kwargs.get( 'search_imdbid': self.__get_default_subscribe_config(mtype, "search_imdbid") if not kwargs.get(

View File

@@ -114,9 +114,21 @@ class MessageOper(DbOper):
return await Message.async_list_by_page(self._db, page, count) return await Message.async_list_by_page(self._db, page, count)
async def async_list_sent_by_page( async def async_list_sent_by_page(
self, page: Optional[int] = 1, count: Optional[int] = 30 self,
page: Optional[int] = 1,
count: Optional[int] = 30,
all_clear_before: Optional[str] = None,
system_clear_before: Optional[str] = None,
media_clear_before: Optional[str] = None,
) -> list[Message]: ) -> list[Message]:
""" """
分页获取系统发送的通知消息。 分页获取系统发送的通知消息。
""" """
return await Message.async_list_sent_by_page(self._db, page, count) return await Message.async_list_sent_by_page(
self._db,
page,
count,
all_clear_before=all_clear_before,
system_clear_before=system_clear_before,
media_clear_before=media_clear_before,
)

View File

@@ -1,6 +1,6 @@
from typing import List, Optional from typing import List, Optional
from sqlalchemy import Column, Integer, String, JSON, Index, select from sqlalchemy import Column, Integer, String, JSON, Index, and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -81,14 +81,38 @@ class Message(Base):
@classmethod @classmethod
@async_db_query @async_db_query
async def async_list_sent_by_page( async def async_list_sent_by_page(
cls, db: AsyncSession, page: Optional[int] = 1, count: Optional[int] = 30 cls,
db: AsyncSession,
page: Optional[int] = 1,
count: Optional[int] = 30,
all_clear_before: Optional[str] = None,
system_clear_before: Optional[str] = None,
media_clear_before: Optional[str] = None,
) -> List["Message"]: ) -> List["Message"]:
""" """
分页获取系统发送的通知消息。 分页获取系统发送的通知消息。
""" """
statement = select(cls).where(cls.action == 1)
if all_clear_before:
statement = statement.where(cls.reg_time > all_clear_before)
if system_clear_before:
statement = statement.where(
or_(
and_(cls.image.isnot(None), cls.image != ""),
cls.reg_time > system_clear_before,
)
)
if media_clear_before:
statement = statement.where(
or_(
cls.image.is_(None),
cls.image == "",
cls.reg_time > media_clear_before,
)
)
result = await db.execute( result = await db.execute(
select(cls) statement
.where(cls.action == 1)
.order_by(cls.reg_time.desc(), cls.id.desc()) .order_by(cls.reg_time.desc(), cls.id.desc())
.offset((page - 1) * count) .offset((page - 1) * count)
.limit(count) .limit(count)

View File

@@ -284,7 +284,8 @@ class NexusPhpSiteUserInfo(SiteParserBase):
# 加入日期 # 加入日期
join_at_text = html.xpath( join_at_text = html.xpath(
'//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()' '//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()'
'|//div/b[text()="加入日期"]/../text()') '|//div/b[text()="加入日期"]/../text()'
'|//*[@id="outer"]/table/tr/td/div/div[1]/div[2]/div[3]/span[1]/span/@title')
if join_at_text: if join_at_text:
self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip()) self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip())

View File

@@ -7,6 +7,32 @@ from pydantic import BaseModel, Field, field_validator
from app.schemas.types import ContentType, NotificationType, MessageChannel from app.schemas.types import ContentType, NotificationType, MessageChannel
class NotificationClearScope(str, Enum):
"""
通知中心清理范围。
"""
# 全部消息
All = "all"
# 系统消息
System = "system"
# 媒体消息
Media = "media"
class NotificationClearBefore(BaseModel):
"""
通知中心按范围记录的清理时间。
"""
# 全部消息清理时间
all: int = 0
# 系统消息清理时间
system: int = 0
# 媒体消息清理时间
media: int = 0
class MessageResponse(BaseModel): class MessageResponse(BaseModel):
""" """
消息发送响应包含消息ID等信息用于后续编辑 消息发送响应包含消息ID等信息用于后续编辑

View File

@@ -62,9 +62,9 @@ class Subscribe(BaseModel):
# 下载器 # 下载器
downloader: Optional[str] = None downloader: Optional[str] = None
# 是否洗版 # 是否洗版
best_version: Optional[int] = 0 best_version: Optional[int] = None
# 是否只洗全集整包 # 是否只洗全集整包
best_version_full: Optional[int] = 0 best_version_full: Optional[int] = None
# 当前优先级 # 当前优先级
current_priority: Optional[int] = None current_priority: Optional[int] = None
# 洗版时已下载剧集的优先级状态 # 洗版时已下载剧集的优先级状态

View File

@@ -267,6 +267,8 @@ class SystemConfigKey(Enum):
AIAgentConfig = "AIAgentConfig" AIAgentConfig = "AIAgentConfig"
# 通知消息格式模板 # 通知消息格式模板
NotificationTemplates = "NotificationTemplates" NotificationTemplates = "NotificationTemplates"
# 通知中心清理时间
NotificationClearBefore = "NotificationClearBefore"
# 刮削开关设置 # 刮削开关设置
ScrapingSwitchs = "ScrapingSwitchs" ScrapingSwitchs = "ScrapingSwitchs"
# 插件安装统计 # 插件安装统计

View File

@@ -1,4 +1,4 @@
moviepilot-rust~=0.1.10 moviepilot-rust~=0.1.11
pydantic>=2.13.4,<3.0.0 pydantic>=2.13.4,<3.0.0
pydantic-settings>=2.14.1,<3.0.0 pydantic-settings>=2.14.1,<3.0.0
SQLAlchemy~=2.0.50 SQLAlchemy~=2.0.50

View File

@@ -420,7 +420,7 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| POST | `/api/v1/torrent/cache/refresh` | Refresh torrent cache | | POST | `/api/v1/torrent/cache/refresh` | Refresh torrent cache |
| POST | `/api/v1/torrent/cache/reidentify/{domain}/{torrent_hash}` | Re-identify torrent. Params: `tmdbid`, `doubanid` | | POST | `/api/v1/torrent/cache/reidentify/{domain}/{torrent_hash}` | Re-identify torrent. Params: `tmdbid`, `doubanid` |
### Message (6 endpoints) ### Message (8 endpoints)
| Method | Path | Description | | Method | Path | Description |
|--------|------|-------------| |--------|------|-------------|
@@ -428,6 +428,8 @@ All endpoints are under the base URL `{MP_HOST}`. Path parameters are shown as `
| GET | `/api/v1/message/` | Callback verification. Params: `token`, `echostr`, `msg_signature`, `timestamp`, `nonce`, `source` | | GET | `/api/v1/message/` | Callback verification. Params: `token`, `echostr`, `msg_signature`, `timestamp`, `nonce`, `source` |
| POST | `/api/v1/message/web` | Send web message. Params: `text` (required) | | POST | `/api/v1/message/web` | Send web message. Params: `text` (required) |
| GET | `/api/v1/message/web` | Get web messages. Params: `page`, `count` | | GET | `/api/v1/message/web` | Get web messages. Params: `page`, `count` |
| GET | `/api/v1/message/notification` | Get notification history. Params: `page`, `count`; server filters cleared history |
| DELETE | `/api/v1/message/notification` | Mark notification history as cleared. Params: `scope` (`all`, `system`, `media`) |
| POST | `/api/v1/message/webpush/subscribe` | WebPush subscribe. Body: Subscription JSON | | POST | `/api/v1/message/webpush/subscribe` | WebPush subscribe. Body: Subscription JSON |
| POST | `/api/v1/message/webpush/send` | Send WebPush notification. Body: SubscriptionMessage JSON | | POST | `/api/v1/message/webpush/send` | Send WebPush notification. Body: SubscriptionMessage JSON |

View File

@@ -1,15 +1,18 @@
import asyncio
import json import json
from unittest.mock import Mock from unittest.mock import Mock
from app.db import SessionFactory from app.api.endpoints.message import clear_notification_message, get_notification_message
from app.db.message_oper import MessageOper
from app.db.models.message import Message
from app.chain import ChainBase from app.chain import ChainBase
from app.core.context import Context, MediaInfo, TorrentInfo from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.meta import MetaBase from app.core.meta import MetaBase
from app.db import AsyncSessionFactory, SessionFactory
from app.db.message_oper import MessageOper
from app.db.models.message import Message
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper from app.helper.message import MessageHelper
from app.schemas import Notification from app.schemas import Notification, NotificationClearScope
from app.schemas.types import MediaType, NotificationType from app.schemas.types import MediaType, NotificationType, SystemConfigKey
def _clear_messages() -> None: def _clear_messages() -> None:
@@ -19,6 +22,7 @@ def _clear_messages() -> None:
with SessionFactory() as db: with SessionFactory() as db:
db.query(Message).delete() db.query(Message).delete()
db.commit() db.commit()
SystemConfigOper().delete(SystemConfigKey.NotificationClearBefore)
def _reset_message_helper(helper: MessageHelper) -> None: def _reset_message_helper(helper: MessageHelper) -> None:
@@ -30,6 +34,15 @@ def _reset_message_helper(helper: MessageHelper) -> None:
helper._recent_notification_keys.clear() helper._recent_notification_keys.clear()
def _set_message_time(title: str, reg_time: str) -> None:
"""
调整测试消息时间,避免消息写入时的当前秒影响清理边界断言。
"""
with SessionFactory() as db:
db.query(Message).filter(Message.title == title).update({"reg_time": reg_time})
db.commit()
def test_notification_history_only_lists_sent_messages() -> None: def test_notification_history_only_lists_sent_messages() -> None:
""" """
通知历史应返回已发送消息,包含通过消息链登记的智能体消息。 通知历史应返回已发送消息,包含通过消息链登记的智能体消息。
@@ -58,6 +71,48 @@ def test_web_message_history_returns_all_messages() -> None:
assert [message.title for message in messages] == ["普通通知", "用户消息", "智能体回复"] assert [message.title for message in messages] == ["普通通知", "用户消息", "智能体回复"]
def test_notification_clear_marker_filters_history_across_requests() -> None:
"""
通知清理时间写入后端后,后续通知历史查询应直接返回过滤后的结果。
"""
_clear_messages()
oper = MessageOper()
oper.add(
title="旧系统通知",
text="任务失败",
action=1,
mtype=NotificationType.Other,
)
oper.add(
title="旧媒体通知",
text="影片入库",
image="https://example.com/poster.jpg",
action=1,
)
_set_message_time("旧系统通知", "2026-01-01 00:00:00")
_set_message_time("旧媒体通知", "2026-01-01 00:00:00")
asyncio.run(clear_notification_message(scope=NotificationClearScope.Media))
oper.add(
title="新媒体通知",
text="影片入库",
image="https://example.com/new.jpg",
action=1,
)
_set_message_time("新媒体通知", "2999-01-01 00:00:00")
async def _load_titles() -> list[str]:
"""
通过异步接口读取通知标题。
"""
async with AsyncSessionFactory() as db:
messages = await get_notification_message(db=db)
return [message.title for message in messages]
assert asyncio.run(_load_titles()) == ["新媒体通知", "旧系统通知"]
def test_system_helper_message_only_enters_sse_queue() -> None: def test_system_helper_message_only_enters_sse_queue() -> None:
""" """
系统实时消息只进入前端 SSE 队列,不写入通知历史。 系统实时消息只进入前端 SSE 队列,不写入通知历史。

View File

@@ -372,6 +372,25 @@ class SubscribeChainTest(TestCase):
media_info=SimpleNamespace(type=MediaType.TV, tmdb_id=1, douban_id=None), media_info=SimpleNamespace(type=MediaType.TV, tmdb_id=1, douban_id=None),
) )
def test_default_kwargs_respects_explicit_zero_best_version(self):
"""显式关闭洗版时必须保留 0仅未传值才应用默认订阅规则。"""
def _default_config(_mtype, key):
return 1 if key in {"best_version", "best_version_full"} else None
with patch.object(SubscribeChain, "_SubscribeChain__get_default_subscribe_config", side_effect=_default_config):
explicit = SubscribeChain()._SubscribeChain__get_default_kwargs(
MediaType.TV,
best_version=0,
best_version_full=0,
)
omitted = SubscribeChain()._SubscribeChain__get_default_kwargs(MediaType.TV)
self.assertEqual(explicit["best_version"], 0)
self.assertEqual(explicit["best_version_full"], 0)
self.assertEqual(omitted["best_version"], 1)
self.assertEqual(omitted["best_version_full"], 1)
def test_format_subscribe_progress_preserves_special_season_zero(self): def test_format_subscribe_progress_preserves_special_season_zero(self):
"""订阅列表展示必须把 S0 当作合法季号,而不是回退到第 1 季。""" """订阅列表展示必须把 S0 当作合法季号,而不是回退到第 1 季。"""
subscribe = self._build_subscribe(season=0, total_episode=5, lack_episode=2) subscribe = self._build_subscribe(season=0, total_episode=5, lack_episode=2)