From b2dc311f01de876b81c66f466608ac37961903d1 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 21 Oct 2025 18:29:24 +0800 Subject: [PATCH] Count operation type A & B --- config/config.example.yaml | 9 ++++ internal/api/multipart_handler.go | 32 +++++++++----- internal/api/object_handler.go | 12 ++++-- internal/api/operation_tracker.go | 17 ++++++++ internal/bucket/manager.go | 69 ++++++++++++++++++++++++++++--- internal/bucket/reporter.go | 24 ++++++----- internal/config/config.go | 29 ++++++++----- 7 files changed, 151 insertions(+), 41 deletions(-) create mode 100644 internal/api/operation_tracker.go diff --git a/config/config.example.yaml b/config/config.example.yaml index 0b79d91..d305fe7 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -48,6 +48,9 @@ buckets: enabled: true path_style: false # AWS S3使用虚拟主机风格 virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 # 类型A操作(写入类)上限,0表示不限制 + type_b: 0 # 类型B操作(读取类)上限,0表示不限制 # 真实存储桶 - MinIO(用于存储数据,对客户端隐藏) - name: "my-bucket-2" @@ -60,6 +63,9 @@ buckets: enabled: true path_style: true # MinIO通常使用路径风格 virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 + type_b: 0 # 虚拟存储桶 - user-bucket-1(对客户端可见的唯一存储桶) - name: "user-bucket-1" @@ -96,6 +102,9 @@ buckets: enabled: true path_style: false virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 + type_b: 0 # 负载均衡配置 balancer: diff --git a/internal/api/multipart_handler.go b/internal/api/multipart_handler.go index b3389cf..4aca3f3 100644 --- a/internal/api/multipart_handler.go +++ b/internal/api/multipart_handler.go @@ -76,6 +76,8 @@ func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) { return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 检查当前已上传大小 + 本次分片大小是否超过bucket剩余空间 currentSize, err := h.storage.GetUploadSessionSize(uploadID) if err != nil { @@ -213,6 +215,8 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 初始化分片上传 ctx := context.Background() createResp, err := targetBucket.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ @@ -280,14 +284,16 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re // 降级到遍历所有存储桶的方式 ctx := context.Background() allBuckets := h.bucketManager.GetAllBuckets() - for _, bucket := range allBuckets { - if bucket.IsVirtual() { + for _, realBucket := range allBuckets { + if realBucket.IsVirtual() { continue } + h.recordBackendOperation(realBucket, bucket.OperationTypeB) + // 列出每个真实存储桶的分片上传 - listResp, err := bucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ - Bucket: aws.String(bucket.Config.Name), + listResp, err := realBucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ + Bucket: aws.String(realBucket.Config.Name), KeyMarker: aws.String(keyMarker), UploadIdMarker: aws.String(uploadIdMarker), Prefix: aws.String(prefix), @@ -295,7 +301,7 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re MaxUploads: aws.Int32(int32(maxUploads)), }) if err != nil { - log.Printf("Failed to list multipart uploads for bucket %s: %v", bucket.Config.Name, err) + log.Printf("Failed to list multipart uploads for bucket %s: %v", realBucket.Config.Name, err) continue } @@ -409,21 +415,22 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ if err != nil { // 如果没有找到映射,尝试查询所有真实存储桶 allBuckets := h.bucketManager.GetAllBuckets() - for _, bucket := range allBuckets { - if bucket.IsVirtual() { + for _, realBucket := range allBuckets { + if realBucket.IsVirtual() { continue } // 尝试列出分片,如果成功则说明上传在这个桶中 ctx := context.Background() - _, err := bucket.Client.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(bucket.Config.Name), + h.recordBackendOperation(realBucket, bucket.OperationTypeB) + _, err := realBucket.Client.ListParts(ctx, &s3.ListPartsInput{ + Bucket: aws.String(realBucket.Config.Name), Key: aws.String(key), UploadId: aws.String(uploadID), PartNumberMarker: aws.String(strconv.Itoa(partNumberMarker)), MaxParts: aws.Int32(1), // 只检查是否存在 }) if err == nil { - targetBucket = bucket + targetBucket = realBucket break } } @@ -446,6 +453,7 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ } // 列出分片 + h.recordBackendOperation(targetBucket, bucket.OperationTypeB) ctx := context.Background() listResp, err := targetBucket.Client.ListParts(ctx, &s3.ListPartsInput{ Bucket: aws.String(targetBucket.Config.Name), @@ -570,6 +578,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http // 完成分片上传 ctx := context.Background() + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) sort.SliceStable(completeReq.Parts, func(i, j int) bool { return completeReq.Parts[i].PartNumber < completeReq.Parts[j].PartNumber }) @@ -611,6 +620,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http // 获取完成上传后的对象大小 var objectSize int64 + h.recordBackendOperation(targetBucket, bucket.OperationTypeB) headResp, err := targetBucket.Client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), @@ -658,6 +668,7 @@ func getAPIError(err error) (smithy.APIError, bool) { // abortMultipartUploadInternal 内部方法:向后端S3发送中止分片上传请求 func (h *S3Handler) abortMultipartUploadInternal(targetBucket *bucket.BucketInfo, key, uploadID string) error { + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) ctx := context.Background() _, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(targetBucket.Config.Name), @@ -718,6 +729,7 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re // 中止分片上传 ctx := context.Background() + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) _, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), diff --git a/internal/api/object_handler.go b/internal/api/object_handler.go index 9b8a2ff..0655b19 100644 --- a/internal/api/object_handler.go +++ b/internal/api/object_handler.go @@ -70,6 +70,8 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) return } + + h.recordBackendOperation(bucket1, bucket.OperationTypeB) } // 生成预签名下载URL @@ -228,6 +230,8 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 生成预签名上传URL uploadInfo, err := h.presigner.GenerateUploadURL( context.Background(), @@ -295,7 +299,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b return } - var bucket *bucket.BucketInfo + var targetBucket *bucket.BucketInfo var err error if requestedBucket.IsVirtual() { @@ -308,7 +312,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b } // 获取映射到的真实存储桶 - bucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) + targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) if !ok { h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) return @@ -319,10 +323,12 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 生成预签名删除URL deleteInfo, err := h.presigner.GenerateDeleteURL( context.Background(), - bucket, + targetBucket, key, ) if err != nil { diff --git a/internal/api/operation_tracker.go b/internal/api/operation_tracker.go new file mode 100644 index 0000000..d4b17c1 --- /dev/null +++ b/internal/api/operation_tracker.go @@ -0,0 +1,17 @@ +package api + +import ( + "log" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// recordBackendOperation increments backend operation counters and disables the bucket if limits are exceeded. +func (h *S3Handler) recordBackendOperation(b *bucket.BucketInfo, category bucket.OperationCategory) { + if b == nil { + return + } + if disabled := b.RecordOperation(category); disabled { + log.Printf("Bucket %s disabled after exceeding %s-type operation limit", b.Config.Name, category) + } +} diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 48571f0..ca3b6ff 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -16,14 +16,27 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) +// OperationCategory 表示后端操作分类 +type OperationCategory string + +const ( + // OperationTypeA 表示写入类操作 + OperationTypeA OperationCategory = "A" + // OperationTypeB 表示读取类操作 + OperationTypeB OperationCategory = "B" +) + // BucketInfo 存储桶信息 type BucketInfo struct { - Config config.BucketConfig - Client *s3.Client - UsedSize int64 // 已使用容量(字节) - Available bool // 是否可用(由health监控更新) - LastChecked time.Time // 最后检查时间(由health监控更新) - mu sync.RWMutex + Config config.BucketConfig + Client *s3.Client + UsedSize int64 // 已使用容量(字节) + Available bool // 是否可用(由health监控更新) + LastChecked time.Time // 最后检查时间(由health监控更新) + mu sync.RWMutex + operationCountA int64 + operationCountB int64 + operationLimitReached bool } // Manager 存储桶管理器 @@ -267,6 +280,50 @@ func (b *BucketInfo) UpdateUsedSize(delta int64) { b.UsedSize += delta } +// RecordOperation 记录一次后端操作并根据配置判断是否需要禁用存储桶 +func (b *BucketInfo) RecordOperation(category OperationCategory) bool { + if b == nil { + return false + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.Config.Virtual { + return false + } + + var ( + limit int64 + count *int64 + ) + + switch category { + case OperationTypeA: + b.operationCountA++ + count = &b.operationCountA + limit = int64(b.Config.OperationLimits.TypeA) + case OperationTypeB: + b.operationCountB++ + count = &b.operationCountB + limit = int64(b.Config.OperationLimits.TypeB) + default: + return false + } + + if limit <= 0 || count == nil { + return false + } + + if !b.operationLimitReached && *count >= limit { + b.Available = false + b.operationLimitReached = true + return true + } + + return false +} + // IsVirtual 检查是否为虚拟存储桶 func (b *BucketInfo) IsVirtual() bool { b.mu.RLock() diff --git a/internal/bucket/reporter.go b/internal/bucket/reporter.go index 230d2b0..9826a7f 100644 --- a/internal/bucket/reporter.go +++ b/internal/bucket/reporter.go @@ -7,9 +7,9 @@ import ( // MetricsReporter 实现 health.HealthReporter 和 health.StatsReporter 接口 type MetricsReporter struct { - metrics *metrics.Metrics - buckets map[string]*BucketInfo - manager *Manager + metrics *metrics.Metrics + buckets map[string]*BucketInfo + manager *Manager } // NewMetricsReporter 创建指标报告器 @@ -25,18 +25,20 @@ func (r *MetricsReporter) ReportHealth(targetID string, status health.Status) { if r.metrics == nil { return } - + // 更新存储桶可用性状态 r.manager.mu.RLock() bucket, exists := r.manager.buckets[targetID] r.manager.mu.RUnlock() - + if exists { bucket.mu.Lock() - bucket.Available = status.Healthy + if !bucket.operationLimitReached { + bucket.Available = status.Healthy + } bucket.LastChecked = status.LastChecked bucket.mu.Unlock() - + // 更新 Prometheus 指标 r.metrics.SetBucketHealthy(targetID, bucket.Config.Endpoint, status.Healthy) } @@ -47,18 +49,18 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) { if r.metrics == nil { return } - + // 更新存储桶使用统计 r.manager.mu.RLock() bucket, exists := r.manager.buckets[stats.TargetID] r.manager.mu.RUnlock() - + if exists { bucket.mu.Lock() bucket.UsedSize = stats.UsedSize bucket.mu.Unlock() - + // 更新 Prometheus 指标 r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes) } -} \ No newline at end of file +} diff --git a/internal/config/config.go b/internal/config/config.go index 8224518..5953a38 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,17 +29,24 @@ type ServerConfig struct { // BucketConfig S3存储桶配置 type BucketConfig struct { - Name string `yaml:"name"` // 桶名称 - Endpoint string `yaml:"endpoint"` // S3端点 - Region string `yaml:"region"` // 区域 - AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID - SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥 - MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB") - MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位 - Weight int `yaml:"weight"` // 权重 (用于负载均衡) - Enabled bool `yaml:"enabled"` // 是否启用 - PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问 - Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶(仅S3 API中可见) + Name string `yaml:"name"` // 桶名称 + Endpoint string `yaml:"endpoint"` // S3端点 + Region string `yaml:"region"` // 区域 + AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID + SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥 + MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB") + MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位 + Weight int `yaml:"weight"` // 权重 (用于负载均衡) + Enabled bool `yaml:"enabled"` // 是否启用 + PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问 + Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶(仅S3 API中可见) + OperationLimits OperationLimitConfig `yaml:"operation_limits"` +} + +// OperationLimitConfig 后端操作次数限制配置 +type OperationLimitConfig struct { + TypeA int `yaml:"type_a"` // 类型A操作上限(0表示不限制) + TypeB int `yaml:"type_b"` // 类型B操作上限(0表示不限制) } // BalancerConfig 负载均衡配置