diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 8b3d508..12706df 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -1,25 +1,14 @@ package balancer import ( - "crypto/md5" - "encoding/binary" "fmt" - "math/rand" - "sort" - "sync" - "sync/atomic" "github.com/DullJZ/s3-balance/internal/bucket" "github.com/DullJZ/s3-balance/internal/config" ) -// Strategy 负载均衡策略接口 -type Strategy interface { - SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) - Name() string -} - // Balancer 负载均衡器 +// 负责根据配置的策略选择合适的存储桶 type Balancer struct { manager *bucket.Manager strategy Strategy @@ -30,6 +19,7 @@ type Balancer struct { func NewBalancer(manager *bucket.Manager, cfg *config.BalancerConfig) (*Balancer, error) { var strategy Strategy + // 根据配置创建对应的策略实例 switch cfg.Strategy { case "round-robin": strategy = NewRoundRobinStrategy() @@ -51,6 +41,7 @@ func NewBalancer(manager *bucket.Manager, cfg *config.BalancerConfig) (*Balancer } // SelectBucket 选择一个存储桶 +// 首先过滤出有足够空间的存储桶,然后使用策略选择 func (b *Balancer) SelectBucket(key string, size int64) (*bucket.BucketInfo, error) { // 获取所有可用的存储桶 buckets := b.manager.GetAvailableBuckets() @@ -84,181 +75,60 @@ func (b *Balancer) GetStrategy() string { return b.strategy.Name() } -// RoundRobinStrategy 轮询策略 -type RoundRobinStrategy struct { - counter uint64 -} - -// NewRoundRobinStrategy 创建轮询策略 -func NewRoundRobinStrategy() *RoundRobinStrategy { - return &RoundRobinStrategy{} -} - -// SelectBucket 选择存储桶(轮询) -func (s *RoundRobinStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { - if len(buckets) == 0 { - return nil, fmt.Errorf("no buckets available") +// SetStrategy 动态切换策略 +// 允许在运行时更改负载均衡策略 +func (b *Balancer) SetStrategy(strategyName string) error { + var strategy Strategy + + switch strategyName { + case "round-robin": + strategy = NewRoundRobinStrategy() + case "least-space": + strategy = NewLeastSpaceStrategy() + case "weighted": + strategy = NewWeightedStrategy() + case "consistent-hash": + strategy = NewConsistentHashStrategy() + default: + return fmt.Errorf("unknown balancer strategy: %s", strategyName) } - index := atomic.AddUint64(&s.counter, 1) % uint64(len(buckets)) - return buckets[index], nil + b.strategy = strategy + return nil } -// Name 返回策略名称 -func (s *RoundRobinStrategy) Name() string { - return "round-robin" +// GetAvailableBuckets 获取所有可用的存储桶 +// 方便外部直接查询可用存储桶列表 +func (b *Balancer) GetAvailableBuckets() []*bucket.BucketInfo { + return b.manager.GetAvailableBuckets() } -// LeastSpaceStrategy 最少使用空间策略 -type LeastSpaceStrategy struct{} - -// NewLeastSpaceStrategy 创建最少使用空间策略 -func NewLeastSpaceStrategy() *LeastSpaceStrategy { - return &LeastSpaceStrategy{} -} - -// SelectBucket 选择存储桶(选择使用空间最少的) -func (s *LeastSpaceStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { - if len(buckets) == 0 { - return nil, fmt.Errorf("no buckets available") - } - - // 按可用空间排序(从大到小) - sort.Slice(buckets, func(i, j int) bool { - return buckets[i].GetAvailableSpace() > buckets[j].GetAvailableSpace() - }) - - return buckets[0], nil -} - -// Name 返回策略名称 -func (s *LeastSpaceStrategy) Name() string { - return "least-space" -} - -// WeightedStrategy 加权策略 -type WeightedStrategy struct { - mu sync.RWMutex -} - -// NewWeightedStrategy 创建加权策略 -func NewWeightedStrategy() *WeightedStrategy { - return &WeightedStrategy{} -} - -// SelectBucket 选择存储桶(基于权重) -func (s *WeightedStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { - if len(buckets) == 0 { - return nil, fmt.Errorf("no buckets available") - } - - // 计算总权重 - totalWeight := 0 - for _, b := range buckets { - totalWeight += b.Config.Weight - } - - if totalWeight == 0 { - // 如果所有权重都是0,则随机选择 - return buckets[rand.Intn(len(buckets))], nil - } - - // 根据权重随机选择 - randomWeight := rand.Intn(totalWeight) - currentWeight := 0 - - for _, b := range buckets { - currentWeight += b.Config.Weight - if randomWeight < currentWeight { - return b, nil +// GetBucketStats 获取存储桶统计信息 +// 返回每个存储桶的使用情况 +func (b *Balancer) GetBucketStats() map[string]BucketStats { + stats := make(map[string]BucketStats) + buckets := b.manager.GetAllBuckets() + + for _, bucket := range buckets { + stats[bucket.Config.Name] = BucketStats{ + Name: bucket.Config.Name, + TotalSpace: bucket.Config.MaxSizeBytes, + UsedSpace: bucket.GetUsedSize(), + AvailableSpace: bucket.GetAvailableSpace(), + IsAvailable: bucket.IsAvailable(), + Weight: bucket.Config.Weight, } } - - // 不应该到达这里,但为了安全返回最后一个 - return buckets[len(buckets)-1], nil + + return stats } -// Name 返回策略名称 -func (s *WeightedStrategy) Name() string { - return "weighted" -} - -// ConsistentHashStrategy 一致性哈希策略 -type ConsistentHashStrategy struct { - replicas int - ring map[uint32]*bucket.BucketInfo - nodes []uint32 - mu sync.RWMutex -} - -// NewConsistentHashStrategy 创建一致性哈希策略 -func NewConsistentHashStrategy() *ConsistentHashStrategy { - return &ConsistentHashStrategy{ - replicas: 100, // 每个节点的虚拟节点数 - ring: make(map[uint32]*bucket.BucketInfo), - } -} - -// SelectBucket 选择存储桶(基于一致性哈希) -func (s *ConsistentHashStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { - if len(buckets) == 0 { - return nil, fmt.Errorf("no buckets available") - } - - // 更新哈希环 - s.updateRing(buckets) - - // 计算key的哈希值 - hash := s.hash(key) - - // 在环上找到第一个大于等于hash的节点 - s.mu.RLock() - defer s.mu.RUnlock() - - idx := sort.Search(len(s.nodes), func(i int) bool { - return s.nodes[i] >= hash - }) - - // 如果没找到,返回第一个节点(环形结构) - if idx == len(s.nodes) { - idx = 0 - } - - return s.ring[s.nodes[idx]], nil -} - -// updateRing 更新哈希环 -func (s *ConsistentHashStrategy) updateRing(buckets []*bucket.BucketInfo) { - s.mu.Lock() - defer s.mu.Unlock() - - // 清空现有环 - s.ring = make(map[uint32]*bucket.BucketInfo) - s.nodes = nil - - // 为每个存储桶添加虚拟节点 - for _, b := range buckets { - for i := 0; i < s.replicas; i++ { - virtualKey := fmt.Sprintf("%s-%d", b.Config.Name, i) - hash := s.hash(virtualKey) - s.ring[hash] = b - s.nodes = append(s.nodes, hash) - } - } - - // 排序节点 - sort.Slice(s.nodes, func(i, j int) bool { - return s.nodes[i] < s.nodes[j] - }) -} - -// hash 计算哈希值 -func (s *ConsistentHashStrategy) hash(key string) uint32 { - h := md5.Sum([]byte(key)) - return binary.BigEndian.Uint32(h[:4]) -} - -// Name 返回策略名称 -func (s *ConsistentHashStrategy) Name() string { - return "consistent-hash" +// BucketStats 存储桶统计信息 +type BucketStats struct { + Name string `json:"name"` + TotalSpace int64 `json:"total_space"` + UsedSpace int64 `json:"used_space"` + AvailableSpace int64 `json:"available_space"` + IsAvailable bool `json:"is_available"` + Weight int `json:"weight"` } diff --git a/internal/balancer/consistent_hash.go b/internal/balancer/consistent_hash.go new file mode 100644 index 0000000..2fd7316 --- /dev/null +++ b/internal/balancer/consistent_hash.go @@ -0,0 +1,112 @@ +package balancer + +import ( + "crypto/md5" + "encoding/binary" + "fmt" + "sort" + "sync" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// ConsistentHashStrategy 一致性哈希策略 +// 使用一致性哈希算法,确保相同的key总是映射到相同的存储桶 +// 当存储桶增加或减少时,能够最小化数据迁移 +type ConsistentHashStrategy struct { + replicas int // 每个节点的虚拟节点数 + ring map[uint32]*bucket.BucketInfo // 哈希环 + nodes []uint32 // 排序的哈希值列表 + mu sync.RWMutex // 读写锁,保护并发访问 +} + +// NewConsistentHashStrategy 创建一致性哈希策略 +func NewConsistentHashStrategy() *ConsistentHashStrategy { + return &ConsistentHashStrategy{ + replicas: 100, // 每个节点的虚拟节点数,越大分布越均匀 + ring: make(map[uint32]*bucket.BucketInfo), + } +} + +// SelectBucket 选择存储桶(基于一致性哈希) +// 相同的key总是会映射到相同的存储桶 +func (s *ConsistentHashStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { + if len(buckets) == 0 { + return nil, fmt.Errorf("no buckets available") + } + + // 更新哈希环 + s.updateRing(buckets) + + // 计算key的哈希值 + hash := s.hash(key) + + // 在环上找到第一个大于等于hash的节点 + s.mu.RLock() + defer s.mu.RUnlock() + + // 二分查找第一个大于等于hash的节点 + idx := sort.Search(len(s.nodes), func(i int) bool { + return s.nodes[i] >= hash + }) + + // 如果没找到,返回第一个节点(环形结构) + if idx == len(s.nodes) { + idx = 0 + } + + return s.ring[s.nodes[idx]], nil +} + +// updateRing 更新哈希环 +// 为每个存储桶创建多个虚拟节点,使哈希分布更均匀 +func (s *ConsistentHashStrategy) updateRing(buckets []*bucket.BucketInfo) { + s.mu.Lock() + defer s.mu.Unlock() + + // 清空现有环 + s.ring = make(map[uint32]*bucket.BucketInfo) + s.nodes = nil + + // 为每个存储桶添加虚拟节点 + for _, b := range buckets { + for i := 0; i < s.replicas; i++ { + // 为每个虚拟节点生成唯一的key + virtualKey := fmt.Sprintf("%s-%d", b.Config.Name, i) + hash := s.hash(virtualKey) + s.ring[hash] = b + s.nodes = append(s.nodes, hash) + } + } + + // 排序节点,便于二分查找 + sort.Slice(s.nodes, func(i, j int) bool { + return s.nodes[i] < s.nodes[j] + }) +} + +// hash 计算哈希值 +// 使用MD5算法,取前4个字节作为uint32 +func (s *ConsistentHashStrategy) hash(key string) uint32 { + h := md5.Sum([]byte(key)) + return binary.BigEndian.Uint32(h[:4]) +} + +// Name 返回策略名称 +func (s *ConsistentHashStrategy) Name() string { + return "consistent-hash" +} + +// SetReplicas 设置虚拟节点数 +// 虚拟节点越多,负载分布越均匀,但内存使用也越多 +func (s *ConsistentHashStrategy) SetReplicas(replicas int) { + s.mu.Lock() + defer s.mu.Unlock() + + if replicas > 0 { + s.replicas = replicas + // 清空现有环,下次选择时会重新构建 + s.ring = make(map[uint32]*bucket.BucketInfo) + s.nodes = nil + } +} diff --git a/internal/balancer/least_space.go b/internal/balancer/least_space.go new file mode 100644 index 0000000..31cda34 --- /dev/null +++ b/internal/balancer/least_space.go @@ -0,0 +1,39 @@ +package balancer + +import ( + "fmt" + "sort" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// LeastSpaceStrategy 最少使用空间策略 +// 选择当前使用空间最少(可用空间最多)的存储桶 +type LeastSpaceStrategy struct{} + +// NewLeastSpaceStrategy 创建最少使用空间策略 +func NewLeastSpaceStrategy() *LeastSpaceStrategy { + return &LeastSpaceStrategy{} +} + +// SelectBucket 选择存储桶(选择使用空间最少的) +// 优先选择可用空间最大的存储桶,有助于均衡存储使用 +func (s *LeastSpaceStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { + if len(buckets) == 0 { + return nil, fmt.Errorf("no buckets available") + } + + // 按可用空间排序(从大到小) + // 使用稳定排序,保证相同可用空间的存储桶保持原有顺序 + sort.SliceStable(buckets, func(i, j int) bool { + return buckets[i].GetAvailableSpace() > buckets[j].GetAvailableSpace() + }) + + // 返回可用空间最大的存储桶 + return buckets[0], nil +} + +// Name 返回策略名称 +func (s *LeastSpaceStrategy) Name() string { + return "least-space" +} diff --git a/internal/balancer/round_robin.go b/internal/balancer/round_robin.go new file mode 100644 index 0000000..3526db9 --- /dev/null +++ b/internal/balancer/round_robin.go @@ -0,0 +1,36 @@ +package balancer + +import ( + "fmt" + "sync/atomic" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// RoundRobinStrategy 轮询策略 +// 按照固定顺序循环选择存储桶 +type RoundRobinStrategy struct { + counter uint64 +} + +// NewRoundRobinStrategy 创建轮询策略 +func NewRoundRobinStrategy() *RoundRobinStrategy { + return &RoundRobinStrategy{} +} + +// SelectBucket 选择存储桶(轮询) +// 使用原子操作确保并发安全 +func (s *RoundRobinStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { + if len(buckets) == 0 { + return nil, fmt.Errorf("no buckets available") + } + + // 原子递增计数器并取模,确保索引在有效范围内 + index := atomic.AddUint64(&s.counter, 1) % uint64(len(buckets)) + return buckets[index], nil +} + +// Name 返回策略名称 +func (s *RoundRobinStrategy) Name() string { + return "round-robin" +} diff --git a/internal/balancer/strategy.go b/internal/balancer/strategy.go new file mode 100644 index 0000000..ea5e1aa --- /dev/null +++ b/internal/balancer/strategy.go @@ -0,0 +1,15 @@ +package balancer + +import "github.com/DullJZ/s3-balance/internal/bucket" + +// Strategy 负载均衡策略接口 +type Strategy interface { + // SelectBucket 根据策略选择一个存储桶 + // buckets: 可用的存储桶列表 + // key: 对象的键 + // size: 对象的大小 + SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) + + // Name 返回策略名称 + Name() string +} diff --git a/internal/balancer/weighted.go b/internal/balancer/weighted.go new file mode 100644 index 0000000..7b857ce --- /dev/null +++ b/internal/balancer/weighted.go @@ -0,0 +1,63 @@ +package balancer + +import ( + "fmt" + "math/rand" + "sync" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// WeightedStrategy 加权策略 +// 根据存储桶配置的权重进行选择,权重越高被选中的概率越大 +type WeightedStrategy struct { + mu sync.RWMutex +} + +// NewWeightedStrategy 创建加权策略 +func NewWeightedStrategy() *WeightedStrategy { + return &WeightedStrategy{} +} + +// SelectBucket 选择存储桶(基于权重) +// 权重越高的存储桶被选中的概率越大 +func (s *WeightedStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) { + if len(buckets) == 0 { + return nil, fmt.Errorf("no buckets available") + } + + s.mu.RLock() + defer s.mu.RUnlock() + + // 计算总权重 + totalWeight := 0 + for _, b := range buckets { + totalWeight += b.Config.Weight + } + + if totalWeight == 0 { + // 如果所有权重都是0,则随机选择 + return buckets[rand.Intn(len(buckets))], nil + } + + // 根据权重随机选择 + // 生成一个 [0, totalWeight) 范围内的随机数 + randomWeight := rand.Intn(totalWeight) + currentWeight := 0 + + // 遍历存储桶,累加权重直到超过随机数 + for _, b := range buckets { + currentWeight += b.Config.Weight + if randomWeight < currentWeight { + return b, nil + } + } + + // 不应该到达这里,但为了安全返回最后一个 + return buckets[len(buckets)-1], nil +} + +// Name 返回策略名称 +func (s *WeightedStrategy) Name() string { + return "weighted" +}