Balancer Refactoring

This commit is contained in:
DullJZ
2025-09-11 21:26:41 +08:00
parent c514ab2a3d
commit 61eeba03ac
6 changed files with 314 additions and 179 deletions

View File

@@ -1,25 +1,14 @@
package balancer
import (
"crypto/md5"
"encoding/binary"
"fmt"
"math/rand"
"sort"
"sync"
"sync/atomic"
"github.com/DullJZ/s3-balance/internal/bucket"
"github.com/DullJZ/s3-balance/internal/config"
)
// Strategy 负载均衡策略接口
type Strategy interface {
SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error)
Name() string
}
// Balancer 负载均衡器
// 负责根据配置的策略选择合适的存储桶
type Balancer struct {
manager *bucket.Manager
strategy Strategy
@@ -30,6 +19,7 @@ type Balancer struct {
func NewBalancer(manager *bucket.Manager, cfg *config.BalancerConfig) (*Balancer, error) {
var strategy Strategy
// 根据配置创建对应的策略实例
switch cfg.Strategy {
case "round-robin":
strategy = NewRoundRobinStrategy()
@@ -51,6 +41,7 @@ func NewBalancer(manager *bucket.Manager, cfg *config.BalancerConfig) (*Balancer
}
// SelectBucket 选择一个存储桶
// 首先过滤出有足够空间的存储桶,然后使用策略选择
func (b *Balancer) SelectBucket(key string, size int64) (*bucket.BucketInfo, error) {
// 获取所有可用的存储桶
buckets := b.manager.GetAvailableBuckets()
@@ -84,181 +75,60 @@ func (b *Balancer) GetStrategy() string {
return b.strategy.Name()
}
// RoundRobinStrategy 轮询策略
type RoundRobinStrategy struct {
counter uint64
}
// NewRoundRobinStrategy 创建轮询策略
func NewRoundRobinStrategy() *RoundRobinStrategy {
return &RoundRobinStrategy{}
}
// SelectBucket 选择存储桶(轮询)
func (s *RoundRobinStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
// SetStrategy 动态切换策略
// 允许在运行时更改负载均衡策略
func (b *Balancer) SetStrategy(strategyName string) error {
var strategy Strategy
switch strategyName {
case "round-robin":
strategy = NewRoundRobinStrategy()
case "least-space":
strategy = NewLeastSpaceStrategy()
case "weighted":
strategy = NewWeightedStrategy()
case "consistent-hash":
strategy = NewConsistentHashStrategy()
default:
return fmt.Errorf("unknown balancer strategy: %s", strategyName)
}
index := atomic.AddUint64(&s.counter, 1) % uint64(len(buckets))
return buckets[index], nil
b.strategy = strategy
return nil
}
// Name 返回策略名称
func (s *RoundRobinStrategy) Name() string {
return "round-robin"
// GetAvailableBuckets 获取所有可用的存储桶
// 方便外部直接查询可用存储桶列表
func (b *Balancer) GetAvailableBuckets() []*bucket.BucketInfo {
return b.manager.GetAvailableBuckets()
}
// LeastSpaceStrategy 最少使用空间策略
type LeastSpaceStrategy struct{}
// NewLeastSpaceStrategy 创建最少使用空间策略
func NewLeastSpaceStrategy() *LeastSpaceStrategy {
return &LeastSpaceStrategy{}
}
// SelectBucket 选择存储桶(选择使用空间最少的)
func (s *LeastSpaceStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 按可用空间排序(从大到小)
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].GetAvailableSpace() > buckets[j].GetAvailableSpace()
})
return buckets[0], nil
}
// Name 返回策略名称
func (s *LeastSpaceStrategy) Name() string {
return "least-space"
}
// WeightedStrategy 加权策略
type WeightedStrategy struct {
mu sync.RWMutex
}
// NewWeightedStrategy 创建加权策略
func NewWeightedStrategy() *WeightedStrategy {
return &WeightedStrategy{}
}
// SelectBucket 选择存储桶(基于权重)
func (s *WeightedStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 计算总权重
totalWeight := 0
for _, b := range buckets {
totalWeight += b.Config.Weight
}
if totalWeight == 0 {
// 如果所有权重都是0则随机选择
return buckets[rand.Intn(len(buckets))], nil
}
// 根据权重随机选择
randomWeight := rand.Intn(totalWeight)
currentWeight := 0
for _, b := range buckets {
currentWeight += b.Config.Weight
if randomWeight < currentWeight {
return b, nil
// GetBucketStats 获取存储桶统计信息
// 返回每个存储桶的使用情况
func (b *Balancer) GetBucketStats() map[string]BucketStats {
stats := make(map[string]BucketStats)
buckets := b.manager.GetAllBuckets()
for _, bucket := range buckets {
stats[bucket.Config.Name] = BucketStats{
Name: bucket.Config.Name,
TotalSpace: bucket.Config.MaxSizeBytes,
UsedSpace: bucket.GetUsedSize(),
AvailableSpace: bucket.GetAvailableSpace(),
IsAvailable: bucket.IsAvailable(),
Weight: bucket.Config.Weight,
}
}
// 不应该到达这里,但为了安全返回最后一个
return buckets[len(buckets)-1], nil
return stats
}
// Name 返回策略名称
func (s *WeightedStrategy) Name() string {
return "weighted"
}
// ConsistentHashStrategy 一致性哈希策略
type ConsistentHashStrategy struct {
replicas int
ring map[uint32]*bucket.BucketInfo
nodes []uint32
mu sync.RWMutex
}
// NewConsistentHashStrategy 创建一致性哈希策略
func NewConsistentHashStrategy() *ConsistentHashStrategy {
return &ConsistentHashStrategy{
replicas: 100, // 每个节点的虚拟节点数
ring: make(map[uint32]*bucket.BucketInfo),
}
}
// SelectBucket 选择存储桶(基于一致性哈希)
func (s *ConsistentHashStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 更新哈希环
s.updateRing(buckets)
// 计算key的哈希值
hash := s.hash(key)
// 在环上找到第一个大于等于hash的节点
s.mu.RLock()
defer s.mu.RUnlock()
idx := sort.Search(len(s.nodes), func(i int) bool {
return s.nodes[i] >= hash
})
// 如果没找到,返回第一个节点(环形结构)
if idx == len(s.nodes) {
idx = 0
}
return s.ring[s.nodes[idx]], nil
}
// updateRing 更新哈希环
func (s *ConsistentHashStrategy) updateRing(buckets []*bucket.BucketInfo) {
s.mu.Lock()
defer s.mu.Unlock()
// 清空现有环
s.ring = make(map[uint32]*bucket.BucketInfo)
s.nodes = nil
// 为每个存储桶添加虚拟节点
for _, b := range buckets {
for i := 0; i < s.replicas; i++ {
virtualKey := fmt.Sprintf("%s-%d", b.Config.Name, i)
hash := s.hash(virtualKey)
s.ring[hash] = b
s.nodes = append(s.nodes, hash)
}
}
// 排序节点
sort.Slice(s.nodes, func(i, j int) bool {
return s.nodes[i] < s.nodes[j]
})
}
// hash 计算哈希值
func (s *ConsistentHashStrategy) hash(key string) uint32 {
h := md5.Sum([]byte(key))
return binary.BigEndian.Uint32(h[:4])
}
// Name 返回策略名称
func (s *ConsistentHashStrategy) Name() string {
return "consistent-hash"
// BucketStats 存储桶统计信息
type BucketStats struct {
Name string `json:"name"`
TotalSpace int64 `json:"total_space"`
UsedSpace int64 `json:"used_space"`
AvailableSpace int64 `json:"available_space"`
IsAvailable bool `json:"is_available"`
Weight int `json:"weight"`
}

View File

@@ -0,0 +1,112 @@
package balancer
import (
"crypto/md5"
"encoding/binary"
"fmt"
"sort"
"sync"
"github.com/DullJZ/s3-balance/internal/bucket"
)
// ConsistentHashStrategy 一致性哈希策略
// 使用一致性哈希算法确保相同的key总是映射到相同的存储桶
// 当存储桶增加或减少时,能够最小化数据迁移
type ConsistentHashStrategy struct {
replicas int // 每个节点的虚拟节点数
ring map[uint32]*bucket.BucketInfo // 哈希环
nodes []uint32 // 排序的哈希值列表
mu sync.RWMutex // 读写锁,保护并发访问
}
// NewConsistentHashStrategy 创建一致性哈希策略
func NewConsistentHashStrategy() *ConsistentHashStrategy {
return &ConsistentHashStrategy{
replicas: 100, // 每个节点的虚拟节点数,越大分布越均匀
ring: make(map[uint32]*bucket.BucketInfo),
}
}
// SelectBucket 选择存储桶(基于一致性哈希)
// 相同的key总是会映射到相同的存储桶
func (s *ConsistentHashStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 更新哈希环
s.updateRing(buckets)
// 计算key的哈希值
hash := s.hash(key)
// 在环上找到第一个大于等于hash的节点
s.mu.RLock()
defer s.mu.RUnlock()
// 二分查找第一个大于等于hash的节点
idx := sort.Search(len(s.nodes), func(i int) bool {
return s.nodes[i] >= hash
})
// 如果没找到,返回第一个节点(环形结构)
if idx == len(s.nodes) {
idx = 0
}
return s.ring[s.nodes[idx]], nil
}
// updateRing 更新哈希环
// 为每个存储桶创建多个虚拟节点,使哈希分布更均匀
func (s *ConsistentHashStrategy) updateRing(buckets []*bucket.BucketInfo) {
s.mu.Lock()
defer s.mu.Unlock()
// 清空现有环
s.ring = make(map[uint32]*bucket.BucketInfo)
s.nodes = nil
// 为每个存储桶添加虚拟节点
for _, b := range buckets {
for i := 0; i < s.replicas; i++ {
// 为每个虚拟节点生成唯一的key
virtualKey := fmt.Sprintf("%s-%d", b.Config.Name, i)
hash := s.hash(virtualKey)
s.ring[hash] = b
s.nodes = append(s.nodes, hash)
}
}
// 排序节点,便于二分查找
sort.Slice(s.nodes, func(i, j int) bool {
return s.nodes[i] < s.nodes[j]
})
}
// hash 计算哈希值
// 使用MD5算法取前4个字节作为uint32
func (s *ConsistentHashStrategy) hash(key string) uint32 {
h := md5.Sum([]byte(key))
return binary.BigEndian.Uint32(h[:4])
}
// Name 返回策略名称
func (s *ConsistentHashStrategy) Name() string {
return "consistent-hash"
}
// SetReplicas 设置虚拟节点数
// 虚拟节点越多,负载分布越均匀,但内存使用也越多
func (s *ConsistentHashStrategy) SetReplicas(replicas int) {
s.mu.Lock()
defer s.mu.Unlock()
if replicas > 0 {
s.replicas = replicas
// 清空现有环,下次选择时会重新构建
s.ring = make(map[uint32]*bucket.BucketInfo)
s.nodes = nil
}
}

View File

@@ -0,0 +1,39 @@
package balancer
import (
"fmt"
"sort"
"github.com/DullJZ/s3-balance/internal/bucket"
)
// LeastSpaceStrategy 最少使用空间策略
// 选择当前使用空间最少(可用空间最多)的存储桶
type LeastSpaceStrategy struct{}
// NewLeastSpaceStrategy 创建最少使用空间策略
func NewLeastSpaceStrategy() *LeastSpaceStrategy {
return &LeastSpaceStrategy{}
}
// SelectBucket 选择存储桶(选择使用空间最少的)
// 优先选择可用空间最大的存储桶,有助于均衡存储使用
func (s *LeastSpaceStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 按可用空间排序(从大到小)
// 使用稳定排序,保证相同可用空间的存储桶保持原有顺序
sort.SliceStable(buckets, func(i, j int) bool {
return buckets[i].GetAvailableSpace() > buckets[j].GetAvailableSpace()
})
// 返回可用空间最大的存储桶
return buckets[0], nil
}
// Name 返回策略名称
func (s *LeastSpaceStrategy) Name() string {
return "least-space"
}

View File

@@ -0,0 +1,36 @@
package balancer
import (
"fmt"
"sync/atomic"
"github.com/DullJZ/s3-balance/internal/bucket"
)
// RoundRobinStrategy 轮询策略
// 按照固定顺序循环选择存储桶
type RoundRobinStrategy struct {
counter uint64
}
// NewRoundRobinStrategy 创建轮询策略
func NewRoundRobinStrategy() *RoundRobinStrategy {
return &RoundRobinStrategy{}
}
// SelectBucket 选择存储桶(轮询)
// 使用原子操作确保并发安全
func (s *RoundRobinStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
// 原子递增计数器并取模,确保索引在有效范围内
index := atomic.AddUint64(&s.counter, 1) % uint64(len(buckets))
return buckets[index], nil
}
// Name 返回策略名称
func (s *RoundRobinStrategy) Name() string {
return "round-robin"
}

View File

@@ -0,0 +1,15 @@
package balancer
import "github.com/DullJZ/s3-balance/internal/bucket"
// Strategy 负载均衡策略接口
type Strategy interface {
// SelectBucket 根据策略选择一个存储桶
// buckets: 可用的存储桶列表
// key: 对象的键
// size: 对象的大小
SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error)
// Name 返回策略名称
Name() string
}

View File

@@ -0,0 +1,63 @@
package balancer
import (
"fmt"
"math/rand"
"sync"
"github.com/DullJZ/s3-balance/internal/bucket"
)
// WeightedStrategy 加权策略
// 根据存储桶配置的权重进行选择,权重越高被选中的概率越大
type WeightedStrategy struct {
mu sync.RWMutex
}
// NewWeightedStrategy 创建加权策略
func NewWeightedStrategy() *WeightedStrategy {
return &WeightedStrategy{}
}
// SelectBucket 选择存储桶(基于权重)
// 权重越高的存储桶被选中的概率越大
func (s *WeightedStrategy) SelectBucket(buckets []*bucket.BucketInfo, key string, size int64) (*bucket.BucketInfo, error) {
if len(buckets) == 0 {
return nil, fmt.Errorf("no buckets available")
}
s.mu.RLock()
defer s.mu.RUnlock()
// 计算总权重
totalWeight := 0
for _, b := range buckets {
totalWeight += b.Config.Weight
}
if totalWeight == 0 {
// 如果所有权重都是0则随机选择
return buckets[rand.Intn(len(buckets))], nil
}
// 根据权重随机选择
// 生成一个 [0, totalWeight) 范围内的随机数
randomWeight := rand.Intn(totalWeight)
currentWeight := 0
// 遍历存储桶,累加权重直到超过随机数
for _, b := range buckets {
currentWeight += b.Config.Weight
if randomWeight < currentWeight {
return b, nil
}
}
// 不应该到达这里,但为了安全返回最后一个
return buckets[len(buckets)-1], nil
}
// Name 返回策略名称
func (s *WeightedStrategy) Name() string {
return "weighted"
}