Count operation type A & B

This commit is contained in:
DullJZ
2025-10-21 18:29:24 +08:00
parent 7e1f4bbee3
commit b2dc311f01
7 changed files with 151 additions and 41 deletions

View File

@@ -48,6 +48,9 @@ buckets:
enabled: true
path_style: false # AWS S3使用虚拟主机风格
virtual: false # 这是真实存储桶
operation_limits:
type_a: 0 # 类型A操作写入类上限0表示不限制
type_b: 0 # 类型B操作读取类上限0表示不限制
# 真实存储桶 - MinIO用于存储数据对客户端隐藏
- name: "my-bucket-2"
@@ -60,6 +63,9 @@ buckets:
enabled: true
path_style: true # MinIO通常使用路径风格
virtual: false # 这是真实存储桶
operation_limits:
type_a: 0
type_b: 0
# 虚拟存储桶 - user-bucket-1对客户端可见的唯一存储桶
- name: "user-bucket-1"
@@ -96,6 +102,9 @@ buckets:
enabled: true
path_style: false
virtual: false # 这是真实存储桶
operation_limits:
type_a: 0
type_b: 0
# 负载均衡配置
balancer:

View File

@@ -76,6 +76,8 @@ func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) {
return
}
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
// 检查当前已上传大小 + 本次分片大小是否超过bucket剩余空间
currentSize, err := h.storage.GetUploadSessionSize(uploadID)
if err != nil {
@@ -213,6 +215,8 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request
return
}
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
// 初始化分片上传
ctx := context.Background()
createResp, err := targetBucket.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
@@ -280,14 +284,16 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re
// 降级到遍历所有存储桶的方式
ctx := context.Background()
allBuckets := h.bucketManager.GetAllBuckets()
for _, bucket := range allBuckets {
if bucket.IsVirtual() {
for _, realBucket := range allBuckets {
if realBucket.IsVirtual() {
continue
}
h.recordBackendOperation(realBucket, bucket.OperationTypeB)
// 列出每个真实存储桶的分片上传
listResp, err := bucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{
Bucket: aws.String(bucket.Config.Name),
listResp, err := realBucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{
Bucket: aws.String(realBucket.Config.Name),
KeyMarker: aws.String(keyMarker),
UploadIdMarker: aws.String(uploadIdMarker),
Prefix: aws.String(prefix),
@@ -295,7 +301,7 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re
MaxUploads: aws.Int32(int32(maxUploads)),
})
if err != nil {
log.Printf("Failed to list multipart uploads for bucket %s: %v", bucket.Config.Name, err)
log.Printf("Failed to list multipart uploads for bucket %s: %v", realBucket.Config.Name, err)
continue
}
@@ -409,21 +415,22 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ
if err != nil {
// 如果没有找到映射,尝试查询所有真实存储桶
allBuckets := h.bucketManager.GetAllBuckets()
for _, bucket := range allBuckets {
if bucket.IsVirtual() {
for _, realBucket := range allBuckets {
if realBucket.IsVirtual() {
continue
}
// 尝试列出分片,如果成功则说明上传在这个桶中
ctx := context.Background()
_, err := bucket.Client.ListParts(ctx, &s3.ListPartsInput{
Bucket: aws.String(bucket.Config.Name),
h.recordBackendOperation(realBucket, bucket.OperationTypeB)
_, err := realBucket.Client.ListParts(ctx, &s3.ListPartsInput{
Bucket: aws.String(realBucket.Config.Name),
Key: aws.String(key),
UploadId: aws.String(uploadID),
PartNumberMarker: aws.String(strconv.Itoa(partNumberMarker)),
MaxParts: aws.Int32(1), // 只检查是否存在
})
if err == nil {
targetBucket = bucket
targetBucket = realBucket
break
}
}
@@ -446,6 +453,7 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ
}
// 列出分片
h.recordBackendOperation(targetBucket, bucket.OperationTypeB)
ctx := context.Background()
listResp, err := targetBucket.Client.ListParts(ctx, &s3.ListPartsInput{
Bucket: aws.String(targetBucket.Config.Name),
@@ -570,6 +578,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
// 完成分片上传
ctx := context.Background()
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
sort.SliceStable(completeReq.Parts, func(i, j int) bool {
return completeReq.Parts[i].PartNumber < completeReq.Parts[j].PartNumber
})
@@ -611,6 +620,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
// 获取完成上传后的对象大小
var objectSize int64
h.recordBackendOperation(targetBucket, bucket.OperationTypeB)
headResp, err := targetBucket.Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),
@@ -658,6 +668,7 @@ func getAPIError(err error) (smithy.APIError, bool) {
// abortMultipartUploadInternal 内部方法向后端S3发送中止分片上传请求
func (h *S3Handler) abortMultipartUploadInternal(targetBucket *bucket.BucketInfo, key, uploadID string) error {
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
ctx := context.Background()
_, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(targetBucket.Config.Name),
@@ -718,6 +729,7 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re
// 中止分片上传
ctx := context.Background()
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
_, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),

View File

@@ -70,6 +70,8 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
}
h.recordBackendOperation(bucket1, bucket.OperationTypeB)
}
// 生成预签名下载URL
@@ -228,6 +230,8 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
return
}
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
// 生成预签名上传URL
uploadInfo, err := h.presigner.GenerateUploadURL(
context.Background(),
@@ -295,7 +299,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
return
}
var bucket *bucket.BucketInfo
var targetBucket *bucket.BucketInfo
var err error
if requestedBucket.IsVirtual() {
@@ -308,7 +312,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
}
// 获取映射到的真实存储桶
bucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
@@ -319,10 +323,12 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
return
}
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
// 生成预签名删除URL
deleteInfo, err := h.presigner.GenerateDeleteURL(
context.Background(),
bucket,
targetBucket,
key,
)
if err != nil {

View File

@@ -0,0 +1,17 @@
package api
import (
"log"
"github.com/DullJZ/s3-balance/internal/bucket"
)
// recordBackendOperation increments backend operation counters and disables the bucket if limits are exceeded.
func (h *S3Handler) recordBackendOperation(b *bucket.BucketInfo, category bucket.OperationCategory) {
if b == nil {
return
}
if disabled := b.RecordOperation(category); disabled {
log.Printf("Bucket %s disabled after exceeding %s-type operation limit", b.Config.Name, category)
}
}

View File

@@ -16,14 +16,27 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// OperationCategory 表示后端操作分类
type OperationCategory string
const (
// OperationTypeA 表示写入类操作
OperationTypeA OperationCategory = "A"
// OperationTypeB 表示读取类操作
OperationTypeB OperationCategory = "B"
)
// BucketInfo 存储桶信息
type BucketInfo struct {
Config config.BucketConfig
Client *s3.Client
UsedSize int64 // 已使用容量(字节)
Available bool // 是否可用由health监控更新
LastChecked time.Time // 最后检查时间由health监控更新
mu sync.RWMutex
Config config.BucketConfig
Client *s3.Client
UsedSize int64 // 已使用容量(字节)
Available bool // 是否可用由health监控更新
LastChecked time.Time // 最后检查时间由health监控更新
mu sync.RWMutex
operationCountA int64
operationCountB int64
operationLimitReached bool
}
// Manager 存储桶管理器
@@ -267,6 +280,50 @@ func (b *BucketInfo) UpdateUsedSize(delta int64) {
b.UsedSize += delta
}
// RecordOperation 记录一次后端操作并根据配置判断是否需要禁用存储桶
func (b *BucketInfo) RecordOperation(category OperationCategory) bool {
if b == nil {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
if b.Config.Virtual {
return false
}
var (
limit int64
count *int64
)
switch category {
case OperationTypeA:
b.operationCountA++
count = &b.operationCountA
limit = int64(b.Config.OperationLimits.TypeA)
case OperationTypeB:
b.operationCountB++
count = &b.operationCountB
limit = int64(b.Config.OperationLimits.TypeB)
default:
return false
}
if limit <= 0 || count == nil {
return false
}
if !b.operationLimitReached && *count >= limit {
b.Available = false
b.operationLimitReached = true
return true
}
return false
}
// IsVirtual 检查是否为虚拟存储桶
func (b *BucketInfo) IsVirtual() bool {
b.mu.RLock()

View File

@@ -7,9 +7,9 @@ import (
// MetricsReporter 实现 health.HealthReporter 和 health.StatsReporter 接口
type MetricsReporter struct {
metrics *metrics.Metrics
buckets map[string]*BucketInfo
manager *Manager
metrics *metrics.Metrics
buckets map[string]*BucketInfo
manager *Manager
}
// NewMetricsReporter 创建指标报告器
@@ -25,18 +25,20 @@ func (r *MetricsReporter) ReportHealth(targetID string, status health.Status) {
if r.metrics == nil {
return
}
// 更新存储桶可用性状态
r.manager.mu.RLock()
bucket, exists := r.manager.buckets[targetID]
r.manager.mu.RUnlock()
if exists {
bucket.mu.Lock()
bucket.Available = status.Healthy
if !bucket.operationLimitReached {
bucket.Available = status.Healthy
}
bucket.LastChecked = status.LastChecked
bucket.mu.Unlock()
// 更新 Prometheus 指标
r.metrics.SetBucketHealthy(targetID, bucket.Config.Endpoint, status.Healthy)
}
@@ -47,18 +49,18 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) {
if r.metrics == nil {
return
}
// 更新存储桶使用统计
r.manager.mu.RLock()
bucket, exists := r.manager.buckets[stats.TargetID]
r.manager.mu.RUnlock()
if exists {
bucket.mu.Lock()
bucket.UsedSize = stats.UsedSize
bucket.mu.Unlock()
// 更新 Prometheus 指标
r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes)
}
}
}

View File

@@ -29,17 +29,24 @@ type ServerConfig struct {
// BucketConfig S3存储桶配置
type BucketConfig struct {
Name string `yaml:"name"` // 桶名称
Endpoint string `yaml:"endpoint"` // S3端点
Region string `yaml:"region"` // 区域
AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID
SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥
MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB")
MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位
Weight int `yaml:"weight"` // 权重 (用于负载均衡)
Enabled bool `yaml:"enabled"` // 是否启用
PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问
Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶仅S3 API中可见
Name string `yaml:"name"` // 桶名称
Endpoint string `yaml:"endpoint"` // S3端点
Region string `yaml:"region"` // 区域
AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID
SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥
MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB")
MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位
Weight int `yaml:"weight"` // 权重 (用于负载均衡)
Enabled bool `yaml:"enabled"` // 是否启用
PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问
Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶仅S3 API中可见
OperationLimits OperationLimitConfig `yaml:"operation_limits"`
}
// OperationLimitConfig 后端操作次数限制配置
type OperationLimitConfig struct {
TypeA int `yaml:"type_a"` // 类型A操作上限0表示不限制
TypeB int `yaml:"type_b"` // 类型B操作上限0表示不限制
}
// BalancerConfig 负载均衡配置