import json import pickle from typing import Any, Optional from urllib.parse import quote import redis from app.core.config import settings from app.log import logger class RedisHelper: """ Redis连接和操作助手类 特性: - 管理Redis连接池和客户端 - 提供序列化和反序列化功能 - 支持内存限制和淘汰策略设置 - 提供键名生成和区域管理功能 """ # 类型缓存集合,针对非容器简单类型 _complex_serializable_types = set() _simple_serializable_types = set() def __init__(self, redis_url: Optional[str] = "redis://localhost"): """ 初始化Redis助手实例 :param redis_url: Redis服务的URL """ self.redis_url = redis_url self.client = None self._connect() def _connect(self): """ 建立Redis连接 """ try: self.client = redis.Redis.from_url( self.redis_url, decode_responses=False, socket_timeout=30, socket_connect_timeout=5, health_check_interval=60, ) # 测试连接,确保Redis可用 self.client.ping() logger.debug(f"Successfully connected to Redis") self.set_memory_limit() except Exception as e: logger.error(f"Failed to connect to Redis: {e}") raise RuntimeError("Redis connection failed") from e def set_memory_limit(self, policy: Optional[str] = "allkeys-lru"): """ 动态设置Redis最大内存和内存淘汰策略 :param policy: 淘汰策略(如'allkeys-lru') """ try: # 如果有显式值,则直接使用,为0时说明不限制,如果未配置,开启BIG_MEMORY_MODE时为"1024mb",未开启时为"256mb" maxmemory = settings.CACHE_REDIS_MAXMEMORY or ("1024mb" if settings.BIG_MEMORY_MODE else "256mb") self.client.config_set("maxmemory", maxmemory) self.client.config_set("maxmemory-policy", policy) logger.debug(f"Redis maxmemory set to {maxmemory}, policy: {policy}") except Exception as e: logger.error(f"Failed to set Redis maxmemory or policy: {e}") @staticmethod def is_container_type(t): """ 判断是否为容器类型 """ return t in (list, dict, tuple, set) @classmethod def serialize(cls, value: Any) -> bytes: """ 将值序列化为二进制数据,根据序列化方式标识格式 """ vt = type(value) # 针对非容器类型使用缓存策略 if not cls.is_container_type(vt): # 如果已知需要复杂序列化 if vt in cls._complex_serializable_types: return b"PICKLE" + b"\x00" + pickle.dumps(value) # 如果已知可以简单序列化 if vt in cls._simple_serializable_types: json_data = json.dumps(value).encode("utf-8") return b"JSON" + b"\x00" + json_data # 对于未知的非容器类型,尝试简单序列化,如抛出异常,再使用复杂序列化 try: json_data = json.dumps(value).encode("utf-8") cls._simple_serializable_types.add(vt) return b"JSON" + b"\x00" + json_data except TypeError: cls._complex_serializable_types.add(vt) return b"PICKLE" + b"\x00" + pickle.dumps(value) # 针对容器类型,每次尝试简单序列化,不使用缓存 else: try: json_data = json.dumps(value).encode("utf-8") return b"JSON" + b"\x00" + json_data except TypeError: return b"PICKLE" + b"\x00" + pickle.dumps(value) @classmethod def deserialize(cls, value: bytes) -> Any: """ 将二进制数据反序列化为原始值,根据格式标识区分序列化方式 """ format_marker, data = value.split(b"\x00", 1) if format_marker == b"JSON": return json.loads(data.decode("utf-8")) elif format_marker == b"PICKLE": return pickle.loads(data) else: raise ValueError("Unknown serialization format") @staticmethod def get_region(region: Optional[str] = "DEFAULT"): """ 获取缓存的区 """ return f"region:{region}" if region else "region:default" def get_redis_key(self, region: str, key: str) -> str: """ 获取缓存Key """ # 使用region作为缓存键的一部分 region = self.get_region(quote(region)) return f"{region}:key:{quote(key)}" def set(self, key: str, value: Any, ttl: Optional[int] = None, region: Optional[str] = "DEFAULT", **kwargs) -> None: """ 设置缓存 :param key: 缓存的键 :param value: 缓存的值 :param ttl: 缓存的存活时间,单位秒 :param region: 缓存的区 :param kwargs: 其他参数 """ try: redis_key = self.get_redis_key(region, key) # 对值进行序列化 serialized_value = self.serialize(value) kwargs.pop("maxsize", None) self.client.set(redis_key, serialized_value, ex=ttl, **kwargs) except Exception as e: logger.error(f"Failed to set key: {key} in region: {region}, error: {e}") def exists(self, key: str, region: Optional[str] = "DEFAULT") -> bool: """ 判断缓存键是否存在 :param key: 缓存的键 :param region: 缓存的区 :return: 存在返回True,否则返回False """ try: redis_key = self.get_redis_key(region, key) return self.client.exists(redis_key) == 1 except Exception as e: logger.error(f"Failed to exists key: {key} region: {region}, error: {e}") return False def get(self, key: str, region: Optional[str] = "DEFAULT") -> Optional[Any]: """ 获取缓存的值 :param key: 缓存的键 :param region: 缓存的区 :return: 返回缓存的值,如果缓存不存在返回None """ try: redis_key = self.get_redis_key(region, key) value = self.client.get(redis_key) if value is not None: return self.deserialize(value) return None except Exception as e: logger.error(f"Failed to get key: {key} in region: {region}, error: {e}") return None def delete(self, key: str, region: Optional[str] = "DEFAULT") -> None: """ 删除缓存 :param key: 缓存的键 :param region: 缓存的区 """ try: redis_key = self.get_redis_key(region, key) self.client.delete(redis_key) except Exception as e: logger.error(f"Failed to delete key: {key} in region: {region}, error: {e}") def clear(self, region: Optional[str] = None) -> None: """ 清除指定区域的缓存或全部缓存 :param region: 缓存的区 """ try: if region: cache_region = self.get_region(quote(region)) redis_key = f"{cache_region}:key:*" with self.client.pipeline() as pipe: for key in self.client.scan_iter(redis_key): pipe.delete(key) pipe.execute() logger.info(f"Cleared Redis cache for region: {region}") else: self.client.flushdb() logger.info("Cleared all Redis cache") except Exception as e: logger.error(f"Failed to clear cache, region: {region}, error: {e}") def close(self) -> None: """ 关闭Redis客户端的连接池 """ if self.client: self.client.close() logger.debug("Redis connection closed")