health check to metrics

This commit is contained in:
DullJZ
2025-09-22 20:31:36 +08:00
parent 2b7861ccbc
commit a1b5037174
6 changed files with 748 additions and 145 deletions

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/health"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
@@ -19,18 +20,20 @@ type BucketInfo struct {
Config config.BucketConfig
Client *s3.Client
UsedSize int64 // 已使用容量(字节)
Available bool // 是否可用
LastChecked time.Time // 最后检查时间
Available bool // 是否可用由health监控更新
LastChecked time.Time // 最后检查时间由health监控更新
mu sync.RWMutex
}
// Manager 存储桶管理器
type Manager struct {
buckets map[string]*BucketInfo
mu sync.RWMutex
config *config.Config
stopChan chan struct{}
metrics *metrics.Metrics
buckets map[string]*BucketInfo
mu sync.RWMutex
config *config.Config
stopChan chan struct{}
metrics *metrics.Metrics
healthMonitor *health.Monitor
statsMonitor *health.StatsMonitor
}
// NewManager 创建新的存储桶管理器
@@ -63,6 +66,9 @@ func NewManager(cfg *config.Config, metrics *metrics.Metrics) (*Manager, error)
m.buckets[bucketCfg.Name] = info
}
// 初始化健康监控
m.initHealthMonitoring()
return m, nil
}
@@ -105,152 +111,78 @@ func createS3Client(bucketCfg config.BucketConfig) (*s3.Client, error) {
return client, nil
}
// initHealthMonitoring 初始化健康监控系统
func (m *Manager) initHealthMonitoring() {
// 创建指标报告器
reporter := NewMetricsReporter(m.metrics, m)
// 创建健康检查配置
healthConfig := health.Config{
Strategy: health.StrategySimple,
Interval: m.config.Balancer.HealthCheckPeriod,
Timeout: 5 * time.Second,
Retries: 1,
}
// 创建S3健康检查器
healthChecker := health.NewS3Checker(healthConfig)
// 创建健康监控器
m.healthMonitor = health.NewMonitor(healthChecker, reporter)
// 创建统计收集器
statsCollector := health.NewS3StatsCollector(30 * time.Second)
// 创建统计监控器
m.statsMonitor = health.NewStatsMonitor(
statsCollector,
m.config.Balancer.UpdateStatsPeriod,
reporter,
)
// 注册所有非虚拟存储桶到监控系统
for _, bucket := range m.buckets {
if bucket.Config.Virtual {
continue
}
target := &health.S3Target{
ID: bucket.Config.Name,
Bucket: bucket.Config.Name,
Endpoint: bucket.Config.Endpoint,
Client: bucket.Client,
}
m.healthMonitor.RegisterTarget(target)
m.statsMonitor.RegisterTarget(target)
}
}
// Start 启动管理器(健康检查和统计更新)
func (m *Manager) Start(ctx context.Context) {
// 启动健康检查
go m.healthCheckLoop(ctx)
// 启动统计更新
go m.statsUpdateLoop(ctx)
// 启动健康监控
if m.healthMonitor != nil {
m.healthMonitor.Start(ctx)
}
// 启动统计监控
if m.statsMonitor != nil {
m.statsMonitor.Start(ctx)
}
}
// Stop 停止管理器
func (m *Manager) Stop() {
close(m.stopChan)
}
// healthCheckLoop 健康检查循环
func (m *Manager) healthCheckLoop(ctx context.Context) {
ticker := time.NewTicker(m.config.Balancer.HealthCheckPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-m.stopChan:
return
case <-ticker.C:
m.checkAllBuckets(ctx)
}
// 停止健康监控
if m.healthMonitor != nil {
m.healthMonitor.Stop()
}
}
// statsUpdateLoop 统计更新循环
func (m *Manager) statsUpdateLoop(ctx context.Context) {
ticker := time.NewTicker(m.config.Balancer.UpdateStatsPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-m.stopChan:
return
case <-ticker.C:
m.updateAllStats(ctx)
}
}
}
// checkAllBuckets 检查所有存储桶的健康状态
func (m *Manager) checkAllBuckets(ctx context.Context) {
m.mu.RLock()
buckets := make([]*BucketInfo, 0, len(m.buckets))
for _, b := range m.buckets {
buckets = append(buckets, b)
}
m.mu.RUnlock()
var wg sync.WaitGroup
for _, bucket := range buckets {
wg.Add(1)
go func(b *BucketInfo) {
defer wg.Done()
m.checkBucket(ctx, b)
}(bucket)
}
wg.Wait()
}
// checkBucket 检查单个存储桶
func (m *Manager) checkBucket(ctx context.Context, bucket *BucketInfo) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// 尝试列出存储桶(用于健康检查)
_, err := bucket.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket.Config.Name),
MaxKeys: aws.Int32(1),
})
bucket.mu.Lock()
bucket.Available = err == nil
bucket.LastChecked = time.Now()
bucket.mu.Unlock()
// 更新指标
if m.metrics != nil {
m.metrics.SetBucketHealthy(bucket.Config.Name, bucket.Config.Endpoint, bucket.Available)
}
}
// updateAllStats 更新所有存储桶的统计信息
func (m *Manager) updateAllStats(ctx context.Context) {
m.mu.RLock()
buckets := make([]*BucketInfo, 0, len(m.buckets))
for _, b := range m.buckets {
buckets = append(buckets, b)
}
m.mu.RUnlock()
var wg sync.WaitGroup
for _, bucket := range buckets {
wg.Add(1)
go func(b *BucketInfo) {
defer wg.Done()
m.updateBucketStats(ctx, b)
}(bucket)
}
wg.Wait()
}
// updateBucketStats 更新单个存储桶的统计信息
func (m *Manager) updateBucketStats(ctx context.Context, bucket *BucketInfo) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
var totalSize int64
var continuationToken *string
for {
output, err := bucket.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket.Config.Name),
ContinuationToken: continuationToken,
})
if err != nil {
break
}
for _, obj := range output.Contents {
if obj.Size != nil {
totalSize += *obj.Size
}
}
if output.IsTruncated == nil || !*output.IsTruncated {
break
}
continuationToken = output.NextContinuationToken
}
bucket.mu.Lock()
bucket.UsedSize = totalSize
bucket.mu.Unlock()
// 更新指标
if m.metrics != nil {
m.metrics.SetBucketUsage(bucket.Config.Name, totalSize, bucket.Config.MaxSizeBytes)
// 停止统计监控
if m.statsMonitor != nil {
m.statsMonitor.Stop()
}
}

