From b2dc311f01de876b81c66f466608ac37961903d1 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 21 Oct 2025 18:29:24 +0800 Subject: [PATCH 01/20] Count operation type A & B --- config/config.example.yaml | 9 ++++ internal/api/multipart_handler.go | 32 +++++++++----- internal/api/object_handler.go | 12 ++++-- internal/api/operation_tracker.go | 17 ++++++++ internal/bucket/manager.go | 69 ++++++++++++++++++++++++++++--- internal/bucket/reporter.go | 24 ++++++----- internal/config/config.go | 29 ++++++++----- 7 files changed, 151 insertions(+), 41 deletions(-) create mode 100644 internal/api/operation_tracker.go diff --git a/config/config.example.yaml b/config/config.example.yaml index 0b79d91..d305fe7 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -48,6 +48,9 @@ buckets: enabled: true path_style: false # AWS S3使用虚拟主机风格 virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 # 类型A操作(写入类)上限,0表示不限制 + type_b: 0 # 类型B操作(读取类)上限,0表示不限制 # 真实存储桶 - MinIO(用于存储数据,对客户端隐藏) - name: "my-bucket-2" @@ -60,6 +63,9 @@ buckets: enabled: true path_style: true # MinIO通常使用路径风格 virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 + type_b: 0 # 虚拟存储桶 - user-bucket-1(对客户端可见的唯一存储桶) - name: "user-bucket-1" @@ -96,6 +102,9 @@ buckets: enabled: true path_style: false virtual: false # 这是真实存储桶 + operation_limits: + type_a: 0 + type_b: 0 # 负载均衡配置 balancer: diff --git a/internal/api/multipart_handler.go b/internal/api/multipart_handler.go index b3389cf..4aca3f3 100644 --- a/internal/api/multipart_handler.go +++ b/internal/api/multipart_handler.go @@ -76,6 +76,8 @@ func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) { return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 检查当前已上传大小 + 本次分片大小是否超过bucket剩余空间 currentSize, err := h.storage.GetUploadSessionSize(uploadID) if err != nil { @@ -213,6 +215,8 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 初始化分片上传 ctx := context.Background() createResp, err := targetBucket.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ @@ -280,14 +284,16 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re // 降级到遍历所有存储桶的方式 ctx := context.Background() allBuckets := h.bucketManager.GetAllBuckets() - for _, bucket := range allBuckets { - if bucket.IsVirtual() { + for _, realBucket := range allBuckets { + if realBucket.IsVirtual() { continue } + h.recordBackendOperation(realBucket, bucket.OperationTypeB) + // 列出每个真实存储桶的分片上传 - listResp, err := bucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ - Bucket: aws.String(bucket.Config.Name), + listResp, err := realBucket.Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{ + Bucket: aws.String(realBucket.Config.Name), KeyMarker: aws.String(keyMarker), UploadIdMarker: aws.String(uploadIdMarker), Prefix: aws.String(prefix), @@ -295,7 +301,7 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re MaxUploads: aws.Int32(int32(maxUploads)), }) if err != nil { - log.Printf("Failed to list multipart uploads for bucket %s: %v", bucket.Config.Name, err) + log.Printf("Failed to list multipart uploads for bucket %s: %v", realBucket.Config.Name, err) continue } @@ -409,21 +415,22 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ if err != nil { // 如果没有找到映射,尝试查询所有真实存储桶 allBuckets := h.bucketManager.GetAllBuckets() - for _, bucket := range allBuckets { - if bucket.IsVirtual() { + for _, realBucket := range allBuckets { + if realBucket.IsVirtual() { continue } // 尝试列出分片,如果成功则说明上传在这个桶中 ctx := context.Background() - _, err := bucket.Client.ListParts(ctx, &s3.ListPartsInput{ - Bucket: aws.String(bucket.Config.Name), + h.recordBackendOperation(realBucket, bucket.OperationTypeB) + _, err := realBucket.Client.ListParts(ctx, &s3.ListPartsInput{ + Bucket: aws.String(realBucket.Config.Name), Key: aws.String(key), UploadId: aws.String(uploadID), PartNumberMarker: aws.String(strconv.Itoa(partNumberMarker)), MaxParts: aws.Int32(1), // 只检查是否存在 }) if err == nil { - targetBucket = bucket + targetBucket = realBucket break } } @@ -446,6 +453,7 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ } // 列出分片 + h.recordBackendOperation(targetBucket, bucket.OperationTypeB) ctx := context.Background() listResp, err := targetBucket.Client.ListParts(ctx, &s3.ListPartsInput{ Bucket: aws.String(targetBucket.Config.Name), @@ -570,6 +578,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http // 完成分片上传 ctx := context.Background() + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) sort.SliceStable(completeReq.Parts, func(i, j int) bool { return completeReq.Parts[i].PartNumber < completeReq.Parts[j].PartNumber }) @@ -611,6 +620,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http // 获取完成上传后的对象大小 var objectSize int64 + h.recordBackendOperation(targetBucket, bucket.OperationTypeB) headResp, err := targetBucket.Client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), @@ -658,6 +668,7 @@ func getAPIError(err error) (smithy.APIError, bool) { // abortMultipartUploadInternal 内部方法:向后端S3发送中止分片上传请求 func (h *S3Handler) abortMultipartUploadInternal(targetBucket *bucket.BucketInfo, key, uploadID string) error { + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) ctx := context.Background() _, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(targetBucket.Config.Name), @@ -718,6 +729,7 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re // 中止分片上传 ctx := context.Background() + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) _, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), diff --git a/internal/api/object_handler.go b/internal/api/object_handler.go index 9b8a2ff..0655b19 100644 --- a/internal/api/object_handler.go +++ b/internal/api/object_handler.go @@ -70,6 +70,8 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) return } + + h.recordBackendOperation(bucket1, bucket.OperationTypeB) } // 生成预签名下载URL @@ -228,6 +230,8 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 生成预签名上传URL uploadInfo, err := h.presigner.GenerateUploadURL( context.Background(), @@ -295,7 +299,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b return } - var bucket *bucket.BucketInfo + var targetBucket *bucket.BucketInfo var err error if requestedBucket.IsVirtual() { @@ -308,7 +312,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b } // 获取映射到的真实存储桶 - bucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) + targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) if !ok { h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) return @@ -319,10 +323,12 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b return } + h.recordBackendOperation(targetBucket, bucket.OperationTypeA) + // 生成预签名删除URL deleteInfo, err := h.presigner.GenerateDeleteURL( context.Background(), - bucket, + targetBucket, key, ) if err != nil { diff --git a/internal/api/operation_tracker.go b/internal/api/operation_tracker.go new file mode 100644 index 0000000..d4b17c1 --- /dev/null +++ b/internal/api/operation_tracker.go @@ -0,0 +1,17 @@ +package api + +import ( + "log" + + "github.com/DullJZ/s3-balance/internal/bucket" +) + +// recordBackendOperation increments backend operation counters and disables the bucket if limits are exceeded. +func (h *S3Handler) recordBackendOperation(b *bucket.BucketInfo, category bucket.OperationCategory) { + if b == nil { + return + } + if disabled := b.RecordOperation(category); disabled { + log.Printf("Bucket %s disabled after exceeding %s-type operation limit", b.Config.Name, category) + } +} diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 48571f0..ca3b6ff 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -16,14 +16,27 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) +// OperationCategory 表示后端操作分类 +type OperationCategory string + +const ( + // OperationTypeA 表示写入类操作 + OperationTypeA OperationCategory = "A" + // OperationTypeB 表示读取类操作 + OperationTypeB OperationCategory = "B" +) + // BucketInfo 存储桶信息 type BucketInfo struct { - Config config.BucketConfig - Client *s3.Client - UsedSize int64 // 已使用容量(字节) - Available bool // 是否可用(由health监控更新) - LastChecked time.Time // 最后检查时间(由health监控更新) - mu sync.RWMutex + Config config.BucketConfig + Client *s3.Client + UsedSize int64 // 已使用容量(字节) + Available bool // 是否可用(由health监控更新) + LastChecked time.Time // 最后检查时间(由health监控更新) + mu sync.RWMutex + operationCountA int64 + operationCountB int64 + operationLimitReached bool } // Manager 存储桶管理器 @@ -267,6 +280,50 @@ func (b *BucketInfo) UpdateUsedSize(delta int64) { b.UsedSize += delta } +// RecordOperation 记录一次后端操作并根据配置判断是否需要禁用存储桶 +func (b *BucketInfo) RecordOperation(category OperationCategory) bool { + if b == nil { + return false + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.Config.Virtual { + return false + } + + var ( + limit int64 + count *int64 + ) + + switch category { + case OperationTypeA: + b.operationCountA++ + count = &b.operationCountA + limit = int64(b.Config.OperationLimits.TypeA) + case OperationTypeB: + b.operationCountB++ + count = &b.operationCountB + limit = int64(b.Config.OperationLimits.TypeB) + default: + return false + } + + if limit <= 0 || count == nil { + return false + } + + if !b.operationLimitReached && *count >= limit { + b.Available = false + b.operationLimitReached = true + return true + } + + return false +} + // IsVirtual 检查是否为虚拟存储桶 func (b *BucketInfo) IsVirtual() bool { b.mu.RLock() diff --git a/internal/bucket/reporter.go b/internal/bucket/reporter.go index 230d2b0..9826a7f 100644 --- a/internal/bucket/reporter.go +++ b/internal/bucket/reporter.go @@ -7,9 +7,9 @@ import ( // MetricsReporter 实现 health.HealthReporter 和 health.StatsReporter 接口 type MetricsReporter struct { - metrics *metrics.Metrics - buckets map[string]*BucketInfo - manager *Manager + metrics *metrics.Metrics + buckets map[string]*BucketInfo + manager *Manager } // NewMetricsReporter 创建指标报告器 @@ -25,18 +25,20 @@ func (r *MetricsReporter) ReportHealth(targetID string, status health.Status) { if r.metrics == nil { return } - + // 更新存储桶可用性状态 r.manager.mu.RLock() bucket, exists := r.manager.buckets[targetID] r.manager.mu.RUnlock() - + if exists { bucket.mu.Lock() - bucket.Available = status.Healthy + if !bucket.operationLimitReached { + bucket.Available = status.Healthy + } bucket.LastChecked = status.LastChecked bucket.mu.Unlock() - + // 更新 Prometheus 指标 r.metrics.SetBucketHealthy(targetID, bucket.Config.Endpoint, status.Healthy) } @@ -47,18 +49,18 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) { if r.metrics == nil { return } - + // 更新存储桶使用统计 r.manager.mu.RLock() bucket, exists := r.manager.buckets[stats.TargetID] r.manager.mu.RUnlock() - + if exists { bucket.mu.Lock() bucket.UsedSize = stats.UsedSize bucket.mu.Unlock() - + // 更新 Prometheus 指标 r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes) } -} \ No newline at end of file +} diff --git a/internal/config/config.go b/internal/config/config.go index 8224518..5953a38 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,17 +29,24 @@ type ServerConfig struct { // BucketConfig S3存储桶配置 type BucketConfig struct { - Name string `yaml:"name"` // 桶名称 - Endpoint string `yaml:"endpoint"` // S3端点 - Region string `yaml:"region"` // 区域 - AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID - SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥 - MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB") - MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位 - Weight int `yaml:"weight"` // 权重 (用于负载均衡) - Enabled bool `yaml:"enabled"` // 是否启用 - PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问 - Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶(仅S3 API中可见) + Name string `yaml:"name"` // 桶名称 + Endpoint string `yaml:"endpoint"` // S3端点 + Region string `yaml:"region"` // 区域 + AccessKeyID string `yaml:"access_key_id"` // 访问密钥ID + SecretAccessKey string `yaml:"secret_access_key"` // 访问密钥 + MaxSize string `yaml:"max_size"` // 最大容量 (例如: "10GB") + MaxSizeBytes int64 `yaml:"-"` // 内部使用,字节为单位 + Weight int `yaml:"weight"` // 权重 (用于负载均衡) + Enabled bool `yaml:"enabled"` // 是否启用 + PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问 + Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶(仅S3 API中可见) + OperationLimits OperationLimitConfig `yaml:"operation_limits"` +} + +// OperationLimitConfig 后端操作次数限制配置 +type OperationLimitConfig struct { + TypeA int `yaml:"type_a"` // 类型A操作上限(0表示不限制) + TypeB int `yaml:"type_b"` // 类型B操作上限(0表示不限制) } // BalancerConfig 负载均衡配置 From 0630cb09c7ab50aaf8e164e319be92b5048df7a0 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 21 Oct 2025 19:59:28 +0800 Subject: [PATCH 02/20] grafana display --- .../dashboards/s3-balance-dashboard.json | 63 ++++++++++++++++++- internal/api/operation_tracker.go | 4 ++ internal/metrics/service.go | 11 +++- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/deploy/monitoring/grafana/dashboards/s3-balance-dashboard.json b/deploy/monitoring/grafana/dashboards/s3-balance-dashboard.json index c23c237..c8a1ec4 100644 --- a/deploy/monitoring/grafana/dashboards/s3-balance-dashboard.json +++ b/deploy/monitoring/grafana/dashboards/s3-balance-dashboard.json @@ -99,6 +99,67 @@ "title": "QPS (操作/秒)", "type": "timeseries" }, + { + "datasource": "Prometheus", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "displayMode": "auto" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 12, + "options": { + "footer": { + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "targets": [ + { + "expr": "sum by (bucket, category)(increase(s3_balance_backend_operations_total[$__range]))", + "instant": true, + "legendFormat": "", + "refId": "A" + } + ], + "timeFrom": "now/M", + "title": "后端操作次数 (本自然月)", + "transformations": [ + { + "id": "labelsToFields", + "options": { + "valueLabel": "操作次数" + } + } + ], + "type": "table" + }, { "datasource": "Prometheus", "fieldConfig": { @@ -334,5 +395,5 @@ "timezone": "", "title": "S3 Balance 监控面板", "uid": "s3-balance-monitoring", - "version": 1 + "version": 2 } diff --git a/internal/api/operation_tracker.go b/internal/api/operation_tracker.go index d4b17c1..e894469 100644 --- a/internal/api/operation_tracker.go +++ b/internal/api/operation_tracker.go @@ -11,6 +11,10 @@ func (h *S3Handler) recordBackendOperation(b *bucket.BucketInfo, category bucket if b == nil { return } + + if h.metrics != nil { + h.metrics.RecordBackendOperation(b.Config.Name, string(category)) + } if disabled := b.RecordOperation(category); disabled { log.Printf("Bucket %s disabled after exceeding %s-type operation limit", b.Config.Name, category) } diff --git a/internal/metrics/service.go b/internal/metrics/service.go index e799b44..e7febb0 100644 --- a/internal/metrics/service.go +++ b/internal/metrics/service.go @@ -36,6 +36,11 @@ var ( Name: "s3_balance_balancer_decisions_total", Help: "Total number of load balancing decisions", }, []string{"strategy", "bucket"}) + + backendOperationsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "s3_balance_backend_operations_total", + Help: "Total number of backend bucket operations by category", + }, []string{"bucket", "category"}) ) type Metrics struct{} @@ -67,4 +72,8 @@ func (m *Metrics) RecordS3OperationDuration(operation, bucket string, duration f func (m *Metrics) RecordBalancerDecision(strategy, bucket string) { balancerDecisions.WithLabelValues(strategy, bucket).Inc() -} \ No newline at end of file +} + +func (m *Metrics) RecordBackendOperation(bucket, category string) { + backendOperationsTotal.WithLabelValues(bucket, category).Inc() +} From ebe2b36c7744c08e0265e6df3d4893966f07c817 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 21 Oct 2025 19:59:45 +0800 Subject: [PATCH 03/20] Add gitignore --- .gitignore | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.gitignore b/.gitignore index a35a948..38261b6 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,10 @@ logs/ # Environment variables .env .env.local + +# AI doc +AGENTS.md +CLAUDE.md + +# Generated files +s3-balance \ No newline at end of file From 529ee8fe558aeb36e37deb495cdb9754261c4d92 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 21 Oct 2025 20:06:37 +0800 Subject: [PATCH 04/20] persistant A&B record --- cmd/s3-balance/main.go | 8 +-- internal/api/operation_tracker.go | 17 +++++- internal/bucket/manager.go | 86 ++++++++++++++++++++++++++++++- internal/storage/models.go | 16 +++--- internal/storage/service.go | 85 ++++++++++++++++++++++++++++++ 5 files changed, 199 insertions(+), 13 deletions(-) diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 5d811e6..7839bfc 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -45,11 +45,14 @@ func main() { } defer database.Close() + // 创建存储服务 + storageService := storage.NewService(database.GetDB()) + // 创建指标服务 metricsService := metrics.New() // 创建存储桶管理器 - bucketManager, err := bucket.NewManager(cfg, metricsService) + bucketManager, err := bucket.NewManager(cfg, metricsService, storageService) if err != nil { log.Fatalf("Failed to create bucket manager: %v", err) } @@ -74,9 +77,6 @@ func main() { 60*time.Minute, // 下载URL有效期 ) - // 创建存储服务 - storageService := storage.NewService(database.GetDB()) - // 启动定期清理过期上传会话的任务 startSessionCleaner(ctx, storageService) diff --git a/internal/api/operation_tracker.go b/internal/api/operation_tracker.go index e894469..822a4e5 100644 --- a/internal/api/operation_tracker.go +++ b/internal/api/operation_tracker.go @@ -15,7 +15,22 @@ func (h *S3Handler) recordBackendOperation(b *bucket.BucketInfo, category bucket if h.metrics != nil { h.metrics.RecordBackendOperation(b.Config.Name, string(category)) } - if disabled := b.RecordOperation(category); disabled { + + var disabled bool + + if h.storage != nil { + newCount, err := h.storage.IncrementBucketOperation(b.Config.Name, string(category)) + if err != nil { + log.Printf("failed to persist backend operation count for bucket %s: %v", b.Config.Name, err) + disabled = b.RecordOperation(category) + } else { + disabled = b.SetOperationCount(category, newCount) + } + } else { + disabled = b.RecordOperation(category) + } + + if disabled { log.Printf("Bucket %s disabled after exceeding %s-type operation limit", b.Config.Name, category) } } diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index ca3b6ff..41c86ee 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -10,6 +10,7 @@ import ( "github.com/DullJZ/s3-balance/internal/config" "github.com/DullJZ/s3-balance/internal/health" "github.com/DullJZ/s3-balance/internal/metrics" + "github.com/DullJZ/s3-balance/internal/storage" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" @@ -49,15 +50,17 @@ type Manager struct { healthMonitor *health.Monitor statsMonitor *health.StatsMonitor monitorCtx context.Context + storage *storage.Service } // NewManager 创建新的存储桶管理器 -func NewManager(cfg *config.Config, metrics *metrics.Metrics) (*Manager, error) { +func NewManager(cfg *config.Config, metrics *metrics.Metrics, storageService *storage.Service) (*Manager, error) { m := &Manager{ buckets: make(map[string]*BucketInfo), config: cfg, stopChan: make(chan struct{}), metrics: metrics, + storage: storageService, } // 初始化所有存储桶客户端 @@ -84,9 +87,49 @@ func NewManager(cfg *config.Config, metrics *metrics.Metrics) (*Manager, error) // 初始化健康监控 m.initHealthMonitoring() + // 加载持久化的操作计数 + m.loadOperationCounts() + return m, nil } +func (m *Manager) loadOperationCounts() { + if m.storage == nil { + return + } + + counts, err := m.storage.GetBucketOperationCounts() + if err != nil { + log.Printf("Failed to load bucket operation counts: %v", err) + return + } + + m.mu.RLock() + buckets := make(map[string]*BucketInfo, len(m.buckets)) + for name, info := range m.buckets { + buckets[name] = info + } + m.mu.RUnlock() + + for name, info := range buckets { + if info == nil { + continue + } + + oc, ok := counts[name] + if !ok { + continue + } + + if info.SetOperationCount(OperationTypeA, oc.CountA) { + log.Printf("Bucket %s disabled after exceeding A-type operation limit (persisted)", name) + } + if info.SetOperationCount(OperationTypeB, oc.CountB) { + log.Printf("Bucket %s disabled after exceeding B-type operation limit (persisted)", name) + } + } +} + // createS3Client 创建S3客户端 func createS3Client(bucketCfg config.BucketConfig) (*s3.Client, error) { // 创建自定义端点解析器 @@ -324,6 +367,45 @@ func (b *BucketInfo) RecordOperation(category OperationCategory) bool { return false } +// SetOperationCount 设置指定类别的操作计数并检查上限 +func (b *BucketInfo) SetOperationCount(category OperationCategory, value int64) bool { + if b == nil { + return false + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.Config.Virtual { + return false + } + + var limit int64 + + switch category { + case OperationTypeA: + b.operationCountA = value + limit = int64(b.Config.OperationLimits.TypeA) + case OperationTypeB: + b.operationCountB = value + limit = int64(b.Config.OperationLimits.TypeB) + default: + return false + } + + if limit <= 0 { + return false + } + + if !b.operationLimitReached && value >= limit { + b.Available = false + b.operationLimitReached = true + return true + } + + return false +} + // IsVirtual 检查是否为虚拟存储桶 func (b *BucketInfo) IsVirtual() bool { b.mu.RLock() @@ -428,6 +510,8 @@ func (m *Manager) UpdateConfig(newConfig *config.Config) error { m.mu.Unlock() + m.loadOperationCounts() + if restartMonitors { m.startMonitors() } diff --git a/internal/storage/models.go b/internal/storage/models.go index 08a114f..a88837f 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -29,13 +29,15 @@ func (Object) TableName() string { // BucketStats 存储桶统计信息模型 type BucketStats struct { - ID uint `gorm:"primaryKey" json:"id"` - BucketName string `gorm:"uniqueIndex;size:255;not null" json:"bucket_name"` - ObjectCount int64 `gorm:"not null;default:0" json:"object_count"` - TotalSize int64 `gorm:"not null;default:0" json:"total_size"` - LastCheckedAt time.Time `json:"last_checked_at"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID uint `gorm:"primaryKey" json:"id"` + BucketName string `gorm:"uniqueIndex;size:255;not null" json:"bucket_name"` + ObjectCount int64 `gorm:"not null;default:0" json:"object_count"` + TotalSize int64 `gorm:"not null;default:0" json:"total_size"` + OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"` + OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"` + LastCheckedAt time.Time `json:"last_checked_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // TableName 指定表名 diff --git a/internal/storage/service.go b/internal/storage/service.go index 20ad733..9ab629e 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -1,6 +1,7 @@ package storage import ( + "errors" "fmt" "time" @@ -12,6 +13,12 @@ type Service struct { db *gorm.DB } +// OperationCounts 后端操作计数 +type OperationCounts struct { + CountA int64 + CountB int64 +} + // NewService 创建新的存储服务 func NewService(db *gorm.DB) *Service { return &Service{ @@ -254,6 +261,84 @@ func (s *Service) updateBucketStats(bucketName string) error { return nil } +func (s *Service) ensureBucketStats(bucketName string) (*BucketStats, error) { + if bucketName == "" { + return nil, fmt.Errorf("bucket name cannot be empty") + } + + stats := &BucketStats{} + err := s.db.Where("bucket_name = ?", bucketName).First(stats).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + stats = &BucketStats{BucketName: bucketName, LastCheckedAt: time.Now()} + if createErr := s.db.Create(stats).Error; createErr != nil { + // 如果在并发创建下出现重复键,忽略并再次查询 + if errors.Is(createErr, gorm.ErrDuplicatedKey) { + if retryErr := s.db.Where("bucket_name = ?", bucketName).First(stats).Error; retryErr != nil { + return nil, fmt.Errorf("failed to fetch bucket stats after duplicate: %w", retryErr) + } + } else { + return nil, fmt.Errorf("failed to create bucket stats: %w", createErr) + } + } + return stats, nil + } else if err != nil { + return nil, fmt.Errorf("failed to fetch bucket stats: %w", err) + } + + return stats, nil +} + +// IncrementBucketOperation 增加指定存储桶的操作计数 +func (s *Service) IncrementBucketOperation(bucketName, category string) (int64, error) { + if _, err := s.ensureBucketStats(bucketName); err != nil { + return 0, err + } + + var field string + switch category { + case "A": + field = "operation_count_a" + case "B": + field = "operation_count_b" + default: + return 0, fmt.Errorf("unknown operation category: %s", category) + } + + if err := s.db.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + UpdateColumn(field, gorm.Expr(field+" + ?", 1)).Error; err != nil { + return 0, fmt.Errorf("failed to increment %s for bucket %s: %w", field, bucketName, err) + } + + var count int64 + if err := s.db.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + Select(field). + Scan(&count).Error; err != nil { + return 0, fmt.Errorf("failed to fetch updated %s for bucket %s: %w", field, bucketName, err) + } + + return count, nil +} + +// GetBucketOperationCounts 获取所有存储桶的操作计数 +func (s *Service) GetBucketOperationCounts() (map[string]OperationCounts, error) { + var stats []BucketStats + if err := s.db.Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to list bucket stats: %w", err) + } + + result := make(map[string]OperationCounts, len(stats)) + for _, st := range stats { + result[st.BucketName] = OperationCounts{ + CountA: st.OperationCountA, + CountB: st.OperationCountB, + } + } + + return result, nil +} + // RecordUploadSession 记录上传会话 func (s *Service) RecordUploadSession(uploadID, key, bucketName string, size int64) error { session := &UploadSession{ From d64ecac37c5f2343096ef754c909bd3aa5c64d5f Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Wed, 29 Oct 2025 16:58:10 +0800 Subject: [PATCH 05/20] A&B operation history --- cmd/s3-balance/main.go | 11 ++ internal/api/stats_handler.go | 193 +++++++++++++++++++++++++ internal/database/database.go | 1 + internal/scheduler/monthly_archiver.go | 92 ++++++++++++ internal/storage/models.go | 17 +++ internal/storage/service.go | 110 ++++++++++++++ 6 files changed, 424 insertions(+) create mode 100644 internal/api/stats_handler.go create mode 100644 internal/scheduler/monthly_archiver.go diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 7839bfc..dfeac0f 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -17,6 +17,7 @@ import ( "github.com/DullJZ/s3-balance/internal/config" "github.com/DullJZ/s3-balance/internal/database" "github.com/DullJZ/s3-balance/internal/metrics" + "github.com/DullJZ/s3-balance/internal/scheduler" "github.com/DullJZ/s3-balance/internal/storage" "github.com/DullJZ/s3-balance/pkg/presigner" "github.com/gorilla/mux" @@ -80,6 +81,11 @@ func main() { // 启动定期清理过期上传会话的任务 startSessionCleaner(ctx, storageService) + // 启动月度统计归档任务(每小时检查一次) + monthlyArchiver := scheduler.NewMonthlyArchiver(storageService, 1*time.Hour) + monthlyArchiver.Start() + defer monthlyArchiver.Stop() + // 创建S3兼容API处理器 s3Handler := api.NewS3Handler( bucketManager, @@ -124,6 +130,11 @@ func main() { log.Printf("Metrics server enabled at %s", cfg.Metrics.Path) } + // 添加统计API端点 + statsHandler := api.NewStatsHandler(storageService) + statsHandler.RegisterRoutes(router) + log.Println("Statistics API endpoints registered at /api/stats/*") + // 运行在S3兼容模式 log.Println("Running in S3-compatible mode") s3Handler.RegisterS3Routes(router) diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go new file mode 100644 index 0000000..3436dc1 --- /dev/null +++ b/internal/api/stats_handler.go @@ -0,0 +1,193 @@ +package api + +import ( + "encoding/json" + "log" + "net/http" + "strconv" + "time" + + "github.com/DullJZ/s3-balance/internal/storage" + "github.com/gorilla/mux" +) + +// StatsHandler 统计数据处理器 +type StatsHandler struct { + storage *storage.Service +} + +// NewStatsHandler 创建统计处理器 +func NewStatsHandler(storage *storage.Service) *StatsHandler { + return &StatsHandler{ + storage: storage, + } +} + +// RegisterRoutes 注册统计API路由 +func (h *StatsHandler) RegisterRoutes(router *mux.Router) { + router.HandleFunc("/api/stats/monthly", h.GetCurrentMonthStats).Methods("GET") + router.HandleFunc("/api/stats/monthly/{year}/{month}", h.GetMonthlyStats).Methods("GET") + router.HandleFunc("/api/stats/monthly/range", h.GetMonthlyStatsRange).Methods("GET") + router.HandleFunc("/api/stats/bucket/{bucket}/history", h.GetBucketHistory).Methods("GET") +} + +// MonthlyStatsResponse 月度统计响应 +type MonthlyStatsResponse struct { + Year int `json:"year"` + Month int `json:"month"` + Bucket string `json:"bucket"` + Stats BucketOperationCounts `json:"stats"` +} + +// BucketOperationCounts 存储桶操作计数 +type BucketOperationCounts struct { + OperationCountA int64 `json:"operation_count_a"` + OperationCountB int64 `json:"operation_count_b"` + Total int64 `json:"total"` +} + +// GetCurrentMonthStats 获取当前月份的统计 +func (h *StatsHandler) GetCurrentMonthStats(w http.ResponseWriter, r *http.Request) { + stats, err := h.storage.GetCurrentMonthStats() + if err != nil { + log.Printf("Failed to get current month stats: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetMonthlyStats 获取指定月份的统计 +func (h *StatsHandler) GetMonthlyStats(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + year, err := strconv.Atoi(vars["year"]) + if err != nil { + http.Error(w, "Invalid year", http.StatusBadRequest) + return + } + + month, err := strconv.Atoi(vars["month"]) + if err != nil || month < 1 || month > 12 { + http.Error(w, "Invalid month", http.StatusBadRequest) + return + } + + stats, err := h.storage.GetMonthlyStats(year, month) + if err != nil { + log.Printf("Failed to get monthly stats: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetMonthlyStatsRange 获取时间范围内的统计 +func (h *StatsHandler) GetMonthlyStatsRange(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + startYear, err := strconv.Atoi(query.Get("start_year")) + if err != nil { + http.Error(w, "Invalid start_year", http.StatusBadRequest) + return + } + + startMonth, err := strconv.Atoi(query.Get("start_month")) + if err != nil || startMonth < 1 || startMonth > 12 { + http.Error(w, "Invalid start_month", http.StatusBadRequest) + return + } + + endYear, err := strconv.Atoi(query.Get("end_year")) + if err != nil { + http.Error(w, "Invalid end_year", http.StatusBadRequest) + return + } + + endMonth, err := strconv.Atoi(query.Get("end_month")) + if err != nil || endMonth < 1 || endMonth > 12 { + http.Error(w, "Invalid end_month", http.StatusBadRequest) + return + } + + stats, err := h.storage.GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth) + if err != nil { + log.Printf("Failed to get monthly stats range: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetBucketHistory 获取指定存储桶的历史统计 +func (h *StatsHandler) GetBucketHistory(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + bucket := vars["bucket"] + + // 获取查询参数中的月份数,默认12个月 + months := 12 + if monthsStr := r.URL.Query().Get("months"); monthsStr != "" { + if m, err := strconv.Atoi(monthsStr); err == nil && m > 0 { + months = m + } + } + + stats, err := h.storage.GetBucketMonthlyHistory(bucket, months) + if err != nil { + log.Printf("Failed to get bucket history: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// formatMonthlyStats 格式化月度统计数据 +func (h *StatsHandler) formatMonthlyStats(stats []storage.BucketMonthlyStats) []MonthlyStatsResponse { + result := make([]MonthlyStatsResponse, 0, len(stats)) + + for _, stat := range stats { + result = append(result, MonthlyStatsResponse{ + Year: stat.Year, + Month: stat.Month, + Bucket: stat.BucketName, + Stats: BucketOperationCounts{ + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + Total: stat.OperationCountA + stat.OperationCountB, + }, + }) + } + + return result +} + +// ArchiveCurrentMonth 手动触发归档当前月份(管理API) +func (h *StatsHandler) ArchiveCurrentMonth(w http.ResponseWriter, r *http.Request) { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + if err := h.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to archive monthly stats: %v", err) + http.Error(w, "Failed to archive statistics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "success", + "message": "Monthly statistics archived successfully", + "year": strconv.Itoa(year), + "month": strconv.Itoa(month), + }) +} diff --git a/internal/database/database.go b/internal/database/database.go index 34459b8..9d1ff4f 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -181,6 +181,7 @@ func AutoMigrate() error { models := []interface{}{ &storage.Object{}, &storage.BucketStats{}, + &storage.BucketMonthlyStats{}, &storage.UploadSession{}, &storage.AccessLog{}, &storage.VirtualBucketMapping{}, diff --git a/internal/scheduler/monthly_archiver.go b/internal/scheduler/monthly_archiver.go new file mode 100644 index 0000000..2302b9c --- /dev/null +++ b/internal/scheduler/monthly_archiver.go @@ -0,0 +1,92 @@ +package scheduler + +import ( + "log" + "time" + + "github.com/DullJZ/s3-balance/internal/storage" +) + +// MonthlyArchiver 月度统计归档器 +type MonthlyArchiver struct { + storage *storage.Service + ticker *time.Ticker + stopChan chan struct{} +} + +// NewMonthlyArchiver 创建月度归档器 +func NewMonthlyArchiver(storage *storage.Service, checkInterval time.Duration) *MonthlyArchiver { + return &MonthlyArchiver{ + storage: storage, + ticker: time.NewTicker(checkInterval), + stopChan: make(chan struct{}), + } +} + +// Start 启动月度归档定期任务 +func (m *MonthlyArchiver) Start() { + log.Println("Starting monthly statistics archiver...") + + // 启动时立即归档上个月的数据(如果还没有归档) + m.archiveLastMonth() + + go func() { + for { + select { + case <-m.ticker.C: + m.checkAndArchive() + case <-m.stopChan: + log.Println("Monthly statistics archiver stopped") + return + } + } + }() +} + +// Stop 停止归档任务 +func (m *MonthlyArchiver) Stop() { + close(m.stopChan) + m.ticker.Stop() +} + +// checkAndArchive 检查并归档统计数据 +func (m *MonthlyArchiver) checkAndArchive() { + now := time.Now() + + // 如果是每月的第一天凌晨,归档上个月的数据 + if now.Day() == 1 && now.Hour() < 1 { + m.archiveLastMonth() + } + + // 每天都归档当前月份(实时更新) + m.archiveCurrentMonth() +} + +// archiveLastMonth 归档上个月的数据 +func (m *MonthlyArchiver) archiveLastMonth() { + now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + year, month := lastMonth.Year(), int(lastMonth.Month()) + + log.Printf("Archiving monthly stats for %d-%02d...", year, month) + + if err := m.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to archive monthly stats for %d-%02d: %v", year, month, err) + return + } + + log.Printf("Successfully archived monthly stats for %d-%02d", year, month) +} + +// archiveCurrentMonth 归档当前月份(实时更新) +func (m *MonthlyArchiver) archiveCurrentMonth() { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + if err := m.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to update current month stats for %d-%02d: %v", year, month, err) + return + } + + log.Printf("Updated current month stats for %d-%02d", year, month) +} diff --git a/internal/storage/models.go b/internal/storage/models.go index a88837f..25e7549 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -45,6 +45,23 @@ func (BucketStats) TableName() string { return "bucket_stats" } +// BucketMonthlyStats 存储桶月度统计信息模型 +type BucketMonthlyStats struct { + ID uint `gorm:"primaryKey" json:"id"` + BucketName string `gorm:"index:idx_bucket_month;size:255;not null" json:"bucket_name"` + Year int `gorm:"index:idx_bucket_month;not null" json:"year"` + Month int `gorm:"index:idx_bucket_month;not null" json:"month"` + OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"` + OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// TableName 指定表名 +func (BucketMonthlyStats) TableName() string { + return "bucket_monthly_stats" +} + // VirtualBucketMapping 虚拟存储桶文件级映射模型 type VirtualBucketMapping struct { ID uint `gorm:"primaryKey" json:"id"` diff --git a/internal/storage/service.go b/internal/storage/service.go index 9ab629e..0d8a0c1 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -630,3 +630,113 @@ func (s *Service) DeleteVirtualBucketFileMapping(virtualBucketName, objectKey st } return nil } + +// ArchiveMonthlyStats 归档指定月份的统计数据 +// 如果该月份的记录已存在,则更新;否则创建新记录 +func (s *Service) ArchiveMonthlyStats(year, month int) error { + var stats []BucketStats + if err := s.db.Find(&stats).Error; err != nil { + return fmt.Errorf("failed to fetch bucket stats: %w", err) + } + + for _, stat := range stats { + monthlyStats := BucketMonthlyStats{ + BucketName: stat.BucketName, + Year: year, + Month: month, + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + } + + // 使用 UPSERT 逻辑:如果存在则更新,否则创建 + if err := s.db.Where("bucket_name = ? AND year = ? AND month = ?", + stat.BucketName, year, month). + Assign(BucketMonthlyStats{ + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + }). + FirstOrCreate(&monthlyStats).Error; err != nil { + return fmt.Errorf("failed to archive monthly stats for bucket %s: %w", stat.BucketName, err) + } + } + + return nil +} + +// GetMonthlyStats 获取指定月份的统计数据 +func (s *Service) GetMonthlyStats(year, month int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("year = ? AND month = ?", year, month). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch monthly stats: %w", err) + } + return stats, nil +} + +// GetMonthlyStatsRange 获取指定时间范围的统计数据 +func (s *Service) GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("(year > ? OR (year = ? AND month >= ?)) AND (year < ? OR (year = ? AND month <= ?))", + startYear, startYear, startMonth, endYear, endYear, endMonth). + Order("year, month, bucket_name"). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch monthly stats range: %w", err) + } + return stats, nil +} + +// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算) +func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + // 获取上个月的归档数据 + var lastMonthStats []BucketMonthlyStats + lastYear, lastMonth := year, month-1 + if lastMonth == 0 { + lastMonth = 12 + lastYear-- + } + + lastMonthMap := make(map[string]BucketMonthlyStats) + if err := s.db.Where("year = ? AND month = ?", lastYear, lastMonth). + Find(&lastMonthStats).Error; err == nil { + for _, stat := range lastMonthStats { + lastMonthMap[stat.BucketName] = stat + } + } + + // 获取当前累计数据 + var currentStats []BucketStats + if err := s.db.Find(¤tStats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch current bucket stats: %w", err) + } + + // 计算当前月份的增量 + result := make([]BucketMonthlyStats, 0, len(currentStats)) + for _, current := range currentStats { + lastMonth := lastMonthMap[current.BucketName] + result = append(result, BucketMonthlyStats{ + BucketName: current.BucketName, + Year: year, + Month: month, + OperationCountA: current.OperationCountA - lastMonth.OperationCountA, + OperationCountB: current.OperationCountB - lastMonth.OperationCountB, + UpdatedAt: time.Now(), + }) + } + + return result, nil +} + +// GetBucketMonthlyHistory 获取指定存储桶的月度历史统计 +func (s *Service) GetBucketMonthlyHistory(bucketName string, months int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("bucket_name = ?", bucketName). + Order("year DESC, month DESC"). + Limit(months). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch bucket monthly history: %w", err) + } + return stats, nil +} From dc6916f3df67e73d7fb8ceedaae7311269342757 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:00:35 +0800 Subject: [PATCH 06/20] Fix: Add unique index to prevent duplicate monthly stats records --- internal/storage/models.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/storage/models.go b/internal/storage/models.go index 25e7549..b87eef6 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -48,9 +48,9 @@ func (BucketStats) TableName() string { // BucketMonthlyStats 存储桶月度统计信息模型 type BucketMonthlyStats struct { ID uint `gorm:"primaryKey" json:"id"` - BucketName string `gorm:"index:idx_bucket_month;size:255;not null" json:"bucket_name"` - Year int `gorm:"index:idx_bucket_month;not null" json:"year"` - Month int `gorm:"index:idx_bucket_month;not null" json:"month"` + BucketName string `gorm:"uniqueIndex:idx_bucket_month;size:255;not null" json:"bucket_name"` + Year int `gorm:"uniqueIndex:idx_bucket_month;not null" json:"year"` + Month int `gorm:"uniqueIndex:idx_bucket_month;not null" json:"month"` OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"` OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"` CreatedAt time.Time `json:"created_at"` From 953feb196574217ba06a493fbc80659249de5bb1 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:23:12 +0800 Subject: [PATCH 07/20] Fix: Store monthly increments instead of cumulative snapshots --- internal/storage/service.go | 84 ++++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 19 deletions(-) diff --git a/internal/storage/service.go b/internal/storage/service.go index 0d8a0c1..9eae811 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -631,29 +631,69 @@ func (s *Service) DeleteVirtualBucketFileMapping(virtualBucketName, objectKey st return nil } -// ArchiveMonthlyStats 归档指定月份的统计数据 +// ArchiveMonthlyStats 归档指定月份的统计数据(存储增量值,非累计值) // 如果该月份的记录已存在,则更新;否则创建新记录 func (s *Service) ArchiveMonthlyStats(year, month int) error { - var stats []BucketStats - if err := s.db.Find(&stats).Error; err != nil { + // 获取当前所有bucket的累计统计 + var currentStats []BucketStats + if err := s.db.Find(¤tStats).Error; err != nil { return fmt.Errorf("failed to fetch bucket stats: %w", err) } - for _, stat := range stats { + // 获取上个月的累计值(从上月归档数据推算) + lastYear, lastMonth := year, month-1 + if lastMonth == 0 { + lastMonth = 12 + lastYear-- + } + + // 查询上个月及之前的所有归档数据,用于推算上月末的累计值 + var lastMonthArchived []BucketMonthlyStats + lastMonthMap := make(map[string]int64) // bucket_name -> last_month_cumulative_a + lastMonthMapB := make(map[string]int64) // bucket_name -> last_month_cumulative_b + + if err := s.db.Where("year < ? OR (year = ? AND month <= ?)", lastYear, lastYear, lastMonth). + Order("year ASC, month ASC"). + Find(&lastMonthArchived).Error; err == nil { + + // 累加历史增量得到上月末累计值 + cumulativeA := make(map[string]int64) + cumulativeB := make(map[string]int64) + + for _, archived := range lastMonthArchived { + cumulativeA[archived.BucketName] += archived.OperationCountA + cumulativeB[archived.BucketName] += archived.OperationCountB + } + + lastMonthMap = cumulativeA + lastMonthMapB = cumulativeB + } + + // 对每个bucket,计算本月增量并存储 + for _, stat := range currentStats { + lastCumulativeA := lastMonthMap[stat.BucketName] + lastCumulativeB := lastMonthMapB[stat.BucketName] + + incrementA := stat.OperationCountA - lastCumulativeA + incrementB := stat.OperationCountB - lastCumulativeB + + // 如果是首次运行(没有历史数据),incrementA/B 可能等于累计值 + // 这是预期行为:首月记录的就是从0到当前的增量 + monthlyStats := BucketMonthlyStats{ BucketName: stat.BucketName, Year: year, Month: month, - OperationCountA: stat.OperationCountA, - OperationCountB: stat.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, } // 使用 UPSERT 逻辑:如果存在则更新,否则创建 if err := s.db.Where("bucket_name = ? AND year = ? AND month = ?", stat.BucketName, year, month). Assign(BucketMonthlyStats{ - OperationCountA: stat.OperationCountA, - OperationCountB: stat.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, }). FirstOrCreate(&monthlyStats).Error; err != nil { return fmt.Errorf("failed to archive monthly stats for bucket %s: %w", stat.BucketName, err) @@ -685,24 +725,28 @@ func (s *Service) GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth return stats, nil } -// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算) +// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算增量) func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { now := time.Now() year, month := now.Year(), int(now.Month()) - // 获取上个月的归档数据 - var lastMonthStats []BucketMonthlyStats + // 获取上个月末的累计值(通过累加所有历史增量) lastYear, lastMonth := year, month-1 if lastMonth == 0 { lastMonth = 12 lastYear-- } - lastMonthMap := make(map[string]BucketMonthlyStats) - if err := s.db.Where("year = ? AND month = ?", lastYear, lastMonth). - Find(&lastMonthStats).Error; err == nil { - for _, stat := range lastMonthStats { - lastMonthMap[stat.BucketName] = stat + var historicalStats []BucketMonthlyStats + lastMonthCumulativeA := make(map[string]int64) + lastMonthCumulativeB := make(map[string]int64) + + if err := s.db.Where("year < ? OR (year = ? AND month <= ?)", lastYear, lastYear, lastMonth). + Find(&historicalStats).Error; err == nil { + // 累加所有历史增量得到上月末累计值 + for _, stat := range historicalStats { + lastMonthCumulativeA[stat.BucketName] += stat.OperationCountA + lastMonthCumulativeB[stat.BucketName] += stat.OperationCountB } } @@ -715,13 +759,15 @@ func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { // 计算当前月份的增量 result := make([]BucketMonthlyStats, 0, len(currentStats)) for _, current := range currentStats { - lastMonth := lastMonthMap[current.BucketName] + incrementA := current.OperationCountA - lastMonthCumulativeA[current.BucketName] + incrementB := current.OperationCountB - lastMonthCumulativeB[current.BucketName] + result = append(result, BucketMonthlyStats{ BucketName: current.BucketName, Year: year, Month: month, - OperationCountA: current.OperationCountA - lastMonth.OperationCountA, - OperationCountB: current.OperationCountB - lastMonth.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, UpdatedAt: time.Now(), }) } From c35ff96397bae30c46197a915c9e7d50368b2caf Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:38:09 +0800 Subject: [PATCH 08/20] Fix: Prevent duplicate archiving with idempotency tracking --- internal/scheduler/monthly_archiver.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/scheduler/monthly_archiver.go b/internal/scheduler/monthly_archiver.go index 2302b9c..82c2698 100644 --- a/internal/scheduler/monthly_archiver.go +++ b/internal/scheduler/monthly_archiver.go @@ -9,9 +9,10 @@ import ( // MonthlyArchiver 月度统计归档器 type MonthlyArchiver struct { - storage *storage.Service - ticker *time.Ticker - stopChan chan struct{} + storage *storage.Service + ticker *time.Ticker + stopChan chan struct{} + lastArchivedDate string // 格式: "2025-01" - 记录上次归档的月份 } // NewMonthlyArchiver 创建月度归档器 @@ -52,10 +53,13 @@ func (m *MonthlyArchiver) Stop() { // checkAndArchive 检查并归档统计数据 func (m *MonthlyArchiver) checkAndArchive() { now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + lastMonthKey := lastMonth.Format("2006-01") - // 如果是每月的第一天凌晨,归档上个月的数据 - if now.Day() == 1 && now.Hour() < 1 { + // 如果是每月的第一天,且上个月还未归档,则归档上个月的数据 + if now.Day() == 1 && m.lastArchivedDate != lastMonthKey { m.archiveLastMonth() + m.lastArchivedDate = lastMonthKey } // 每天都归档当前月份(实时更新) From 86d2bc1f3e4d0ac705cb964e445820305b6643ad Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:50:39 +0800 Subject: [PATCH 09/20] Fix: Add boundary checks for negative increment values --- internal/storage/service.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/storage/service.go b/internal/storage/service.go index 9eae811..fcc1b31 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -680,6 +680,14 @@ func (s *Service) ArchiveMonthlyStats(year, month int) error { // 如果是首次运行(没有历史数据),incrementA/B 可能等于累计值 // 这是预期行为:首月记录的就是从0到当前的增量 + // 边界情况:如果计算出负值,说明数据不一致,设置为0 + if incrementA < 0 { + incrementA = 0 + } + if incrementB < 0 { + incrementB = 0 + } + monthlyStats := BucketMonthlyStats{ BucketName: stat.BucketName, Year: year, @@ -762,6 +770,14 @@ func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { incrementA := current.OperationCountA - lastMonthCumulativeA[current.BucketName] incrementB := current.OperationCountB - lastMonthCumulativeB[current.BucketName] + // 边界情况:如果计算出负值,说明数据不一致,设置为0 + if incrementA < 0 { + incrementA = 0 + } + if incrementB < 0 { + incrementB = 0 + } + result = append(result, BucketMonthlyStats{ BucketName: current.BucketName, Year: year, From de019d56d8f2c05764053e8e89405266b3885660 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:03:09 +0800 Subject: [PATCH 10/20] Fix: Use transaction for atomic increment and read operation --- internal/storage/service.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/internal/storage/service.go b/internal/storage/service.go index fcc1b31..e7ba263 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -304,18 +304,29 @@ func (s *Service) IncrementBucketOperation(bucketName, category string) (int64, return 0, fmt.Errorf("unknown operation category: %s", category) } - if err := s.db.Model(&BucketStats{}). - Where("bucket_name = ?", bucketName). - UpdateColumn(field, gorm.Expr(field+" + ?", 1)).Error; err != nil { - return 0, fmt.Errorf("failed to increment %s for bucket %s: %w", field, bucketName, err) - } - + // 使用事务确保原子性 var count int64 - if err := s.db.Model(&BucketStats{}). - Where("bucket_name = ?", bucketName). - Select(field). - Scan(&count).Error; err != nil { - return 0, fmt.Errorf("failed to fetch updated %s for bucket %s: %w", field, bucketName, err) + err := s.db.Transaction(func(tx *gorm.DB) error { + // 原子递增 + if err := tx.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + UpdateColumn(field, gorm.Expr(field+" + ?", 1)).Error; err != nil { + return fmt.Errorf("failed to increment %s for bucket %s: %w", field, bucketName, err) + } + + // 在同一事务中读取最新值 + if err := tx.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + Select(field). + Scan(&count).Error; err != nil { + return fmt.Errorf("failed to fetch updated %s for bucket %s: %w", field, bucketName, err) + } + + return nil + }) + + if err != nil { + return 0, err } return count, nil From bf5db75d26d4125edaa78ee29939ceb009b5e04a Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:44:09 +0800 Subject: [PATCH 11/20] Update config.example.yaml --- config/config.example.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/config/config.example.yaml b/config/config.example.yaml index d305fe7..7680fa2 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -141,11 +141,12 @@ s3api: virtual_host: false # 工作模式: - # false:预签名重定向模式,客户端直接与后端存储交互 - # true (默认):代理模式,数据通过S3 Balance服务器传输 + # false:预签名重定向模式,客户端下载直接重定向到与后端存储 + # true (默认):代理模式,数据通过S3 Balance服务器中转传输 + # 该选项仅适用于下载,上传操作始终为全代理模式 proxy_mode: true - # 是否需要认证(开启后使用 Basic Auth,凭据来自 access_key/secret_key) + # 是否需要认证(使用配置的 access_key/secret_key) auth_required: true # 用于签名验证的Host(可选) From ffc21aa49cbfbe5b1dcfe3f9662becfb4aa4a006 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 4 Nov 2025 19:57:02 +0800 Subject: [PATCH 12/20] Add management API endpoints --- cmd/s3-balance/main.go | 15 +++ config/config.example.yaml | 17 +++- internal/api/admin_handler.go | 175 ++++++++++++++++++++++++++++++++++ internal/bucket/manager.go | 19 ++++ internal/config/config.go | 12 +++ internal/middleware/auth.go | 34 +++++++ 6 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 internal/api/admin_handler.go diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 7839bfc..856106e 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -17,6 +17,7 @@ import ( "github.com/DullJZ/s3-balance/internal/config" "github.com/DullJZ/s3-balance/internal/database" "github.com/DullJZ/s3-balance/internal/metrics" + "github.com/DullJZ/s3-balance/internal/middleware" "github.com/DullJZ/s3-balance/internal/storage" "github.com/DullJZ/s3-balance/pkg/presigner" "github.com/gorilla/mux" @@ -124,6 +125,20 @@ func main() { log.Printf("Metrics server enabled at %s", cfg.Metrics.Path) } + // 注册管理API路由(如果启用) + // 必须在S3路由之前注册,因为S3路由使用 /{bucket} 通配符会匹配所有路径 + if cfg.API.Enabled { + log.Println("Management API enabled") + adminHandler := api.NewAdminHandler(bucketManager, lb, cfg) + + // 创建子路由器并应用Token认证中间件 + apiRouter := router.PathPrefix("/api").Subrouter() + apiRouter.Use(middleware.TokenAuthMiddleware(cfg.API.Token)) + adminHandler.RegisterRoutes(apiRouter) + + log.Printf("Management API endpoints available at /api/*") + } + // 运行在S3兼容模式 log.Println("Running in S3-compatible mode") s3Handler.RegisterS3Routes(router) diff --git a/config/config.example.yaml b/config/config.example.yaml index d305fe7..9fa58b2 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -133,18 +133,18 @@ metrics: s3api: # 客户端连接用的Access Key access_key: "AKIAIOSFODNN7EXAMPLE" - + # 客户端连接用的Secret Key secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" - + # 是否使用虚拟主机模式 virtual_host: false - + # 工作模式: # false:预签名重定向模式,客户端直接与后端存储交互 # true (默认):代理模式,数据通过S3 Balance服务器传输 proxy_mode: true - + # 是否需要认证(开启后使用 Basic Auth,凭据来自 access_key/secret_key) auth_required: true @@ -153,3 +153,12 @@ s3api: # 留空则使用请求中的 Host 头 # 示例: "s3.example.com" 或 "s3.example.com:8080" host: "" + +# 管理API配置 +api: + # 是否启用管理API + enabled: true + + # API访问令牌(用于管理接口的身份验证) + # 请修改为强密码,建议使用随机生成的长字符串 + token: "your-secure-api-token-change-this" diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go new file mode 100644 index 0000000..dcf0f3f --- /dev/null +++ b/internal/api/admin_handler.go @@ -0,0 +1,175 @@ +package api + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/DullJZ/s3-balance/internal/balancer" + "github.com/DullJZ/s3-balance/internal/bucket" + "github.com/DullJZ/s3-balance/internal/config" + "github.com/gorilla/mux" +) + +// AdminHandler 管理API处理器 +type AdminHandler struct { + bucketManager *bucket.Manager + balancer *balancer.Balancer + config *config.Config +} + +// NewAdminHandler 创建新的管理API处理器 +func NewAdminHandler( + bucketManager *bucket.Manager, + balancer *balancer.Balancer, + cfg *config.Config, +) *AdminHandler { + return &AdminHandler{ + bucketManager: bucketManager, + balancer: balancer, + config: cfg, + } +} + +// BucketResponse 存储桶响应结构 +type BucketResponse struct { + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Region string `json:"region"` + MaxSize string `json:"max_size"` + MaxSizeBytes int64 `json:"max_size_bytes"` + UsedSize int64 `json:"used_size"` + AvailableSize int64 `json:"available_size"` + UsagePercent float64 `json:"usage_percent"` + Weight int `json:"weight"` + Enabled bool `json:"enabled"` + Available bool `json:"available"` + Virtual bool `json:"virtual"` + LastChecked time.Time `json:"last_checked"` + OperationCountA int64 `json:"operation_count_a"` + OperationCountB int64 `json:"operation_count_b"` + OperationLimits struct { + TypeA int `json:"type_a"` + TypeB int `json:"type_b"` + } `json:"operation_limits"` +} + +// BucketsListResponse 存储桶列表响应结构 +type BucketsListResponse struct { + Total int `json:"total"` + Buckets []BucketResponse `json:"buckets"` +} + +// HealthResponse 健康状态响应结构 +type HealthResponse struct { + Status string `json:"status"` + Timestamp time.Time `json:"timestamp"` + LoadBalancer string `json:"load_balancer_strategy"` + TotalBuckets int `json:"total_buckets"` + AvailableBuckets int `json:"available_buckets"` + Database string `json:"database_type"` +} + +// RegisterRoutes 注册管理API路由 +func (h *AdminHandler) RegisterRoutes(router *mux.Router) { + router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet) + router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) + router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet) +} + +// ListBuckets 获取存储桶列表 +func (h *AdminHandler) ListBuckets(w http.ResponseWriter, r *http.Request) { + buckets := h.bucketManager.GetAllBuckets() + + response := BucketsListResponse{ + Total: len(buckets), + Buckets: make([]BucketResponse, 0, len(buckets)), + } + + for _, b := range buckets { + bucketResp := h.convertBucketInfo(b) + response.Buckets = append(response.Buckets, bucketResp) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetBucketDetail 获取存储桶详情 +func (h *AdminHandler) GetBucketDetail(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + name := vars["name"] + + bucketInfo, exists := h.bucketManager.GetBucket(name) + if !exists { + http.Error(w, `{"error": "bucket not found"}`, http.StatusNotFound) + return + } + + response := h.convertBucketInfo(bucketInfo) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetHealth 获取系统健康状态 +func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) { + buckets := h.bucketManager.GetAllBuckets() + availableBuckets := h.bucketManager.GetAvailableBuckets() + + status := "healthy" + if len(availableBuckets) == 0 { + status = "unhealthy" + } else if len(availableBuckets) < len(buckets)/2 { + status = "degraded" + } + + response := HealthResponse{ + Status: status, + Timestamp: time.Now(), + LoadBalancer: h.config.Balancer.Strategy, + TotalBuckets: len(buckets), + AvailableBuckets: len(availableBuckets), + Database: h.config.Database.Type, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// convertBucketInfo 转换BucketInfo为BucketResponse +func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse { + resp := BucketResponse{ + Name: b.Config.Name, + Endpoint: b.Config.Endpoint, + Region: b.Config.Region, + MaxSize: b.Config.MaxSize, + MaxSizeBytes: b.Config.MaxSizeBytes, + UsedSize: b.UsedSize, + Weight: b.Config.Weight, + Enabled: b.Config.Enabled, + Available: b.Available, + Virtual: b.Config.Virtual, + LastChecked: b.LastChecked, + OperationCountA: b.GetOperationCount(bucket.OperationTypeA), + OperationCountB: b.GetOperationCount(bucket.OperationTypeB), + } + + resp.OperationLimits.TypeA = b.Config.OperationLimits.TypeA + resp.OperationLimits.TypeB = b.Config.OperationLimits.TypeB + + // 计算可用空间 + if b.Config.MaxSizeBytes > 0 { + resp.AvailableSize = b.Config.MaxSizeBytes - b.UsedSize + if resp.AvailableSize < 0 { + resp.AvailableSize = 0 + } + // 计算使用百分比 + resp.UsagePercent = float64(b.UsedSize) / float64(b.Config.MaxSizeBytes) * 100 + } else { + resp.AvailableSize = -1 // -1 表示无限制 + resp.UsagePercent = 0 + } + + return resp +} diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 41c86ee..8f5e0d8 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -413,6 +413,25 @@ func (b *BucketInfo) IsVirtual() bool { return b.Config.Virtual } +// GetOperationCount 获取指定类型的操作计数 +func (b *BucketInfo) GetOperationCount(category OperationCategory) int64 { + if b == nil { + return 0 + } + + b.mu.RLock() + defer b.mu.RUnlock() + + switch category { + case OperationTypeA: + return b.operationCountA + case OperationTypeB: + return b.operationCountB + default: + return 0 + } +} + // GetVirtualBuckets 获取所有虚拟存储桶 func (m *Manager) GetVirtualBuckets() []*BucketInfo { m.mu.RLock() diff --git a/internal/config/config.go b/internal/config/config.go index 5953a38..2f3d8b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ type Config struct { Balancer BalancerConfig `yaml:"balancer"` Metrics MetricsConfig `yaml:"metrics"` S3API S3APIConfig `yaml:"s3api"` + API APIConfig `yaml:"api"` } // ServerConfig 服务器配置 @@ -75,6 +76,12 @@ type S3APIConfig struct { Host string `yaml:"host"` // 用于签名验证的Host(为空则使用请求的Host) } +// APIConfig 管理API配置 +type APIConfig struct { + Enabled bool `yaml:"enabled"` // 是否启用管理API + Token string `yaml:"token"` // API访问令牌 +} + // DatabaseConfig 数据库配置 type DatabaseConfig struct { Type string `yaml:"type"` // 数据库类型: sqlite, mysql, postgres @@ -179,6 +186,11 @@ func (c *Config) SetDefaults() { if c.S3API.SecretKey == "" { c.S3API.SecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" } + + // 管理API默认值 + if c.API.Token == "" { + c.API.Token = "your-secure-api-token-here" + } } // ParseMaxSize 解析最大容量字符串为字节 diff --git a/internal/middleware/auth.go b/internal/middleware/auth.go index c29ffc6..e95bcc4 100644 --- a/internal/middleware/auth.go +++ b/internal/middleware/auth.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strings" "github.com/DullJZ/s3-validate/pkg/s3validate" ) @@ -76,3 +77,36 @@ func invokeOnError(w http.ResponseWriter, r *http.Request, cfg S3SignatureConfig } http.Error(w, message, http.StatusForbidden) } + +// TokenAuthMiddleware 创建Token认证中间件,用于管理API +func TokenAuthMiddleware(validToken string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // 从Authorization头中提取token + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + w.Header().Set("Content-Type", "application/json") + http.Error(w, `{"error": "missing authorization header"}`, http.StatusUnauthorized) + return + } + + // 支持两种格式: + // 1. Bearer + // 2. + token := authHeader + if strings.HasPrefix(authHeader, "Bearer ") { + token = strings.TrimPrefix(authHeader, "Bearer ") + } + + // 验证token + if token != validToken { + w.Header().Set("Content-Type", "application/json") + http.Error(w, `{"error": "invalid token"}`, http.StatusUnauthorized) + return + } + + // 继续处理请求 + next.ServeHTTP(w, r) + }) + } +} From dbd566059bd325d093007cbf1896d1366bde47d2 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 4 Nov 2025 21:35:08 +0800 Subject: [PATCH 13/20] Fix: Remove duplicate /api prefix in admin routes --- internal/api/admin_handler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index dcf0f3f..5b28a5d 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -71,10 +71,11 @@ type HealthResponse struct { } // RegisterRoutes 注册管理API路由 +// 注意: router 参数应该是已经带有 /api 前缀的子路由器 func (h *AdminHandler) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet) - router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) - router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet) + router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet) + router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) + router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet) } // ListBuckets 获取存储桶列表 From c7e984aac23ed0b3ce760b182e84d103f9e4dedd Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Wed, 5 Nov 2025 01:20:49 +0800 Subject: [PATCH 14/20] Fix: Add CORS support for management API OPTIONS requests --- cmd/s3-balance/main.go | 3 ++- internal/api/admin_handler.go | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 6413a8b..ad04355 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -137,8 +137,9 @@ func main() { log.Println("Management API enabled") adminHandler := api.NewAdminHandler(bucketManager, lb, cfg) - // 创建子路由器并应用Token认证中间件 + // 创建子路由器并应用中间件 apiRouter := router.PathPrefix("/api").Subrouter() + apiRouter.Use(corsMiddleware) // 先应用 CORS 中间件,处理 OPTIONS 预检请求 apiRouter.Use(middleware.TokenAuthMiddleware(cfg.API.Token)) adminHandler.RegisterRoutes(apiRouter) diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index 5b28a5d..38f17e1 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -73,9 +73,10 @@ type HealthResponse struct { // RegisterRoutes 注册管理API路由 // 注意: router 参数应该是已经带有 /api 前缀的子路由器 func (h *AdminHandler) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet) - router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) - router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet) + // 注册路由,同时支持 OPTIONS 方法用于 CORS 预检 + router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet, http.MethodOptions) } // ListBuckets 获取存储桶列表 From 291c327477ccecc96f1388de3ecdfb7f89847ea6 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Thu, 6 Nov 2025 01:14:09 +0800 Subject: [PATCH 15/20] Feat: Config Manager API --- .gitignore | 1 + cmd/s3-balance/main.go | 2 +- internal/api/admin_handler.go | 64 +++++++++++- internal/config/manager.go | 177 ++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 38261b6..2e472cb 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ logs/ # AI doc AGENTS.md CLAUDE.md +docs/ # Generated files s3-balance \ No newline at end of file diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 6413a8b..fe006fb 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -135,7 +135,7 @@ func main() { // 必须在S3路由之前注册,因为S3路由使用 /{bucket} 通配符会匹配所有路径 if cfg.API.Enabled { log.Println("Management API enabled") - adminHandler := api.NewAdminHandler(bucketManager, lb, cfg) + adminHandler := api.NewAdminHandler(bucketManager, lb, cfg, configManager) // 创建子路由器并应用Token认证中间件 apiRouter := router.PathPrefix("/api").Subrouter() diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index dcf0f3f..c51e461 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -16,6 +16,7 @@ type AdminHandler struct { bucketManager *bucket.Manager balancer *balancer.Balancer config *config.Config + configManager *config.Manager } // NewAdminHandler 创建新的管理API处理器 @@ -23,11 +24,13 @@ func NewAdminHandler( bucketManager *bucket.Manager, balancer *balancer.Balancer, cfg *config.Config, + configManager *config.Manager, ) *AdminHandler { return &AdminHandler{ bucketManager: bucketManager, balancer: balancer, config: cfg, + configManager: configManager, } } @@ -72,9 +75,12 @@ type HealthResponse struct { // RegisterRoutes 注册管理API路由 func (h *AdminHandler) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet) - router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) - router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet) + // 注册路由,同时支持 OPTIONS 方法用于 CORS 预检 + router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/config", h.GetConfig).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost, http.MethodOptions) } // ListBuckets 获取存储桶列表 @@ -137,6 +143,58 @@ func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) } +// GetConfig 获取当前配置 +func (h *AdminHandler) GetConfig(w http.ResponseWriter, r *http.Request) { + if h.configManager == nil { + http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError) + return + } + + currentConfig := h.configManager.GetConfig() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(currentConfig) +} + +// UpdateConfig 更新配置 +func (h *AdminHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + if h.configManager == nil { + http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError) + return + } + + // 解析请求体 + var newConfig config.Config + if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { + http.Error(w, `{"error": "invalid JSON format: `+err.Error()+`"}`, http.StatusBadRequest) + return + } + + // 设置默认值 + newConfig.SetDefaults() + + // 更新配置 + if err := h.configManager.UpdateConfig(&newConfig); err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{ + "error": "validation failed", + "message": err.Error(), + }) + return + } + + // 返回成功响应 + response := map[string]interface{}{ + "success": true, + "message": "Configuration updated successfully. Changes will take effect automatically.", + "config": &newConfig, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + // convertBucketInfo 转换BucketInfo为BucketResponse func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse { resp := BucketResponse{ diff --git a/internal/config/manager.go b/internal/config/manager.go index 3c46547..601904a 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -1,12 +1,14 @@ package config import ( + "fmt" "log" "os" "sync" "time" "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" ) // Manager 配置管理器,支持热更新 @@ -227,6 +229,181 @@ func (m *Manager) logConfigChanges(oldConfig, newConfig *Config) { } } +// UpdateConfig 通过 API 更新配置文件 +// 返回错误如果验证失败或写入失败 +func (m *Manager) UpdateConfig(newConfig *Config) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + // 1. 验证新配置 + if err := m.validateConfig(newConfig); err != nil { + return err + } + + // 2. 备份当前配置文件 + if err := m.backupConfigFile(); err != nil { + log.Printf("Failed to backup config file: %v", err) + // 继续执行,备份失败不应阻止更新 + } + + // 3. 将新配置写入文件 + if err := m.writeConfigFile(newConfig); err != nil { + return err + } + + // 4. 更新内存中的配置 + oldConfig := m.config + m.config = newConfig + + // 5. 更新最后修改时间,避免文件监听重复触发 + if fileInfo, err := os.Stat(m.configFile); err == nil { + m.lastModTime = fileInfo.ModTime() + } + + log.Printf("Configuration updated successfully via API") + + // 6. 触发配置变更回调(在锁外执行) + callbacks := make([]func(*Config), len(m.callbacks)) + copy(callbacks, m.callbacks) + + go func() { + for _, callback := range callbacks { + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Config change callback panic: %v", r) + } + }() + callback(newConfig) + }() + } + }() + + // 7. 记录配置变更 + m.logConfigChanges(oldConfig, newConfig) + + return nil +} + +// validateConfig 验证配置的有效性 +func (m *Manager) validateConfig(cfg *Config) error { + // 基本验证 + if cfg.Server.Port <= 0 || cfg.Server.Port > 65535 { + return fmt.Errorf("invalid server port: %d", cfg.Server.Port) + } + + if len(cfg.Buckets) == 0 { + return fmt.Errorf("at least one bucket is required") + } + + // 验证存储桶配置 + for i, bucket := range cfg.Buckets { + if bucket.Name == "" { + return fmt.Errorf("bucket[%d]: name is required", i) + } + + // 虚拟存储桶不需要端点和凭据 + if !bucket.Virtual { + if bucket.Endpoint == "" { + return fmt.Errorf("bucket[%d] (%s): endpoint is required for non-virtual bucket", i, bucket.Name) + } + if bucket.AccessKeyID == "" { + return fmt.Errorf("bucket[%d] (%s): access_key_id is required for non-virtual bucket", i, bucket.Name) + } + if bucket.SecretAccessKey == "" { + return fmt.Errorf("bucket[%d] (%s): secret_access_key is required for non-virtual bucket", i, bucket.Name) + } + } + + // 解析并验证容量大小 + if err := cfg.Buckets[i].ParseMaxSize(); err != nil { + return fmt.Errorf("bucket[%d] (%s): invalid max_size: %w", i, bucket.Name, err) + } + } + + // 验证负载均衡策略 + validStrategies := map[string]bool{ + "round-robin": true, + "least-space": true, + "weighted": true, + } + if !validStrategies[cfg.Balancer.Strategy] { + return fmt.Errorf("invalid balancer strategy: %s (must be one of: round-robin, least-space, weighted)", cfg.Balancer.Strategy) + } + + // 验证数据库配置 + if cfg.Database.Type == "" { + return fmt.Errorf("database type is required") + } + validDBTypes := map[string]bool{ + "sqlite": true, + "mysql": true, + "postgres": true, + } + if !validDBTypes[cfg.Database.Type] { + return fmt.Errorf("invalid database type: %s (must be one of: sqlite, mysql, postgres)", cfg.Database.Type) + } + + return nil +} + +// backupConfigFile 备份当前配置文件 +func (m *Manager) backupConfigFile() error { + backupPath := m.configFile + ".backup." + time.Now().Format("20060102-150405") + + sourceData, err := os.ReadFile(m.configFile) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + if err := os.WriteFile(backupPath, sourceData, 0644); err != nil { + return fmt.Errorf("failed to write backup file: %w", err) + } + + log.Printf("Config file backed up to: %s", backupPath) + return nil +} + +// writeConfigFile 将配置写入 YAML 文件 +func (m *Manager) writeConfigFile(cfg *Config) error { + // 临时文件,确保原子性 + tmpFile := m.configFile + ".tmp" + + file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp config file: %w", err) + } + defer file.Close() + + encoder := yaml.NewEncoder(file) + encoder.SetIndent(2) + + if err := encoder.Encode(cfg); err != nil { + file.Close() + os.Remove(tmpFile) + return fmt.Errorf("failed to encode config: %w", err) + } + + if err := encoder.Close(); err != nil { + file.Close() + os.Remove(tmpFile) + return fmt.Errorf("failed to close encoder: %w", err) + } + + if err := file.Close(); err != nil { + os.Remove(tmpFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + // 原子性替换原文件 + if err := os.Rename(tmpFile, m.configFile); err != nil { + os.Remove(tmpFile) + return fmt.Errorf("failed to replace config file: %w", err) + } + + return nil +} + // Close 关闭配置管理器 func (m *Manager) Close() error { // 停止监听协程 From 1402d40c052d26cef3b9b9bb9be7cec6c62262c2 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Thu, 6 Nov 2025 01:41:51 +0800 Subject: [PATCH 16/20] Track health check ListObjects as Class A operations --- .gitignore | 1 + internal/bucket/manager.go | 4 +++ internal/bucket/reporter.go | 52 +++++++++++++++++++++++++++++++++++ internal/health/s3_checker.go | 14 +++++++++- internal/health/stats.go | 14 +++++++++- internal/health/types.go | 16 +++++++++++ 6 files changed, 99 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 38261b6..2e472cb 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ logs/ # AI doc AGENTS.md CLAUDE.md +docs/ # Generated files s3-balance \ No newline at end of file diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 8f5e0d8..7afeef5 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -184,12 +184,16 @@ func (m *Manager) initHealthMonitoring() { // 创建S3健康检查器 healthChecker := health.NewS3Checker(healthConfig) + // 设置操作记录器以统计健康检查的 ListObjects 操作 + healthChecker.SetOperationRecorder(reporter) // 创建健康监控器 m.healthMonitor = health.NewMonitor(healthChecker, reporter) // 创建统计收集器 statsCollector := health.NewS3StatsCollector(30 * time.Second) + // 设置操作记录器以统计 Stats 收集的 ListObjects 操作 + statsCollector.SetOperationRecorder(reporter) // 创建统计监控器 m.statsMonitor = health.NewStatsMonitor( diff --git a/internal/bucket/reporter.go b/internal/bucket/reporter.go index 9826a7f..a5bd3cb 100644 --- a/internal/bucket/reporter.go +++ b/internal/bucket/reporter.go @@ -1,6 +1,8 @@ package bucket import ( + "log" + "github.com/DullJZ/s3-balance/internal/health" "github.com/DullJZ/s3-balance/internal/metrics" ) @@ -64,3 +66,53 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) { r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes) } } + +// RecordOperation 实现 health.OperationRecorder 接口 +func (r *MetricsReporter) RecordOperation(targetID string, category health.OperationCategory) { + r.manager.mu.RLock() + bucket, exists := r.manager.buckets[targetID] + storage := r.manager.storage + r.manager.mu.RUnlock() + + if !exists { + return + } + + // 转换 health.OperationCategory 到 bucket.OperationCategory + var bucketCategory OperationCategory + switch category { + case health.OperationTypeA: + bucketCategory = OperationTypeA + case health.OperationTypeB: + bucketCategory = OperationTypeB + default: + return + } + + // 更新 Prometheus 指标 + if r.metrics != nil { + r.metrics.RecordBackendOperation(targetID, string(bucketCategory)) + } + + // 持久化操作计数到数据库并更新内存计数 + var disabled bool + if storage != nil { + // 先持久化到数据库 + newCount, err := storage.IncrementBucketOperation(targetID, string(bucketCategory)) + if err != nil { + log.Printf("Failed to persist health check operation count for bucket %s: %v", targetID, err) + // 如果数据库更新失败,仍然更新内存计数 + disabled = bucket.RecordOperation(bucketCategory) + } else { + // 使用数据库返回的最新计数更新内存 + disabled = bucket.SetOperationCount(bucketCategory, newCount) + } + } else { + // 没有 storage service,只更新内存 + disabled = bucket.RecordOperation(bucketCategory) + } + + if disabled { + log.Printf("Bucket %s disabled after exceeding %s-type operation limit (detected by health check)", targetID, bucketCategory) + } +} diff --git a/internal/health/s3_checker.go b/internal/health/s3_checker.go index f759deb..9e6229e 100644 --- a/internal/health/s3_checker.go +++ b/internal/health/s3_checker.go @@ -34,7 +34,8 @@ func (t *S3Target) GetEndpoint() string { // S3Checker S3健康检查器 type S3Checker struct { - config Config + config Config + opRecorder OperationRecorder } // NewS3Checker 创建S3健康检查器 @@ -54,6 +55,11 @@ func NewS3Checker(config Config) *S3Checker { } } +// SetOperationRecorder 设置操作记录器 +func (c *S3Checker) SetOperationRecorder(recorder OperationRecorder) { + c.opRecorder = recorder +} + // Check 执行S3健康检查 func (c *S3Checker) Check(ctx context.Context, target Target) Status { s3Target, ok := target.(*S3Target) @@ -121,6 +127,12 @@ func (c *S3Checker) performSimpleCheck(ctx context.Context, target *S3Target) er Bucket: aws.String(target.Bucket), MaxKeys: aws.Int32(1), }) + + // 记录操作(ListObjectsV2 是 Class A 操作) + if c.opRecorder != nil { + c.opRecorder.RecordOperation(target.GetID(), OperationTypeA) + } + return err } diff --git a/internal/health/stats.go b/internal/health/stats.go index f91bfc3..76b1dfd 100644 --- a/internal/health/stats.go +++ b/internal/health/stats.go @@ -26,7 +26,8 @@ type Stats struct { // S3StatsCollector S3统计信息收集器 type S3StatsCollector struct { - timeout time.Duration + timeout time.Duration + opRecorder OperationRecorder } // NewS3StatsCollector 创建S3统计信息收集器 @@ -39,6 +40,11 @@ func NewS3StatsCollector(timeout time.Duration) *S3StatsCollector { } } +// SetOperationRecorder 设置操作记录器 +func (c *S3StatsCollector) SetOperationRecorder(recorder OperationRecorder) { + c.opRecorder = recorder +} + // CollectStats 收集S3存储桶统计信息 func (c *S3StatsCollector) CollectStats(ctx context.Context, target Target) (*Stats, error) { s3Target, ok := target.(*S3Target) @@ -58,6 +64,12 @@ func (c *S3StatsCollector) CollectStats(ctx context.Context, target Target) (*St Bucket: aws.String(s3Target.Bucket), ContinuationToken: continuationToken, }) + + // 记录操作(每次 ListObjectsV2 调用都是 Class A 操作) + if c.opRecorder != nil { + c.opRecorder.RecordOperation(s3Target.GetID(), OperationTypeA) + } + if err != nil { return nil, err } diff --git a/internal/health/types.go b/internal/health/types.go index bbdaadd..f00508a 100644 --- a/internal/health/types.go +++ b/internal/health/types.go @@ -66,3 +66,19 @@ type HealthReporter interface { // ReportHealth 报告健康状态 ReportHealth(targetID string, status Status) } + +// OperationCategory 操作分类 +type OperationCategory string + +const ( + // OperationTypeA 写入类操作 (ListObjects, PutObject, etc.) + OperationTypeA OperationCategory = "A" + // OperationTypeB 读取类操作 (GetObject) + OperationTypeB OperationCategory = "B" +) + +// OperationRecorder 操作记录器接口 +type OperationRecorder interface { + // RecordOperation 记录一次后端操作 + RecordOperation(targetID string, category OperationCategory) +} From dfd38e592e65a9a1430b115cde7996ce80091c5e Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Thu, 6 Nov 2025 18:21:28 +0800 Subject: [PATCH 17/20] Add stats API handler and fix CORS support --- cmd/s3-balance/main.go | 5 ++++- internal/api/stats_handler.go | 9 +++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index fe006fb..faddeca 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -136,11 +136,14 @@ func main() { if cfg.API.Enabled { log.Println("Management API enabled") adminHandler := api.NewAdminHandler(bucketManager, lb, cfg, configManager) + statsHandler := api.NewStatsHandler(storageService) - // 创建子路由器并应用Token认证中间件 + // 创建子路由器并应用中间件 apiRouter := router.PathPrefix("/api").Subrouter() + apiRouter.Use(corsMiddleware) // 先应用 CORS 中间件,处理 OPTIONS 预检请求 apiRouter.Use(middleware.TokenAuthMiddleware(cfg.API.Token)) adminHandler.RegisterRoutes(apiRouter) + statsHandler.RegisterRoutes(apiRouter) log.Printf("Management API endpoints available at /api/*") } diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go index 3436dc1..4d07b92 100644 --- a/internal/api/stats_handler.go +++ b/internal/api/stats_handler.go @@ -25,10 +25,11 @@ func NewStatsHandler(storage *storage.Service) *StatsHandler { // RegisterRoutes 注册统计API路由 func (h *StatsHandler) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/api/stats/monthly", h.GetCurrentMonthStats).Methods("GET") - router.HandleFunc("/api/stats/monthly/{year}/{month}", h.GetMonthlyStats).Methods("GET") - router.HandleFunc("/api/stats/monthly/range", h.GetMonthlyStatsRange).Methods("GET") - router.HandleFunc("/api/stats/bucket/{bucket}/history", h.GetBucketHistory).Methods("GET") + // 注意: router 参数应该是已经带有 /api 前缀的子路由器 + router.HandleFunc("/stats/monthly", h.GetCurrentMonthStats).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/stats/monthly/{year}/{month}", h.GetMonthlyStats).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/stats/monthly/range", h.GetMonthlyStatsRange).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/stats/bucket/{bucket}/history", h.GetBucketHistory).Methods(http.MethodGet, http.MethodOptions) } // MonthlyStatsResponse 月度统计响应 From af8c8e579a3313e3d450fa7221df7b2d8af1dc17 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Thu, 6 Nov 2025 19:41:53 +0800 Subject: [PATCH 18/20] Fix: Modify config fails in Docker --- internal/config/manager.go | 44 ++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/internal/config/manager.go b/internal/config/manager.go index 601904a..10a46a8 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -1,6 +1,7 @@ package config import ( + "bytes" "fmt" "log" "os" @@ -128,7 +129,7 @@ func (m *Manager) watchConfig() { // 只处理修改和重命名事件 if event.Op&fsnotify.Write == fsnotify.Write || - event.Op&fsnotify.Rename == fsnotify.Rename { + event.Op&fsnotify.Rename == fsnotify.Rename { log.Printf("Config file %s modified (detected by fsnotify), reloading...", m.configFile) // 更新最后修改时间以避免轮询重复触发 @@ -366,39 +367,36 @@ func (m *Manager) backupConfigFile() error { // writeConfigFile 将配置写入 YAML 文件 func (m *Manager) writeConfigFile(cfg *Config) error { - // 临时文件,确保原子性 - tmpFile := m.configFile + ".tmp" - - file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return fmt.Errorf("failed to create temp config file: %w", err) - } - defer file.Close() - - encoder := yaml.NewEncoder(file) + // 先编码到缓冲区,避免在写入过程中损坏原文件 + var buf bytes.Buffer + encoder := yaml.NewEncoder(&buf) encoder.SetIndent(2) if err := encoder.Encode(cfg); err != nil { - file.Close() - os.Remove(tmpFile) return fmt.Errorf("failed to encode config: %w", err) } if err := encoder.Close(); err != nil { - file.Close() - os.Remove(tmpFile) return fmt.Errorf("failed to close encoder: %w", err) } - if err := file.Close(); err != nil { - os.Remove(tmpFile) - return fmt.Errorf("failed to close temp file: %w", err) + file, err := os.OpenFile(m.configFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to open config file: %w", err) } - // 原子性替换原文件 - if err := os.Rename(tmpFile, m.configFile); err != nil { - os.Remove(tmpFile) - return fmt.Errorf("failed to replace config file: %w", err) + if _, err := file.Write(buf.Bytes()); err != nil { + file.Close() + return fmt.Errorf("failed to write config file: %w", err) + } + + if err := file.Sync(); err != nil { + file.Close() + return fmt.Errorf("failed to sync config file: %w", err) + } + + if err := file.Close(); err != nil { + return fmt.Errorf("failed to close config file: %w", err) } return nil @@ -420,4 +418,4 @@ func (m *Manager) Close() error { } return nil -} \ No newline at end of file +} From 998e8769ee8df814a4982d0710884198201ed0a8 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Fri, 14 Nov 2025 20:44:26 +0800 Subject: [PATCH 19/20] Feat: Support Web frontend --- .gitignore | 4 +- cmd/s3-balance/main.go | 93 +++++++++++++++++++++++++++++++++++++++++ internal/web/handler.go | 87 ++++++++++++++++++++++++++++++++++++++ internal/webui/embed.go | 14 +++++++ 4 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 internal/web/handler.go create mode 100644 internal/webui/embed.go diff --git a/.gitignore b/.gitignore index 2e472cb..8cafa19 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,6 @@ CLAUDE.md docs/ # Generated files -s3-balance \ No newline at end of file +s3-balance + +dist/ \ No newline at end of file diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index faddeca..e1de95a 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -20,6 +20,8 @@ import ( "github.com/DullJZ/s3-balance/internal/middleware" "github.com/DullJZ/s3-balance/internal/scheduler" "github.com/DullJZ/s3-balance/internal/storage" + "github.com/DullJZ/s3-balance/internal/web" + "github.com/DullJZ/s3-balance/internal/webui" "github.com/DullJZ/s3-balance/pkg/presigner" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -28,9 +30,17 @@ import ( func main() { // 解析命令行参数 var configFile string + var onlyWeb bool flag.StringVar(&configFile, "config", "config/config.yaml", "Path to configuration file") + flag.BoolVar(&onlyWeb, "only-web", false, "Only serve web UI, no backend services") flag.Parse() + // 如果是只提供Web前端模式 + if onlyWeb { + startWebOnlyMode(configFile) + return + } + // 创建配置管理器 configManager, err := config.NewManager(configFile) if err != nil { @@ -148,6 +158,15 @@ func main() { log.Printf("Management API endpoints available at /api/*") } + // 注册Web管理界面 + distSubFS, err := webui.GetDistFS() + if err != nil { + log.Fatalf("Failed to load embedded web UI: %v", err) + } + webHandler := web.NewHandler(distSubFS) + router.PathPrefix("/web").Handler(http.StripPrefix("/web", webHandler)) + log.Println("Web UI available at /web") + // 运行在S3兼容模式 log.Println("Running in S3-compatible mode") s3Handler.RegisterS3Routes(router) @@ -298,3 +317,77 @@ func cleanupS3MultipartUploads(_ context.Context, storageService *storage.Servic } } } + +// startWebOnlyMode 只启动Web前端服务,不启动后端服务 +func startWebOnlyMode(configFile string) { + log.Println("Starting in web-only mode (no backend services)") + + // 加载配置文件以获取端口等信息 + configManager, err := config.NewManager(configFile) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + defer configManager.Close() + + cfg := configManager.GetConfig() + + // 创建路由器 + router := mux.NewRouter() + + // 加载嵌入的前端资源 + distSubFS, err := webui.GetDistFS() + if err != nil { + log.Fatalf("Failed to load embedded web UI: %v", err) + } + + // 注册Web前端路由 + webHandler := web.NewHandler(distSubFS) + + // 根路径重定向到 /web + router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/web/", http.StatusMovedPermanently) + }) + + // Web UI 路由 + router.PathPrefix("/web").Handler(http.StripPrefix("/web", webHandler)) + + // 添加 CORS 和日志中间件 + router.Use(corsMiddleware) + router.Use(loggingMiddleware) + + // 使用配置文件中的端口 + addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port) + srv := &http.Server{ + Addr: addr, + Handler: router, + ReadTimeout: cfg.Server.ReadTimeout, + WriteTimeout: cfg.Server.WriteTimeout, + IdleTimeout: cfg.Server.IdleTimeout, + } + + log.Println("Web UI available at /web") + log.Printf("Starting web server on %s", srv.Addr) + + // 启动服务器 + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Failed to start server: %v", err) + } + }() + + // 等待中断信号 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + <-sigChan + + // 优雅关闭 + log.Println("Shutting down web server...") + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("Server shutdown error: %v", err) + } + + log.Println("Web server stopped") +} diff --git a/internal/web/handler.go b/internal/web/handler.go new file mode 100644 index 0000000..92f836b --- /dev/null +++ b/internal/web/handler.go @@ -0,0 +1,87 @@ +package web + +import ( + "io" + "io/fs" + "net/http" + "path" + "strings" +) + +// Handler Web管理界面处理器 +type Handler struct { + fileSystem http.FileSystem +} + +// NewHandler 创建Web处理器 +// distFS 应该是通过 embed.FS 嵌入的 dist 目录 +func NewHandler(distFS fs.FS) *Handler { + return &Handler{ + fileSystem: http.FS(distFS), + } +} + +// ServeHTTP 实现 http.Handler 接口 +// 处理单页应用的路由,将所有未找到的路径重定向到 index.html +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // 清理路径 + p := r.URL.Path + if !strings.HasPrefix(p, "/") { + p = "/" + p + } + + // 尝试打开文件 + f, err := h.fileSystem.Open(path.Clean(p)) + if err != nil { + // 文件不存在,返回 index.html (用于支持前端路由) + indexFile, err := h.fileSystem.Open("index.html") + if err != nil { + http.Error(w, "File not found", http.StatusNotFound) + return + } + defer indexFile.Close() + + // 读取 index.html 内容 + stat, err := indexFile.Stat() + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + http.ServeContent(w, r, "index.html", stat.ModTime(), indexFile.(io.ReadSeeker)) + return + } + defer f.Close() + + // 文件存在,检查是否为目录 + stat, err := f.Stat() + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + if stat.IsDir() { + // 如果是目录,尝试返回 index.html + indexPath := path.Join(p, "index.html") + indexFile, err := h.fileSystem.Open(indexPath) + if err != nil { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + defer indexFile.Close() + + indexStat, err := indexFile.Stat() + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + http.ServeContent(w, r, "index.html", indexStat.ModTime(), indexFile.(io.ReadSeeker)) + return + } + + // 返回文件内容 + http.ServeContent(w, r, stat.Name(), stat.ModTime(), f.(io.ReadSeeker)) +} diff --git a/internal/webui/embed.go b/internal/webui/embed.go new file mode 100644 index 0000000..bbc56f0 --- /dev/null +++ b/internal/webui/embed.go @@ -0,0 +1,14 @@ +package webui + +import ( + "embed" + "io/fs" +) + +//go:embed dist +var distFS embed.FS + +// GetDistFS 获取嵌入的前端静态文件系统 +func GetDistFS() (fs.FS, error) { + return fs.Sub(distFS, "dist") +} From 3753b50cd96ac56114bbd078a68e30fecd11ca2e Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Sat, 15 Nov 2025 12:31:16 +0800 Subject: [PATCH 20/20] Feat: Add frontend web download in gh action --- .github/workflows/build.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index eaac851..5d90a5f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,6 +19,21 @@ jobs: run: | echo "VERSION=$(cat VERSION | tr -d '\n')" >> $GITHUB_ENV + - name: Download frontend assets + run: | + echo "Downloading latest frontend build from s3-balance-web..." + LATEST_RELEASE=$(curl -s https://api.github.com/repos/DullJZ/s3-balance-web/releases/latest | grep '"tag_name":' | sed -E 's/.*"([^"]+)".*/\1/') + echo "Latest frontend release: $LATEST_RELEASE" + + curl -L -o dist.tar.gz "https://github.com/DullJZ/s3-balance-web/releases/download/$LATEST_RELEASE/dist.tar.gz" + + mkdir -p internal/webui/dist + tar -xzf dist.tar.gz -C internal/webui/ + rm dist.tar.gz + + echo "Frontend assets downloaded and extracted to internal/webui/dist" + ls -la internal/webui/dist/ + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 with: @@ -62,6 +77,21 @@ jobs: run: | echo "VERSION=$(cat VERSION | tr -d '\n')" >> $GITHUB_ENV + - name: Download frontend assets + run: | + echo "Downloading latest frontend build from s3-balance-web..." + LATEST_RELEASE=$(curl -s https://api.github.com/repos/DullJZ/s3-balance-web/releases/latest | grep '"tag_name":' | sed -E 's/.*"([^"]+)".*/\1/') + echo "Latest frontend release: $LATEST_RELEASE" + + curl -L -o dist.tar.gz "https://github.com/DullJZ/s3-balance-web/releases/download/$LATEST_RELEASE/dist.tar.gz" + + mkdir -p internal/webui/dist + tar -xzf dist.tar.gz -C internal/webui/ + rm dist.tar.gz + + echo "Frontend assets downloaded and extracted to internal/webui/dist" + ls -la internal/webui/dist/ + - name: Setup Go uses: actions/setup-go@v2 with: