From ccaeb7662cf7335b4ba9f0ee13c96119641770df Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 18 Jun 2026 18:07:37 +0800 Subject: [PATCH] feat(redis): add configurable connection pool settings for Redis client --- app/core/config.py | 4 ++ app/helper/redis.py | 89 +++++++++++++++++++++------ tests/test_cache_system.py | 119 ++++++++++++++++++++++++++++++++++++- 3 files changed, 193 insertions(+), 19 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 5948f50a..598796cc 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -160,6 +160,10 @@ class ConfigModel(BaseModel): CACHE_BACKEND_URL: Optional[str] = "redis://localhost:6379" # Redis 缓存最大内存限制,未配置时,如开启大内存模式时为 "1024mb",未开启时为 "256mb" CACHE_REDIS_MAXMEMORY: Optional[str] = None + # Redis 连接池最大连接数 + CACHE_REDIS_MAX_CONNECTIONS: int = 256 + # Redis 连接池耗尽时等待可用连接的时间(秒) + CACHE_REDIS_POOL_TIMEOUT: int = 3 # 全局图片缓存,将媒体图片缓存到本地 GLOBAL_IMAGE_CACHE: bool = False # 全局图片缓存保留天数 diff --git a/app/helper/redis.py b/app/helper/redis.py index 23191735..e6031fbc 100644 --- a/app/helper/redis.py +++ b/app/helper/redis.py @@ -1,10 +1,12 @@ import asyncio import json import pickle +import threading from typing import Any, Optional, Generator, Tuple, AsyncGenerator, Union from urllib.parse import quote, unquote import redis +from redis.asyncio import BlockingConnectionPool as AsyncBlockingConnectionPool from redis.asyncio import Redis from app.core.config import settings @@ -83,7 +85,13 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton): - 支持内存限制和淘汰策略设置 - 提供键名生成和区域管理功能 """ - CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"} + CONFIG_WATCH = { + "CACHE_BACKEND_TYPE", + "CACHE_BACKEND_URL", + "CACHE_REDIS_MAXMEMORY", + "CACHE_REDIS_MAX_CONNECTIONS", + "CACHE_REDIS_POOL_TIMEOUT", + } def __init__(self): """ @@ -91,31 +99,46 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton): """ self.redis_url = settings.CACHE_BACKEND_URL self.client = None + self._connect_lock = threading.RLock() def _connect(self): """ 建立Redis连接 """ + if self.client is not None: + return + client = None try: - if self.client is None: - self.client = redis.Redis.from_url( + with self._connect_lock: + if self.client is not None: + return + self.redis_url = settings.CACHE_BACKEND_URL + connection_pool = redis.BlockingConnectionPool.from_url( self.redis_url, decode_responses=False, socket_timeout=_socket_timeout, socket_connect_timeout=_socket_connect_timeout, health_check_interval=_health_check_interval, + max_connections=settings.CACHE_REDIS_MAX_CONNECTIONS, + timeout=settings.CACHE_REDIS_POOL_TIMEOUT, ) + client = redis.Redis(connection_pool=connection_pool) # 测试连接,确保Redis可用 - self.client.ping() + client.ping() + self.client = client logger.info(f"Successfully connected to Redis:{self.redis_url}") self.set_memory_limit() except Exception as e: + if client: + client.close() logger.error(f"Failed to connect to Redis: {e}") self.client = None raise RuntimeError("Redis connection failed") from e def on_config_changed(self): - self.close() + with self._connect_lock: + self.redis_url = settings.CACHE_BACKEND_URL + self.close() self._connect() def get_reload_name(self): @@ -296,10 +319,11 @@ class RedisHelper(ConfigReloadMixin, metaclass=Singleton): """ 关闭Redis客户端的连接池 """ - if self.client: - self.client.close() - self.client = None - logger.debug("Redis connection closed") + with self._connect_lock: + if self.client: + self.client.close() + self.client = None + logger.debug("Redis connection closed") class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton): @@ -313,7 +337,13 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton): - 提供键名生成和区域管理功能 - 所有操作都是异步的 """ - CONFIG_WATCH = {"CACHE_BACKEND_TYPE", "CACHE_BACKEND_URL", "CACHE_REDIS_MAXMEMORY"} + CONFIG_WATCH = { + "CACHE_BACKEND_TYPE", + "CACHE_BACKEND_URL", + "CACHE_REDIS_MAXMEMORY", + "CACHE_REDIS_MAX_CONNECTIONS", + "CACHE_REDIS_POOL_TIMEOUT", + } def __init__(self): """ @@ -322,31 +352,53 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton): self.redis_url = settings.CACHE_BACKEND_URL self.client: Optional[Redis] = None self._loop: Optional[asyncio.AbstractEventLoop] = None + self._connect_lock: Optional[asyncio.Lock] = None + self._connect_lock_loop: Optional[asyncio.AbstractEventLoop] = None + + def _get_connect_lock(self, current_loop: asyncio.AbstractEventLoop) -> asyncio.Lock: + """ + 获取当前事件循环对应的异步连接锁 + """ + if self._connect_lock is None or self._connect_lock_loop is not current_loop: + self._connect_lock = asyncio.Lock() + self._connect_lock_loop = current_loop + return self._connect_lock async def _connect(self): """ 建立异步Redis连接 """ + current_loop = asyncio.get_running_loop() + connect_lock = self._get_connect_lock(current_loop) + client = None try: - current_loop = asyncio.get_running_loop() - # 检测事件循环是否发生变化,如果变化则重新连接 - if self.client is not None and self._loop is not current_loop: - logger.debug("Event loop changed, reconnecting Redis (async)") - await self._close_client() - if self.client is None: - self.client = Redis.from_url( + async with connect_lock: + # 检测事件循环是否发生变化,如果变化则重新连接 + if self.client is not None and self._loop is not current_loop: + logger.debug("Event loop changed, reconnecting Redis (async)") + await self._close_client() + if self.client is not None: + return + self.redis_url = settings.CACHE_BACKEND_URL + connection_pool = AsyncBlockingConnectionPool.from_url( self.redis_url, decode_responses=False, socket_timeout=_socket_timeout, socket_connect_timeout=_socket_connect_timeout, health_check_interval=_health_check_interval, + max_connections=settings.CACHE_REDIS_MAX_CONNECTIONS, + timeout=settings.CACHE_REDIS_POOL_TIMEOUT, ) + client = Redis(connection_pool=connection_pool) self._loop = current_loop # 测试连接,确保Redis可用 - await self.client.ping() + await client.ping() + self.client = client logger.info(f"Successfully connected to Redis (async):{self.redis_url}") await self.set_memory_limit() except Exception as e: + if client: + await client.close() logger.error(f"Failed to connect to Redis (async): {e}") self.client = None self._loop = None @@ -365,6 +417,7 @@ class AsyncRedisHelper(ConfigReloadMixin, metaclass=Singleton): self._loop = None async def on_config_changed(self): + self.redis_url = settings.CACHE_BACKEND_URL await self._close_client() await self._connect() diff --git a/tests/test_cache_system.py b/tests/test_cache_system.py index 4cb9c4c9..06dc3a72 100644 --- a/tests/test_cache_system.py +++ b/tests/test_cache_system.py @@ -1,7 +1,8 @@ import asyncio from app.core.cache import AsyncFileBackend, FileBackend, MemoryBackend -from app.helper.redis import RedisHelper +from app.core.config import settings +from app.helper.redis import AsyncRedisHelper, RedisHelper def test_file_backend_items_keep_relative_keys_and_bytes(tmp_path): @@ -49,6 +50,122 @@ def test_redis_original_key_decodes_quoted_key(): assert RedisHelper._RedisHelper__get_original_key(redis_key) == "nested/poster one.jpg" +def test_redis_helper_uses_blocking_pool_settings(monkeypatch): + """ + Redis 同步客户端应使用阻塞连接池,避免并发峰值直接耗尽 Redis 连接数。 + """ + calls = {} + + class FakeClient: + """模拟同步 Redis 客户端。""" + + def __init__(self, connection_pool): + self.connection_pool = connection_pool + self.config_calls = [] + self.closed = False + + def ping(self): + """模拟 Redis ping。""" + calls["ping"] = True + + def config_set(self, key, value): + """记录 Redis 配置写入。""" + self.config_calls.append((key, value)) + + def close(self): + """标记客户端已关闭。""" + self.closed = True + + def fake_from_url(url, **kwargs): + """记录连接池构造参数。""" + calls["pool"] = {"url": url, **kwargs} + return "pool" + + monkeypatch.setattr(settings, "CACHE_BACKEND_URL", "redis://cache:6379/2") + monkeypatch.setattr(settings, "CACHE_REDIS_MAX_CONNECTIONS", 7) + monkeypatch.setattr(settings, "CACHE_REDIS_POOL_TIMEOUT", 3) + monkeypatch.setattr("app.helper.redis.redis.BlockingConnectionPool.from_url", fake_from_url) + monkeypatch.setattr("app.helper.redis.redis.Redis", FakeClient) + + helper = RedisHelper() + helper.close() + helper._connect() + + assert calls["pool"]["url"] == "redis://cache:6379/2" + assert calls["pool"]["max_connections"] == 7 + assert calls["pool"]["timeout"] == 3 + assert calls["pool"]["decode_responses"] is False + assert calls["ping"] is True + assert ("maxmemory-policy", "allkeys-lru") in helper.client.config_calls + + helper.close() + + +def test_async_redis_helper_uses_blocking_pool_settings(monkeypatch): + """ + Redis 异步客户端应使用阻塞连接池,避免高并发缓存读取立刻抛出连接耗尽错误。 + """ + calls = {} + + class FakeAsyncClient: + """模拟异步 Redis 客户端。""" + + def __init__(self, connection_pool): + self.connection_pool = connection_pool + self.config_calls = [] + self.closed = False + + async def ping(self): + """模拟 Redis ping。""" + calls["ping"] = True + + async def config_set(self, key, value): + """记录 Redis 配置写入。""" + self.config_calls.append((key, value)) + + async def close(self): + """标记客户端已关闭。""" + self.closed = True + + def fake_from_url(url, **kwargs): + """记录连接池构造参数。""" + calls["pool"] = {"url": url, **kwargs} + return "async_pool" + + async def run_connect(): + helper = AsyncRedisHelper() + await helper.close() + await helper._connect() + config_calls = list(helper.client.config_calls) + await helper.close() + return config_calls + + monkeypatch.setattr(settings, "CACHE_BACKEND_URL", "redis://cache:6379/3") + monkeypatch.setattr(settings, "CACHE_REDIS_MAX_CONNECTIONS", 9) + monkeypatch.setattr(settings, "CACHE_REDIS_POOL_TIMEOUT", 4) + monkeypatch.setattr("app.helper.redis.AsyncBlockingConnectionPool.from_url", fake_from_url) + monkeypatch.setattr("app.helper.redis.Redis", FakeAsyncClient) + + config_calls = asyncio.run(run_connect()) + + assert calls["pool"]["url"] == "redis://cache:6379/3" + assert calls["pool"]["max_connections"] == 9 + assert calls["pool"]["timeout"] == 4 + assert calls["pool"]["decode_responses"] is False + assert calls["ping"] is True + assert ("maxmemory-policy", "allkeys-lru") in config_calls + + +def test_redis_helpers_watch_pool_settings(): + """ + Redis 连接池配置变化应触发客户端重建。 + """ + assert "CACHE_REDIS_MAX_CONNECTIONS" in RedisHelper.CONFIG_WATCH + assert "CACHE_REDIS_POOL_TIMEOUT" in RedisHelper.CONFIG_WATCH + assert "CACHE_REDIS_MAX_CONNECTIONS" in AsyncRedisHelper.CONFIG_WATCH + assert "CACHE_REDIS_POOL_TIMEOUT" in AsyncRedisHelper.CONFIG_WATCH + + def test_async_file_backend_missing_region_has_no_items(tmp_path): """ 异步文件缓存缺失区域时应返回空迭代,而不是伪造空 key。