Add management API endpoints

This commit is contained in:
DullJZ
2025-11-04 19:57:02 +08:00
parent 529ee8fe55
commit ffc21aa49c
6 changed files with 268 additions and 4 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/database"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/DullJZ/s3-balance/internal/middleware"
"github.com/DullJZ/s3-balance/internal/storage"
"github.com/DullJZ/s3-balance/pkg/presigner"
"github.com/gorilla/mux"
@@ -124,6 +125,20 @@ func main() {
log.Printf("Metrics server enabled at %s", cfg.Metrics.Path)
}
// 注册管理API路由如果启用
// 必须在S3路由之前注册因为S3路由使用 /{bucket} 通配符会匹配所有路径
if cfg.API.Enabled {
log.Println("Management API enabled")
adminHandler := api.NewAdminHandler(bucketManager, lb, cfg)
// 创建子路由器并应用Token认证中间件
apiRouter := router.PathPrefix("/api").Subrouter()
apiRouter.Use(middleware.TokenAuthMiddleware(cfg.API.Token))
adminHandler.RegisterRoutes(apiRouter)
log.Printf("Management API endpoints available at /api/*")
}
// 运行在S3兼容模式
log.Println("Running in S3-compatible mode")
s3Handler.RegisterS3Routes(router)

View File

@@ -133,18 +133,18 @@ metrics:
s3api:
# 客户端连接用的Access Key
access_key: "AKIAIOSFODNN7EXAMPLE"
# 客户端连接用的Secret Key
secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
# 是否使用虚拟主机模式
virtual_host: false
# 工作模式:
# false预签名重定向模式客户端直接与后端存储交互
# true (默认)代理模式数据通过S3 Balance服务器传输
proxy_mode: true
# 是否需要认证(开启后使用 Basic Auth凭据来自 access_key/secret_key
auth_required: true
@@ -153,3 +153,12 @@ s3api:
# 留空则使用请求中的 Host 头
# 示例: "s3.example.com" 或 "s3.example.com:8080"
host: ""
# 管理API配置
api:
# 是否启用管理API
enabled: true
# API访问令牌用于管理接口的身份验证
# 请修改为强密码,建议使用随机生成的长字符串
token: "your-secure-api-token-change-this"

View File