View File

@@ -0,0 +1,64 @@
package bucket
import (
"github.com/DullJZ/s3-balance/internal/health"
"github.com/DullJZ/s3-balance/internal/metrics"
)
// MetricsReporter 实现 health.HealthReporter 和 health.StatsReporter 接口
type MetricsReporter struct {
metrics *metrics.Metrics
buckets map[string]*BucketInfo
manager *Manager
}
// NewMetricsReporter 创建指标报告器
func NewMetricsReporter(metrics *metrics.Metrics, manager *Manager) *MetricsReporter {
return &MetricsReporter{
metrics: metrics,
manager: manager,
}
}
// ReportHealth 实现 health.HealthReporter 接口
func (r *MetricsReporter) ReportHealth(targetID string, status health.Status) {
if r.metrics == nil {
return
}
// 更新存储桶可用性状态
r.manager.mu.RLock()
bucket, exists := r.manager.buckets[targetID]
r.manager.mu.RUnlock()
if exists {
bucket.mu.Lock()
bucket.Available = status.Healthy
bucket.LastChecked = status.LastChecked
bucket.mu.Unlock()
// 更新 Prometheus 指标
r.metrics.SetBucketHealthy(targetID, bucket.Config.Endpoint, status.Healthy)
}
}
// ReportStats 实现 health.StatsReporter 接口
func (r *MetricsReporter) ReportStats(stats *health.Stats) {
if r.metrics == nil {
return
}
// 更新存储桶使用统计
r.manager.mu.RLock()
bucket, exists := r.manager.buckets[stats.TargetID]
r.manager.mu.RUnlock()
if exists {
bucket.mu.Lock()
bucket.UsedSize = stats.UsedSize
bucket.mu.Unlock()
// 更新 Prometheus 指标
r.metrics.SetBucketUsage(stats.TargetID, stats.UsedSize, bucket.Config.MaxSizeBytes)
}
}

166
internal/health/monitor.go Normal file
View File

