refactor:重构缓存系统

This commit is contained in:
jxxghp
2025-08-20 17:35:32 +08:00
parent 2a7a7239d7
commit 055c8e26f0
12 changed files with 693 additions and 618 deletions

View File

@@ -10,7 +10,6 @@ import aiofiles
import pillow_avif # noqa 用于自动注册AVIF支持
from PIL import Image
from anyio import Path as AsyncPath
from app.helper.sites import SitesHelper # noqa # noqa
from fastapi import APIRouter, Body, Depends, HTTPException, Header, Request, Response
from fastapi.responses import StreamingResponse
@@ -29,7 +28,9 @@ from app.db.user_oper import get_current_active_superuser, get_current_active_su
from app.helper.mediaserver import MediaServerHelper
from app.helper.message import MessageHelper
from app.helper.progress import ProgressHelper
from app.helper.redis import AsyncRedisHelper
from app.helper.rule import RuleHelper
from app.helper.sites import SitesHelper # noqa # noqa
from app.helper.subscribe import SubscribeHelper
from app.helper.system import SystemHelper
from app.log import logger
@@ -48,7 +49,7 @@ router = APIRouter()
async def fetch_image(
url: str,
proxy: bool = False,
use_disk_cache: bool = False,
use_cache: bool = False,
if_none_match: Optional[str] = None,
allowed_domains: Optional[set[str]] = None) -> Response:
"""
@@ -64,37 +65,55 @@ async def fetch_image(
if not SecurityUtils.is_safe_url(url, allowed_domains):
raise HTTPException(status_code=404, detail="Unsafe URL")
# 后续观察系统性能表现如果发现磁盘缓存和HTTP缓存无法满足高并发情况下的响应速度需求可以考虑重新引入内存缓存
cache_path: Optional[AsyncPath] = None
if use_disk_cache:
# 生成缓存路径
base_path = AsyncPath(settings.CACHE_PATH)
sanitized_path = SecurityUtils.sanitize_url_path(url)
cache_path = base_path / "images" / sanitized_path
# 缓存路径
sanitized_path = SecurityUtils.sanitize_url_path(url)
base_path = AsyncPath(settings.CACHE_PATH)
cache_path = base_path / "images" / sanitized_path
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
if not cache_path.suffix:
cache_path = cache_path.with_suffix(".jpg")
# 确保缓存路径和文件类型合法
if not await SecurityUtils.async_is_safe_path(base_path=base_path,
user_path=cache_path,
allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES):
raise HTTPException(status_code=400, detail="Invalid cache path or file type")
# 目前暂不考虑磁盘缓存文件是否过期,后续通过缓存清理机制处理
if cache_path and await cache_path.exists():
try:
async with aiofiles.open(cache_path, 'rb') as f:
content = await f.read()
if use_cache:
if settings.CACHE_BACKEND_TYPE == "redis":
# 使用Redis缓存
redis_helper = AsyncRedisHelper()
content = await redis_helper.get(sanitized_path, region="image_cache")
if content:
# 检查 If-None-Match
etag = HashUtils.md5(content)
headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7)
if if_none_match == etag:
headers = RequestUtils.generate_cache_headers()
return Response(status_code=304, headers=headers)
return Response(content=content, media_type="image/jpeg", headers=headers)
except Exception as e:
# 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理
logger.debug(f"Failed to read cache file {cache_path}: {e}")
# 返回缓存图片
headers = RequestUtils.generate_cache_headers(etag)
return Response(
content=content,
media_type=UrlUtils.get_mime_type(url, "image/jpeg"),
headers=headers
)
else:
# 使用磁盘缓存
if not cache_path.suffix:
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
cache_path = cache_path.with_suffix(".jpg")
# 确保缓存路径和文件类型合法
if not await SecurityUtils.async_is_safe_path(base_path=base_path,
user_path=cache_path,
allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES):
raise HTTPException(status_code=400, detail="Invalid cache path or file type")
# 目前暂不考虑磁盘缓存文件是否过期,通过缓存清理机制处理
if cache_path and await cache_path.exists():
try:
# 读取磁盘缓存图片返回
async with aiofiles.open(cache_path, 'rb') as f:
content = await f.read()
etag = HashUtils.md5(content)
headers = RequestUtils.generate_cache_headers(etag, max_age=86400 * 7)
if if_none_match == etag:
return Response(status_code=304, headers=headers)
return Response(content=content, media_type="image/jpeg", headers=headers)
except Exception as e:
# 如果读取磁盘缓存发生异常,这里仅记录日志,尝试再次请求远端进行处理
logger.debug(f"Failed to read cache file {cache_path}: {e}")
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
@@ -112,22 +131,28 @@ async def fetch_image(
logger.debug(f"Invalid image format for URL {url}: {e}")
raise HTTPException(status_code=502, detail="Invalid image format")
# 获取请求响应头
response_headers = response.headers
cache_control_header = response_headers.get("Cache-Control", "")
cache_directive, max_age = RequestUtils.parse_cache_control(cache_control_header)
# 如果需要使用磁盘缓存,则保存到磁盘
if use_disk_cache and cache_path:
try:
if not await cache_path.parent.exists():
await cache_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
await tmp_file.write(content)
temp_path = AsyncPath(tmp_file.name)
await temp_path.replace(cache_path)
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path}: {e}")
# 保存缓存
if use_cache:
if settings.CACHE_BACKEND_TYPE == "redis":
# 保存到Redis缓存
redis_helper = AsyncRedisHelper()
await redis_helper.set(sanitized_path, content, region="image_cache")
else:
# 保存到磁盘缓存
try:
if not await cache_path.parent.exists():
await cache_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
await tmp_file.write(content)
temp_path = AsyncPath(tmp_file.name)
await temp_path.replace(cache_path)
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path}: {e}")
# 检查 If-None-Match
etag = HashUtils.md5(content)
@@ -135,8 +160,8 @@ async def fetch_image(
headers = RequestUtils.generate_cache_headers(etag, cache_directive, max_age)
return Response(status_code=304, headers=headers)
# 响应
headers = RequestUtils.generate_cache_headers(etag, cache_directive, max_age)
return Response(
content=content,
media_type=response_headers.get("Content-Type") or UrlUtils.get_mime_type(url, "image/jpeg"),
@@ -159,7 +184,7 @@ async def proxy_img(
hosts = [config.config.get("host") for config in MediaServerHelper().get_configs().values() if
config and config.config and config.config.get("host")]
allowed_domains = set(settings.SECURITY_IMAGE_DOMAINS) | set(hosts)
return await fetch_image(url=imgurl, proxy=proxy, use_disk_cache=cache,
return await fetch_image(url=imgurl, proxy=proxy, use_cache=cache,
if_none_match=if_none_match, allowed_domains=allowed_domains)
@@ -174,7 +199,7 @@ async def cache_img(
"""
# 如果没有启用全局图片缓存,则不使用磁盘缓存
proxy = "doubanio.com" not in url
return await fetch_image(url=url, proxy=proxy, use_disk_cache=settings.GLOBAL_IMAGE_CACHE,
return await fetch_image(url=url, proxy=proxy, use_cache=settings.GLOBAL_IMAGE_CACHE,
if_none_match=if_none_match)

View File

@@ -13,6 +13,7 @@ from fastapi.concurrency import run_in_threadpool
from qbittorrentapi import TorrentFilesList
from transmission_rpc import File
from app.core.cache import get_cache_backend
from app.core.config import settings
from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.event import EventManager
@@ -22,7 +23,6 @@ from app.core.plugin import PluginManager
from app.db.message_oper import MessageOper
from app.db.user_oper import UserOper
from app.helper.message import MessageHelper, MessageQueueManager, MessageTemplateHelper
from app.helper.redis import RedisHelper
from app.helper.service import ServiceConfigHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent, ExistMediaInfo, DownloadingTorrent, CommingMessage, Notification, \
@@ -48,23 +48,17 @@ class ChainBase(metaclass=ABCMeta):
send_callback=self.run_module
)
self.pluginmanager = PluginManager()
# 初始化Redis缓存助手
self._redis_helper = None
if settings.CACHE_BACKEND_TYPE == "redis":
try:
self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL)
except RuntimeError as e:
self._redis_helper = None
logger.warning(f"Redis缓存初始化失败将使用本地缓存: {e}")
# 文件类缓存保留1
self._cache = get_cache_backend(ttl=30 * 24 * 3600)
def load_cache(self, filename: str) -> Any:
"""
加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
"""
# 如果Redis可用优先从Redis读取
if self._redis_helper:
if self._cache.is_redis():
try:
cache_data = self._redis_helper.get(filename, region="chain_cache")
cache_data = self._cache.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis加载缓存: {filename}")
return cache_data
@@ -86,9 +80,9 @@ class ChainBase(metaclass=ABCMeta):
异步加载缓存优先从Redis读取没有数据时从本地读取兼容存量未迁移数据
"""
# 如果Redis可用优先从Redis读取
if self._redis_helper:
if self._cache.is_redis():
try:
cache_data = self._redis_helper.get(filename, region="chain_cache")
cache_data = self._cache.get(filename, region="chain_cache")
if cache_data is not None:
logger.debug(f"从Redis异步加载缓存: {filename}")
return cache_data
@@ -111,9 +105,9 @@ class ChainBase(metaclass=ABCMeta):
异步保存缓存优先保存到Redis同时保存到本地作为备份
"""
# 如果Redis可用优先保存到Redis
if self._redis_helper:
if self._cache.is_redis():
try:
self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache")
self._cache.set(filename, cache, region="chain_cache")
logger.debug(f"异步保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"异步保存缓存到Redis失败: {e}")
@@ -130,9 +124,9 @@ class ChainBase(metaclass=ABCMeta):
保存缓存优先保存到Redis同时保存到本地作为备份
"""
# 如果Redis可用优先保存到Redis
if self._redis_helper:
if self._cache.is_redis():
try:
self._redis_helper.set(filename, cache, ttl=86400, region="chain_cache")
self._cache.set(filename, cache, region="chain_cache")
logger.debug(f"保存缓存到Redis: {filename}")
except Exception as e:
logger.warning(f"保存缓存到Redis失败: {e}")
@@ -149,9 +143,9 @@ class ChainBase(metaclass=ABCMeta):
删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._redis_helper:
if self._cache.is_redis():
try:
self._redis_helper.delete(filename, region="chain_cache")
self._cache.delete(filename, region="chain_cache")
logger.debug(f"删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"删除Redis缓存失败: {e}")
@@ -170,9 +164,9 @@ class ChainBase(metaclass=ABCMeta):
异步删除缓存同时删除Redis和本地缓存
"""
# 如果Redis可用删除Redis缓存
if self._redis_helper:
if self._cache.is_redis():
try:
self._redis_helper.delete(filename, region="chain_cache")
self._cache.delete(filename, region="chain_cache")
logger.debug(f"异步删除Redis缓存: {filename}")
except Exception as e:
logger.warning(f"异步删除Redis缓存失败: {e}")

View File

@@ -1,48 +1,166 @@
import asyncio
import io
import tempfile
from pathlib import Path
from typing import List, Optional
import aiofiles
import pillow_avif # noqa 用于自动注册AVIF支持
from PIL import Image
from anyio import Path as AsyncPath
from app.chain import ChainBase
from app.chain.bangumi import BangumiChain
from app.chain.douban import DoubanChain
from app.chain.tmdb import TmdbChain
from app.core.cache import cache_backend, cached
from app.core.cache import cached
from app.core.config import settings, global_vars
from app.log import logger
from app.schemas import MediaType
from app.utils.asyncio import AsyncUtils
from app.utils.common import log_execution_time
from app.utils.http import AsyncRequestUtils
from app.utils.http import RequestUtils
from app.utils.security import SecurityUtils
from app.utils.singleton import Singleton
# 推荐相关的专用缓存
recommend_ttl = 24 * 3600
recommend_cache_region = "recommend"
class RecommendChain(ChainBase, metaclass=Singleton):
"""
推荐处理链,单例运行
"""
# 推荐数据的缓存页数
# 推荐缓存时间
recommend_ttl = 24 * 3600
# 推荐缓存页数
cache_max_pages = 5
# 推荐缓存区域
recommend_cache_region = "recommend"
def refresh_recommend(self):
"""
刷新推荐数据 - 同步包装器
刷新推荐
"""
logger.debug("Starting to refresh Recommend data.")
self._cache.clear(region=self.recommend_cache_region)
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
recommend_methods = [
self.tmdb_movies,
self.tmdb_tvs,
self.tmdb_trending,
self.bangumi_calendar,
self.douban_movie_showing,
self.douban_movies,
self.douban_tvs,
self.douban_movie_top250,
self.douban_tv_weekly_chinese,
self.douban_tv_weekly_global,
self.douban_tv_animation,
self.douban_movie_hot,
self.douban_tv_hot,
]
# 缓存并刷新所有推荐数据
recommends = []
# 记录哪些方法已完成
methods_finished = set()
# 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历
for page in range(1, self.cache_max_pages + 1):
for method in recommend_methods:
if global_vars.is_system_stopped:
return
if method in methods_finished:
continue
logger.debug(f"Fetch {method.__name__} data for page {page}.")
data = method(page=page)
if not data:
logger.debug("All recommendation methods have finished fetching data. Ending pagination early.")
methods_finished.add(method)
continue
recommends.extend(data)
# 如果所有方法都已经完成,提前结束循环
if len(methods_finished) == len(recommend_methods):
break
# 缓存收集到的海报
self.__cache_posters(recommends)
logger.debug("Recommend data refresh completed.")
def __cache_posters(self, datas: List[dict]):
"""
提取 poster_path 并缓存图片
:param datas: 数据列表
"""
if not settings.GLOBAL_IMAGE_CACHE:
return
for data in datas:
if global_vars.is_system_stopped:
return
poster_path = data.get("poster_path")
if poster_path:
poster_url = poster_path.replace("original", "w500")
logger.debug(f"Caching poster image: {poster_url}")
self.__fetch_and_save_image(poster_url)
def __fetch_and_save_image(self, url: str):
"""
请求并保存图片
:param url: 图片路径
"""
# 生成缓存路径
sanitized_path = SecurityUtils.sanitize_url_path(url)
cache_path = settings.CACHE_PATH / "images" / sanitized_path
if self._cache.is_redis():
if self._cache.get(sanitized_path, region=self.recommend_cache_region):
logger.debug(f"Cache hit: Image already exists for URL: {url}")
return
else:
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
if not cache_path.suffix:
cache_path = cache_path.with_suffix(".jpg")
# 确保缓存路径和文件类型合法
if not SecurityUtils.is_safe_path(settings.CACHE_PATH, cache_path, settings.SECURITY_IMAGE_SUFFIXES):
logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}")
return
# 本地存在缓存图片,则直接跳过
if cache_path.exists():
logger.debug(f"Cache hit: Image already exists at {cache_path}")
return
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
proxies = settings.PROXY if not referer else None
response = RequestUtils(ua=settings.NORMAL_USER_AGENT, proxies=proxies, referer=referer).get_res(url=url)
if not response:
logger.debug(f"Empty response for URL: {url}")
return
# 验证下载的内容是否为有效图片
try:
AsyncUtils.run_async(self.async_refresh_recommend())
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.error(f"刷新推荐数据失败:{str(e)}")
raise
logger.debug(f"Invalid image format for URL {url}: {e}")
return
if self._cache.is_redis():
# 如果是Redis缓存直接存储到缓存中
try:
self._cache.set(sanitized_path, response.content, region=self.recommend_cache_region)
logger.debug(f"Successfully cached image for URL: {url} in Redis.")
except Exception as e:
logger.debug(f"Failed to cache image for URL {url} in Redis: {e}")
else:
# 如果是本地文件缓存,写入到指定路径
try:
if not cache_path.parent.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
tmp_file.write(response.content)
temp_path = Path(tmp_file.name)
temp_path.replace(cache_path)
logger.debug(f"Successfully cached image at {cache_path} for URL: {url}")
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
@log_execution_time(logger=logger)
@cached(ttl=recommend_ttl, region=recommend_cache_region)
@@ -199,162 +317,6 @@ class RecommendChain(ChainBase, metaclass=Singleton):
tvs = DoubanChain().tv_hot(page=page, count=count)
return [media.to_dict() for media in tvs] if tvs else []
# 异步版本的方法
async def async_refresh_recommend(self):
"""
异步刷新推荐
"""
logger.debug("Starting to async refresh Recommend data.")
cache_backend.clear(region=recommend_cache_region)
logger.debug("Recommend Cache has been cleared.")
# 推荐来源方法
recommend_methods = [
self.async_tmdb_movies,
self.async_tmdb_tvs,
self.async_tmdb_trending,
self.async_bangumi_calendar,
self.async_douban_movie_showing,
self.async_douban_movies,
self.async_douban_tvs,
self.async_douban_movie_top250,
self.async_douban_tv_weekly_chinese,
self.async_douban_tv_weekly_global,
self.async_douban_tv_animation,
self.async_douban_movie_hot,
self.async_douban_tv_hot,
]
# 缓存并刷新所有推荐数据
recommends = []
# 记录哪些方法已完成
methods_finished = set()
# 这里避免区间内连续调用相同来源,因此遍历方案为每页遍历所有推荐来源,再进行页数遍历
for page in range(1, self.cache_max_pages + 1):
# 为每个页面并发执行所有方法
tasks = []
for method in recommend_methods:
if global_vars.is_system_stopped:
return
if method in methods_finished:
continue
tasks.append(self._async_fetch_method_data(method, page, methods_finished))
# 并发执行所有任务
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, list) and result:
recommends.extend(result)
# 如果所有方法都已经完成,提前结束循环
if len(methods_finished) == len(recommend_methods):
break
# 缓存收集到的海报
await self.__async_cache_posters(recommends)
logger.debug("Async recommend data refresh completed.")
@staticmethod
async def _async_fetch_method_data(method, page: int, methods_finished: set):
"""
异步获取方法数据的辅助函数
"""
try:
logger.debug(f"Async fetch {method.__name__} data for page {page}.")
data = await method(page=page)
if not data:
logger.debug(f"Method {method.__name__} finished fetching data. Ending pagination early.")
methods_finished.add(method)
return []
return data
except Exception as e:
logger.error(f"Error fetching data from {method.__name__}: {e}")
methods_finished.add(method)
return []
async def __async_cache_posters(self, datas: List[dict]):
"""
异步提取 poster_path 并缓存图片
:param datas: 数据列表
"""
if not settings.GLOBAL_IMAGE_CACHE:
return
tasks = []
for data in datas:
if global_vars.is_system_stopped:
return
poster_path = data.get("poster_path")
if poster_path:
poster_url = poster_path.replace("original", "w500")
logger.debug(f"Async caching poster image: {poster_url}")
tasks.append(self.__async_fetch_and_save_image(poster_url))
# 并发缓存图片
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
@staticmethod
async def __async_fetch_and_save_image(url: str):
"""
异步请求并保存图片
:param url: 图片路径
"""
if not settings.GLOBAL_IMAGE_CACHE or not url:
return
# 生成缓存路径
base_path = AsyncPath(settings.CACHE_PATH)
sanitized_path = SecurityUtils.sanitize_url_path(url)
cache_path = base_path / "images" / sanitized_path
# 没有文件类型,则添加后缀,在恶意文件类型和实际需求下的折衷选择
if not cache_path.suffix:
cache_path = cache_path.with_suffix(".jpg")
# 确保缓存路径和文件类型合法
if not await SecurityUtils.async_is_safe_path(base_path=base_path,
user_path=cache_path,
allowed_suffixes=settings.SECURITY_IMAGE_SUFFIXES):
logger.debug(f"Invalid cache path or file type for URL: {url}, sanitized path: {sanitized_path}")
return
# 本地存在缓存图片,则直接跳过
if await cache_path.exists():
logger.debug(f"Cache hit: Image already exists at {cache_path}")
return
# 请求远程图片
referer = "https://movie.douban.com/" if "doubanio.com" in url else None
proxies = settings.PROXY if not referer else None
response = await AsyncRequestUtils(ua=settings.NORMAL_USER_AGENT,
proxies=proxies, referer=referer).get_res(url=url)
if not response:
logger.debug(f"Empty response for URL: {url}")
return
# 验证下载的内容是否为有效图片
try:
Image.open(io.BytesIO(response.content)).verify()
except Exception as e:
logger.debug(f"Invalid image format for URL {url}: {e}")
return
if not cache_path:
return
try:
if not await cache_path.parent.exists():
await cache_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.tempfile.NamedTemporaryFile(dir=cache_path.parent, delete=False) as tmp_file:
await tmp_file.write(response.content)
temp_path = AsyncPath(tmp_file.name)
await temp_path.replace(cache_path)
logger.debug(f"Successfully cached image at {cache_path} for URL: {url}")
except Exception as e:
logger.debug(f"Failed to write cache file {cache_path} for URL {url}: {e}")
@log_execution_time(logger=logger)
@cached(ttl=recommend_ttl, region=recommend_cache_region)
async def async_tmdb_movies(self, sort_by: Optional[str] = "popularity.desc",

View File

@@ -23,7 +23,8 @@ class CacheBackend(ABC):
"""
@abstractmethod
def set(self, key: str, value: Any, ttl: int, region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
"""
设置缓存
@@ -76,6 +77,16 @@ class CacheBackend(ABC):
"""
pass
@abstractmethod
def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
pass
@abstractmethod
def close(self) -> None:
"""
@@ -114,6 +125,10 @@ class CacheBackend(ABC):
# 使用有序参数生成缓存键
return f"{func.__name__}_{hashkey(*keys)}"
@staticmethod
def is_redis() -> bool:
return settings.CACHE_BACKEND_TYPE == "redis"
class CacheToolsBackend(CacheBackend):
"""
@@ -128,7 +143,7 @@ class CacheToolsBackend(CacheBackend):
- 不支持按 `key` 独立隔离 TTL 和 Maxsize仅支持作用于 region 级别
"""
def __init__(self, maxsize: Optional[int] = 512, ttl: Optional[int] = 1800):
def __init__(self, maxsize: Optional[int] = 1024, ttl: Optional[int] = 1800):
"""
初始化缓存实例
@@ -226,6 +241,18 @@ class CacheToolsBackend(CacheBackend):
region_cache.clear()
logger.info("Cleared all cache")
def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
region_cache = self.__get_region_cache(region)
if region_cache is None:
return {}
return dict(region_cache.items())
def close(self) -> None:
"""
内存缓存不需要关闭资源
@@ -247,20 +274,14 @@ class RedisBackend(CacheBackend):
- Pickle 反序列化可能存在安全风险,需进一步重构调用来源,避免复杂对象缓存
"""
def __init__(self, redis_url: Optional[str] = "redis://localhost", ttl: Optional[int] = 1800):
def __init__(self, ttl: Optional[int] = 1800):
"""
初始化 Redis 缓存实例
:param redis_url: Redis 服务的 URL
:param ttl: 缓存的存活时间,单位秒
"""
self.ttl = ttl
self._redis_helper = None
try:
self.redis_helper = RedisHelper(redis_url=redis_url)
except RuntimeError as e:
logger.warning(f"Redis缓存初始化失败: {e}")
raise e
self.redis_helper = RedisHelper()
def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = DEFAULT_CACHE_REGION, **kwargs) -> None:
@@ -313,6 +334,15 @@ class RedisBackend(CacheBackend):
"""
self.redis_helper.clear(region=region)
def items(self, region: Optional[str] = DEFAULT_CACHE_REGION) -> Dict[str, Any]:
"""
获取指定区域的所有缓存项
:param region: 缓存的区
:return: 返回一个字典,包含所有缓存键值对
"""
return self.redis_helper.items(region=region)
def close(self) -> None:
"""
关闭 Redis 客户端的连接池
@@ -324,28 +354,16 @@ def get_cache_backend(maxsize: Optional[int] = 512, ttl: Optional[int] = 1800) -
"""
根据配置获取缓存后端实例
:param maxsize: 缓存的最大条目数
:param maxsize: 缓存的最大条目数仅使用cachetools时生效
:param ttl: 缓存的默认存活时间,单位秒
:return: 返回缓存后端实例
"""
cache_type = settings.CACHE_BACKEND_TYPE
logger.debug(f"Cache backend type from settings: {cache_type}")
if cache_type == "redis":
redis_url = settings.CACHE_BACKEND_URL
if redis_url:
try:
logger.debug(f"Attempting to use RedisBackend with URL: {redis_url}, TTL: {ttl}")
return RedisBackend(redis_url=redis_url, ttl=ttl)
except RuntimeError:
logger.warning("Falling back to CacheToolsBackend due to Redis connection failure.")
else:
logger.debug("Cache backend type is redis, but no valid REDIS_URL found. "
"Falling back to CacheToolsBackend.")
# 如果不是 Redis回退到内存缓存
logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}")
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
if settings.CACHE_BACKEND_TYPE == "redis":
logger.debug(f"Attempting to use RedisBackend, TTL: {ttl}")
return RedisBackend(ttl=ttl)
else:
logger.debug(f"Using CacheToolsBackend with default maxsize: {maxsize}, TTL: {ttl}")
return CacheToolsBackend(maxsize=maxsize, ttl=ttl)
class TTLCache:
@@ -369,12 +387,12 @@ class TTLCache:
self.ttl = ttl
self._backend = get_cache_backend(maxsize=maxsize, ttl=ttl)
def __getitem__(self, key):
def __getitem__(self, key: str):
"""
获取缓存项
"""
try:
value = self._backend.get(str(key))
value = self._backend.get(key)
if value is not None:
return value
except Exception as e:
@@ -382,40 +400,40 @@ class TTLCache:
raise KeyError(key)
def __setitem__(self, key, value):
def __setitem__(self, key: str, value: Any):
"""
设置缓存项
"""
try:
self._backend.set(str(key), value, ttl=self.ttl)
self._backend.set(key, value, ttl=self.ttl)
except Exception as e:
logger.warning(f"缓存设置失败: {e}")
def __delitem__(self, key):
def __delitem__(self, key: str):
"""
删除缓存项
"""
try:
self._backend.delete(str(key))
self._backend.delete(key)
except Exception as e:
logger.warning(f"缓存删除失败: {e}")
def __contains__(self, key):
def __contains__(self, key: str):
"""
检查键是否存在
"""
try:
return self._backend.exists(str(key))
return self._backend.exists(key)
except Exception as e:
logger.warning(f"缓存检查失败: {e}")
return False
def get(self, key, default=None):
def get(self, key: str, default: Any = None):
"""
获取缓存项,如果不存在返回默认值
"""
try:
value = self._backend.get(str(key))
value = self._backend.get(key)
if value is not None:
return value
except Exception as e:
@@ -442,10 +460,6 @@ class TTLCache:
logger.warning(f"缓存关闭失败: {e}")
# 缓存后端实例
cache_backend = get_cache_backend()
def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Optional[int] = 1800,
skip_none: Optional[bool] = True, skip_empty: Optional[bool] = False):
"""
@@ -458,6 +472,8 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Opti
:param skip_empty: 跳过空值缓存(如 None, [], {}, "", set()),默认为 False
:return: 装饰器函数
"""
# 缓存后端实例
cache_backend = get_cache_backend(maxsize=maxsize, ttl=ttl)
def should_cache(value: Any) -> bool:
"""
@@ -554,15 +570,3 @@ def cached(region: Optional[str] = None, maxsize: Optional[int] = 512, ttl: Opti
return wrapper
return decorator
def close_cache() -> None:
"""
关闭缓存后端连接并清理资源
"""
try:
if cache_backend:
cache_backend.close()
logger.info("Cache backend closed successfully.")
except Exception as e:
logger.info(f"Error while closing cache backend: {e}")

View File

@@ -4,17 +4,19 @@ from typing import Any, Optional
from urllib.parse import quote
import redis
from redis.asyncio import Redis
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.schemas import ConfigChangeEventData
from app.schemas.types import EventType
from app.utils.singleton import Singleton
class RedisHelper:
class RedisHelper(metaclass=Singleton):
"""
Redis连接和操作助手类
Redis连接和操作助手类,单例模式
特性:
- 管理Redis连接池和客户端
@@ -27,32 +29,30 @@ class RedisHelper:
_complex_serializable_types = set()
_simple_serializable_types = set()
def __init__(self, redis_url: Optional[str] = "redis://localhost"):
def __init__(self):
"""
初始化Redis助手实例
:param redis_url: Redis服务的URL
"""
self.redis_url = redis_url
self.redis_url = settings.CACHE_BACKEND_URL
self.client = None
self._connect()
def _connect(self):
"""
建立Redis连接
"""
try:
self.client = redis.Redis.from_url(
self.redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接确保Redis可用
self.client.ping()
logger.info(f"Successfully connected to Redis{self.redis_url}")
self.set_memory_limit()
if self.client is None:
self.client = redis.Redis.from_url(
self.redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接确保Redis可用
self.client.ping()
logger.info(f"Successfully connected to Redis{self.redis_url}")
self.set_memory_limit()
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise RuntimeError("Redis connection failed") from e
@@ -69,6 +69,7 @@ class RedisHelper:
if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']:
return
logger.info("配置变更重连Redis...")
self.close()
self._connect()
def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
@@ -164,6 +165,7 @@ class RedisHelper:
:param kwargs: 其他参数
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
@@ -181,6 +183,7 @@ class RedisHelper:
:return: 存在返回True否则返回False
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
return self.client.exists(redis_key) == 1
except Exception as e:
@@ -196,6 +199,7 @@ class RedisHelper:
:return: 返回缓存的值如果缓存不存在返回None
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
value = self.client.get(redis_key)
if value is not None:
@@ -213,6 +217,7 @@ class RedisHelper:
:param region: 缓存的区
"""
try:
self._connect()
redis_key = self.get_redis_key(region, key)
self.client.delete(redis_key)
except Exception as e:
@@ -225,6 +230,7 @@ class RedisHelper:
:param region: 缓存的区
"""
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
@@ -239,10 +245,302 @@ class RedisHelper:
except Exception as e:
logger.error(f"Failed to clear cache, region: {region}, error: {e}")
def items(self, region: Optional[str] = None):
"""
获取指定区域的所有缓存键值对
:param region: 缓存的区
:return: 返回键值对生成器
"""
try:
self._connect()
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
for key in self.client.scan_iter(redis_key):
value = self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
else:
for key in self.client.scan_iter("*"):
value = self.client.get(key)
if value is not None:
yield key, self.deserialize(value)
except Exception as e:
logger.error(f"Failed to get items from Redis, region: {region}, error: {e}")
def test(self) -> bool:
"""
测试Redis连接性
"""
try:
self._connect()
return True
except Exception as e:
logger.error(f"Redis connection test failed: {e}")
return False
def close(self) -> None:
"""
关闭Redis客户端的连接池
"""
if self.client:
self.client.close()
self.client = None
logger.debug("Redis connection closed")
class AsyncRedisHelper(metaclass=Singleton):
"""
异步Redis连接和操作助手类单例模式
特性:
- 管理异步Redis连接池和客户端
- 提供序列化和反序列化功能
- 支持内存限制和淘汰策略设置
- 提供键名生成和区域管理功能
- 所有操作都是异步的
"""
# 类型缓存集合,针对非容器简单类型
_complex_serializable_types = set()
_simple_serializable_types = set()
def __init__(self, redis_url: Optional[str] = "redis://localhost"):
"""
初始化异步Redis助手实例
:param redis_url: Redis服务的URL
"""
self.redis_url = redis_url
self.client: Optional[Redis] = None
async def _connect(self):
"""
建立异步Redis连接
"""
try:
if self.client is None:
self.client = Redis.from_url(
self.redis_url,
decode_responses=False,
socket_timeout=30,
socket_connect_timeout=5,
health_check_interval=60,
)
# 测试连接确保Redis可用
await self.client.ping()
logger.info(f"Successfully connected to Redis (async){self.redis_url}")
await self.set_memory_limit()
except Exception as e:
logger.error(f"Failed to connect to Redis (async): {e}")
raise RuntimeError("Redis async connection failed") from e
@eventmanager.register(EventType.ConfigChanged)
async def handle_config_changed(self, event: Event):
"""
处理配置变更事件更新Redis设置
:param event: 事件对象
"""
if not event:
return
event_data: ConfigChangeEventData = event.event_data
if event_data.key not in ['CACHE_BACKEND_TYPE', 'CACHE_BACKEND_URL', 'CACHE_REDIS_MAXMEMORY']:
return
logger.info("配置变更重连Redis (async)...")
await self.close()
await self._connect()
async def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"):
"""
动态设置Redis最大内存和内存淘汰策略
:param policy: 淘汰策略(如'allkeys-lru'
"""
try:
# 如果有显式值则直接使用为0时说明不限制如果未配置开启BIG_MEMORY_MODE时为"1024mb",未开启时为"256mb"
maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb")
await self.client.config_set("maxmemory", maxmemory)
await self.client.config_set("maxmemory-policy", policy)
logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy} (async)")
except Exception as e:
logger.error(f"Failed to set Redis maxmemory or policy (async): {e}")
@staticmethod
def is_container_type(t):
"""
判断是否为容器类型
"""
return t in (list, dict, tuple, set)
@classmethod
def serialize(cls, value: Any) -> bytes:
"""
将值序列化为二进制数据,根据序列化方式标识格式
"""
vt = type(value)
# 针对非容器类型使用缓存策略
if not cls.is_container_type(vt):
# 如果已知需要复杂序列化
if vt in cls._complex_serializable_types:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 如果已知可以简单序列化
if vt in cls._simple_serializable_types:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
# 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化
try:
json_data = json.dumps(value).encode("utf-8")
cls._simple_serializable_types.add(vt)
return b"JSON" + b"\x00" + json_data
except TypeError:
cls._complex_serializable_types.add(vt)
return b"PICKLE" + b"\x00" + pickle.dumps(value)
# 针对容器类型,每次尝试简单序列化,不使用缓存
else:
try:
json_data = json.dumps(value).encode("utf-8")
return b"JSON" + b"\x00" + json_data
except TypeError:
return b"PICKLE" + b"\x00" + pickle.dumps(value)
@classmethod
def deserialize(cls, value: bytes) -> Any:
"""
将二进制数据反序列化为原始值,根据格式标识区分序列化方式
"""
format_marker, data = value.split(b"\x00", 1)
if format_marker == b"JSON":
return json.loads(data.decode("utf-8"))
elif format_marker == b"PICKLE":
return pickle.loads(data)
else:
raise ValueError("Unknown serialization format")
@staticmethod
def get_region(region: Optional[str] = "DEFAULT"):
"""
获取缓存的区
"""
return f"region:{region}" if region else "region:default"
def get_redis_key(self, region: str, key: str) -> str:
"""
获取缓存Key
"""
# 使用region作为缓存键的一部分
region = self.get_region(quote(region))
return f"{region}:key:{quote(key)}"
async def set(self, key: str, value: Any, ttl: Optional[int] = None,
region: Optional[str] = "DEFAULT", **kwargs) -> None:
"""
异步设置缓存
:param key: 缓存的键
:param value: 缓存的值
:param ttl: 缓存的存活时间,单位秒
:param region: 缓存的区
:param kwargs: 其他参数
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
# 对值进行序列化
serialized_value = self.serialize(value)
kwargs.pop("maxsize", None)
await self.client.set(redis_key, serialized_value, ex=ttl, **kwargs)
except Exception as e:
logger.error(f"Failed to set key (async): {key} in region: {region}, error: {e}")
async def exists(self, key: str, region: Optional[str] = "DEFAULT") -> bool:
"""
异步判断缓存键是否存在
:param key: 缓存的键
:param region: 缓存的区
:return: 存在返回True否则返回False
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
result = await self.client.exists(redis_key)
return result == 1
except Exception as e:
logger.error(f"Failed to exists key (async): {key} region: {region}, error: {e}")
return False
async def get(self, key: str, region: Optional[str] = "DEFAULT") -> Optional[Any]:
"""
异步获取缓存的值
:param key: 缓存的键
:param region: 缓存的区
:return: 返回缓存的值如果缓存不存在返回None
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
value = await self.client.get(redis_key)
if value is not None:
return self.deserialize(value)
return None
except Exception as e:
logger.error(f"Failed to get key (async): {key} in region: {region}, error: {e}")
return None
async def delete(self, key: str, region: Optional[str] = "DEFAULT") -> None:
"""
异步删除缓存
:param key: 缓存的键
:param region: 缓存的区
"""
try:
await self._connect()
redis_key = self.get_redis_key(region, key)
await self.client.delete(redis_key)
except Exception as e:
logger.error(f"Failed to delete key (async): {key} in region: {region}, error: {e}")
async def clear(self, region: Optional[str] = None) -> None:
"""
异步清除指定区域的缓存或全部缓存
:param region: 缓存的区
"""
try:
await self._connect()
if region:
cache_region = self.get_region(quote(region))
redis_key = f"{cache_region}:key:*"
async with self.client.pipeline() as pipe:
async for key in self.client.scan_iter(redis_key):
await pipe.delete(key)
await pipe.execute()
logger.info(f"Cleared Redis cache for region (async): {region}")
else:
await self.client.flushdb()
logger.info("Cleared all Redis cache (async)")
except Exception as e:
logger.error(f"Failed to clear cache (async), region: {region}, error: {e}")
async def test(self) -> bool:
"""
异步测试Redis连接性
"""
try:
await self._connect()
return True
except Exception as e:
logger.error(f"Redis async connection test failed: {e}")
return False
async def close(self) -> None:
"""
关闭异步Redis客户端的连接池
"""
if self.client:
await self.client.close()
self.client = None
logger.debug("Redis async connection closed")

View File

@@ -1,7 +1,7 @@
from threading import Thread
from typing import List, Tuple, Optional
from app.core.cache import cached, cache_backend
from app.core.cache import cached
from app.core.config import settings
from app.db.subscribe_oper import SubscribeOper
from app.db.systemconfig_oper import SystemConfigOper
@@ -111,7 +111,12 @@ class SubscribeHelper(metaclass=WeakSingleton):
if res and res.status_code == 200:
# 清除缓存
if clear_cache:
cache_backend.clear(region=self._shares_cache_region)
self.get_shares.cache_clear()
self.get_statistic.cache_clear()
self.get_share_statistics.cache_clear()
self.async_get_shares.cache_clear()
self.async_get_statistic.cache_clear()
self.async_get_share_statistics.cache_clear()
return True, ""
else:
return False, res.json().get("message")

View File

@@ -1,7 +1,7 @@
import json
from typing import List, Tuple, Optional
from app.core.cache import cached, cache_backend
from app.core.cache import cached
from app.core.config import settings
from app.db.models import Workflow
from app.db.workflow_oper import WorkflowOper
@@ -89,7 +89,8 @@ class WorkflowHelper(metaclass=WeakSingleton):
if success:
# 清除缓存
if clear_cache:
cache_backend.clear(region=self._shares_cache_region)
self.get_shares.cache_clear()
self.async_get_shares.cache_clear()
return True, ""
else:
try:

View File

@@ -1,24 +1,19 @@
import pickle
import random
import time
import traceback
from pathlib import Path
from threading import RLock
from typing import Optional
from app.core.cache import get_cache_backend
from app.core.config import settings
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo
from app.helper.redis import RedisHelper
from app.log import logger
from app.schemas.types import MediaType
from app.utils.singleton import WeakSingleton
lock = RLock()
CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CONF.meta
class DoubanCache(metaclass=WeakSingleton):
"""
@@ -34,32 +29,23 @@ class DoubanCache(metaclass=WeakSingleton):
_douban_cache_expire: bool = True
def __init__(self):
# 初始化Redis缓存助手
self._redis_helper = None
if settings.CACHE_BACKEND_TYPE == "redis":
try:
self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL)
except RuntimeError as e:
logger.warning(f"豆瓣缓存Redis初始化失败将使用本地缓存: {e}")
self._redis_helper = None
# 加载本地缓存数据
self._meta_path = settings.TEMP_PATH / "__douban_cache__"
if not self._redis_helper:
self._meta_data = self.__load(self._meta_path)
self.maxsize = settings.CONF.douban
self.ttl = settings.CONF.meta
self.region = "__douban_cache__"
self._meta_filepath = settings.TEMP_PATH / self.region
# 初始化缓存
self._cache = get_cache_backend(maxsize=self.maxsize, ttl=self.ttl)
# 非Redis加载本地缓存数据
if not self._cache.is_redis():
for key, value in self.__load(self._meta_filepath).items():
self._cache.set(key, value)
def clear(self):
"""
清空所有豆瓣缓存
"""
with lock:
self._meta_data = {}
# 如果Redis可用同时清理Redis缓存
if self._redis_helper:
try:
self._redis_helper.clear(region="douban_cache")
logger.debug("已清理豆瓣Redis缓存")
except Exception as e:
logger.warning(f"清理豆瓣Redis缓存失败: {e}")
self._cache.clear(region=self.region)
@staticmethod
def __get_key(meta: MetaBase) -> str:
@@ -74,28 +60,8 @@ class DoubanCache(metaclass=WeakSingleton):
根据KEY值获取缓存值
"""
key = self.__get_key(meta)
if self._redis_helper:
# 如果Redis可用从Redis读取
try:
redis_data = self._redis_helper.get(key, region="douban_cache")
return redis_data or {}
except Exception as e:
logger.warning(f"从Redis获取豆瓣缓存失败: {e}")
else:
# Redis不可用时从内存缓存读取
with lock:
info: dict = self._meta_data.get(key)
if info:
# 检查过期时间
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire or int(time.time()) < expire:
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
self._meta_data[key] = info
elif expire and self._douban_cache_expire:
self.delete(key)
return info or {}
return {}
with lock:
return self._cache.get(key, region=self.region) or {}
def delete(self, key: str) -> dict:
"""
@@ -103,18 +69,12 @@ class DoubanCache(metaclass=WeakSingleton):
@param key: 缓存key
@return: 被删除的缓存内容
"""
if self._redis_helper:
# 如果Redis可用删除Redis缓存
try:
self._redis_helper.delete(key, region="douban_cache")
return {}
except Exception as e:
logger.warning(f"删除豆瓣Redis缓存失败: {e}")
return {}
else:
# Redis不可用时删除内存缓存
with lock:
return self._meta_data.pop(key, {})
with lock:
redis_data = self._cache.get(key, region=self.region)
if redis_data:
self._cache.delete(key, region=self.region)
return redis_data
return {}
def modify(self, key: str, title: str) -> dict:
"""
@@ -123,24 +83,13 @@ class DoubanCache(metaclass=WeakSingleton):
@param title: 标题
@return: 被修改后缓存内容
"""
if self._redis_helper:
# 如果Redis可用修改Redis缓存
try:
redis_data = self._redis_helper.get(key, region="douban_cache")
if redis_data:
redis_data['title'] = title
self._redis_helper.set(key, redis_data, ttl=EXPIRE_TIMESTAMP, region="douban_cache")
return redis_data
except Exception as e:
logger.warning(f"修改豆瓣Redis缓存失败: {e}")
with lock:
redis_data = self._cache.get(key, region=self.region)
if redis_data:
redis_data["title"] = title
self._cache.set(key, redis_data, region=self.region)
return redis_data
return {}
else:
# Redis不可用时修改内存缓存
with lock:
if self._meta_data.get(key):
self._meta_data[key]['title'] = title
self._meta_data[key][CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
return self._meta_data.get(key)
@staticmethod
def __load(path: Path) -> dict:
@@ -152,10 +101,9 @@ class DoubanCache(metaclass=WeakSingleton):
with open(path, 'rb') as f:
data = pickle.load(f)
return data
return {}
except Exception as e:
logger.error(f"加载缓存失败: {str(e)} - {traceback.format_exc()}")
return {}
return {}
def update(self, meta: MetaBase, info: dict) -> None:
"""
@@ -184,99 +132,41 @@ class DoubanCache(metaclass=WeakSingleton):
if not poster_path and info.get("cover"):
poster_path = info.get("cover").get("url")
if self._redis_helper:
# 如果Redis可用保存到Redis
cache_data = {
with lock:
self._cache.set(self.__get_key(meta), {
"id": info.get("id"),
"type": mtype,
"year": cache_year,
"title": cache_title,
"poster_path": poster_path
}
try:
self._redis_helper.set(self.__get_key(meta), cache_data, ttl=EXPIRE_TIMESTAMP,
region="douban_cache")
except Exception as e:
logger.warning(f"保存豆瓣缓存到Redis失败: {e}")
else:
# Redis不可用时保存到内存缓存
with lock:
cache_data = {
"id": info.get("id"),
"type": mtype,
"year": cache_year,
"title": cache_title,
"poster_path": poster_path,
CACHE_EXPIRE_TIMESTAMP_STR: int(time.time()) + EXPIRE_TIMESTAMP
}
self._meta_data[self.__get_key(meta)] = cache_data
}, region=self.region)
elif info is not None:
# None时不缓存此时代表网络错误允许重复请求
if self._redis_helper:
try:
self._redis_helper.set(self.__get_key(meta), {'id': "0"}, ttl=EXPIRE_TIMESTAMP,
region="douban_cache")
except Exception as e:
logger.warning(f"保存豆瓣缓存到Redis失败: {e}")
else:
with lock:
self._meta_data[self.__get_key(meta)] = {'id': "0"}
with lock:
self._cache.set(self.__get_key(meta), {
"id": 0
}, region=self.region)
def save(self, force: Optional[bool] = False) -> None:
"""
保存缓存数据到文件
"""
# 如果Redis可用,不需要保存到本地文件
if self._redis_helper:
# Redis不需要保存到本地文件
if self._cache.is_redis():
return
# Redis不可用时保存到本地文件
meta_data = self.__load(self._meta_path)
new_meta_data = {k: v for k, v in self._meta_data.items() if v.get("id")}
# 本地文件
meta_data = self.__load(self._meta_filepath)
# 当前缓存数据(去除无法识别)
new_meta_data = {k: v for k, v in self._cache.items(region=self.region) if v.get("id")}
if not force \
and not self._random_sample(new_meta_data) \
and meta_data.keys() == new_meta_data.keys():
return
with open(self._meta_path, 'wb') as f:
# 写入本地
with open(self._meta_filepath, 'wb') as f:
pickle.dump(new_meta_data, f, pickle.HIGHEST_PROTOCOL) # noqa
def _random_sample(self, new_meta_data: dict) -> bool:
"""
采样分析是否需要保存
"""
ret = False
if len(new_meta_data) < 25:
keys = list(new_meta_data.keys())
for k in keys:
info = new_meta_data.get(k)
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire:
ret = True
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
elif int(time.time()) >= expire:
ret = True
if self._douban_cache_expire:
new_meta_data.pop(k)
else:
count = 0
keys = random.sample(sorted(new_meta_data.keys()), 25)
for k in keys:
info = new_meta_data.get(k)
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire:
ret = True
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
elif int(time.time()) >= expire:
ret = True
if self._douban_cache_expire:
new_meta_data.pop(k)
count += 1
if count >= 5:
ret |= self._random_sample(new_meta_data)
return ret
def __del__(self):
self.save()

View File

@@ -51,9 +51,10 @@ class RedisModule(_ModuleBase):
"""
if settings.CACHE_BACKEND_TYPE != "redis":
return None
redis_helper = RedisHelper()
try:
redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL)
if redis_helper.test():
return True, ""
return False, "Redis连接失败请检查配置"
finally:
redis_helper.close()
except RuntimeError as e:
return False, f"Redis连接失败{e}"
return True, ""

View File

@@ -1,22 +1,17 @@
import pickle
import random
import time
import traceback
from pathlib import Path
from threading import RLock
from app.core.cache import get_cache_backend
from app.core.config import settings
from app.core.meta import MetaBase
from app.helper.redis import RedisHelper
from app.log import logger
from app.schemas.types import MediaType
from app.utils.singleton import WeakSingleton
lock = RLock()
CACHE_EXPIRE_TIMESTAMP_STR = "cache_expire_timestamp"
EXPIRE_TIMESTAMP = settings.CONF.meta
class TmdbCache(metaclass=WeakSingleton):
"""
@@ -32,33 +27,23 @@ class TmdbCache(metaclass=WeakSingleton):
_tmdb_cache_expire: bool = True
def __init__(self):
# 初始化Redis缓存助手
self._redis_helper = None
if settings.CACHE_BACKEND_TYPE == "redis":
try:
self._redis_helper = RedisHelper(redis_url=settings.CACHE_BACKEND_URL)
except RuntimeError as e:
logger.warning(f"TMDB缓存Redis初始化失败将使用本地缓存: {e}")
self._redis_helper = None
# 加载缓存数据
self._meta_path = settings.TEMP_PATH / "__tmdb_cache__"
if not self._redis_helper:
self._meta_data = self.__load(self._meta_path)
self.maxsize = settings.CONF.douban
self.ttl = settings.CONF.meta
self.region = "__tmdb_cache__"
self._meta_filepath = settings.TEMP_PATH / self.region
# 初始化缓存
self._cache = get_cache_backend(maxsize=self.maxsize, ttl=self.ttl)
# 非Redis加载本地缓存数据
if not self._cache.is_redis():
for key, value in self.__load(self._meta_filepath).items():
self._cache.set(key, value)
def clear(self):
"""
清空所有TMDB缓存
"""
with lock:
self._meta_data = {}
# 如果Redis可用同时清理Redis缓存
if self._redis_helper:
try:
self._redis_helper.clear(region="tmdb_cache")
logger.debug("已清理TMDB Redis缓存")
except Exception as e:
logger.warning(f"清理TMDB Redis缓存失败: {e}")
self._cache.clear(region=self.region)
@staticmethod
def __get_key(meta: MetaBase) -> str:
@@ -73,27 +58,8 @@ class TmdbCache(metaclass=WeakSingleton):
"""
key = self.__get_key(meta)
if self._redis_helper:
# 如果Redis可用从Redis读取
try:
redis_data = self._redis_helper.get(key, region="tmdb_cache")
return redis_data or {}
except Exception as e:
logger.warning(f"从Redis获取TMDB缓存失败: {e}")
else:
# Redis不可用时从内存缓存读取
with lock:
info: dict = self._meta_data.get(key)
if info:
# 检查过期时间
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire or int(time.time()) < expire:
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
self._meta_data[key] = info
elif expire and self._tmdb_cache_expire:
self.delete(key)
return info or {}
return {}
with lock:
return self._cache.get(key, region=self.region) or {}
def delete(self, key: str) -> dict:
"""
@@ -101,18 +67,12 @@ class TmdbCache(metaclass=WeakSingleton):
@param key: 缓存key
@return: 被删除的缓存内容
"""
if self._redis_helper:
# 如果Redis可用删除Redis缓存
try:
self._redis_helper.delete(key, region="tmdb_cache")
return {}
except Exception as e:
logger.warning(f"删除TMDB Redis缓存失败: {e}")
return {}
else:
# Redis不可用时删除内存缓存
with lock:
return self._meta_data.pop(key, {})
with lock:
redis_data = self._cache.get(key, region=self.region)
if redis_data:
self._cache.delete(key, region=self.region)
return redis_data
return {}
def modify(self, key: str, title: str) -> dict:
"""
@@ -121,24 +81,13 @@ class TmdbCache(metaclass=WeakSingleton):
@param title: 标题
@return: 被修改后缓存内容
"""
if self._redis_helper:
# 如果Redis可用修改Redis缓存
try:
redis_data = self._redis_helper.get(key, region="tmdb_cache")
if redis_data:
redis_data['title'] = title
self._redis_helper.set(key, redis_data, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache")
return redis_data
except Exception as e:
logger.warning(f"修改TMDB Redis缓存失败: {e}")
with lock:
redis_data = self._cache.get(key, region=self.region)
if redis_data:
redis_data['title'] = title
self._cache.set(key, redis_data, region=self.region)
return redis_data
return {}
else:
# Redis不可用时修改内存缓存
with lock:
if self._meta_data.get(key):
self._meta_data[key]['title'] = title
self._meta_data[key][CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
return self._meta_data.get(key)
@staticmethod
def __load(path: Path) -> dict:
@@ -158,6 +107,7 @@ class TmdbCache(metaclass=WeakSingleton):
"""
新增或更新缓存条目
"""
key = self.__get_key(meta)
if info:
# 缓存标题
cache_title = info.get("title") \
@@ -168,8 +118,8 @@ class TmdbCache(metaclass=WeakSingleton):
if cache_year:
cache_year = cache_year[:4]
if self._redis_helper:
# 如果Redis可用保存到Redis
with lock:
# 缓存数据
cache_data = {
"id": info.get("id"),
"type": info.get("media_type"),
@@ -178,89 +128,32 @@ class TmdbCache(metaclass=WeakSingleton):
"poster_path": info.get("poster_path"),
"backdrop_path": info.get("backdrop_path")
}
try:
self._redis_helper.set(self.__get_key(meta), cache_data, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache")
except Exception as e:
logger.warning(f"保存TMDB缓存到Redis失败: {e}")
else:
# Redis不可用时保存到内存缓存
with lock:
cache_data = {
"id": info.get("id"),
"type": info.get("media_type"),
"year": cache_year,
"title": cache_title,
"poster_path": info.get("poster_path"),
"backdrop_path": info.get("backdrop_path"),
CACHE_EXPIRE_TIMESTAMP_STR: int(time.time()) + EXPIRE_TIMESTAMP
}
self._meta_data[self.__get_key(meta)] = cache_data
self._cache.set(key, cache_data, region=self.region)
elif info is not None:
# None时不缓存此时代表网络错误允许重复请求
if self._redis_helper:
try:
self._redis_helper.set(self.__get_key(meta), {'id': 0}, ttl=EXPIRE_TIMESTAMP, region="tmdb_cache")
except Exception as e:
logger.warning(f"保存TMDB缓存到Redis失败: {e}")
else:
with lock:
self._meta_data[self.__get_key(meta)] = {'id': 0}
with lock:
self._cache.set(key, {"id": 0}, region=self.region)
def save(self, force: bool = False) -> None:
"""
保存缓存数据到文件
"""
# 如果Redis可用,不需要保存到本地文件
if self._redis_helper:
# Redis不需要保存到本地文件
if self._cache.is_redis():
return
# Redis不可用时保存到本地文件
meta_data = self.__load(self._meta_path)
new_meta_data = {k: v for k, v in self._meta_data.items() if v.get("id")}
meta_data = self.__load(self._meta_filepath)
# 当前缓存,去除无法识别
new_meta_data = {k: v for k, v in self._cache.items(region=self.region) if v.get("id")}
if not force \
and not self._random_sample(new_meta_data) \
and meta_data.keys() == new_meta_data.keys():
return
with open(self._meta_path, 'wb') as f:
with open(self._meta_filepath, 'wb') as f:
pickle.dump(new_meta_data, f, pickle.HIGHEST_PROTOCOL) # type: ignore
def _random_sample(self, new_meta_data: dict) -> bool:
"""
采样分析是否需要保存
"""
ret = False
if len(new_meta_data) < 25:
keys = list(new_meta_data.keys())
for k in keys:
info = new_meta_data.get(k)
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire:
ret = True
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
elif int(time.time()) >= expire:
ret = True
if self._tmdb_cache_expire:
new_meta_data.pop(k)
else:
count = 0
keys = random.sample(sorted(new_meta_data.keys()), 25)
for k in keys:
info = new_meta_data.get(k)
expire = info.get(CACHE_EXPIRE_TIMESTAMP_STR)
if not expire:
ret = True
info[CACHE_EXPIRE_TIMESTAMP_STR] = int(time.time()) + EXPIRE_TIMESTAMP
elif int(time.time()) >= expire:
ret = True
if self._tmdb_cache_expire:
new_meta_data.pop(k)
count += 1
if count >= 5:
ret |= self._random_sample(new_meta_data)
return ret
def __del__(self):
self.save()

View File

@@ -18,7 +18,7 @@ from app.chain.subscribe import SubscribeChain
from app.chain.transfer import TransferChain
from app.chain.workflow import WorkflowChain
from app.core.config import settings
from app.core.event import EventManager, eventmanager, Event
from app.core.event import eventmanager, Event
from app.core.plugin import PluginManager
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper

View File

@@ -1,5 +1,7 @@
import sys
from app.helper.redis import RedisHelper, AsyncRedisHelper
# SitesHelper涉及资源包拉取提前引入并容错提示
try:
from app.helper.sites import SitesHelper # noqa
@@ -12,7 +14,6 @@ except ImportError as e:
from app.utils.system import SystemUtils
from app.log import logger
from app.core.config import settings
from app.core.cache import close_cache
from app.core.module import ModuleManager
from app.core.event import EventManager
from app.helper.thread import ThreadHelper
@@ -119,8 +120,9 @@ async def stop_modules():
ThreadHelper().shutdown()
# 停止消息服务
stop_message()
# 停止缓存连接
close_cache()
# 关闭Redis缓存连接
RedisHelper().close()
await AsyncRedisHelper().close()
# 停止数据库连接
await close_database()
# 停止前端服务