Compare commits

...

55 Commits

Author SHA1 Message Date
jxxghp
4666b9051d 更新 version.py 2025-01-23 07:11:50 +08:00
jxxghp
56c524a822 Merge pull request #3792 from InfinityPacer/feature/cache 2025-01-23 06:55:31 +08:00
jxxghp
43e8df1b9f Merge pull request #3791 from InfinityPacer/feature/subscribe 2025-01-23 06:54:09 +08:00
InfinityPacer
dbc465f6e5 fix(cache): update tmdb match_web base_wait to 5 for finer control 2025-01-23 00:11:36 +08:00
InfinityPacer
bfbd3c527c fix(cache): ensure consistent parameter ordering in get_cache_key 2025-01-22 23:53:19 +08:00
InfinityPacer
412405f69b fix(subscribe): optimize site list retrieval in get_sub_sites 2025-01-22 23:22:15 +08:00
jxxghp
12b74eb04f 更新 subscribe.py 2025-01-22 22:50:51 +08:00
jxxghp
2305a6287a fix #3777 2025-01-22 22:23:15 +08:00
jxxghp
68245be081 fix meta 2025-01-22 22:20:17 +08:00
jxxghp
29e01294bd Merge pull request #3789 from InfinityPacer/feature/cache
fix(cache): enhance fanart image caching
2025-01-22 22:16:37 +08:00
InfinityPacer
d35bee54a6 fix(limit): log accurate wait time after triggering limit 2025-01-22 21:34:59 +08:00
InfinityPacer
bf63be18e4 fix(cache): enhance fanart image caching 2025-01-22 21:34:44 +08:00
jxxghp
3dc7adc61a 更新 scheduler.py 2025-01-22 19:39:02 +08:00
jxxghp
047d1e0afd Merge pull request #3788 from InfinityPacer/feature/cache
feat(cache): optimize serialization with type-based caching
2025-01-22 18:47:08 +08:00
InfinityPacer
7c017faf31 feat(cache): optimize serialization with type-based caching 2025-01-22 17:41:11 +08:00
jxxghp
7a59565761 fix 优化订阅匹配的识别量 2025-01-22 16:37:49 +08:00
jxxghp
9afb904d40 Merge pull request #3785 from InfinityPacer/feature/cache
fix(cache): enhance tmdb match_web rate-limiting and caching
2025-01-22 15:27:08 +08:00
jxxghp
8189de589a fix #3775 2025-01-22 15:21:10 +08:00
jxxghp
c458d7525d fix #3778 2025-01-22 15:01:24 +08:00
InfinityPacer
5c7bd95f6b fix(cache): enhance tmdb match_web rate-limiting and caching 2025-01-22 14:58:56 +08:00
InfinityPacer
70c4509682 feat(cache): add exists to check key presence in cache backends 2025-01-22 14:25:30 +08:00
jxxghp
f34e36c571 feat:Follow订阅分享人功能 2025-01-22 13:32:13 +08:00
jxxghp
5054ffe7e4 Merge pull request #3776 from kiri-to/patch-1 2025-01-21 19:38:30 +08:00
kiri-to
ed30933ca2 Update nexus_php.py
修复'站点数据刷新'时潜在429问题
2025-01-21 19:10:52 +08:00
jxxghp
2a4111ecce 更新 version.py 2025-01-21 12:56:09 +08:00
jxxghp
5bc8709605 fix 全x集未识别集数问题 2025-01-21 08:16:20 +08:00
jxxghp
efa2edf869 fix 2025-01-20 18:28:06 +08:00
jxxghp
5c1e972feb 更新 __init__.py 2025-01-20 16:02:53 +08:00
jxxghp
8c23e7a7b7 fix #3760 2025-01-20 08:30:29 +08:00
jxxghp
57183f8cdc Merge pull request #3759 from wikrin/v2 2025-01-20 07:13:26 +08:00
Attente
0481b49c04 refactor(app/helper): optimize module reloading mechanism 2025-01-19 22:40:07 +08:00
jxxghp
7eb9b5e92d Merge pull request #3755 from InfinityPacer/feature/cache 2025-01-19 12:56:56 +08:00
InfinityPacer
2a409d83d4 feat(redis): update redis maxmemory 2025-01-19 12:41:30 +08:00
jxxghp
785a3f5de8 Merge pull request #3752 from InfinityPacer/feature/cache 2025-01-19 08:06:50 +08:00
InfinityPacer
7c17c1c73b feat(redis): update comments 2025-01-19 05:18:49 +08:00
InfinityPacer
0ea429782c feat(redis): optimize serialize 2025-01-19 05:13:31 +08:00
InfinityPacer
7a8f880dbe feat(redis): optimize memory limit and cache cleanup 2025-01-19 04:28:16 +08:00
InfinityPacer
0a86b72110 feat(redis): add encoding for keys and optimize deletion 2025-01-19 04:28:16 +08:00
InfinityPacer
cb5c06ee7e feat(redis): add Redis support 2025-01-19 04:28:15 +08:00
InfinityPacer
9f22ce5cc0 feat(cache): remove maxsize from recommend_cache 2025-01-19 01:26:33 +08:00
jxxghp
86e1fbc28a Merge pull request #3751 from InfinityPacer/feature/cache 2025-01-19 01:02:54 +08:00
InfinityPacer
a5c5f7c718 feat(cache): enhance cache region and decorator 2025-01-19 00:55:45 +08:00
InfinityPacer
ff5d94782f fix(TMDB): adjust trending cache maxsize to 1024 2025-01-18 23:45:03 +08:00
jxxghp
58a1bd2c86 Merge pull request #3750 from wikrin/v2 2025-01-18 07:08:18 +08:00
jxxghp
f78ba6afb0 Merge pull request #3749 from InfinityPacer/feature/cache 2025-01-18 07:07:52 +08:00
Attente
331f3455f8 fix: 指定集数 2025-01-18 02:52:26 +08:00
InfinityPacer
ad0241b7f1 feat(cache): set default skip_empty to False 2025-01-18 02:44:56 +08:00
InfinityPacer
d9508533e1 feat(cache): add cache region support 2025-01-18 02:32:08 +08:00
InfinityPacer
6d2059447e feat(cache): enhance get_plugins to skip empty during network errors 2025-01-18 02:14:01 +08:00
InfinityPacer
11d4f27268 feat(cache): migrate cachetools usage to unified cache backend 2025-01-18 02:12:20 +08:00
InfinityPacer
a29f987649 feat(cache): add cache backend implementations and decorator support 2025-01-18 02:10:17 +08:00
jxxghp
3e692c790e Merge remote-tracking branch 'origin/v2' into v2 2025-01-17 20:31:40 +08:00
jxxghp
35cc214492 fix #3743 2025-01-17 20:31:23 +08:00
jxxghp
bae7bff70d fix #3744 2025-01-17 16:41:01 +08:00
jxxghp
71ef6f6a61 fix media_files Exception 2025-01-17 12:32:55 +08:00
38 changed files with 909 additions and 280 deletions

View File