@@ -0,0 +1,166 @@
package health
import (
"context"
"sync"
"time"
)
// Monitor 健康监控器
type Monitor struct {
checker Checker
targets map[string]Target
statuses map[string]Status
reporter HealthReporter
mu sync.RWMutex
stopChan chan struct{}
interval time.Duration
}
// NewMonitor 创建健康监控器
func NewMonitor(checker Checker, reporter HealthReporter) *Monitor {
return &Monitor{
checker: checker,
targets: make(map[string]Target),
statuses: make(map[string]Status),
reporter: reporter,
stopChan: make(chan struct{}),
interval: checker.GetInterval(),
}
}
// RegisterTarget 注册监控目标
func (m *Monitor) RegisterTarget(target Target) {
m.mu.Lock()
defer m.mu.Unlock()
m.targets[target.GetID()] = target
}
// UnregisterTarget 注销监控目标
func (m *Monitor) UnregisterTarget(targetID string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.targets, targetID)
delete(m.statuses, targetID)
}
// Start 启动健康监控
func (m *Monitor) Start(ctx context.Context) {
// 立即执行一次检查
m.checkAll(ctx)
// 启动定期检查
go m.run(ctx)
}
// Stop 停止健康监控
func (m *Monitor) Stop() {
close(m.stopChan)
}
// run 运行健康检查循环
func (m *Monitor) run(ctx context.Context) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-m.stopChan:
return
case <-ticker.C:
m.checkAll(ctx)
}
}
}
// checkAll 检查所有目标
func (m *Monitor) checkAll(ctx context.Context) {
m.mu.RLock()
targets := make([]Target, 0, len(m.targets))
for _, target := range m.targets {
targets = append(targets, target)
}
m.mu.RUnlock()
// 并发检查所有目标
var wg sync.WaitGroup
for _, target := range targets {
wg.Add(1)
go func(t Target) {
defer wg.Done()
m.checkTarget(ctx, t)
}(target)
}
wg.Wait()
}
// checkTarget 检查单个目标
func (m *Monitor) checkTarget(ctx context.Context, target Target) {
status := m.checker.Check(ctx, target)
// 更新状态
m.mu.Lock()
m.statuses[target.GetID()] = status
m.mu.Unlock()
// 报告状态
if m.reporter != nil {
m.reporter.ReportHealth(target.GetID(), status)
}
}
// GetStatus 获取指定目标的健康状态
func (m *Monitor) GetStatus(targetID string) (Status, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
status, ok := m.statuses[targetID]
return status, ok
}
// GetAllStatuses 获取所有目标的健康状态
func (m *Monitor) GetAllStatuses() map[string]Status {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string]Status, len(m.statuses))
for id, status := range m.statuses {
result[id] = status
}
return result
}
// IsHealthy 检查指定目标是否健康
func (m *Monitor) IsHealthy(targetID string) bool {
status, ok := m.GetStatus(targetID)
return ok && status.Healthy
}
// GetHealthyTargets 获取所有健康的目标
func (m *Monitor) GetHealthyTargets() []string {
m.mu.RLock()
defer m.mu.RUnlock()
var healthy []string
for id, status := range m.statuses {
if status.Healthy {
healthy = append(healthy, id)
}
}
return healthy
}
// GetUnhealthyTargets 获取所有不健康的目标
func (m *Monitor) GetUnhealthyTargets() []string {
m.mu.RLock()
defer m.mu.RUnlock()
var unhealthy []string
for id, status := range m.statuses {
if !status.Healthy {
unhealthy = append(unhealthy, id)
}
}
return unhealthy
}

View File

