From ffc21aa49cbfbe5b1dcfe3f9662becfb4aa4a006 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Tue, 4 Nov 2025 19:57:02 +0800 Subject: [PATCH] Add management API endpoints --- cmd/s3-balance/main.go | 15 +++ config/config.example.yaml | 17 +++- internal/api/admin_handler.go | 175 ++++++++++++++++++++++++++++++++++ internal/bucket/manager.go | 19 ++++ internal/config/config.go | 12 +++ internal/middleware/auth.go | 34 +++++++ 6 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 internal/api/admin_handler.go diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 7839bfc..856106e 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -17,6 +17,7 @@ import ( "github.com/DullJZ/s3-balance/internal/config" "github.com/DullJZ/s3-balance/internal/database" "github.com/DullJZ/s3-balance/internal/metrics" + "github.com/DullJZ/s3-balance/internal/middleware" "github.com/DullJZ/s3-balance/internal/storage" "github.com/DullJZ/s3-balance/pkg/presigner" "github.com/gorilla/mux" @@ -124,6 +125,20 @@ func main() { log.Printf("Metrics server enabled at %s", cfg.Metrics.Path) } + // 注册管理API路由(如果启用) + // 必须在S3路由之前注册,因为S3路由使用 /{bucket} 通配符会匹配所有路径 + if cfg.API.Enabled { + log.Println("Management API enabled") + adminHandler := api.NewAdminHandler(bucketManager, lb, cfg) + + // 创建子路由器并应用Token认证中间件 + apiRouter := router.PathPrefix("/api").Subrouter() + apiRouter.Use(middleware.TokenAuthMiddleware(cfg.API.Token)) + adminHandler.RegisterRoutes(apiRouter) + + log.Printf("Management API endpoints available at /api/*") + } + // 运行在S3兼容模式 log.Println("Running in S3-compatible mode") s3Handler.RegisterS3Routes(router) diff --git a/config/config.example.yaml b/config/config.example.yaml index d305fe7..9fa58b2 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -133,18 +133,18 @@ metrics: s3api: # 客户端连接用的Access Key access_key: "AKIAIOSFODNN7EXAMPLE" - + # 客户端连接用的Secret Key secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" - + # 是否使用虚拟主机模式 virtual_host: false - + # 工作模式: # false:预签名重定向模式,客户端直接与后端存储交互 # true (默认):代理模式,数据通过S3 Balance服务器传输 proxy_mode: true - + # 是否需要认证(开启后使用 Basic Auth,凭据来自 access_key/secret_key) auth_required: true @@ -153,3 +153,12 @@ s3api: # 留空则使用请求中的 Host 头 # 示例: "s3.example.com" 或 "s3.example.com:8080" host: "" + +# 管理API配置 +api: + # 是否启用管理API + enabled: true + + # API访问令牌(用于管理接口的身份验证) + # 请修改为强密码,建议使用随机生成的长字符串 + token: "your-secure-api-token-change-this" diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go new file mode 100644 index 0000000..dcf0f3f --- /dev/null +++ b/internal/api/admin_handler.go @@ -0,0 +1,175 @@ +package api + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/DullJZ/s3-balance/internal/balancer" + "github.com/DullJZ/s3-balance/internal/bucket" + "github.com/DullJZ/s3-balance/internal/config" + "github.com/gorilla/mux" +) + +// AdminHandler 管理API处理器 +type AdminHandler struct { + bucketManager *bucket.Manager + balancer *balancer.Balancer + config *config.Config +} + +// NewAdminHandler 创建新的管理API处理器 +func NewAdminHandler( + bucketManager *bucket.Manager, + balancer *balancer.Balancer, + cfg *config.Config, +) *AdminHandler { + return &AdminHandler{ + bucketManager: bucketManager, + balancer: balancer, + config: cfg, + } +} + +// BucketResponse 存储桶响应结构 +type BucketResponse struct { + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Region string `json:"region"` + MaxSize string `json:"max_size"` + MaxSizeBytes int64 `json:"max_size_bytes"` + UsedSize int64 `json:"used_size"` + AvailableSize int64 `json:"available_size"` + UsagePercent float64 `json:"usage_percent"` + Weight int `json:"weight"` + Enabled bool `json:"enabled"` + Available bool `json:"available"` + Virtual bool `json:"virtual"` + LastChecked time.Time `json:"last_checked"` + OperationCountA int64 `json:"operation_count_a"` + OperationCountB int64 `json:"operation_count_b"` + OperationLimits struct { + TypeA int `json:"type_a"` + TypeB int `json:"type_b"` + } `json:"operation_limits"` +} + +// BucketsListResponse 存储桶列表响应结构 +type BucketsListResponse struct { + Total int `json:"total"` + Buckets []BucketResponse `json:"buckets"` +} + +// HealthResponse 健康状态响应结构 +type HealthResponse struct { + Status string `json:"status"` + Timestamp time.Time `json:"timestamp"` + LoadBalancer string `json:"load_balancer_strategy"` + TotalBuckets int `json:"total_buckets"` + AvailableBuckets int `json:"available_buckets"` + Database string `json:"database_type"` +} + +// RegisterRoutes 注册管理API路由 +func (h *AdminHandler) RegisterRoutes(router *mux.Router) { + router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet) + router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) + router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet) +} + +// ListBuckets 获取存储桶列表 +func (h *AdminHandler) ListBuckets(w http.ResponseWriter, r *http.Request) { + buckets := h.bucketManager.GetAllBuckets() + + response := BucketsListResponse{ + Total: len(buckets), + Buckets: make([]BucketResponse, 0, len(buckets)), + } + + for _, b := range buckets { + bucketResp := h.convertBucketInfo(b) + response.Buckets = append(response.Buckets, bucketResp) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetBucketDetail 获取存储桶详情 +func (h *AdminHandler) GetBucketDetail(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + name := vars["name"] + + bucketInfo, exists := h.bucketManager.GetBucket(name) + if !exists { + http.Error(w, `{"error": "bucket not found"}`, http.StatusNotFound) + return + } + + response := h.convertBucketInfo(bucketInfo) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetHealth 获取系统健康状态 +func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) { + buckets := h.bucketManager.GetAllBuckets() + availableBuckets := h.bucketManager.GetAvailableBuckets() + + status := "healthy" + if len(availableBuckets) == 0 { + status = "unhealthy" + } else if len(availableBuckets) < len(buckets)/2 { + status = "degraded" + } + + response := HealthResponse{ + Status: status, + Timestamp: time.Now(), + LoadBalancer: h.config.Balancer.Strategy, + TotalBuckets: len(buckets), + AvailableBuckets: len(availableBuckets), + Database: h.config.Database.Type, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// convertBucketInfo 转换BucketInfo为BucketResponse +func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse { + resp := BucketResponse{ + Name: b.Config.Name, + Endpoint: b.Config.Endpoint, + Region: b.Config.Region, + MaxSize: b.Config.MaxSize, + MaxSizeBytes: b.Config.MaxSizeBytes, + UsedSize: b.UsedSize, + Weight: b.Config.Weight, + Enabled: b.Config.Enabled, + Available: b.Available, + Virtual: b.Config.Virtual, + LastChecked: b.LastChecked, + OperationCountA: b.GetOperationCount(bucket.OperationTypeA), + OperationCountB: b.GetOperationCount(bucket.OperationTypeB), + } + + resp.OperationLimits.TypeA = b.Config.OperationLimits.TypeA + resp.OperationLimits.TypeB = b.Config.OperationLimits.TypeB + + // 计算可用空间 + if b.Config.MaxSizeBytes > 0 { + resp.AvailableSize = b.Config.MaxSizeBytes - b.UsedSize + if resp.AvailableSize < 0 { + resp.AvailableSize = 0 + } + // 计算使用百分比 + resp.UsagePercent = float64(b.UsedSize) / float64(b.Config.MaxSizeBytes) * 100 + } else { + resp.AvailableSize = -1 // -1 表示无限制 + resp.UsagePercent = 0 + } + + return resp +} diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 41c86ee..8f5e0d8 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -413,6 +413,25 @@ func (b *BucketInfo) IsVirtual() bool { return b.Config.Virtual } +// GetOperationCount 获取指定类型的操作计数 +func (b *BucketInfo) GetOperationCount(category OperationCategory) int64 { + if b == nil { + return 0 + } + + b.mu.RLock() + defer b.mu.RUnlock() + + switch category { + case OperationTypeA: + return b.operationCountA + case OperationTypeB: + return b.operationCountB + default: + return 0 + } +} + // GetVirtualBuckets 获取所有虚拟存储桶 func (m *Manager) GetVirtualBuckets() []*BucketInfo { m.mu.RLock() diff --git a/internal/config/config.go b/internal/config/config.go index 5953a38..2f3d8b7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,7 @@ type Config struct { Balancer BalancerConfig `yaml:"balancer"` Metrics MetricsConfig `yaml:"metrics"` S3API S3APIConfig `yaml:"s3api"` + API APIConfig `yaml:"api"` } // ServerConfig 服务器配置 @@ -75,6 +76,12 @@ type S3APIConfig struct { Host string `yaml:"host"` // 用于签名验证的Host(为空则使用请求的Host) } +// APIConfig 管理API配置 +type APIConfig struct { + Enabled bool `yaml:"enabled"` // 是否启用管理API + Token string `yaml:"token"` // API访问令牌 +} + // DatabaseConfig 数据库配置 type DatabaseConfig struct { Type string `yaml:"type"` // 数据库类型: sqlite, mysql, postgres @@ -179,6 +186,11 @@ func (c *Config) SetDefaults() { if c.S3API.SecretKey == "" { c.S3API.SecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" } + + // 管理API默认值 + if c.API.Token == "" { + c.API.Token = "your-secure-api-token-here" + } } // ParseMaxSize 解析最大容量字符串为字节 diff --git a/internal/middleware/auth.go b/internal/middleware/auth.go index c29ffc6..e95bcc4 100644 --- a/internal/middleware/auth.go +++ b/internal/middleware/auth.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strings" "github.com/DullJZ/s3-validate/pkg/s3validate" ) @@ -76,3 +77,36 @@ func invokeOnError(w http.ResponseWriter, r *http.Request, cfg S3SignatureConfig } http.Error(w, message, http.StatusForbidden) } + +// TokenAuthMiddleware 创建Token认证中间件,用于管理API +func TokenAuthMiddleware(validToken string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // 从Authorization头中提取token + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + w.Header().Set("Content-Type", "application/json") + http.Error(w, `{"error": "missing authorization header"}`, http.StatusUnauthorized) + return + } + + // 支持两种格式: + // 1. Bearer + // 2. + token := authHeader + if strings.HasPrefix(authHeader, "Bearer ") { + token = strings.TrimPrefix(authHeader, "Bearer ") + } + + // 验证token + if token != validToken { + w.Header().Set("Content-Type", "application/json") + http.Error(w, `{"error": "invalid token"}`, http.StatusUnauthorized) + return + } + + // 继续处理请求 + next.ServeHTTP(w, r) + }) + } +}