feat(redis-stream): 支持 Redis Stream 类型查看与消息增删

- 后端扩展 RedisClient 接口,新增 StreamEntry 与 Stream 操作定义
- Redis 实现新增 XADD/XDEL/XRANGE 封装并接入 RedisGetValue 的 stream 分支
- App 层新增 RedisStreamAdd 与 RedisStreamDelete 方法并返回操作结果
- 前端新增 stream 类型视图,支持消息新增、删除与字段复制
- refs #92
This commit is contained in:
Syngnat
2026-02-11 10:41:22 +08:00
parent ab92e94bf8
commit e31dc4e7f1
7 changed files with 378 additions and 3 deletions

View File

@@ -450,6 +450,40 @@ func (a *App) RedisZSetRemove(config connection.ConnectionConfig, key string, me
return connection.QueryResult{Success: true, Message: "删除成功"}
}
// RedisStreamAdd adds an entry to a stream
func (a *App) RedisStreamAdd(config connection.ConnectionConfig, key string, fields map[string]string, id string) connection.QueryResult {
config.Type = "redis"
client, err := a.getRedisClient(config)
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
newID, err := client.StreamAdd(key, fields, id)
if err != nil {
logger.Error(err, "RedisStreamAdd 添加失败key=%s id=%s", key, id)
return connection.QueryResult{Success: false, Message: err.Error()}
}
return connection.QueryResult{Success: true, Message: "添加成功", Data: map[string]string{"id": newID}}
}
// RedisStreamDelete deletes stream entries by IDs
func (a *App) RedisStreamDelete(config connection.ConnectionConfig, key string, ids []string) connection.QueryResult {
config.Type = "redis"
client, err := a.getRedisClient(config)
if err != nil {
return connection.QueryResult{Success: false, Message: err.Error()}
}
deleted, err := client.StreamDelete(key, ids...)
if err != nil {
logger.Error(err, "RedisStreamDelete 删除失败key=%s ids=%v", key, ids)
return connection.QueryResult{Success: false, Message: err.Error()}
}
return connection.QueryResult{Success: true, Message: "删除成功", Data: map[string]int64{"deleted": deleted}}
}
// RedisFlushDB flushes the current database
func (a *App) RedisFlushDB(config connection.ConnectionConfig) connection.QueryResult {
config.Type = "redis"

View File

@@ -4,7 +4,7 @@ import "GoNavi-Wails/internal/connection"
// RedisValue represents a Redis value with its type and metadata
type RedisValue struct {
Type string `json:"type"` // string, hash, list, set, zset
Type string `json:"type"` // string, hash, list, set, zset, stream
TTL int64 `json:"ttl"` // TTL in seconds, -1 means no expiry, -2 means key doesn't exist
Value interface{} `json:"value"` // The actual value
Length int64 `json:"length"` // Length/size of the value
@@ -72,6 +72,11 @@ type RedisClient interface {
ZSetAdd(key string, members ...ZSetMember) error
ZSetRemove(key string, members ...string) error
// Stream operations
GetStream(key, start, stop string, count int64) ([]StreamEntry, error)
StreamAdd(key string, fields map[string]string, id string) (string, error)
StreamDelete(key string, ids ...string) (int64, error)
// Command execution
ExecuteCommand(args []string) (interface{}, error)
@@ -88,3 +93,9 @@ type ZSetMember struct {
Member string `json:"member"`
Score float64 `json:"score"`
}
// StreamEntry represents a single stream message
type StreamEntry struct {
ID string `json:"id"`
Fields map[string]string `json:"fields"`
}

View File

@@ -334,6 +334,26 @@ func (r *RedisClientImpl) GetValue(key string) (*RedisValue, error) {
result.Value = members
result.Length = length
case "stream":
length, err := r.client.XLen(ctx, key).Result()
if err != nil {
return nil, err
}
result.Length = length
if length == 0 {
result.Value = []StreamEntry{}
break
}
limit := int64(1000)
if length < limit {
limit = length
}
val, err := r.client.XRangeN(ctx, key, "-", "+", limit).Result()
if err != nil {
return nil, err
}
result.Value = toStreamEntries(val)
default:
return nil, fmt.Errorf("不支持的 Redis 数据类型: %s", keyType)
}
@@ -523,6 +543,91 @@ func (r *RedisClientImpl) ZSetRemove(key string, members ...string) error {
return r.client.ZRem(ctx, key, args...).Err()
}
// GetStream gets stream entries in a range
func (r *RedisClientImpl) GetStream(key, start, stop string, count int64) ([]StreamEntry, error) {
if r.client == nil {
return nil, fmt.Errorf("Redis 客户端未连接")
}
if start == "" {
start = "-"
}
if stop == "" {
stop = "+"
}
if count <= 0 {
count = 1000
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
val, err := r.client.XRangeN(ctx, key, start, stop, count).Result()
if err != nil {
return nil, err
}
return toStreamEntries(val), nil
}
// StreamAdd adds an entry to a stream
func (r *RedisClientImpl) StreamAdd(key string, fields map[string]string, id string) (string, error) {
if r.client == nil {
return "", fmt.Errorf("Redis 客户端未连接")
}
if len(fields) == 0 {
return "", fmt.Errorf("Stream 字段不能为空")
}
if id == "" {
id = "*"
}
values := make(map[string]interface{}, len(fields))
for field, value := range fields {
values[field] = value
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
newID, err := r.client.XAdd(ctx, &redis.XAddArgs{
Stream: key,
ID: id,
Values: values,
}).Result()
if err != nil {
return "", err
}
return newID, nil
}
// StreamDelete deletes entries from a stream by IDs
func (r *RedisClientImpl) StreamDelete(key string, ids ...string) (int64, error) {
if r.client == nil {
return 0, fmt.Errorf("Redis 客户端未连接")
}
if len(ids) == 0 {
return 0, fmt.Errorf("Stream ID 不能为空")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.client.XDel(ctx, key, ids...).Result()
}
func toStreamEntries(messages []redis.XMessage) []StreamEntry {
entries := make([]StreamEntry, 0, len(messages))
for _, msg := range messages {
fields := make(map[string]string, len(msg.Values))
for field, value := range msg.Values {
fields[field] = fmt.Sprint(value)
}
entries = append(entries, StreamEntry{
ID: msg.ID,
Fields: fields,
})
}
return entries
}
// ExecuteCommand executes a raw Redis command
func (r *RedisClientImpl) ExecuteCommand(args []string) (interface{}, error) {
if r.client == nil {