@@ -0,0 +1,151 @@
package health
import (
"context"
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// S3Target S3健康检查目标
type S3Target struct {
ID string
Bucket string
Endpoint string
Client *s3.Client
}
// GetID 实现 Target 接口
func (t *S3Target) GetID() string {
return t.ID
}
// GetType 实现 Target 接口
func (t *S3Target) GetType() string {
return "s3"
}
// GetEndpoint 实现 Target 接口
func (t *S3Target) GetEndpoint() string {
return t.Endpoint
}
// S3Checker S3健康检查器
type S3Checker struct {
config Config
}
// NewS3Checker 创建S3健康检查器
func NewS3Checker(config Config) *S3Checker {
if config.Interval == 0 {
config.Interval = 30 * time.Second
}
if config.Timeout == 0 {
config.Timeout = 5 * time.Second
}
if config.Retries == 0 {
config.Retries = 1
}
return &S3Checker{
config: config,
}
}
// Check 执行S3健康检查
func (c *S3Checker) Check(ctx context.Context, target Target) Status {
s3Target, ok := target.(*S3Target)
if !ok {
return Status{
Healthy: false,
LastChecked: time.Now(),
Message: "invalid target type for S3 checker",
Error: fmt.Errorf("expected *S3Target, got %T", target),
}
}
// 创建带超时的context
checkCtx, cancel := context.WithTimeout(ctx, c.config.Timeout)
defer cancel()
var lastErr error
for i := 0; i < c.config.Retries; i++ {
if i > 0 {
// 重试前等待
select {
case <-ctx.Done():
return Status{
Healthy: false,
LastChecked: time.Now(),
Message: "health check cancelled",
Error: ctx.Err(),
}
case <-time.After(time.Second):
}
}
err := c.performCheck(checkCtx, s3Target)
if err == nil {
return Status{
Healthy: true,
LastChecked: time.Now(),
Message: fmt.Sprintf("S3 bucket %s is healthy", s3Target.Bucket),
}
}
lastErr = err
}
return Status{
Healthy: false,
LastChecked: time.Now(),
Message: fmt.Sprintf("S3 bucket %s is unhealthy after %d retries", s3Target.Bucket, c.config.Retries),
Error: lastErr,
}
}
func (c *S3Checker) performCheck(ctx context.Context, target *S3Target) error {
switch c.config.Strategy {
case StrategyDetailed:
return c.performDetailedCheck(ctx, target)
default:
return c.performSimpleCheck(ctx, target)
}
}
// performSimpleCheck 执行简单健康检查(轻量级)
func (c *S3Checker) performSimpleCheck(ctx context.Context, target *S3Target) error {
// 尝试列出1个对象这是最轻量级的检查方式
_, err := target.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(target.Bucket),
MaxKeys: aws.Int32(1),
})
return err
}
// performDetailedCheck 执行详细健康检查(包含更多验证)
func (c *S3Checker) performDetailedCheck(ctx context.Context, target *S3Target) error {
// 先执行简单检查
if err := c.performSimpleCheck(ctx, target); err != nil {
return err
}
// 可以添加更多详细检查,比如:
// 1. 检查存储桶策略
// 2. 测试写入权限(创建并删除测试对象)
// 3. 检查存储桶版本控制状态
// 4. 测量响应时间等性能指标
return nil
}
// GetInterval 获取检查间隔
func (c *S3Checker) GetInterval() time.Duration {
return c.config.Interval
}
// GetTimeout 获取检查超时时间
func (c *S3Checker) GetTimeout() time.Duration {
return c.config.Timeout
}

222
internal/health/stats.go Normal file
View File