@@ -15,10 +15,11 @@ from app.db import get_db
from app.db.models.subscribe import Subscribe
from app.db.models.subscribehistory import SubscribeHistory
from app.db.models.user import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.helper.subscribe import SubscribeHelper
from app.scheduler import Scheduler
from app.schemas.types import MediaType, EventType
from app.schemas.types import MediaType, EventType, SystemConfigKey
router = APIRouter()
@@ -497,6 +498,42 @@ def subscribe_fork(
return result
@router.get("/follow", summary="查询已Follow的订阅分享人", response_model=List[str])
def followed_subscribers(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询已Follow的订阅分享人
"""
return SystemConfigOper().get(SystemConfigKey.FollowSubscribers) or []
@router.post("/follow", summary="Follow订阅分享人", response_model=schemas.Response)
def follow_subscriber(
share_uid: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
Follow订阅分享人
"""
subscribers = SystemConfigOper().get(SystemConfigKey.FollowSubscribers) or []
if share_uid and share_uid not in subscribers:
subscribers.append(share_uid)
SystemConfigOper().set(SystemConfigKey.FollowSubscribers, subscribers)
return schemas.Response(success=True)
@router.delete("/follow", summary="取消Follow订阅分享人", response_model=schemas.Response)
def unfollow_subscriber(
share_uid: str = None,
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
取消Follow订阅分享人
"""
subscribers = SystemConfigOper().get(SystemConfigKey.FollowSubscribers) or []
if share_uid and share_uid in subscribers:
subscribers.remove(share_uid)
SystemConfigOper().set(SystemConfigKey.FollowSubscribers, subscribers)
return schemas.Response(success=True)
@router.get("/shares", summary="查询分享的订阅", response_model=List[schemas.SubscribeShare])
def popular_subscribes(
name: str = None,

View File

@@ -680,6 +680,14 @@ def arr_add_series(tv: schemas.SonarrSeries,
)
@arr_router.put("/series", summary="更新剧集订阅")
def arr_update_series(tv: schemas.SonarrSeries) -> Any:
"""
更新Sonarr剧集订阅
"""
return arr_add_series(tv)
@arr_router.delete("/series/{tid}", summary="删除剧集订阅")
def arr_remove_series(tid: int, db: Session = Depends(get_db), _: str = Depends(verify_apikey)) -> Any:
"""

View File

@@ -531,6 +531,9 @@ class ChainBase(metaclass=ABCMeta):
# 管理员发过了,此消息不发了
logger.info(f"用户 {send_message.username} 不存在,消息无法发送到对应用户")
continue
elif send_message.username == settings.SUPERUSER:
# 管理员同名已发送
admin_sended = True
else:
# 按原消息发送全体
if not admin_sended:

View File

@@ -1,9 +1,8 @@
import threading
from typing import List, Union, Optional, Generator
from cachetools import cached, TTLCache
from app.chain import ChainBase
from app.core.cache import cached
from app.core.config import global_vars
from app.db.mediaserver_oper import MediaServerOper
from app.helper.service import ServiceConfigHelper
@@ -94,7 +93,7 @@ class MediaServerChain(ChainBase):
"""
return self.run_module("mediaserver_latest", count=count, server=server, username=username)
@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(maxsize=1, ttl=3600)
def get_latest_wallpapers(self, server: str = None, count: int = 10,
remote: bool = True, username: str = None) -> List[str]:
"""

View File

@@ -1,18 +1,15 @@
import inspect
import io
import tempfile
from functools import wraps
from pathlib import Path
from typing import Any, Callable, List
from typing import Any, List
from PIL import Image
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.chain import ChainBase
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
from app.core.cache import cache_backend, cached
from app.core.config import settings, global_vars
from app.log import logger
from app.schemas import MediaType
@@ -23,42 +20,7 @@ from app.utils.singleton import Singleton
# 推荐相关的专用缓存
recommend_ttl = 24 * 3600
recommend_cache = TTLCache(maxsize=256, ttl=recommend_ttl)
# 推荐缓存装饰器,避免偶发网络获取数据为空时,页面由于缓存为空长时间渲染异常问题
def cached_with_empty_check(func: Callable):
"""
缓存装饰器,用于缓存函数的返回结果,仅在结果非空时进行缓存
:param func: 被装饰的函数
:return: 包装后的函数
"""
@wraps(func)
def wrapper(*args, **kwargs):
signature = inspect.signature(func)
resolved_kwargs = {}
# 获取默认值并结合传递的参数(如果有)
for param, value in signature.parameters.items():
if param in kwargs:
# 使用显式传递的参数
resolved_kwargs[param] = kwargs[param]
elif value.default is not inspect.Parameter.empty:
# 没有传递参数时使用默认值
resolved_kwargs[param] = value.default
# 使用 cachetools 缓存,构造缓存键
cache_key = f"{func.__name__}_{hashkey(*args, **resolved_kwargs)}"
if cache_key in recommend_cache:
return recommend_cache[cache_key]
result = func(*args, **kwargs)
# 如果返回值为空,则不缓存
if result in [None, [], {}]:
return result
recommend_cache[cache_key] = result
return result
return wrapper
recommend_cache_region = "recommend"
class RecommendChain(ChainBase, metaclass=Singleton):
@@ -78,7 +40,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
刷新推荐
"""
logger.debug("Starting to refresh Recommend data.")
recommend_cache.clear()
cache_backend.clear(region=recommend_cache_region)
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
@@ -194,7 +156,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_movies(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "", page: int = 1) -> Any:
"""
@@ -208,7 +170,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [movie.to_dict() for movie in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_tvs(self, sort_by: str = "popularity.desc", with_genres: str = "",
with_original_language: str = "zh|en|ja|ko", page: int = 1) -> Any:
"""
@@ -222,7 +184,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [tv.to_dict() for tv in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def tmdb_trending(self, page: int = 1) -> Any:
"""
TMDB流行趋势
@@ -231,7 +193,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [info.to_dict() for info in infos] if infos else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def bangumi_calendar(self, page: int = 1, count: int = 30) -> Any:
"""
Bangumi每日放送
@@ -240,7 +202,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in medias[(page - 1) * count: page * count]] if medias else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_showing(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣正在热映
@@ -249,7 +211,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_movies(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电影
@@ -259,7 +221,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_tvs(self, sort: str = "R", tags: str = "", page: int = 1, count: int = 30) -> Any:
"""
豆瓣最新电视剧
@@ -269,7 +231,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_top250(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣电影TOP250
@@ -278,7 +240,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_weekly_chinese(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣国产剧集榜
@@ -287,7 +249,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_weekly_global(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣全球剧集榜
@@ -296,7 +258,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_animation(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门动漫
@@ -305,7 +267,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in tvs] if tvs else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_movie_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电影
@@ -314,7 +276,7 @@ class RecommendChain(ChainBase, metaclass=Singleton):
return [media.to_dict() for media in movies] if movies else []
@log_execution_time(logger=logger)
@cached_with_empty_check
@cached(ttl=recommend_ttl, region=recommend_cache_region)
def douban_tv_hot(self, page: int = 1, count: int = 30) -> Any:
"""
豆瓣热门电视剧

View File

@@ -6,6 +6,7 @@ import time
from datetime import datetime
from typing import Dict, List, Optional, Union, Tuple
from app import schemas
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.chain.media import MediaChain
@@ -27,8 +28,6 @@ from app.helper.message import MessageHelper
from app.helper.subscribe import SubscribeHelper
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import NotExistMediaInfo, Notification, SubscrbieInfo, SubscribeEpisodeInfo, SubscribeDownloadFileInfo, \
SubscribeLibraryFileInfo
from app.schemas.types import MediaType, SystemConfigKey, MessageChannel, NotificationType, EventType
from app.utils.singleton import Singleton
@@ -173,14 +172,14 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
logger.error(f'{mediainfo.title_year} {err_msg}')
if not exist_ok and message:
# 失败发回原用户
self.post_message(Notification(channel=channel,
source=source,
mtype=NotificationType.Subscribe,
title=f"{mediainfo.title_year} {metainfo.season} "
f"添加订阅失败!",
text=f"{err_msg}",
image=mediainfo.get_message_image(),
userid=userid))
self.post_message(schemas.Notification(channel=channel,
source=source,
mtype=NotificationType.Subscribe,
title=f"{mediainfo.title_year} {metainfo.season} "
f"添加订阅失败!",
text=f"{err_msg}",
image=mediainfo.get_message_image(),
userid=userid))
return None, err_msg
elif message:
logger.info(f'{mediainfo.title_year} {metainfo.season} 添加订阅成功')
@@ -193,12 +192,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
else:
link = settings.MP_DOMAIN('#/subscribe/movie?tab=mysub')
# 订阅成功按规则发送消息
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f"{mediainfo.title_year} {metainfo.season} 已添加订阅",
text=text,
image=mediainfo.get_message_image(),
link=link,
username=username))
self.post_message(schemas.Notification(mtype=NotificationType.Subscribe,
title=f"{mediainfo.title_year} {metainfo.season} 已添加订阅",
text=text,
image=mediainfo.get_message_image(),
link=link,
username=username))
# 发送事件
EventManager().send_event(EventType.SubscribeAdded, {
"subscribe_id": sid,
@@ -409,7 +408,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
def finish_subscribe_or_not(self, subscribe: Subscribe, meta: MetaBase, mediainfo: MediaInfo,
downloads: List[Context] = None,
lefts: Dict[Union[int | str], Dict[int, NotExistMediaInfo]] = None,
lefts: Dict[Union[int | str], Dict[int, schemas.NotExistMediaInfo]] = None,
force: bool = False):
"""
判断是否应完成订阅
@@ -464,18 +463,16 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"""
# 从系统配置获取默认订阅站点
default_sites = self.systemconfig.get(SystemConfigKey.RssSites) or []
# 如果订阅未指定站点信息,直接返回默认站点
# 如果订阅未指定站点,直接返回默认站点
if not subscribe.sites:
return default_sites
# 如果默认订阅站点未设置,直接返回订阅指定站点
if not default_sites:
return subscribe.sites or []
# 尝试解析订阅中的站点数据
user_sites = subscribe.sites
# 计算 user_sites 和 default_sites 的交集
intersection_sites = [site for site in user_sites if site in default_sites]
# 如果交集与原始订阅不一致,更新数据库
if set(intersection_sites) != set(user_sites):
self.subscribeoper.update(subscribe.id, {
"sites": intersection_sites
})
# 如果交集为空,返回默认站点
return intersection_sites if intersection_sites else default_sites
@@ -574,9 +571,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
continue
# 有自定义识别词时,需要判断是否需要重新识别
apply_words = None
if custom_words_list:
_, apply_words = WordsMatcher().prepare(torrent_info.title,
# 使用org_string应用一次后理论上不能再次应用
_, apply_words = WordsMatcher().prepare(torrent_meta.org_string,
custom_words=custom_words_list)
if apply_words:
logger.info(
@@ -584,6 +581,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 重新识别元数据
torrent_meta = MetaInfo(title=torrent_info.title, subtitle=torrent_info.description,
custom_words=custom_words_list)
# 更新元数据缓存
context.meta_info = torrent_meta
# 媒体信息需要重新识别
torrent_mediainfo = None
@@ -594,8 +593,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
torrent_mediainfo = self.recognize_media(meta=torrent_meta)
if torrent_mediainfo:
# 更新种子缓存
if not apply_words:
context.media_info = torrent_mediainfo
context.media_info = torrent_mediainfo
else:
# 通过标题匹配兜底
logger.warn(
@@ -607,9 +605,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
logger.info(
f'{mediainfo.title_year} 通过标题匹配到可选资源:{torrent_info.site_name} - {torrent_info.title}')
torrent_mediainfo = mediainfo
# 更新种子缓存
if not apply_words:
context.media_info = mediainfo
context.media_info = torrent_mediainfo
else:
continue
@@ -784,6 +780,63 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
})
logger.info(f'{subscribe.name} 订阅元数据更新完成')
def follow(self):
"""
刷新follow的用户分享并自动添加订阅
"""
follow_users: List[str] = self.systemconfig.get(SystemConfigKey.FollowSubscribers)
if not follow_users:
return
share_subs = self.subscribehelper.get_shares()
logger.info(f'开始刷新follow用户分享订阅 ...')
success_count = 0
for share_sub in share_subs:
uid = share_sub.get("share_uid")
if uid and uid in follow_users:
# 订阅已存在则跳过
if self.subscribeoper.exists(tmdbid=share_sub.get("tmdbid"),
doubanid=share_sub.get("doubanid"),
season=share_sub.get("season")):
continue
# 去除无效属性
for key in list(share_sub.keys()):
if not hasattr(schemas.Subscribe(), key):
share_sub.pop(key)
# 类型转换
subscribe_in = schemas.Subscribe(**share_sub)
mtype = MediaType(subscribe_in.type)
# 豆瓣标题处理
if subscribe_in.doubanid or subscribe_in.bangumiid:
meta = MetaInfo(subscribe_in.name)
subscribe_in.name = meta.name
subscribe_in.season = meta.begin_season
# 标题转换
if subscribe_in.name:
title = subscribe_in.name
else:
title = None
sid, message = SubscribeChain().add(mtype=mtype,
title=title,
year=subscribe_in.year,
tmdbid=subscribe_in.tmdbid,
season=subscribe_in.season,
doubanid=subscribe_in.doubanid,
bangumiid=subscribe_in.bangumiid,
username="订阅分享",
best_version=subscribe_in.best_version,
save_path=subscribe_in.save_path,
search_imdbid=subscribe_in.search_imdbid,
custom_words=subscribe_in.custom_words,
media_category=subscribe_in.media_category,
filter_groups=subscribe_in.filter_groups,
exist_ok=True)
if sid:
success_count += 1
logger.info(f'follow用户分享订阅 {title} 添加成功')
else:
logger.error(f'follow用户分享订阅 {title} 添加失败:{message}')
logger.info(f'follow用户分享订阅刷新完成共添加 {success_count} 个订阅')
def __update_subscribe_note(self, subscribe: Subscribe, downloads: List[Context]):
"""
更新已下载信息到note字段
@@ -840,7 +893,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
return note
return []
def __update_lack_episodes(self, lefts: Dict[Union[int, str], Dict[int, NotExistMediaInfo]],
def __update_lack_episodes(self, lefts: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]],
subscribe: Subscribe,
mediainfo: MediaInfo,
update_date: bool = False):
@@ -895,11 +948,11 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
else:
link = settings.MP_DOMAIN('#/subscribe/movie?tab=mysub')
# 完成订阅按规则发送消息
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f'{mediainfo.title_year} {meta.season} 已完成{msgstr}',
image=mediainfo.get_message_image(),
link=link,
username=subscribe.username))
self.post_message(schemas.Notification(mtype=NotificationType.Subscribe,
title=f'{mediainfo.title_year} {meta.season} 已完成{msgstr}',
image=mediainfo.get_message_image(),
link=link,
username=subscribe.username))
# 发送事件
EventManager().send_event(EventType.SubscribeComplete, {
"subscribe_id": subscribe.id,
@@ -919,9 +972,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"""
subscribes = self.subscribeoper.list()
if not subscribes:
self.post_message(Notification(channel=channel,
source=source,
title='没有任何订阅!', userid=userid))
self.post_message(schemas.Notification(channel=channel,
source=source,
title='没有任何订阅!', userid=userid))
return
title = f"共有 {len(subscribes)} 个订阅,回复对应指令操作: " \
f"\n- 删除订阅:/subscribe_delete [id]" \
@@ -937,8 +990,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
f"[{subscribe.total_episode - (subscribe.lack_episode or subscribe.total_episode)}"
f"/{subscribe.total_episode}]")
# 发送列表
self.post_message(Notification(channel=channel, source=source,
title=title, text='\n'.join(messages), userid=userid))
self.post_message(schemas.Notification(channel=channel, source=source,
title=title, text='\n'.join(messages), userid=userid))
def remote_delete(self, arg_str: str, channel: MessageChannel,
userid: Union[str, int] = None, source: str = None):
@@ -946,9 +999,9 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
删除订阅
"""
if not arg_str:
self.post_message(Notification(channel=channel, source=source,
title="请输入正确的命令格式:/subscribe_delete [id]"
"[id]为订阅编号", userid=userid))
self.post_message(schemas.Notification(channel=channel, source=source,
title="请输入正确的命令格式:/subscribe_delete [id]"
"[id]为订阅编号", userid=userid))
return
arg_strs = str(arg_str).split()
for arg_str in arg_strs:
@@ -958,8 +1011,8 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
subscribe_id = int(arg_str)
subscribe = self.subscribeoper.get(subscribe_id)
if not subscribe:
self.post_message(Notification(channel=channel, source=source,
title=f"订阅编号 {subscribe_id} 不存在!", userid=userid))
self.post_message(schemas.Notification(channel=channel, source=source,
title=f"订阅编号 {subscribe_id} 不存在!", userid=userid))
return
# 删除订阅
self.subscribeoper.delete(subscribe_id)
@@ -973,13 +1026,13 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
@staticmethod
def __get_subscribe_no_exits(subscribe_name: str,
no_exists: Dict[Union[int, str], Dict[int, NotExistMediaInfo]],
no_exists: Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]],
mediakey: Union[str, int],
begin_season: int,
total_episode: int,
start_episode: int,
downloaded_episodes: List[int] = None
) -> Tuple[bool, Dict[Union[int, str], Dict[int, NotExistMediaInfo]]]:
) -> Tuple[bool, Dict[Union[int, str], Dict[int, schemas.NotExistMediaInfo]]]:
"""
根据订阅开始集数和总集数结合TMDB信息计算当前订阅的缺失集数
:param subscribe_name: 订阅名称
@@ -1029,7 +1082,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 与原集列表取交集
episodes = list(set(episode_list).intersection(set(new_episodes)))
# 更新集合
no_exists[mediakey][begin_season] = NotExistMediaInfo(
no_exists[mediakey][begin_season] = schemas.NotExistMediaInfo(
season=begin_season,
episodes=episodes,
total_episode=total_episode,
@@ -1056,7 +1109,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
if not episodes:
return True, {}
# 更新集合
no_exists[mediakey][begin_season] = NotExistMediaInfo(
no_exists[mediakey][begin_season] = schemas.NotExistMediaInfo(
season=begin_season,
episodes=episodes,
total_episode=total,
@@ -1070,7 +1123,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 如果存在已下载剧集,则差集为空时,说明所有均已存在
if not episodes:
return True, {}
no_exists[mediakey][begin_season] = NotExistMediaInfo(
no_exists[mediakey][begin_season] = schemas.NotExistMediaInfo(
season=begin_season,
episodes=episodes,
total_episode=total_episode,
@@ -1157,7 +1210,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
"min_seeders_time": default_rule.get("min_seeders_time"),
}.items() if value is not None}
def subscribe_files_info(self, subscribe: Subscribe) -> Optional[SubscrbieInfo]:
def subscribe_files_info(self, subscribe: Subscribe) -> Optional[schemas.SubscrbieInfo]:
"""
订阅相关的下载和文件信息
"""
@@ -1165,10 +1218,10 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
return
# 返回订阅数据
subscribe_info = SubscrbieInfo()
subscribe_info = schemas.SubscrbieInfo()
# 所有集的数据
episodes: Dict[int, SubscribeEpisodeInfo] = {}
episodes: Dict[int, schemas.SubscribeEpisodeInfo] = {}
if subscribe.tmdbid and subscribe.type == MediaType.TV.value:
# 查询TMDB中的集信息
tmdb_episodes = self.tmdbchain.tmdb_episodes(
@@ -1177,7 +1230,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
)
if tmdb_episodes:
for episode in tmdb_episodes:
info = SubscribeEpisodeInfo()
info = schemas.SubscribeEpisodeInfo()
info.title = episode.name
info.description = episode.overview
info.backdrop = f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/w500${episode.still_path}"
@@ -1185,12 +1238,12 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
elif subscribe.type == MediaType.TV.value:
# 根据开始结束集计算集信息
for i in range(subscribe.start_episode or 1, subscribe.total_episode + 1):
info = SubscribeEpisodeInfo()
info = schemas.SubscribeEpisodeInfo()
info.title = f'{i}'
episodes[i] = info
else:
# 电影
info = SubscribeEpisodeInfo()
info = schemas.SubscribeEpisodeInfo()
info.title = subscribe.name
episodes[0] = info
@@ -1205,7 +1258,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 识别文件名
file_meta = MetaInfo(file.filepath)
# 下载文件信息
file_info = SubscribeDownloadFileInfo(
file_info = schemas.SubscribeDownloadFileInfo(
torrent_title=his.torrent_name,
site_name=his.torrent_site,
downloader=file.downloader,
@@ -1248,7 +1301,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 识别文件名
file_meta = MetaInfo(fileitem.path)
# 媒体库文件信息
file_info = SubscribeLibraryFileInfo(
file_info = schemas.SubscribeLibraryFileInfo(
storage=fileitem.storage,
file_path=fileitem.path,
)
@@ -1308,7 +1361,7 @@ class SubscribeChain(ChainBase, metaclass=Singleton):
# 对于电视剧,构造缺失的媒体信息
no_exists = {
mediakey: {
subscribe.season: NotExistMediaInfo(
subscribe.season: schemas.NotExistMediaInfo(
season=subscribe.season,
episodes=[],
total_episode=subscribe.total_episode,

View File

@@ -1,10 +1,9 @@
import random
from typing import Optional, List
from cachetools import cached, TTLCache
from app import schemas
from app.chain import ChainBase
from app.core.cache import cached
from app.core.context import MediaInfo
from app.schemas import MediaType
from app.utils.singleton import Singleton
@@ -119,7 +118,7 @@ class TmdbChain(ChainBase, metaclass=Singleton):
"""
return self.run_module("tmdb_person_credits", person_id=person_id, page=page)
@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(maxsize=1, ttl=3600)
def get_random_wallpager(self) -> Optional[str]:
"""
获取随机壁纸缓存1个小时
@@ -133,7 +132,7 @@ class TmdbChain(ChainBase, metaclass=Singleton):
return info.backdrop_path
return None
@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(maxsize=1, ttl=3600)
def get_trending_wallpapers(self, num: int = 10) -> List[str]:
"""
获取所有流行壁纸

View File

@@ -453,9 +453,12 @@ class TransferChain(ChainBase, metaclass=Singleton):
if transferinfo.transfer_type in ["move"]:
# 所有成功的业务
tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season)
# 记录已处理的种子hash
processed_hashes = set()
for t in tasks:
# 下载器hash
if t.download_hash:
if t.download_hash and t.download_hash not in processed_hashes:
processed_hashes.add(t.download_hash)
if self.remove_torrents(t.download_hash, downloader=t.downloader):
logger.info(f"移动模式删除种子成功:{t.download_hash} ")
# 删除残留目录

552
app/core/cache.py Normal file
View File

@@ -0,0 +1,552 @@
import inspect
import json
import pickle
from abc import ABC, abstractmethod
from functools import wraps
from typing import Any, Dict, Optional
from urllib.parse import quote
import redis
from cachetools import TTLCache
from cachetools.keys import hashkey
from app.core.config import settings
from app.log import logger
# 默认缓存区
DEFAULT_CACHE_REGION = "DEFAULT"
class CacheBackend(ABC):
"""
缓存后端基类,定义通用的缓存接口
"""
@abstractmethod
def set(self, key: str, value: Any, ttl: int, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒
:param region: 缓存的区
:param kwargs: 其他参数
"""
pass
@abstractmethod
def exists(self, key: str, region: str = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
pass
@abstractmethod
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
pass
@abstractmethod
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
pass
@abstractmethod
def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
pass
@abstractmethod
def close(self) -> None:
"""
关闭缓存连接
"""
pass
@staticmethod
def get_region(region: str = DEFAULT_CACHE_REGION):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
@staticmethod
def get_cache_key(func, args, kwargs):
"""
获取缓存的键,通过哈希函数对函数的参数进行处理
:param func: 被装饰的函数
:param args: 位置参数
:param kwargs: 关键字参数
:return: 缓存键
"""
signature = inspect.signature(func)
# 绑定传入的参数并应用默认值
bound = signature.bind(*args, **kwargs)
bound.apply_defaults()
# 忽略第一个参数,如果它是实例(self)或类(cls)
parameters = list(signature.parameters.keys())
if parameters and parameters[0] in ("self", "cls"):
bound.arguments.pop(parameters[0], None)
# 按照函数签名顺序提取参数值列表
keys = [
bound.arguments[param] for param in signature.parameters if param in bound.arguments
]
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
class CacheToolsBackend(CacheBackend):
"""
基于 `cachetools.TTLCache` 实现的缓存后端
特性:
- 支持动态设置缓存的 TTLTime To Live存活时间和最大条目数Maxsize
- 缓存实例按区域region划分不同 region 拥有独立的缓存实例
- 同一 region 共享相同的 TTL 和 Maxsize设置时只能作用于整个 region
限制:
- 不支持按 `key` 独立隔离 TTL 和 Maxsize仅支持作用于 region 级别
"""
def __init__(self, maxsize: int = 1000, ttl: int = 1800):
"""
初始化缓存实例
:param maxsize: 缓存的最大条目数
:param ttl: 默认缓存存活时间,单位秒
"""
self.maxsize = maxsize
self.ttl = ttl
# 存储各个 region 的缓存实例region -> TTLCache
self._region_caches: Dict[str, TTLCache] = {}
def __get_region_cache(self, region: str) -> Optional[TTLCache]:
"""
获取指定区域的缓存实例,如果不存在则返回 None
"""
region = self.get_region(region)
return self._region_caches.get(region)
def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存值支持每个 key 独立配置 TTL 和 Maxsize
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒如果未传入则使用默认值
:param region: 缓存的区
:param kwargs: maxsize: 缓存的最大条目数如果未传入则使用默认值
"""
ttl = ttl or self.ttl
maxsize = kwargs.get("maxsize", self.maxsize)
region = self.get_region(region)
# 如果该 key 尚未有缓存实例,则创建一个新的 TTLCache 实例
region_cache = self._region_caches.setdefault(region, TTLCache(maxsize=maxsize, ttl=ttl))
# 设置缓存值
region_cache[key] = value
def exists(self, key: str, region: str = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return False
return key in region_cache
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Any:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
return region_cache.get(key)
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return None
del region_cache[key]
def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
if region:
# 清理指定缓存区
region_cache = self.__get_region_cache(region)
if region_cache:
region_cache.clear()
logger.info(f"Cleared cache for region: {region}")
else:
# 清除所有区域的缓存
for region_cache in self._region_caches.values():
region_cache.clear()
logger.info("Cleared all cache")
def close(self) -> None:
"""
内存缓存不需要关闭资源
"""
pass
class RedisBackend(CacheBackend):
"""
基于 Redis 实现的缓存后端,支持通过 Redis 存储缓存
特性:
- 支持动态设置缓存的 TTLTime To Live存活时间
- 支持分区域region管理缓存不同的 region 采用独立的命名空间
- 支持自定义最大内存限制maxmemory和内存淘汰策略如 allkeys-lru
限制:
- 由于 Redis 的分布式特性,写入和读取可能受到网络延迟的影响
- Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存
"""
# 类型缓存集合,针对非容器简单类型
_complex_serializable_types = set()
_simple_serializable_types = set()
def __init__(self, redis_url: str = "redis://localhost", ttl: int = 1800):
"""
初始化 Redis 缓存实例
:param redis_url: Redis 服务的 URL
:param ttl: 缓存的存活时间,单位秒
"""
self.redis_url = redis_url
self.ttl = ttl
try:
self.client = redis.Redis.from_url(
redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接,确保 Redis 可用
self.client.ping()
logger.debug(f"Successfully connected to Redis")
self.set_memory_limit()
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise RuntimeError("Redis connection failed") from e
def set_memory_limit(self, policy: str = "allkeys-lru"):
"""
动态设置 Redis 最大内存和内存淘汰策略
:param policy: 淘汰策略(如 'allkeys-lru'
"""
try:
# 如果有显式值,则直接使用,为 0 时说明不限制,如果未配置,开启 BIG_MEMORY_MODE 时为 "1024mb",未开启时为 "256mb"
maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb")
self.client.config_set("maxmemory", maxmemory)
self.client.config_set("maxmemory-policy", policy)
logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}")
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy: {e}")
@staticmethod
def is_container_type(t):
return t in (list, dict, tuple, set)
@classmethod
def serialize(cls, value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
vt = type(value)
# 针对非容器类型使用缓存策略
if not cls.is_container_type(vt):
# 如果已知需要复杂序列化
if vt in cls._complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in cls._simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
cls._simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
cls._complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 针对容器类型,每次尝试简单序列化,不使用缓存
else:
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@classmethod
def deserialize(cls, value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
# @staticmethod
# def serialize(value: Any) -> bytes:
# return msgpack.packb(value, use_bin_type=True)
#
# @staticmethod
# def deserialize(value: bytes) -> Any:
# return msgpack.unpackb(value, raw=False)
def get_redis_key(self, region: str, key: str) -> str:
"""
获取缓存 Key
"""
# 使用 region 作为缓存键的一部分
region = self.get_region(quote(region))
return f"{region}:key:{quote(key)}"
def set(self, key: str, value: Any, ttl: int = None, region: str = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒如果未传入则使用默认值
:param region: 缓存的区
:param kwargs: kwargs
"""
try:
ttl = ttl or self.ttl
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
kwargs.pop("maxsize", None)
self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
logger.error(f"Failed to set key: {key} in region: {region}, error: {e}")
def exists(self, key: str, region: str = DEFAULT_CACHE_REGION) -> bool:
"""
判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回 True否则返回 False
"""
try:
redis_key = self.get_redis_key(region, key)
return self.client.exists(redis_key) == 1
except Exception as e:
logger.error(f"Failed to exists key: {key} region: {region}, error: {e}")
return False
def get(self, key: str, region: str = DEFAULT_CACHE_REGION) -> Optional[Any]:
"""
获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值,如果缓存不存在返回 None
"""
try:
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
return self.deserialize(value) # noqa
return None
except Exception as e:
logger.error(f"Failed to get key: {key} in region: {region}, error: {e}")
return None
def delete(self, key: str, region: str = DEFAULT_CACHE_REGION) -> None:
"""
删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
try:
redis_key = self.get_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}")
def clear(self, region: Optional[str] = None) -> None:
"""
清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
try:
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
# self.client.delete(*self.client.keys(redis_key))
with self.client.pipeline() as pipe:
for key in self.client.scan_iter(redis_key):
pipe.delete(key)
pipe.execute()
logger.info(f"Cleared Redis cache for region: {region}")
else:
self.client.flushdb()
logger.info("Cleared all Redis cache")
except Exception as e:
logger.error(f"Failed to clear cache, region: {region}, error: {e}")
def close(self) -> None:
"""
关闭 Redis 客户端的连接池
"""
if self.client:
self.client.close()
def get_cache_backend(maxsize: int = 1000, ttl: int = 1800) -> CacheBackend:
"""
根据配置获取缓存后端实例
:param maxsize: 缓存的最大条目数
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回缓存后端实例
"""
cache_type = settings.CACHE_BACKEND_TYPE
logger.debug(f"Cache backend type from settings: {cache_type}")
if cache_type == "redis":
redis_url = settings.CACHE_BACKEND_URL
if redis_url:
try:
logger.debug(f"Attempting to use RedisBackend with URL: {redis_url}, TTL: {ttl}")
return RedisBackend(redis_url=redis_url, ttl=ttl)
except RuntimeError:
logger.warning("Falling back to CacheToolsBackend due to Redis connection failure.")
else:
logger.debug("Cache backend type is redis, but no valid REDIS_URL found. "
"Falling back to CacheToolsBackend.")
# 如果不是 Redis回退到内存缓存
logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}")
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
def cached(region: Optional[str] = None, maxsize: int = 1000, ttl: int = 1800,
skip_none: bool = True, skip_empty: bool = False):
"""
自定义缓存装饰器,支持为每个 key 动态传递 maxsize 和 ttl
:param region: 缓存的区
:param maxsize: 缓存的最大条目数,默认值为 1000
:param ttl: 缓存的存活时间,单位秒,默认值为 1800
:param skip_none: 跳过 None 缓存,默认为 True
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:return: 装饰器函数
"""
def should_cache(value: Any) -> bool:
"""
判断是否应该缓存结果,如果返回值是 None 或空值则不缓存
:param value: 要判断的缓存值
:return: 是否缓存结果
"""
if skip_none and value is None:
return False
# if skip_empty and value in [None, [], {}, "", set()]:
if skip_empty and not value:
return False
return True
def decorator(func):
# 获取缓存区
cache_region = region if region is not None else f"{func.__module__}.{func.__name__}"
@wraps(func)
def wrapper(*args, **kwargs):
# 获取缓存键
cache_key = cache_backend.get_cache_key(func, args, kwargs)
# 尝试获取缓存
cached_value = cache_backend.get(cache_key, region=cache_region)
if should_cache(cached_value):
return cached_value
# 执行函数并缓存结果
result = func(*args, **kwargs)
# 判断是否需要缓存
if not should_cache(result):
return result
# 设置缓存(如果有传入的 maxsize 和 ttl则覆盖默认值
cache_backend.set(cache_key, result, ttl=ttl, maxsize=maxsize, region=cache_region)
return result
def cache_clear():
"""
清理缓存区
"""
# 清理缓存区
cache_backend.clear(region=cache_region)
wrapper.cache_region = cache_region
wrapper.cache_clear = cache_clear
return wrapper
return decorator
# 缓存后端实例
cache_backend = get_cache_backend()
def close_cache() -> None:
"""
关闭缓存后端连接并清理资源
"""
try:
if cache_backend:
cache_backend.close()
logger.info("Cache backend closed successfully.")
except Exception as e:
logger.info(f"Error while closing cache backend: {e}")

View File

@@ -71,6 +71,12 @@ class ConfigModel(BaseModel):
DB_TIMEOUT: int = 60
# SQLite 是否启用 WAL 模式,默认关闭
DB_WAL_ENABLE: bool = False
# 缓存类型,支持 cachetools 和 redis默认使用 cachetools
CACHE_BACKEND_TYPE: str = "cachetools"
# 缓存连接字符串,仅外部缓存(如 Redis、Memcached需要
CACHE_BACKEND_URL: Optional[str] = None
# Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb"
CACHE_REDIS_MAXMEMORY: Optional[str] = None
# 配置文件目录
CONFIG_DIR: Optional[str] = None
# 超级管理员
@@ -351,7 +357,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
return default, True
@validator('*', pre=True, always=True)
def generic_type_validator(cls, value: Any, field): # noqa
def generic_type_validator(cls, value: Any, field): # noqa
"""
通用校验器,尝试将配置值转换为期望的类型
"""

View File

@@ -69,7 +69,7 @@ class MetaBase(object):
_subtitle_flag = False
_title_episodel_re = r"Episode\s+(\d{1,4})"
_subtitle_season_re = r"(?<![全共]\s*)[第\s]+([0-9一二三四五六七八九十S\-]+)\s*季(?!\s*[全共])"
_subtitle_season_all_re = r"[全共]\s*([0-9一二三四五六七八九十]+)\s*季|([0-9一二三四五六七八九十]+)\s*季\s*全"
_subtitle_season_all_re = r"[全共]\s*([0-9一二三四五六七八九十]+)\s*季"
_subtitle_episode_re = r"(?<![全共]\s*)[第\s]+([0-9一二三四五六七八九十百零EP]+)\s*[集话話期幕](?!\s*[全共])"
_subtitle_episode_between_re = r"[第]*\s*([0-9一二三四五六七八九十百零]+)\s*[集话話期幕]?\s*-\s*第*\s*([0-9一二三四五六七八九十百零]+)\s*[集话話期幕]"
_subtitle_episode_all_re = r"([0-9一二三四五六七八九十百零]+)\s*集\s*全|[全共]\s*([0-9一二三四五六七八九十百零]+)\s*[集话話期幕]"
@@ -247,7 +247,7 @@ class MetaBase(object):
self.type = MediaType.TV
self._subtitle_flag = True
return
# x集全
# x集全/全x集
episode_all_str = re.search(r'%s' % self._subtitle_episode_all_re, title_text, re.IGNORECASE)
if episode_all_str:
episode_all = episode_all_str.group(1)
@@ -259,8 +259,6 @@ class MetaBase(object):
except Exception as err:
logger.debug(f'识别集失败:{str(err)} - {traceback.format_exc()}')
return
self.begin_episode = None
self.end_episode = None
self.type = MediaType.TV
self._subtitle_flag = True
return

View File

@@ -67,27 +67,6 @@ class UserOper(DbOper):
def get_permissions(self, name: str) -> dict:
"""
获取用户权限
{
"admin": "管理员",
"usermanage": "用户管理",
"dashboard": "仪表板",
"ranking": "推荐榜单",
"resource": {
"search": "搜索站点资源",
"download": "下载站点资源",
},
"subscribe": {
"request": "提交订阅请求",
"autopass": "订阅请求自动批准"
"approve": "审批订阅请求",
"calendar": "查看订阅日历",
"manage": "管理所有订阅"
},
"downloading": {
"view": "查看正在下载任务",
"manager": "管理正在下载任务"
}
}
"""
user = User.get_by_name(self._db, name)
if user:

View File

@@ -99,7 +99,7 @@ class FormatParser(object):
# `details` 格式为 `X`
start_ep = self.__offset.replace("EP", str(self._start_ep))
return int(eval(start_ep)), None, self.part
else:
elif not self._format:
# `details` 格式为 `X,X`
start_ep = self.__offset.replace("EP", str(self._start_ep))
end_ep = self.__offset.replace("EP", str(self._end_ep))

View File

@@ -64,13 +64,12 @@ class ModuleHelper:
def reload_sub_modules(parent_module, parent_module_name):
"""重新加载一级子模块"""
for sub_importer, sub_module_name, sub_is_pkg in pkgutil.walk_packages(parent_module.__path__):
full_sub_module_name = f'{parent_module_name}.{sub_module_name}'
for sub_importer, sub_module_name, sub_is_pkg in pkgutil.walk_packages(parent_module.__path__, parent_module_name+'.'):
try:
full_sub_module = importlib.import_module(full_sub_module_name)
full_sub_module = importlib.import_module(sub_module_name)
importlib.reload(full_sub_module)
except Exception as sub_err:
logger.debug(f'加载子模块 {full_sub_module_name} 失败:{str(sub_err)} - {traceback.format_exc()}')
logger.debug(f'加载子模块 {sub_module_name} 失败:{str(sub_err)} - {traceback.format_exc()}')
# 遍历包中的所有子模块
for importer, package_name, is_pkg in pkgutil.iter_modules(packages.__path__):

View File

@@ -4,11 +4,11 @@ import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Set
from cachetools import TTLCache, cached
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from packaging.version import Version, InvalidVersion
from pkg_resources import Requirement, working_set
from app.core.cache import cached
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
@@ -38,24 +38,26 @@ class PluginHelper(metaclass=Singleton):
if self.install_report():
self.systemconfig.set(SystemConfigKey.PluginInstallReport, "1")
@cached(cache=TTLCache(maxsize=1000, ttl=1800))
def get_plugins(self, repo_url: str, package_version: str = None) -> Dict[str, dict]:
@cached(maxsize=1000, ttl=1800)
def get_plugins(self, repo_url: str, package_version: str = None) -> Optional[Dict[str, dict]]:
"""
获取Github所有最新插件列表
:param repo_url: Github仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如果不指定则获取 v1 版本
"""
if not repo_url:
return {}
return None
user, repo = self.get_repo_info(repo_url)
if not user or not repo:
return {}
return None
raw_url = self._base_url.format(user=user, repo=repo)
package_url = f"{raw_url}package.{package_version}.json" if package_version else f"{raw_url}package.json"
res = self.__request_with_fallback(package_url, headers=settings.REPO_GITHUB_HEADERS(repo=f"{user}/{repo}"))
if res is None:
return None
if res:
try:
return json.loads(res.text)
@@ -113,7 +115,7 @@ class PluginHelper(metaclass=Singleton):
return None, None
return user, repo
@cached(cache=TTLCache(maxsize=1, ttl=1800))
@cached(maxsize=1, ttl=1800)
def get_statistic(self) -> Dict:
"""
获取插件安装统计

View File

@@ -1,8 +1,7 @@
from threading import Thread
from typing import List, Tuple
from cachetools import TTLCache, cached
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.subscribe_oper import SubscribeOper
from app.db.systemconfig_oper import SystemConfigOper
@@ -31,7 +30,7 @@ class SubscribeHelper(metaclass=Singleton):
_sub_fork = f"{settings.MP_SERVER_HOST}/subscribe/fork/%s"
_shares_cache = TTLCache(maxsize=20, ttl=1800)
_shares_cache_region = "subscribe_share"
def __init__(self):
self.systemconfig = SystemConfigOper()
@@ -41,7 +40,7 @@ class SubscribeHelper(metaclass=Singleton):
if self.sub_report():
self.systemconfig.set(SystemConfigKey.SubscribeReport, "1")
@cached(cache=TTLCache(maxsize=20, ttl=1800))
@cached(maxsize=20, ttl=1800)
def get_statistic(self, stype: str, page: int = 1, count: int = 30) -> List[dict]:
"""
获取订阅统计数据
@@ -129,6 +128,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "订阅不存在"
subscribe_dict = subscribe.to_dict()
subscribe_dict.pop("id")
cache_backend.clear(region=self._shares_cache_region)
res = RequestUtils(proxies=settings.PROXY, content_type="application/json",
timeout=10).post(self._sub_share,
json={
@@ -142,7 +142,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
self._shares_cache.clear()
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
@@ -160,7 +160,7 @@ class SubscribeHelper(metaclass=Singleton):
return False, "连接MoviePilot服务器失败"
if res.ok:
# 清除 get_shares 的缓存,以便实时看到结果
self._shares_cache.clear()
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
@@ -181,8 +181,8 @@ class SubscribeHelper(metaclass=Singleton):
else:
return False, res.json().get("message")
@cached(cache=_shares_cache)
def get_shares(self, name: str, page: int = 1, count: int = 30) -> List[dict]:
@cached(region=_shares_cache_region)
def get_shares(self, name: str = None, page: int = 1, count: int = 30) -> List[dict]:
"""
获取订阅分享数据
"""

View File

@@ -1,8 +1,8 @@
from datetime import datetime
import requests
from cachetools import TTLCache, cached
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
@@ -29,7 +29,7 @@ class BangumiApi(object):
pass
@classmethod
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["bangumi"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["bangumi"], ttl=settings.CACHE_CONF["meta"])
def __invoke(cls, url, **kwargs):
req_url = cls._base_url + url
params = {}
@@ -188,7 +188,8 @@ class BangumiApi(object):
获取人物参演作品
"""
ret_list = []
result = self.__invoke(self._urls["person_credits"] % person_id, _ts=datetime.strftime(datetime.now(), '%Y%m%d'))
result = self.__invoke(self._urls["person_credits"] % person_id,
_ts=datetime.strftime(datetime.now(), '%Y%m%d'))
if result:
for item in result:
ret_list.append(item)

View File

@@ -7,8 +7,8 @@ from random import choice
from urllib import parse
import requests
from cachetools import TTLCache, cached
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
@@ -174,14 +174,14 @@ class DoubanApi(metaclass=Singleton):
).digest()
).decode()
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
def __invoke_recommend(self, url: str, **kwargs) -> dict:
"""
推荐/发现类API
"""
return self.__invoke(url, **kwargs)
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
def __invoke_search(self, url: str, **kwargs) -> dict:
"""
搜索类API
@@ -216,7 +216,7 @@ class DoubanApi(metaclass=Singleton):
return resp.json()
return resp.json() if resp else {}
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["douban"], ttl=settings.CACHE_CONF["meta"])
def __post(self, url: str, **kwargs) -> dict:
"""
POST请求

View File

@@ -1,8 +1,7 @@
import re
from typing import Optional, Tuple, Union
from cachetools import TTLCache, cached
from app.core.cache import cached
from app.core.context import MediaInfo, settings
from app.log import logger
from app.modules import _ModuleBase
@@ -11,7 +10,6 @@ from app.utils.http import RequestUtils
class FanartModule(_ModuleBase):
"""
{
"name": "The Wheel of Time",
@@ -384,7 +382,7 @@ class FanartModule(_ModuleBase):
continue
if not isinstance(images, list):
continue
# 图片属性xx_path
image_name = self.__name(name)
if image_name.startswith("season"):
@@ -422,7 +420,7 @@ class FanartModule(_ModuleBase):
return result
@classmethod
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["fanart"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["fanart"], ttl=settings.CACHE_CONF["meta"], skip_none=False)
def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]:
if media_type == MediaType.MOVIE:
image_url = cls._movie_url % queryid

View File

@@ -368,7 +368,7 @@ class FileManagerModule(_ModuleBase):
# 覆盖模式
overwrite_mode = target_directory.overwrite_mode
# 是否需要刮削
need_scrape = scrape or target_directory.scraping
need_scrape = target_directory.scraping if scrape is None else scrape
# 目标存储类型
if not target_storage:
target_storage = target_directory.library_storage
@@ -920,18 +920,6 @@ class FileManagerModule(_ModuleBase):
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
# 计算重命名中的文件夹层数
rename_format_level = len(rename_format.split("/")) - 1
if rename_format_level < 1:
# 重命名格式不合法
logger.error(f"重命名格式不合法:{rename_format}")
return TransferInfo(success=False,
message=f"重命名格式不合法",
fileitem=fileitem,
transfer_type=transfer_type,
need_notify=need_notify)
# 判断是否为文件夹
if fileitem.type == "dir":
# 整理整个目录,一般为蓝光原盘
@@ -1011,12 +999,15 @@ class FileManagerModule(_ModuleBase):
overflag = False
# 目的操作对象
target_oper: StorageBase = self.__get_storage_oper(target_storage)
# 计算重命名中的文件夹层级
rename_format_level = len(rename_format.split("/")) - 1
folder_path = new_file.parents[rename_format_level - 1]
# 目标目录
target_diritem = target_oper.get_folder(new_file.parents[rename_format_level - 1])
target_diritem = target_oper.get_folder(folder_path)
if not target_diritem:
logger.error(f"目标目录 {new_file.parents[rename_format_level - 1]} 获取失败")
logger.error(f"目标目录 {folder_path} 获取失败")
return TransferInfo(success=False,
message=f"目标目录 {new_file.parents[rename_format_level - 1]} 获取失败",
message=f"目标目录 {folder_path} 获取失败",
fileitem=fileitem,
fail_list=[fileitem.path],
transfer_type=transfer_type,
@@ -1256,10 +1247,6 @@ class FileManagerModule(_ModuleBase):
# 重命名格式
rename_format = settings.TV_RENAME_FORMAT \
if mediainfo.type == MediaType.TV else settings.MOVIE_RENAME_FORMAT
# 计算重命名中的文件夹层数
rename_format_level = len(rename_format.split("/")) - 1
if rename_format_level < 1:
continue
# 获取路径(重命名路径)
target_path = self.get_rename_path(
path=dir_path,
@@ -1267,13 +1254,19 @@ class FileManagerModule(_ModuleBase):
rename_dict=self.__get_naming_dict(meta=MetaInfo(mediainfo.title),
mediainfo=mediainfo)
)
# 计算重命名中的文件夹层数
rename_format_level = len(rename_format.split("/")) - 1
# 取相对路径的第1层目录
media_path = target_path.parents[rename_format_level - 1]
# 检索媒体文件
fileitem = storage_oper.get_item(media_path)
if not fileitem:
continue
media_files = self.list_files(fileitem, True)
try:
media_files = self.list_files(fileitem, True)
except Exception as e:
logger.debug(f"获取媒体文件列表失败:{str(e)}")
continue
if media_files:
for media_file in media_files:
if f".{media_file.extension.lower()}" in settings.RMT_MEDIAEXT:

View File

@@ -102,7 +102,7 @@ class StorageBase(metaclass=ABCMeta):
"""
获取父目录
"""
return self.get_folder(Path(fileitem.path).parent)
return self.get_item(Path(fileitem.path).parent)
@abstractmethod
def delete(self, fileitem: schemas.FileItem) -> bool:

View File

@@ -4,10 +4,10 @@ from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict
from cachetools import cached, TTLCache
from requests import Response
from app import schemas
from app.core.cache import cached
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
@@ -67,7 +67,7 @@ class Alist(StorageBase, metaclass=Singleton):
return self.__generate_token
@property
@cached(cache=TTLCache(maxsize=1, ttl=60 * 60 * 24 * 2 - 60 * 5))
@cached(maxsize=1, ttl=60 * 60 * 24 * 2 - 60 * 5)
def __generate_token(self) -> str:
"""
使用账号密码生成一个临时token

View File

@@ -208,9 +208,16 @@ class NexusPhpSiteUserInfo(SiteParserBase):
# 是否存在下页数据
next_page = None
next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁") or contains(.//text(), ">")]/@href')
if next_page_text:
next_page = next_page_text[-1].strip()
# fix up page url
#防止识别到详情页
while next_page_text:
next_page = next_page_text.pop().strip()
if not next_page.startswith('details.php'):
break;
next_page = None
# fix up page url
if next_page:
if self.userid not in next_page:
next_page = f'{next_page}&userid={self.userid}&type=seeding'

View File

@@ -3,13 +3,13 @@ from pathlib import Path
from typing import List, Optional, Dict, Tuple, Generator, Any, Union
from urllib.parse import quote_plus
from cachetools import TTLCache, cached
from plexapi import media
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from requests import Response, Session
from app import schemas
from app.core.cache import cached
from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
@@ -83,7 +83,7 @@ class Plex:
logger.error(f"Authentication failed: {e}")
return None
@cached(cache=TTLCache(maxsize=100, ttl=86400))
@cached(maxsize=100, ttl=86400)
def __get_library_images(self, library_key: str, mtype: int) -> Optional[List[str]]:
"""
获取媒体服务器最近添加的媒体的图片列表
@@ -293,7 +293,7 @@ class Plex:
season_episodes[episode.seasonNumber].append(episode.index)
return videos.key, season_episodes
def get_remote_image_by_id(self,
def get_remote_image_by_id(self,
item_id: str,
image_type: str,
depth: int = 0,

View File

@@ -340,7 +340,7 @@ class Slack:
return ""
conversation_id = ""
try:
for result in self._client.conversations_list():
for result in self._client.conversations_list(types="public_channel,private_channel"):
if conversation_id:
break
for channel in result["channels"]:

View File

@@ -3,13 +3,15 @@ from typing import Optional, List
from urllib.parse import quote
import zhconv
from cachetools import TTLCache, cached
from lxml import etree
from app.core.cache import cached
from app.core.config import settings
from app.log import logger
from app.schemas import APIRateLimitException
from app.schemas.types import MediaType
from app.utils.http import RequestUtils
from app.utils.limit import rate_limit_exponential
from app.utils.string import StringUtils
from .tmdbv3api import TMDb, Search, Movie, TV, Season, Episode, Discover, Trending, Person, Collection
from .tmdbv3api.exceptions import TMDbException
@@ -491,7 +493,8 @@ class TmdbApi:
return ret_info
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"])
@rate_limit_exponential(source="match_tmdb_web", base_wait=5, max_wait=1800, enable_logging=True)
def match_web(self, name: str, mtype: MediaType) -> Optional[dict]:
"""
搜索TMDB网站直接抓取结果结果只有一条时才返回
@@ -504,51 +507,56 @@ class TmdbApi:
return {}
logger.info("正在从TheDbMovie网站查询%s ..." % name)
tmdb_url = "https://www.themoviedb.org/search?query=%s" % quote(name)
res = RequestUtils(timeout=5, ua=settings.USER_AGENT).get_res(url=tmdb_url)
if res and res.status_code == 200:
html_text = res.text
if not html_text:
return None
try:
tmdb_links = []
html = etree.HTML(html_text)
if mtype == MediaType.TV:
links = html.xpath("//a[@data-id and @data-media-type='tv']/@href")
else:
links = html.xpath("//a[@data-id]/@href")
for link in links:
if not link or (not link.startswith("/tv") and not link.startswith("/movie")):
continue
if link not in tmdb_links:
tmdb_links.append(link)
if len(tmdb_links) == 1:
tmdbinfo = self.get_info(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=tmdb_links[0].split("/")[-1])
if tmdbinfo:
if mtype == MediaType.TV and tmdbinfo.get('media_type') != MediaType.TV:
return {}
if tmdbinfo.get('media_type') == MediaType.MOVIE:
logger.info("%s 从WEB识别到 电影TMDBID=%s, 名称=%s, 上映日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('title'),
tmdbinfo.get('release_date')))
else:
logger.info("%s 从WEB识别到 电视剧TMDBID=%s, 名称=%s, 首播日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('name'),
tmdbinfo.get('first_air_date')))
return tmdbinfo
elif len(tmdb_links) > 1:
logger.info("%s TMDB网站返回数据过多%s" % (name, len(tmdb_links)))
else:
logger.info("%s TMDB网站未查询到媒体信息" % name)
except Exception as err:
logger.error(f"从TheDbMovie网站查询出错{str(err)}")
return None
return None
res = RequestUtils(timeout=5, ua=settings.USER_AGENT, proxies=settings.PROXY).get_res(url=tmdb_url)
if res is None:
return None
if res.status_code == 429:
raise APIRateLimitException("触发TheDbMovie网站限流获取媒体信息失败")
if res.status_code != 200:
return {}
html_text = res.text
if not html_text:
return {}
try:
tmdb_links = []
html = etree.HTML(html_text)
if mtype == MediaType.TV:
links = html.xpath("//a[@data-id and @data-media-type='tv']/@href")
else:
links = html.xpath("//a[@data-id]/@href")
for link in links:
if not link or (not link.startswith("/tv") and not link.startswith("/movie")):
continue
if link not in tmdb_links:
tmdb_links.append(link)
if len(tmdb_links) == 1:
tmdbinfo = self.get_info(
mtype=MediaType.TV if tmdb_links[0].startswith("/tv") else MediaType.MOVIE,
tmdbid=tmdb_links[0].split("/")[-1])
if tmdbinfo:
if mtype == MediaType.TV and tmdbinfo.get('media_type') != MediaType.TV:
return {}
if tmdbinfo.get('media_type') == MediaType.MOVIE:
logger.info("%s 从WEB识别到 电影TMDBID=%s, 名称=%s, 上映日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('title'),
tmdbinfo.get('release_date')))
else:
logger.info("%s 从WEB识别到 电视剧TMDBID=%s, 名称=%s, 首播日期=%s" % (
name,
tmdbinfo.get('id'),
tmdbinfo.get('name'),
tmdbinfo.get('first_air_date')))
return tmdbinfo
elif len(tmdb_links) > 1:
logger.info("%s TMDB网站返回数据过多%s" % (name, len(tmdb_links)))
else:
logger.info("%s TMDB网站未查询到媒体信息" % name)
except Exception as err:
logger.error(f"从TheDbMovie网站查询出错{str(err)}")
return {}
return {}
def get_info(self,
mtype: MediaType,
@@ -678,14 +686,14 @@ class TmdbApi:
else:
en_title = __get_tmdb_lang_title(tmdb_info, "US")
tmdb_info['en_title'] = en_title or org_title
# 查找香港台湾译名
tmdb_info['hk_title'] = __get_tmdb_lang_title(tmdb_info, "HK")
tmdb_info['tw_title'] = __get_tmdb_lang_title(tmdb_info, "TW")
# 查找新加坡名(用于替代中文名)
tmdb_info['sg_title'] = __get_tmdb_lang_title(tmdb_info, "SG") or org_title
def __get_movie_detail(self,
tmdbid: int,
append_to_response: str = "images,"

View File

@@ -1,5 +1,5 @@
from app.core.cache import cached
from ..tmdb import TMDb
from cachetools import cached, TTLCache
try:
from urllib import urlencode
@@ -13,7 +13,7 @@ class Discover(TMDb):
"tv": "/discover/tv"
}
@cached(cache=TTLCache(maxsize=1, ttl=43200))
@cached(maxsize=1, ttl=43200)
def discover_movies(self, params_tuple):
"""
Discover movies by different types of data like average rating, number of votes, genres and certifications.
@@ -23,7 +23,7 @@ class Discover(TMDb):
params = dict(params_tuple)
return self._request_obj(self._urls["movies"], urlencode(params), key="results", call_cached=False)
@cached(cache=TTLCache(maxsize=1, ttl=43200))
@cached(maxsize=1, ttl=43200)
def discover_tv_shows(self, params_tuple):
"""
Discover TV shows by different types of data like average rating, number of votes, genres,

View File

@@ -1,4 +1,4 @@
from cachetools import cached, TTLCache
from app.core.cache import cached
from ..tmdb import TMDb
@@ -6,7 +6,7 @@ from ..tmdb import TMDb
class Trending(TMDb):
_urls = {"trending": "/trending/%s/%s"}
@cached(cache=TTLCache(maxsize=1, ttl=43200))
@cached(maxsize=1024, ttl=43200)
def _trending(self, media_type="all", time_window="day", page=1):
"""
Get trending, TTLCache 12 hours

View File

@@ -7,8 +7,8 @@ from datetime import datetime
import requests
import requests.exceptions
from cachetools import TTLCache, cached
from app.core.cache import cached
from app.core.config import settings
from app.utils.http import RequestUtils
from .exceptions import TMDbException
@@ -137,7 +137,7 @@ class TMDb(object):
def cache(self, cache):
os.environ[self.TMDB_CACHE_ENABLED] = str(cache)
@cached(cache=TTLCache(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"]))
@cached(maxsize=settings.CACHE_CONF["tmdb"], ttl=settings.CACHE_CONF["meta"])
def cached_request(self, method, url, data, json,
_ts=datetime.strftime(datetime.now(), '%Y%m%d')):
"""

View File

@@ -68,7 +68,7 @@ class WebPushModule(_ModuleBase, _MessageBase):
webpush_users = conf.config.get("WEBPUSH_USERNAME") or ""
if webpush_users:
# 设定了接收用户时,非该用户的消息不接收
if not message.userid or message.userid not in webpush_users.split(","):
if not message.username or message.username not in webpush_users.split(","):
continue
if not message.title and not message.text:
logger.warn("标题和内容不能同时为空")

View File

@@ -15,7 +15,6 @@ from app.chain.recommend import RecommendChain
from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.tmdb import TmdbChain
from app.chain.torrents import TorrentsChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.event import EventManager
@@ -93,6 +92,11 @@ class Scheduler(metaclass=Singleton):
"func": SubscribeChain().refresh,
"running": False,
},
"subscribe_follow": {
"name": "关注的订阅分享",
"func": SubscribeChain().follow,
"running": False,
},
"transfer": {
"name": "下载文件整理",
"func": TransferChain().process,
@@ -242,6 +246,18 @@ class Scheduler(metaclass=Singleton):
}
)
# 关注订阅分享每1小时
self._scheduler.add_job(
self.start,
"interval",
id="subscribe_follow",
name="关注的订阅分享",
hours=1,
kwargs={
'job_id': 'subscribe_follow'
}
)
# 下载器文件转移每5分钟
self._scheduler.add_job(
self.start,
@@ -549,7 +565,6 @@ class Scheduler(metaclass=Singleton):
"""
清理缓存
"""
TorrentsChain().clear_cache()
SchedulerChain().clear_cache()
def user_auth(self):

View File

@@ -135,6 +135,8 @@ class SystemConfigKey(Enum):
DefaultTvSubscribeConfig = "DefaultTvSubscribeConfig"
# 用户站点认证参数
UserSiteAuthParams = "UserSiteAuthParams"
# Follow订阅分享者
FollowSubscribers = "FollowSubscribers"
# 处理进度Key字典

View File

@@ -2,6 +2,7 @@ import sys
from fastapi import FastAPI
from app.core.cache import close_cache
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.log import logger
@@ -129,6 +130,8 @@ def shutdown_modules(_: FastAPI):
Monitor().stop()
# 停止线程池
ThreadHelper().shutdown()
# 停止缓存连接
close_cache()
# 停止数据库连接
close_database()
# 停止前端服务

View File

@@ -156,7 +156,8 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
with self.lock:
self.next_allowed_time = current_time + self.current_wait
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")
wait_time = self.next_allowed_time - current_time
self.log_warning(f"触发限流,将在 {wait_time:.2f} 秒后允许继续调用")
# 时间窗口限流器

View File

@@ -1,6 +1,6 @@
from typing import Optional, List
from cachetools import TTLCache, cached
from app.core.cache import cached
from app.utils.http import RequestUtils
@@ -75,7 +75,7 @@ class WebUtils:
return ""
@staticmethod
@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(maxsize=1, ttl=3600)
def get_bing_wallpaper() -> Optional[str]:
"""
获取Bing每日壁纸
@@ -93,7 +93,7 @@ class WebUtils:
return None
@staticmethod
@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(maxsize=1, ttl=3600)
def get_bing_wallpapers(num: int = 7) -> List[str]:
"""
获取7天的Bing每日壁纸

View File

@@ -64,4 +64,5 @@ python-cookietools==0.0.2.1
aligo~=6.2.4
aiofiles~=24.1.0
jieba~=0.42.1
rsa~=4.9
rsa~=4.9
redis~=5.2.1

View File

@@ -968,7 +968,7 @@ meta_cases = [{
"year": "2023",
"part": "",
"season": "S02",
"episode": "",
"episode": "E01-E08",
"restype": "WEB-DL",
"pix": "2160p",
"video_codec": "H265",
@@ -1016,7 +1016,7 @@ meta_cases = [{
"year": "2019",
"part": "",
"season": "S01",
"episode": "",
"episode": "E01-E36",
"restype": "WEB-DL",
"pix": "2160p",
"video_codec": "H265",

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.2.2'
FRONTEND_VERSION = 'v2.2.2'
APP_VERSION = 'v2.2.4'
FRONTEND_VERSION = 'v2.2.4'