This commit is contained in:
DullJZ
2025-09-13 22:29:48 +08:00
parent 61eeba03ac
commit 4268f6238d
7 changed files with 141 additions and 2 deletions

View File

@@ -16,9 +16,11 @@ import (
"github.com/DullJZ/s3-balance/internal/bucket"
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/database"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/DullJZ/s3-balance/internal/storage"
"github.com/DullJZ/s3-balance/pkg/presigner"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
@@ -39,8 +41,11 @@ func main() {
}
defer database.Close()
// 创建指标服务
metricsService := metrics.New()
// 创建存储桶管理器
bucketManager, err := bucket.NewManager(cfg)
bucketManager, err := bucket.NewManager(cfg, metricsService)
if err != nil {
log.Fatalf("Failed to create bucket manager: %v", err)
}
@@ -55,6 +60,9 @@ func main() {
if err != nil {
log.Fatalf("Failed to create balancer: %v", err)
}
// 设置指标服务
lb.SetMetrics(metricsService)
// 创建预签名URL生成器
signer := presigner.NewPresigner(
@@ -76,11 +84,18 @@ func main() {
storageService,
cfg.S3API.AccessKey,
cfg.S3API.SecretKey,
metricsService,
)
// 设置路由
router := mux.NewRouter()
// 添加指标端点
if cfg.Metrics.Enabled {
router.Path(cfg.Metrics.Path).Handler(promhttp.Handler())
log.Printf("Metrics server enabled at %s", cfg.Metrics.Path)
}
// 运行在S3兼容模式
log.Println("Running in S3-compatible mode")
s3Handler.RegisterS3Routes(router)

View File

@@ -43,6 +43,18 @@ func (h *S3Handler) handleBucketOperations(w http.ResponseWriter, r *http.Reques
vars := mux.Vars(r)
bucketName := vars["bucket"]
// 记录操作指标
start := time.Now()
method := r.Method
var status = "success"
defer func() {
if h.metrics != nil {
duration := time.Since(start).Seconds()
h.metrics.RecordS3Operation(method, bucketName, status)
h.metrics.RecordS3OperationDuration(method, bucketName, duration)
}
}()
switch r.Method {
case "GET":
h.handleListObjects(w, r, bucketName)

View File

@@ -18,6 +18,18 @@ func (h *S3Handler) handleObjectOperations(w http.ResponseWriter, r *http.Reques
bucketName := vars["bucket"]
key := vars["key"]
// 记录操作指标
start := time.Now()
method := r.Method
var status = "success"
defer func() {
if h.metrics != nil {
duration := time.Since(start).Seconds()
h.metrics.RecordS3Operation(method, bucketName, status)
h.metrics.RecordS3OperationDuration(method, bucketName, duration)
}
}()
switch r.Method {
case "GET":
h.handleGetObject(w, r, bucketName, key)

View File

@@ -3,6 +3,7 @@ package api
import (
"github.com/DullJZ/s3-balance/internal/balancer"
"github.com/DullJZ/s3-balance/internal/bucket"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/DullJZ/s3-balance/internal/storage"
"github.com/DullJZ/s3-balance/pkg/presigner"
"github.com/gorilla/mux"
@@ -16,6 +17,7 @@ type S3Handler struct {
storage *storage.Service
accessKey string
secretKey string
metrics *metrics.Metrics
}
// NewS3Handler 创建新的S3兼容API处理器
@@ -26,6 +28,7 @@ func NewS3Handler(
storage *storage.Service,
accessKey string,
secretKey string,
metrics *metrics.Metrics,
) *S3Handler {
return &S3Handler{
bucketManager: bucketManager,
@@ -34,6 +37,7 @@ func NewS3Handler(
storage: storage,
accessKey: accessKey,
secretKey: secretKey,
metrics: metrics,
}
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/DullJZ/s3-balance/internal/bucket"
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/metrics"
)
// Balancer 负载均衡器
@@ -13,6 +14,7 @@ type Balancer struct {
manager *bucket.Manager
strategy Strategy
config *config.BalancerConfig
metrics *metrics.Metrics
}
// NewBalancer 创建新的负载均衡器
@@ -37,6 +39,7 @@ func NewBalancer(manager *bucket.Manager, cfg *config.BalancerConfig) (*Balancer
manager: manager,
strategy: strategy,
config: cfg,
metrics: nil, // 将在main.go中设置
}, nil
}
@@ -67,6 +70,11 @@ func (b *Balancer) SelectBucket(key string, size int64) (*bucket.BucketInfo, err
return nil, err
}
// 记录指标
if b.metrics != nil && selected != nil {
b.metrics.RecordBalancerDecision(b.strategy.Name(), selected.Config.Name)
}
return selected, nil
}
@@ -97,6 +105,11 @@ func (b *Balancer) SetStrategy(strategyName string) error {
return nil
}
// SetMetrics 设置指标服务
func (b *Balancer) SetMetrics(metrics *metrics.Metrics) {
b.metrics = metrics
}
// GetAvailableBuckets 获取所有可用的存储桶
// 方便外部直接查询可用存储桶列表
func (b *Balancer) GetAvailableBuckets() []*bucket.BucketInfo {

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
@@ -29,14 +30,16 @@ type Manager struct {
mu sync.RWMutex
config *config.Config
stopChan chan struct{}
metrics *metrics.Metrics
}
// NewManager 创建新的存储桶管理器
func NewManager(cfg *config.Config) (*Manager, error) {
func NewManager(cfg *config.Config, metrics *metrics.Metrics) (*Manager, error) {
m := &Manager{
buckets: make(map[string]*BucketInfo),
config: cfg,
stopChan: make(chan struct{}),
metrics: metrics,
}
// 初始化所有存储桶客户端
@@ -185,6 +188,11 @@ func (m *Manager) checkBucket(ctx context.Context, bucket *BucketInfo) {
bucket.Available = err == nil
bucket.LastChecked = time.Now()
bucket.mu.Unlock()
// 更新指标
if m.metrics != nil {
m.metrics.SetBucketHealthy(bucket.Config.Name, bucket.Config.Endpoint, bucket.Available)
}
}
// updateAllStats 更新所有存储桶的统计信息
@@ -239,6 +247,11 @@ func (m *Manager) updateBucketStats(ctx context.Context, bucket *BucketInfo) {
bucket.mu.Lock()
bucket.UsedSize = totalSize
bucket.mu.Unlock()
// 更新指标
if m.metrics != nil {
m.metrics.SetBucketUsage(bucket.Config.Name, totalSize, bucket.Config.MaxSizeBytes)
}
}
// GetBucket 获取指定名称的存储桶

View File

@@ -0,0 +1,70 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
bucketHealthy = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "s3_balance_bucket_healthy",
Help: "Health status of S3 bucket (1 = healthy, 0 = unhealthy)",
}, []string{"bucket", "endpoint"})
bucketUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "s3_balance_bucket_usage_bytes",
Help: "Current usage of S3 bucket in bytes",
}, []string{"bucket"})
bucketCapacity = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "s3_balance_bucket_capacity_bytes",
Help: "Maximum capacity of S3 bucket in bytes",
}, []string{"bucket"})
s3OperationsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "s3_balance_s3_operations_total",
Help: "Total number of S3 operations",
}, []string{"operation", "bucket", "status"})
s3OperationDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "s3_balance_s3_operation_duration_seconds",
Help: "Duration of S3 operations in seconds",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "bucket"})
balancerDecisions = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "s3_balance_balancer_decisions_total",
Help: "Total number of load balancing decisions",
}, []string{"strategy", "bucket"})
)
type Metrics struct{}
func New() *Metrics {
return &Metrics{}
}
func (m *Metrics) SetBucketHealthy(bucket, endpoint string, healthy bool) {
value := 0.0
if healthy {
value = 1.0
}
bucketHealthy.WithLabelValues(bucket, endpoint).Set(value)
}
func (m *Metrics) SetBucketUsage(bucket string, usage, capacity int64) {
bucketUsage.WithLabelValues(bucket).Set(float64(usage))
bucketCapacity.WithLabelValues(bucket).Set(float64(capacity))
}
func (m *Metrics) RecordS3Operation(operation, bucket, status string) {
s3OperationsTotal.WithLabelValues(operation, bucket, status).Inc()
}
func (m *Metrics) RecordS3OperationDuration(operation, bucket string, duration float64) {
s3OperationDuration.WithLabelValues(operation, bucket).Observe(duration)
}
func (m *Metrics) RecordBalancerDecision(strategy, bucket string) {
balancerDecisions.WithLabelValues(strategy, bucket).Inc()
}