@@ -0,0 +1,222 @@
package health
import (
"context"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// StatsCollector 统计信息收集器接口
type StatsCollector interface {
// CollectStats 收集统计信息
CollectStats(ctx context.Context, target Target) (*Stats, error)
}
// Stats 统计信息
type Stats struct {
TargetID string
UsedSize int64 // 已使用大小(字节)
ObjectCount int64 // 对象数量
LastUpdated time.Time // 最后更新时间
}
// S3StatsCollector S3统计信息收集器
type S3StatsCollector struct {
timeout time.Duration
}
// NewS3StatsCollector 创建S3统计信息收集器
func NewS3StatsCollector(timeout time.Duration) *S3StatsCollector {
if timeout == 0 {
timeout = 30 * time.Second
}
return &S3StatsCollector{
timeout: timeout,
}
}
// CollectStats 收集S3存储桶统计信息
func (c *S3StatsCollector) CollectStats(ctx context.Context, target Target) (*Stats, error) {
s3Target, ok := target.(*S3Target)
if !ok {
return nil, fmt.Errorf("expected *S3Target, got %T", target)
}
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
var totalSize int64
var objectCount int64
var continuationToken *string
for {
output, err := s3Target.Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s3Target.Bucket),
ContinuationToken: continuationToken,
})
if err != nil {
return nil, err
}
for _, obj := range output.Contents {
objectCount++
if obj.Size != nil {
totalSize += *obj.Size
}
}
if output.IsTruncated == nil || !*output.IsTruncated {
break
}
continuationToken = output.NextContinuationToken
}
return &Stats{
TargetID: s3Target.GetID(),
UsedSize: totalSize,
ObjectCount: objectCount,
LastUpdated: time.Now(),
}, nil
}
// StatsMonitor 统计信息监控器
type StatsMonitor struct {
collector StatsCollector
targets map[string]Target
stats map[string]*Stats
reporter StatsReporter
mu sync.RWMutex
stopChan chan struct{}
interval time.Duration
}
// StatsReporter 统计信息报告器接口
type StatsReporter interface {
// ReportStats 报告统计信息
ReportStats(stats *Stats)
}
// NewStatsMonitor 创建统计信息监控器
func NewStatsMonitor(collector StatsCollector, interval time.Duration, reporter StatsReporter) *StatsMonitor {
if interval == 0 {
interval = 60 * time.Second
}
return &StatsMonitor{
collector: collector,
targets: make(map[string]Target),
stats: make(map[string]*Stats),
reporter: reporter,
stopChan: make(chan struct{}),
interval: interval,
}
}
// RegisterTarget 注册监控目标
func (m *StatsMonitor) RegisterTarget(target Target) {
m.mu.Lock()
defer m.mu.Unlock()
m.targets[target.GetID()] = target
}
// UnregisterTarget 注销监控目标
func (m *StatsMonitor) UnregisterTarget(targetID string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.targets, targetID)
delete(m.stats, targetID)
}
// Start 启动统计监控
func (m *StatsMonitor) Start(ctx context.Context) {
// 立即执行一次收集
m.collectAll(ctx)
// 启动定期收集
go m.run(ctx)
}
// Stop 停止统计监控
func (m *StatsMonitor) Stop() {
close(m.stopChan)
}
// run 运行统计收集循环
func (m *StatsMonitor) run(ctx context.Context) {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-m.stopChan:
return
case <-ticker.C:
m.collectAll(ctx)
}
}
}
// collectAll 收集所有目标的统计信息
func (m *StatsMonitor) collectAll(ctx context.Context) {
m.mu.RLock()
targets := make([]Target, 0, len(m.targets))
for _, target := range m.targets {
targets = append(targets, target)
}
m.mu.RUnlock()
// 并发收集所有目标的统计信息
var wg sync.WaitGroup
for _, target := range targets {
wg.Add(1)
go func(t Target) {
defer wg.Done()
m.collectTarget(ctx, t)
}(target)
}
wg.Wait()
}
// collectTarget 收集单个目标的统计信息
func (m *StatsMonitor) collectTarget(ctx context.Context, target Target) {
stats, err := m.collector.CollectStats(ctx, target)
if err != nil {
// 记录错误但不影响其他目标
return
}
// 更新统计信息
m.mu.Lock()
m.stats[target.GetID()] = stats
m.mu.Unlock()
// 报告统计信息
if m.reporter != nil {
m.reporter.ReportStats(stats)
}
}
// GetStats 获取指定目标的统计信息
func (m *StatsMonitor) GetStats(targetID string) (*Stats, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
stats, ok := m.stats[targetID]
return stats, ok
}
// GetAllStats 获取所有目标的统计信息
func (m *StatsMonitor) GetAllStats() map[string]*Stats {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[string]*Stats, len(m.stats))
for id, stats := range m.stats {
result[id] = stats
}
return result
}

68
internal/health/types.go Normal file
View File

@@ -0,0 +1,68 @@
package health
import (
"context"
"time"
)
// Status 健康检查状态
type Status struct {
Healthy bool // 是否健康
LastChecked time.Time // 最后检查时间
Message string // 状态信息
Error error // 错误信息(如果有)
}
// Target 健康检查目标
type Target interface {
// GetID 获取目标唯一标识
GetID() string
// GetType 获取目标类型
GetType() string
// GetEndpoint 获取目标端点(用于日志和监控)
GetEndpoint() string
}
// Checker 健康检查器接口
type Checker interface {
// Check 执行健康检查
Check(ctx context.Context, target Target) Status
// GetInterval 获取检查间隔
GetInterval() time.Duration
// GetTimeout 获取检查超时时间
GetTimeout() time.Duration
}
// Strategy 健康检查策略
type Strategy string
const (
// StrategySimple 简单健康检查(快速探测)
StrategySimple Strategy = "simple"
// StrategyDetailed 详细健康检查(包含性能指标)
StrategyDetailed Strategy = "detailed"
)
// Config 健康检查配置
type Config struct {
Strategy Strategy `yaml:"strategy"`
Interval time.Duration `yaml:"interval"`
Timeout time.Duration `yaml:"timeout"`
Retries int `yaml:"retries"`
}
// DefaultConfig 默认配置
func DefaultConfig() Config {
return Config{
Strategy: StrategySimple,
Interval: 30 * time.Second,
Timeout: 5 * time.Second,
Retries: 3,
}
}
// HealthReporter 健康状态报告器接口
type HealthReporter interface {
// ReportHealth 报告健康状态
ReportHealth(targetID string, status Status)
}