@@ -0,0 +1,175 @@
package api
import (
"encoding/json"
"net/http"
"time"
"github.com/DullJZ/s3-balance/internal/balancer"
"github.com/DullJZ/s3-balance/internal/bucket"
"github.com/DullJZ/s3-balance/internal/config"
"github.com/gorilla/mux"
)
// AdminHandler 管理API处理器
type AdminHandler struct {
bucketManager *bucket.Manager
balancer *balancer.Balancer
config *config.Config
}
// NewAdminHandler 创建新的管理API处理器
func NewAdminHandler(
bucketManager *bucket.Manager,
balancer *balancer.Balancer,
cfg *config.Config,
) *AdminHandler {
return &AdminHandler{
bucketManager: bucketManager,
balancer: balancer,
config: cfg,
}
}
// BucketResponse 存储桶响应结构
type BucketResponse struct {
Name string `json:"name"`
Endpoint string `json:"endpoint"`
Region string `json:"region"`
MaxSize string `json:"max_size"`
MaxSizeBytes int64 `json:"max_size_bytes"`
UsedSize int64 `json:"used_size"`
AvailableSize int64 `json:"available_size"`
UsagePercent float64 `json:"usage_percent"`
Weight int `json:"weight"`
Enabled bool `json:"enabled"`
Available bool `json:"available"`
Virtual bool `json:"virtual"`
LastChecked time.Time `json:"last_checked"`
OperationCountA int64 `json:"operation_count_a"`
OperationCountB int64 `json:"operation_count_b"`
OperationLimits struct {
TypeA int `json:"type_a"`
TypeB int `json:"type_b"`
} `json:"operation_limits"`
}
// BucketsListResponse 存储桶列表响应结构
type BucketsListResponse struct {
Total int `json:"total"`
Buckets []BucketResponse `json:"buckets"`
}
// HealthResponse 健康状态响应结构
type HealthResponse struct {
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
LoadBalancer string `json:"load_balancer_strategy"`
TotalBuckets int `json:"total_buckets"`
AvailableBuckets int `json:"available_buckets"`
Database string `json:"database_type"`
}
// RegisterRoutes 注册管理API路由
func (h *AdminHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet)
router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet)
router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet)
}
// ListBuckets 获取存储桶列表
func (h *AdminHandler) ListBuckets(w http.ResponseWriter, r *http.Request) {
buckets := h.bucketManager.GetAllBuckets()
response := BucketsListResponse{
Total: len(buckets),
Buckets: make([]BucketResponse, 0, len(buckets)),
}
for _, b := range buckets {
bucketResp := h.convertBucketInfo(b)
response.Buckets = append(response.Buckets, bucketResp)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetBucketDetail 获取存储桶详情
func (h *AdminHandler) GetBucketDetail(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
name := vars["name"]
bucketInfo, exists := h.bucketManager.GetBucket(name)
if !exists {
http.Error(w, `{"error": "bucket not found"}`, http.StatusNotFound)
return
}
response := h.convertBucketInfo(bucketInfo)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetHealth 获取系统健康状态
func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) {
buckets := h.bucketManager.GetAllBuckets()
availableBuckets := h.bucketManager.GetAvailableBuckets()
status := "healthy"
if len(availableBuckets) == 0 {
status = "unhealthy"
} else if len(availableBuckets) < len(buckets)/2 {
status = "degraded"
}
response := HealthResponse{
Status: status,
Timestamp: time.Now(),
LoadBalancer: h.config.Balancer.Strategy,
TotalBuckets: len(buckets),
AvailableBuckets: len(availableBuckets),
Database: h.config.Database.Type,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// convertBucketInfo 转换BucketInfo为BucketResponse
func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse {
resp := BucketResponse{
Name: b.Config.Name,
Endpoint: b.Config.Endpoint,
Region: b.Config.Region,
MaxSize: b.Config.MaxSize,
MaxSizeBytes: b.Config.MaxSizeBytes,
UsedSize: b.UsedSize,
Weight: b.Config.Weight,
Enabled: b.Config.Enabled,
Available: b.Available,
Virtual: b.Config.Virtual,
LastChecked: b.LastChecked,
OperationCountA: b.GetOperationCount(bucket.OperationTypeA),
OperationCountB: b.GetOperationCount(bucket.OperationTypeB),
}
resp.OperationLimits.TypeA = b.Config.OperationLimits.TypeA
resp.OperationLimits.TypeB = b.Config.OperationLimits.TypeB
// 计算可用空间
if b.Config.MaxSizeBytes > 0 {
resp.AvailableSize = b.Config.MaxSizeBytes - b.UsedSize
if resp.AvailableSize < 0 {
resp.AvailableSize = 0
}
// 计算使用百分比
resp.UsagePercent = float64(b.UsedSize) / float64(b.Config.MaxSizeBytes) * 100
} else {
resp.AvailableSize = -1 // -1 表示无限制
resp.UsagePercent = 0
}
return resp
}

View File

@@ -413,6 +413,25 @@ func (b *BucketInfo) IsVirtual() bool {
return b.Config.Virtual
}
// GetOperationCount 获取指定类型的操作计数
func (b *BucketInfo) GetOperationCount(category OperationCategory) int64 {
if b == nil {
return 0
}
b.mu.RLock()
defer b.mu.RUnlock()
switch category {
case OperationTypeA:
return b.operationCountA
case OperationTypeB:
return b.operationCountB
default:
return 0
}
}
// GetVirtualBuckets 获取所有虚拟存储桶
func (m *Manager) GetVirtualBuckets() []*BucketInfo {
m.mu.RLock()

View File

@@ -16,6 +16,7 @@ type Config struct {
Balancer BalancerConfig `yaml:"balancer"`
Metrics MetricsConfig `yaml:"metrics"`
S3API S3APIConfig `yaml:"s3api"`
API APIConfig `yaml:"api"`
}
// ServerConfig 服务器配置
@@ -75,6 +76,12 @@ type S3APIConfig struct {
Host string `yaml:"host"` // 用于签名验证的Host为空则使用请求的Host
}
// APIConfig 管理API配置
type APIConfig struct {
Enabled bool `yaml:"enabled"` // 是否启用管理API
Token string `yaml:"token"` // API访问令牌
}
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
Type string `yaml:"type"` // 数据库类型: sqlite, mysql, postgres
@@ -179,6 +186,11 @@ func (c *Config) SetDefaults() {
if c.S3API.SecretKey == "" {
c.S3API.SecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
// 管理API默认值
if c.API.Token == "" {
c.API.Token = "your-secure-api-token-here"
}
}
// ParseMaxSize 解析最大容量字符串为字节

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strings"
"github.com/DullJZ/s3-validate/pkg/s3validate"
)
@@ -76,3 +77,36 @@ func invokeOnError(w http.ResponseWriter, r *http.Request, cfg S3SignatureConfig
}
http.Error(w, message, http.StatusForbidden)
}
// TokenAuthMiddleware 创建Token认证中间件用于管理API
func TokenAuthMiddleware(validToken string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 从Authorization头中提取token
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
w.Header().Set("Content-Type", "application/json")
http.Error(w, `{"error": "missing authorization header"}`, http.StatusUnauthorized)
return
}
// 支持两种格式:
// 1. Bearer <token>
// 2. <token>
token := authHeader
if strings.HasPrefix(authHeader, "Bearer ") {
token = strings.TrimPrefix(authHeader, "Bearer ")
}
// 验证token
if token != validToken {
w.Header().Set("Content-Type", "application/json")
http.Error(w, `{"error": "invalid token"}`, http.StatusUnauthorized)
return
}
// 继续处理请求
next.ServeHTTP(w, r)
})
}
}