diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index c51e461..d1d769b 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -74,6 +74,7 @@ type HealthResponse struct { } // RegisterRoutes 注册管理API路由 +// 注意: router 参数应该是已经带有 /api 前缀的子路由器 func (h *AdminHandler) RegisterRoutes(router *mux.Router) { // 注册路由,同时支持 OPTIONS 方法用于 CORS 预检 router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet, http.MethodOptions) diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 8f5e0d8..7afeef5 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -184,12 +184,16 @@ func (m *Manager) initHealthMonitoring() { // 创建S3健康检查器 healthChecker := health.NewS3Checker(healthConfig) + // 设置操作记录器以统计健康检查的 ListObjects 操作 + healthChecker.SetOperationRecorder(reporter) // 创建健康监控器 m.healthMonitor = health.NewMonitor(healthChecker, reporter) // 创建统计收集器 statsCollector := health.NewS3StatsCollector(30 * time.Second) + // 设置操作记录器以统计 Stats 收集的 ListObjects 操作 + statsCollector.SetOperationRecorder(reporter) // 创建统计监控器 m.statsMonitor = health.NewStatsMonitor( diff --git a/internal/bucket/reporter.go b/internal/bucket/reporter.go index 9826a7f..a5bd3cb 100644 --- a/internal/bucket/reporter.go +++ b/internal/bucket/reporter.go @@ -1,6 +1,8 @@ package bucket import ( + "log" + "github.com/DullJZ/s3-balance/internal/health" "github.com/DullJZ/s3-balance/internal/metrics" ) @@ -64,3 +66,53 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) { r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes) } } + +// RecordOperation 实现 health.OperationRecorder 接口 +func (r *MetricsReporter) RecordOperation(targetID string, category health.OperationCategory) { + r.manager.mu.RLock() + bucket, exists := r.manager.buckets[targetID] + storage := r.manager.storage + r.manager.mu.RUnlock() + + if !exists { + return + } + + // 转换 health.OperationCategory 到 bucket.OperationCategory + var bucketCategory OperationCategory + switch category { + case health.OperationTypeA: + bucketCategory = OperationTypeA + case health.OperationTypeB: + bucketCategory = OperationTypeB + default: + return + } + + // 更新 Prometheus 指标 + if r.metrics != nil { + r.metrics.RecordBackendOperation(targetID, string(bucketCategory)) + } + + // 持久化操作计数到数据库并更新内存计数 + var disabled bool + if storage != nil { + // 先持久化到数据库 + newCount, err := storage.IncrementBucketOperation(targetID, string(bucketCategory)) + if err != nil { + log.Printf("Failed to persist health check operation count for bucket %s: %v", targetID, err) + // 如果数据库更新失败,仍然更新内存计数 + disabled = bucket.RecordOperation(bucketCategory) + } else { + // 使用数据库返回的最新计数更新内存 + disabled = bucket.SetOperationCount(bucketCategory, newCount) + } + } else { + // 没有 storage service,只更新内存 + disabled = bucket.RecordOperation(bucketCategory) + } + + if disabled { + log.Printf("Bucket %s disabled after exceeding %s-type operation limit (detected by health check)", targetID, bucketCategory) + } +} diff --git a/internal/health/s3_checker.go b/internal/health/s3_checker.go index f759deb..9e6229e 100644 --- a/internal/health/s3_checker.go +++ b/internal/health/s3_checker.go @@ -34,7 +34,8 @@ func (t *S3Target) GetEndpoint() string { // S3Checker S3健康检查器 type S3Checker struct { - config Config + config Config + opRecorder OperationRecorder } // NewS3Checker 创建S3健康检查器 @@ -54,6 +55,11 @@ func NewS3Checker(config Config) *S3Checker { } } +// SetOperationRecorder 设置操作记录器 +func (c *S3Checker) SetOperationRecorder(recorder OperationRecorder) { + c.opRecorder = recorder +} + // Check 执行S3健康检查 func (c *S3Checker) Check(ctx context.Context, target Target) Status { s3Target, ok := target.(*S3Target) @@ -121,6 +127,12 @@ func (c *S3Checker) performSimpleCheck(ctx context.Context, target *S3Target) er Bucket: aws.String(target.Bucket), MaxKeys: aws.Int32(1), }) + + // 记录操作(ListObjectsV2 是 Class A 操作) + if c.opRecorder != nil { + c.opRecorder.RecordOperation(target.GetID(), OperationTypeA) + } + return err } diff --git a/internal/health/stats.go b/internal/health/stats.go index f91bfc3..76b1dfd 100644 --- a/internal/health/stats.go +++ b/internal/health/stats.go @@ -26,7 +26,8 @@ type Stats struct { // S3StatsCollector S3统计信息收集器 type S3StatsCollector struct { - timeout time.Duration + timeout time.Duration + opRecorder OperationRecorder } // NewS3StatsCollector 创建S3统计信息收集器 @@ -39,6 +40,11 @@ func NewS3StatsCollector(timeout time.Duration) *S3StatsCollector { } } +// SetOperationRecorder 设置操作记录器 +func (c *S3StatsCollector) SetOperationRecorder(recorder OperationRecorder) { + c.opRecorder = recorder +} + // CollectStats 收集S3存储桶统计信息 func (c *S3StatsCollector) CollectStats(ctx context.Context, target Target) (*Stats, error) { s3Target, ok := target.(*S3Target) @@ -58,6 +64,12 @@ func (c *S3StatsCollector) CollectStats(ctx context.Context, target Target) (*St Bucket: aws.String(s3Target.Bucket), ContinuationToken: continuationToken, }) + + // 记录操作(每次 ListObjectsV2 调用都是 Class A 操作) + if c.opRecorder != nil { + c.opRecorder.RecordOperation(s3Target.GetID(), OperationTypeA) + } + if err != nil { return nil, err } diff --git a/internal/health/types.go b/internal/health/types.go index bbdaadd..f00508a 100644 --- a/internal/health/types.go +++ b/internal/health/types.go @@ -66,3 +66,19 @@ type HealthReporter interface { // ReportHealth 报告健康状态 ReportHealth(targetID string, status Status) } + +// OperationCategory 操作分类 +type OperationCategory string + +const ( + // OperationTypeA 写入类操作 (ListObjects, PutObject, etc.) + OperationTypeA OperationCategory = "A" + // OperationTypeB 读取类操作 (GetObject) + OperationTypeB OperationCategory = "B" +) + +// OperationRecorder 操作记录器接口 +type OperationRecorder interface { + // RecordOperation 记录一次后端操作 + RecordOperation(targetID string, category OperationCategory) +}