Files
MyGoNavi/internal/redis/redis_impl.go
Syngnat 494484eb92 Release/0.5.1 (#149)
* 🐛 fix(data-viewer): 修复ClickHouse尾部分页异常并增强DuckDB复杂类型兼容

- DataViewer 新增 ClickHouse 反向分页策略,修复最后页与倒数页查询失败
- DuckDB 查询失败时按列类型生成安全 SELECT,复杂类型转 VARCHAR 重试
- 分页状态统一使用 currentPage 回填,避免页码与总数推导不一致
- 增强查询异常日志与重试路径,降低大表场景卡顿与误报

*  feat(frontend-driver): 驱动管理支持快速搜索并优化信息展示

- 新增搜索框,支持按 DuckDB/ClickHouse 等关键字快速定位驱动
- 显示“匹配 x / y”统计与无结果提示
- 优化头部区域排版,提升透明/暗色场景下的视觉对齐

* 🔧 fix(connection-modal): 修复多数据源URI导入解析并校正Oracle服务名校验

- 新增单主机URI解析映射,兼容 postgres/postgresql、sqlserver、redis、tdengine、dameng(dm)、kingbase、highgo、vastbase、clickhouse、oracle
- 抽取 parseSingleHostUri 复用逻辑,统一 host/port/user/password/database 回填行为
- Oracle 连接新增服务名必填校验,移除“服务名为空回退用户名”的隐式逻辑
- 连接弹窗补充 Oracle 服务名输入项与 URI 示例

* 🐛 fix(query-export): 修复查询结果导出卡住并统一按数据源能力控制导出路径

- 查询结果页导出增加稳定兜底,异常时确保 loading 关闭避免持续转圈
- DataGrid 导出逻辑按数据源能力分流,优先走后端 ExportQuery 并保留结果集导出降级
- QueryEditor 传递结果导出 SQL,保证查询结果导出范围与当前结果一致
- 后端补充 ExportData/ExportQuery 关键日志,提升导出链路可观测性

* 🐛 fix(precision): 修复查询链路与分页统计的大整数精度丢失

- 代理响应数据解码改为 UseNumber,避免默认 float64 吞精度
- 统一归一化 json.Number 与超界整数,超出 JS 安全范围转字符串
- 修复 DataViewer 总数解析,超大值不再误转 Number 参与分页
- refs #142

* 🐛 fix(driver-manager): 修复驱动管理网络告警重复并强化代理引导

- 新增下载链路域名探测,区分“GitHub可达但驱动下载链路不可达”
- 网络不可达场景仅保留红色强提醒,移除重复二级告警
- 强提醒增加“打开全局代理设置”入口,优先引导使用 GoNavi 全局代理
- 统一网络检测与目录说明提示图标尺寸,修复加载期视觉不一致
- refs #141

* ♻️ refactor(frontend-interaction): 统一标签拖拽与暗色主题交互实现

- 重构Tab拖拽排序实现,统一为可配置拖拽引擎
- 规范拖拽与点击事件边界,提升交互一致性
- 统一多组件暗色透明样式策略,减少硬编码色值
- 提升Redis/表格/连接面板在透明模式下的观感一致性
- refs #144

* ♻️ refactor(update-state): 重构在线更新状态流并按版本统一进度展示

- 重构更新检查与下载状态同步流程,减少前后端状态分叉
- 进度展示严格绑定 latestVersion,避免跨版本状态串用
- 优化 about 打开场景的静默检查状态回填逻辑
- 统一下载弹窗关闭/后台隐藏行为
- 保持现有安装流程并补齐目录打开能力

* 🎨 style(sidebar-log): 将SQL执行日志入口调整为悬浮胶囊样式

- 移除侧栏底部整条日志入口容器
- 新增悬浮按钮阴影/边框/透明背景并适配明暗主题
- 为树区域预留底部空间避免入口遮挡内容

*  feat(redis-cluster): 支持集群模式逻辑多库隔离与 0-15 库切换

- 前端恢复 Redis 集群场景下 db0-db15 的数据库选择与展示
- 后端新增集群逻辑库命名空间前缀映射,统一 key/pattern 读写隔离
- 覆盖扫描、读取、写入、删除、重命名等核心操作的键映射规则
- 集群命令通道支持 SELECT 逻辑切库与 FLUSHDB 逻辑库清空
- refs #145

*  feat(DataGrid): 大数据表虚拟滚动性能优化及UI一致性修复

- 启用动态虚拟滚动(数据量≥500行自动切换),解决万行数据表卡顿问题
- 虚拟模式下EditableCell改用div渲染,CSS选择器从元素级改为类级适配虚拟DOM
- 修复虚拟模式双水平滚动条:样式化rc-virtual-list内置滚动条为胶囊外观,禁用自定义外部滚动条
- 为rc-virtual-list水平滚动条添加鼠标滚轮支持(MutationObserver + marginLeft驱动)
- 修复白色主题透明模式下列名悬浮Tooltip对比度不足的问题
- 新增白色主题全局滚动条样式适配透明模式(App.css)
- App.tsx主题token与组件样式优化
- refs #147

* 🔧 chore(app): 清理 App.tsx 类型告警并收敛前端壳层实现

- 清除未使用代码和冗余状态
- 替换弃用 API 以消除 IDE 提示
- 显式处理浮动 Promise 避免告警
- 保持现有更新检查和代理设置行为不变

---------

Co-authored-by: Syngnat <yangguofeng919@gmail.com>
2026-03-03 14:35:17 +08:00

1279 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package redis
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"GoNavi-Wails/internal/connection"
"GoNavi-Wails/internal/logger"
"GoNavi-Wails/internal/ssh"
"github.com/redis/go-redis/v9"
)
// RedisClientImpl implements RedisClient using go-redis
type RedisClientImpl struct {
client redis.UniversalClient
singleClient *redis.Client
clusterClient *redis.ClusterClient
config connection.ConnectionConfig
currentDB int
isCluster bool
seedAddrs []string
forwarder *ssh.LocalForwarder
}
const (
redisScanDefaultTargetCount int64 = 2000
redisScanMaxTargetCount int64 = 10000
redisScanMinStepCount int64 = 200
redisScanMaxStepCount int64 = 2000
redisScanMaxRounds = 64
redisScanMaxDuration = 12 * time.Second
redisSearchMaxTargetCount int64 = 1000
redisSearchMaxStepCount int64 = 1000
redisSearchMaxRounds = 16
redisSearchMaxDuration = 3 * time.Second
)
// NewRedisClient creates a new Redis client instance
func NewRedisClient() RedisClient {
return &RedisClientImpl{}
}
func normalizeRedisTimeout(timeoutSeconds int) time.Duration {
if timeoutSeconds <= 0 {
return 30 * time.Second
}
return time.Duration(timeoutSeconds) * time.Second
}
func normalizeRedisSeedAddress(raw string, defaultPort int) (string, error) {
addr := strings.TrimSpace(raw)
if addr == "" {
return "", fmt.Errorf("Redis 节点地址不能为空")
}
if _, _, err := net.SplitHostPort(addr); err == nil {
return addr, nil
}
if !strings.Contains(addr, ":") {
return net.JoinHostPort(addr, strconv.Itoa(defaultPort)), nil
}
// 尝试兼容 host:port 但端口格式异常的场景。
host, port, ok := strings.Cut(addr, ":")
if !ok {
return "", fmt.Errorf("无效 Redis 节点地址: %s", addr)
}
host = strings.TrimSpace(host)
port = strings.TrimSpace(port)
if host == "" {
return "", fmt.Errorf("无效 Redis 节点地址: %s", addr)
}
if _, err := strconv.Atoi(port); err != nil {
return "", fmt.Errorf("无效 Redis 端口: %s", addr)
}
return net.JoinHostPort(host, port), nil
}
func buildRedisSeedAddrs(config connection.ConnectionConfig) ([]string, error) {
defaultPort := config.Port
if defaultPort <= 0 {
defaultPort = 6379
}
candidates := make([]string, 0, 1+len(config.Hosts))
if strings.TrimSpace(config.Host) != "" {
candidates = append(candidates, fmt.Sprintf("%s:%d", strings.TrimSpace(config.Host), defaultPort))
}
candidates = append(candidates, config.Hosts...)
seen := make(map[string]struct{}, len(candidates))
addrs := make([]string, 0, len(candidates))
for _, candidate := range candidates {
normalized, err := normalizeRedisSeedAddress(candidate, defaultPort)
if err != nil {
return nil, err
}
if _, exists := seen[normalized]; exists {
continue
}
seen[normalized] = struct{}{}
addrs = append(addrs, normalized)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("Redis 连接地址不能为空")
}
return addrs, nil
}
func (r *RedisClientImpl) redisNamespacePrefixForDB(index int) string {
if !r.isCluster || index <= 0 {
return ""
}
// Redis Cluster 仅支持物理 db0这里用固定前缀模拟逻辑库隔离。
return fmt.Sprintf("__gonavi_db_%d__:", index)
}
func (r *RedisClientImpl) redisNamespacePrefix() string {
return r.redisNamespacePrefixForDB(r.currentDB)
}
func (r *RedisClientImpl) toPhysicalKey(key string) string {
trimmed := strings.TrimSpace(key)
if trimmed == "" {
return ""
}
prefix := r.redisNamespacePrefix()
if prefix == "" || strings.HasPrefix(trimmed, prefix) {
return trimmed
}
return prefix + trimmed
}
func (r *RedisClientImpl) toPhysicalPattern(pattern string) string {
normalized := strings.TrimSpace(pattern)
if normalized == "" {
normalized = "*"
}
prefix := r.redisNamespacePrefix()
if prefix == "" {
return normalized
}
return prefix + normalized
}
func (r *RedisClientImpl) toPhysicalKeys(keys []string) []string {
if len(keys) == 0 {
return nil
}
result := make([]string, 0, len(keys))
for _, key := range keys {
physical := r.toPhysicalKey(key)
if physical == "" {
continue
}
result = append(result, physical)
}
return result
}
func (r *RedisClientImpl) toDisplayKey(key string) string {
prefix := r.redisNamespacePrefix()
if prefix == "" {
return key
}
return strings.TrimPrefix(key, prefix)
}
// Connect establishes a connection to Redis
func (r *RedisClientImpl) Connect(config connection.ConnectionConfig) error {
r.config = config
if r.config.RedisDB < 0 || r.config.RedisDB > 15 {
r.config.RedisDB = 0
}
r.currentDB = r.config.RedisDB
r.forwarder = nil
r.client = nil
r.singleClient = nil
r.clusterClient = nil
r.isCluster = false
seedAddrs, err := buildRedisSeedAddrs(config)
if err != nil {
return err
}
r.seedAddrs = append([]string(nil), seedAddrs...)
topology := strings.ToLower(strings.TrimSpace(config.Topology))
r.isCluster = topology == "cluster" || len(seedAddrs) > 1
if r.isCluster && config.UseSSH {
return fmt.Errorf("Redis 集群模式暂不支持 SSH 隧道,请关闭 SSH 后重试")
}
timeout := normalizeRedisTimeout(config.Timeout)
if r.isCluster {
opts := &redis.ClusterOptions{
Addrs: seedAddrs,
Username: strings.TrimSpace(config.User),
Password: config.Password,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
clusterClient := redis.NewClusterClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := clusterClient.Ping(ctx).Err(); err != nil {
clusterClient.Close()
return fmt.Errorf("Redis 集群连接失败: %w", err)
}
r.client = clusterClient
r.clusterClient = clusterClient
logger.Infof("Redis 集群连接成功: seeds=%s 逻辑库=db%d", strings.Join(seedAddrs, ","), r.currentDB)
return nil
}
addr := seedAddrs[0]
if config.UseSSH {
forwarder, err := ssh.GetOrCreateLocalForwarder(config.SSH, config.Host, config.Port)
if err != nil {
return fmt.Errorf("创建 SSH 隧道失败: %w", err)
}
r.forwarder = forwarder
addr = forwarder.LocalAddr
logger.Infof("Redis 通过 SSH 隧道连接: %s -> %s:%d", addr, config.Host, config.Port)
}
opts := &redis.Options{
Addr: addr,
Username: strings.TrimSpace(config.User),
Password: config.Password,
DB: r.currentDB,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
singleClient := redis.NewClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := singleClient.Ping(ctx).Err(); err != nil {
singleClient.Close()
return fmt.Errorf("Redis 连接失败: %w", err)
}
r.client = singleClient
r.singleClient = singleClient
logger.Infof("Redis 连接成功: %s DB=%d", addr, r.currentDB)
return nil
}
// Close closes the Redis connection
func (r *RedisClientImpl) Close() error {
if r.client != nil {
err := r.client.Close()
r.client = nil
r.singleClient = nil
r.clusterClient = nil
r.isCluster = false
r.seedAddrs = nil
r.forwarder = nil
return err
}
return nil
}
// Ping tests the connection
func (r *RedisClientImpl) Ping() error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Ping(ctx).Err()
}
// ScanKeys scans keys matching a pattern
func (r *RedisClientImpl) ScanKeys(pattern string, cursor uint64, count int64) (*RedisScanResult, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
if pattern == "" {
pattern = "*"
}
physicalPattern := r.toPhysicalPattern(pattern)
isSearchPattern := pattern != "*"
targetCount := normalizeRedisScanTargetCount(count)
scanStepCount := normalizeRedisScanStepCount(targetCount)
maxRounds := redisScanMaxRounds
maxDuration := redisScanMaxDuration
if isSearchPattern {
if targetCount > redisSearchMaxTargetCount {
targetCount = redisSearchMaxTargetCount
}
if scanStepCount > redisSearchMaxStepCount {
scanStepCount = redisSearchMaxStepCount
}
maxRounds = redisSearchMaxRounds
maxDuration = redisSearchMaxDuration
}
ctx, cancel := context.WithTimeout(context.Background(), maxDuration+5*time.Second)
defer cancel()
currentCursor := cursor
round := 0
scanStartedAt := time.Now()
keys := make([]string, 0, int(targetCount))
seen := make(map[string]struct{}, int(targetCount))
for len(keys) < int(targetCount) {
if time.Since(scanStartedAt) >= maxDuration {
break
}
batch, nextCursor, err := r.client.Scan(ctx, currentCursor, physicalPattern, scanStepCount).Result()
if err != nil {
return nil, err
}
for _, key := range batch {
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
keys = append(keys, key)
if len(keys) >= int(targetCount) {
break
}
}
currentCursor = nextCursor
round++
if currentCursor == 0 || round >= maxRounds {
break
}
}
return &RedisScanResult{
Keys: r.loadRedisKeyInfos(ctx, keys),
Cursor: strconv.FormatUint(currentCursor, 10),
}, nil
}
func normalizeRedisScanTargetCount(count int64) int64 {
if count <= 0 {
return redisScanDefaultTargetCount
}
if count > redisScanMaxTargetCount {
return redisScanMaxTargetCount
}
return count
}
func normalizeRedisScanStepCount(targetCount int64) int64 {
if targetCount < redisScanMinStepCount {
return redisScanMinStepCount
}
if targetCount > redisScanMaxStepCount {
return redisScanMaxStepCount
}
return targetCount
}
func (r *RedisClientImpl) loadRedisKeyInfos(ctx context.Context, keys []string) []RedisKeyInfo {
result := make([]RedisKeyInfo, 0, len(keys))
if len(keys) == 0 {
return result
}
pipe := r.client.Pipeline()
typeResults := make([]*redis.StatusCmd, len(keys))
ttlResults := make([]*redis.DurationCmd, len(keys))
for i, key := range keys {
typeResults[i] = pipe.Type(ctx, key)
ttlResults[i] = pipe.TTL(ctx, key)
}
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
for _, key := range keys {
keyType, typeErr := r.client.Type(ctx, key).Result()
if typeErr != nil && typeErr != redis.Nil {
keyType = ""
}
ttlValue, ttlErr := r.client.TTL(ctx, key).Result()
if ttlErr != nil && ttlErr != redis.Nil {
ttlValue = -2
}
result = append(result, RedisKeyInfo{
Key: r.toDisplayKey(key),
Type: keyType,
TTL: toRedisTTLSeconds(ttlValue),
})
}
return result
}
for i, key := range keys {
result = append(result, RedisKeyInfo{
Key: r.toDisplayKey(key),
Type: typeResults[i].Val(),
TTL: toRedisTTLSeconds(ttlResults[i].Val()),
})
}
return result
}
func toRedisTTLSeconds(ttl time.Duration) int64 {
if ttl == -1 {
return -1
}
if ttl == -2 {
return -2
}
return int64(ttl.Seconds())
}
// GetKeyType returns the type of a key
func (r *RedisClientImpl) GetKeyType(key string) (string, error) {
if r.client == nil {
return "", fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Type(ctx, r.toPhysicalKey(key)).Result()
}
// GetTTL returns the TTL of a key in seconds
func (r *RedisClientImpl) GetTTL(key string) (int64, error) {
if r.client == nil {
return 0, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ttl, err := r.client.TTL(ctx, r.toPhysicalKey(key)).Result()
if err != nil {
return 0, err
}
if ttl == -1 {
return -1, nil // No expiry
} else if ttl == -2 {
return -2, nil // Key doesn't exist
}
return int64(ttl.Seconds()), nil
}
// SetTTL sets the TTL of a key
func (r *RedisClientImpl) SetTTL(key string, ttl int64) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if ttl < 0 {
// Remove expiry
return r.client.Persist(ctx, r.toPhysicalKey(key)).Err()
}
return r.client.Expire(ctx, r.toPhysicalKey(key), time.Duration(ttl)*time.Second).Err()
}
// DeleteKeys deletes one or more keys
func (r *RedisClientImpl) DeleteKeys(keys []string) (int64, error) {
if r.client == nil {
return 0, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
physicalKeys := r.toPhysicalKeys(keys)
if len(physicalKeys) == 0 {
return 0, nil
}
return r.client.Del(ctx, physicalKeys...).Result()
}
// RenameKey renames a key
func (r *RedisClientImpl) RenameKey(oldKey, newKey string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Rename(ctx, r.toPhysicalKey(oldKey), r.toPhysicalKey(newKey)).Err()
}
// KeyExists checks if a key exists
func (r *RedisClientImpl) KeyExists(key string) (bool, error) {
if r.client == nil {
return false, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
n, err := r.client.Exists(ctx, r.toPhysicalKey(key)).Result()
return n > 0, err
}
// GetValue gets the value of a key with automatic type detection
func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
keyType, err := r.GetKeyType(key)
if err != nil {
return nil, err
}
ttl, _ := r.GetTTL(key)
physicalKey := r.toPhysicalKey(key)
result := &RedisValue{
Type: keyType,
TTL: ttl,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
switch keyType {
case "string":
val, err := r.client.Get(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
result.Value = val
result.Length = int64(len(val))
case "hash":
val, err := r.client.HGetAll(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
result.Value = val
result.Length = int64(len(val))
case "list":
length, err := r.client.LLen(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
// Get first 1000 items
limit := int64(1000)
if length < limit {
limit = length
}
val, err := r.client.LRange(ctx, physicalKey, 0, limit-1).Result()
if err != nil {
return nil, err
}
result.Value = val
result.Length = length
case "set":
length, err := r.client.SCard(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
// Get members using SMembers (limited by Redis server)
members, err := r.client.SMembers(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
result.Value = members
result.Length = length
case "zset":
length, err := r.client.ZCard(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
// Get first 1000 members with scores
limit := int64(1000)
if length < limit {
limit = length
}
val, err := r.client.ZRangeWithScores(ctx, physicalKey, 0, limit-1).Result()
if err != nil {
return nil, err
}
members := make([]ZSetMember, len(val))
for i, z := range val {
members[i] = ZSetMember{
Member: z.Member.(string),
Score: z.Score,
}
}
result.Value = members
result.Length = length
case "stream":
length, err := r.client.XLen(ctx, physicalKey).Result()
if err != nil {
return nil, err
}
result.Length = length
if length == 0 {
result.Value = []StreamEntry{}
break
}
limit := int64(1000)
if length < limit {
limit = length
}
val, err := r.client.XRangeN(ctx, physicalKey, "-", "+", limit).Result()
if err != nil {
return nil, err
}
result.Value = toStreamEntries(val)
default:
return nil, fmt.Errorf("不支持的 Redis 数据类型: %s", keyType)
}
return result, nil
}
// GetString gets a string value
func (r *RedisClientImpl) GetString(key string) (string, error) {
if r.client == nil {
return "", fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.Get(ctx, r.toPhysicalKey(key)).Result()
}
// SetString sets a string value with optional TTL
func (r *RedisClientImpl) SetString(key, value string, ttl int64) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var expiration time.Duration
if ttl > 0 {
expiration = time.Duration(ttl) * time.Second
}
return r.client.Set(ctx, r.toPhysicalKey(key), value, expiration).Err()
}
// GetHash gets all fields of a hash
func (r *RedisClientImpl) GetHash(key string) (map[string]string, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.HGetAll(ctx, r.toPhysicalKey(key)).Result()
}
// SetHashField sets a field in a hash
func (r *RedisClientImpl) SetHashField(key, field, value string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.HSet(ctx, r.toPhysicalKey(key), field, value).Err()
}
// DeleteHashField deletes fields from a hash
func (r *RedisClientImpl) DeleteHashField(key string, fields ...string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.HDel(ctx, r.toPhysicalKey(key), fields...).Err()
}
// GetList gets a range of elements from a list
func (r *RedisClientImpl) GetList(key string, start, stop int64) ([]string, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.LRange(ctx, r.toPhysicalKey(key), start, stop).Result()
}
// ListPush pushes values to the end of a list
func (r *RedisClientImpl) ListPush(key string, values ...string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
args := make([]interface{}, len(values))
for i, v := range values {
args[i] = v
}
return r.client.RPush(ctx, r.toPhysicalKey(key), args...).Err()
}
// ListSet sets the value at an index in a list
func (r *RedisClientImpl) ListSet(key string, index int64, value string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.LSet(ctx, r.toPhysicalKey(key), index, value).Err()
}
// GetSet gets all members of a set
func (r *RedisClientImpl) GetSet(key string) ([]string, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.SMembers(ctx, r.toPhysicalKey(key)).Result()
}
// SetAdd adds members to a set
func (r *RedisClientImpl) SetAdd(key string, members ...string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
args := make([]interface{}, len(members))
for i, m := range members {
args[i] = m
}
return r.client.SAdd(ctx, r.toPhysicalKey(key), args...).Err()
}
// SetRemove removes members from a set
func (r *RedisClientImpl) SetRemove(key string, members ...string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
args := make([]interface{}, len(members))
for i, m := range members {
args[i] = m
}
return r.client.SRem(ctx, r.toPhysicalKey(key), args...).Err()
}
// GetZSet gets members with scores from a sorted set
func (r *RedisClientImpl) GetZSet(key string, start, stop int64) ([]ZSetMember, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
val, err := r.client.ZRangeWithScores(ctx, r.toPhysicalKey(key), start, stop).Result()
if err != nil {
return nil, err
}
members := make([]ZSetMember, len(val))
for i, z := range val {
members[i] = ZSetMember{
Member: z.Member.(string),
Score: z.Score,
}
}
return members, nil
}
// ZSetAdd adds members to a sorted set
func (r *RedisClientImpl) ZSetAdd(key string, members ...ZSetMember) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
zMembers := make([]redis.Z, len(members))
for i, m := range members {
zMembers[i] = redis.Z{
Score: m.Score,
Member: m.Member,
}
}
return r.client.ZAdd(ctx, r.toPhysicalKey(key), zMembers...).Err()
}
// ZSetRemove removes members from a sorted set
func (r *RedisClientImpl) ZSetRemove(key string, members ...string) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
args := make([]interface{}, len(members))
for i, m := range members {
args[i] = m
}
return r.client.ZRem(ctx, r.toPhysicalKey(key), args...).Err()
}
// GetStream gets stream entries in a range
func (r *RedisClientImpl) GetStream(key, start, stop string, count int64) ([]StreamEntry, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
if start == "" {
start = "-"
}
if stop == "" {
stop = "+"
}
if count <= 0 {
count = 1000
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
val, err := r.client.XRangeN(ctx, r.toPhysicalKey(key), start, stop, count).Result()
if err != nil {
return nil, err
}
return toStreamEntries(val), nil
}
// StreamAdd adds an entry to a stream
func (r *RedisClientImpl) StreamAdd(key string, fields map[string]string, id string) (string, error) {
if r.client == nil {
return "", fmt.Errorf("Redis 客户端未连接")
}
if len(fields) == 0 {
return "", fmt.Errorf("Stream 字段不能为空")
}
if id == "" {
id = "*"
}
values := make(map[string]interface{}, len(fields))
for field, value := range fields {
values[field] = value
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
newID, err := r.client.XAdd(ctx, &redis.XAddArgs{
Stream: r.toPhysicalKey(key),
ID: id,
Values: values,
}).Result()
if err != nil {
return "", err
}
return newID, nil
}
// StreamDelete deletes entries from a stream by IDs
func (r *RedisClientImpl) StreamDelete(key string, ids ...string) (int64, error) {
if r.client == nil {
return 0, fmt.Errorf("Redis 客户端未连接")
}
if len(ids) == 0 {
return 0, fmt.Errorf("Stream ID 不能为空")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.XDel(ctx, r.toPhysicalKey(key), ids...).Result()
}
func toStreamEntries(messages []redis.XMessage) []StreamEntry {
entries := make([]StreamEntry, 0, len(messages))
for _, msg := range messages {
fields := make(map[string]string, len(msg.Values))
for field, value := range msg.Values {
fields[field] = fmt.Sprint(value)
}
entries = append(entries, StreamEntry{
ID: msg.ID,
Fields: fields,
})
}
return entries
}
func parseRedisCommandGetKeysResult(result interface{}) []string {
items, ok := result.([]interface{})
if !ok || len(items) == 0 {
return nil
}
keys := make([]string, 0, len(items))
for _, item := range items {
switch v := item.(type) {
case string:
if v != "" {
keys = append(keys, v)
}
case []byte:
text := string(v)
if text != "" {
keys = append(keys, text)
}
}
}
return keys
}
func (r *RedisClientImpl) rewriteCommandArgsForNamespace(ctx context.Context, args []string) []string {
if !r.isCluster || r.currentDB <= 0 || len(args) == 0 {
return args
}
command := strings.ToUpper(strings.TrimSpace(args[0]))
if command == "COMMAND" || command == "SELECT" || command == "FLUSHDB" {
return args
}
probeArgs := make([]interface{}, 0, len(args)+2)
probeArgs = append(probeArgs, "COMMAND", "GETKEYS")
for _, arg := range args {
probeArgs = append(probeArgs, arg)
}
result, err := r.client.Do(ctx, probeArgs...).Result()
if err != nil {
return args
}
keyCandidates := parseRedisCommandGetKeysResult(result)
if len(keyCandidates) == 0 {
return args
}
rewritten := append([]string(nil), args...)
used := make([]bool, len(rewritten))
for _, key := range keyCandidates {
for i := 1; i < len(rewritten); i++ {
if used[i] {
continue
}
if rewritten[i] != key {
continue
}
rewritten[i] = r.toPhysicalKey(rewritten[i])
used[i] = true
break
}
}
return rewritten
}
// ExecuteCommand executes a raw Redis command
func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
if len(args) == 0 {
return nil, fmt.Errorf("命令不能为空")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if r.isCluster {
command := strings.ToUpper(strings.TrimSpace(args[0]))
switch command {
case "SELECT":
if len(args) < 2 {
return nil, fmt.Errorf("SELECT 命令缺少数据库索引")
}
index, err := strconv.Atoi(strings.TrimSpace(args[1]))
if err != nil {
return nil, fmt.Errorf("无效数据库索引: %s", args[1])
}
if index < 0 || index > 15 {
return nil, fmt.Errorf("数据库索引必须在 0-15 之间")
}
r.currentDB = index
r.config.RedisDB = index
return "OK", nil
case "FLUSHDB":
if err := r.FlushDB(); err != nil {
return nil, err
}
return "OK", nil
}
}
args = r.rewriteCommandArgsForNamespace(ctx, args)
// Convert to []interface{}
cmdArgs := make([]interface{}, len(args))
for i, arg := range args {
cmdArgs[i] = arg
}
result, err := r.client.Do(ctx, cmdArgs...).Result()
if err != nil {
return nil, err
}
return formatCommandResult(result), nil
}
// formatCommandResult formats the command result for display
func formatCommandResult(result interface{}) interface{} {
switch v := result.(type) {
case []interface{}:
formatted := make([]interface{}, len(v))
for i, item := range v {
formatted[i] = formatCommandResult(item)
}
return formatted
case []byte:
return string(v)
default:
return v
}
}
// GetServerInfo returns server information
func (r *RedisClientImpl) GetServerInfo() (map[string]string, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
info, err := r.client.Info(ctx).Result()
if err != nil {
return nil, err
}
result := make(map[string]string)
lines := strings.Split(info, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, ":", 2)
if len(parts) == 2 {
result[parts[0]] = parts[1]
}
}
return result, nil
}
// GetDatabases returns information about all databases
func (r *RedisClientImpl) GetDatabases() ([]RedisDBInfo, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if r.isCluster && r.clusterClient != nil {
var totalKeys int64
var mu sync.Mutex
err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error {
keys, err := node.DBSize(nodeCtx).Result()
if err != nil {
return err
}
mu.Lock()
totalKeys += keys
mu.Unlock()
return nil
})
if err != nil {
logger.Warnf("Redis 集群获取 key 数量失败,回退为 0: %v", err)
totalKeys = 0
}
result := make([]RedisDBInfo, 16)
for i := 0; i < 16; i++ {
result[i] = RedisDBInfo{Index: i, Keys: 0}
}
result[0].Keys = totalKeys
return result, nil
}
// Get keyspace info
info, err := r.client.Info(ctx, "keyspace").Result()
if err != nil {
return nil, err
}
// Parse keyspace info
dbMap := make(map[int]int64)
lines := strings.Split(info, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "db") {
// Format: db0:keys=123,expires=0,avg_ttl=0
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
continue
}
dbIndex, err := strconv.Atoi(strings.TrimPrefix(parts[0], "db"))
if err != nil {
continue
}
// Parse keys count
kvPairs := strings.Split(parts[1], ",")
for _, kv := range kvPairs {
if strings.HasPrefix(kv, "keys=") {
keys, _ := strconv.ParseInt(strings.TrimPrefix(kv, "keys="), 10, 64)
dbMap[dbIndex] = keys
break
}
}
}
}
// Return all 16 databases (0-15)
result := make([]RedisDBInfo, 16)
for i := 0; i < 16; i++ {
result[i] = RedisDBInfo{
Index: i,
Keys: dbMap[i], // Will be 0 if not in map
}
}
return result, nil
}
// SelectDB selects a database
func (r *RedisClientImpl) SelectDB(index int) error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
if r.isCluster {
if index < 0 || index > 15 {
return fmt.Errorf("数据库索引必须在 0-15 之间")
}
r.currentDB = index
r.config.RedisDB = index
return nil
}
if index < 0 || index > 15 {
return fmt.Errorf("数据库索引必须在 0-15 之间")
}
// Create new client with different DB
addr := ""
if len(r.seedAddrs) > 0 {
addr = r.seedAddrs[0]
}
if r.forwarder != nil {
addr = r.forwarder.LocalAddr
}
if addr == "" {
addr = fmt.Sprintf("%s:%d", r.config.Host, r.config.Port)
}
timeout := normalizeRedisTimeout(r.config.Timeout)
opts := &redis.Options{
Addr: addr,
Username: strings.TrimSpace(r.config.User),
Password: r.config.Password,
DB: index,
DialTimeout: timeout,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
newClient := redis.NewClient(opts)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := newClient.Ping(ctx).Err(); err != nil {
newClient.Close()
return fmt.Errorf("切换数据库失败: %w", err)
}
// Close old client and replace
if r.client != nil {
_ = r.client.Close()
}
r.client = newClient
r.singleClient = newClient
r.clusterClient = nil
r.currentDB = index
r.config.RedisDB = index
logger.Infof("Redis 切换到数据库: db%d", index)
return nil
}
// GetCurrentDB returns the current database index
func (r *RedisClientImpl) GetCurrentDB() int {
return r.currentDB
}
// FlushDB flushes the current database
func (r *RedisClientImpl) FlushDB() error {
if r.client == nil {
return fmt.Errorf("Redis 客户端未连接")
}
if r.isCluster && r.clusterClient != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
namespacePrefix := r.redisNamespacePrefix()
var deletedTotal int64
var deletedMu sync.Mutex
err := r.clusterClient.ForEachMaster(ctx, func(nodeCtx context.Context, node *redis.Client) error {
var cursor uint64
for {
pattern := "*"
if namespacePrefix != "" {
pattern = namespacePrefix + "*"
}
keys, nextCursor, err := node.Scan(nodeCtx, cursor, pattern, 2000).Result()
if err != nil {
return err
}
if namespacePrefix == "" {
filtered := keys[:0]
for _, key := range keys {
// db0 保留兼容:不删除逻辑库前缀 key避免误清理 db1~db15。
if strings.HasPrefix(key, "__gonavi_db_") {
continue
}
filtered = append(filtered, key)
}
keys = filtered
}
if len(keys) > 0 {
deleted, err := node.Del(nodeCtx, keys...).Result()
if err != nil {
return err
}
deletedMu.Lock()
deletedTotal += deleted
deletedMu.Unlock()
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return nil
})
if err != nil {
return err
}
logger.Infof("Redis 集群逻辑库清空完成: db%d deleted=%d", r.currentDB, deletedTotal)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return r.client.FlushDB(ctx).Err()
}