From 291c327477ccecc96f1388de3ecdfb7f89847ea6 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Thu, 6 Nov 2025 01:14:09 +0800 Subject: [PATCH] Feat: Config Manager API --- .gitignore | 1 + cmd/s3-balance/main.go | 2 +- internal/api/admin_handler.go | 64 +++++++++++- internal/config/manager.go | 177 ++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 38261b6..2e472cb 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ logs/ # AI doc AGENTS.md CLAUDE.md +docs/ # Generated files s3-balance \ No newline at end of file diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 6413a8b..fe006fb 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -135,7 +135,7 @@ func main() { // 必须在S3路由之前注册,因为S3路由使用 /{bucket} 通配符会匹配所有路径 if cfg.API.Enabled { log.Println("Management API enabled") - adminHandler := api.NewAdminHandler(bucketManager, lb, cfg) + adminHandler := api.NewAdminHandler(bucketManager, lb, cfg, configManager) // 创建子路由器并应用Token认证中间件 apiRouter := router.PathPrefix("/api").Subrouter() diff --git a/internal/api/admin_handler.go b/internal/api/admin_handler.go index dcf0f3f..c51e461 100644 --- a/internal/api/admin_handler.go +++ b/internal/api/admin_handler.go @@ -16,6 +16,7 @@ type AdminHandler struct { bucketManager *bucket.Manager balancer *balancer.Balancer config *config.Config + configManager *config.Manager } // NewAdminHandler 创建新的管理API处理器 @@ -23,11 +24,13 @@ func NewAdminHandler( bucketManager *bucket.Manager, balancer *balancer.Balancer, cfg *config.Config, + configManager *config.Manager, ) *AdminHandler { return &AdminHandler{ bucketManager: bucketManager, balancer: balancer, config: cfg, + configManager: configManager, } } @@ -72,9 +75,12 @@ type HealthResponse struct { // RegisterRoutes 注册管理API路由 func (h *AdminHandler) RegisterRoutes(router *mux.Router) { - router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet) - router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet) - router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet) + // 注册路由,同时支持 OPTIONS 方法用于 CORS 预检 + router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/config", h.GetConfig).Methods(http.MethodGet, http.MethodOptions) + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost, http.MethodOptions) } // ListBuckets 获取存储桶列表 @@ -137,6 +143,58 @@ func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) } +// GetConfig 获取当前配置 +func (h *AdminHandler) GetConfig(w http.ResponseWriter, r *http.Request) { + if h.configManager == nil { + http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError) + return + } + + currentConfig := h.configManager.GetConfig() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(currentConfig) +} + +// UpdateConfig 更新配置 +func (h *AdminHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + if h.configManager == nil { + http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError) + return + } + + // 解析请求体 + var newConfig config.Config + if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { + http.Error(w, `{"error": "invalid JSON format: `+err.Error()+`"}`, http.StatusBadRequest) + return + } + + // 设置默认值 + newConfig.SetDefaults() + + // 更新配置 + if err := h.configManager.UpdateConfig(&newConfig); err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]string{ + "error": "validation failed", + "message": err.Error(), + }) + return + } + + // 返回成功响应 + response := map[string]interface{}{ + "success": true, + "message": "Configuration updated successfully. Changes will take effect automatically.", + "config": &newConfig, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + // convertBucketInfo 转换BucketInfo为BucketResponse func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse { resp := BucketResponse{ diff --git a/internal/config/manager.go b/internal/config/manager.go index 3c46547..601904a 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -1,12 +1,14 @@ package config import ( + "fmt" "log" "os" "sync" "time" "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" ) // Manager 配置管理器,支持热更新 @@ -227,6 +229,181 @@ func (m *Manager) logConfigChanges(oldConfig, newConfig *Config) { } } +// UpdateConfig 通过 API 更新配置文件 +// 返回错误如果验证失败或写入失败 +func (m *Manager) UpdateConfig(newConfig *Config) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + // 1. 验证新配置 + if err := m.validateConfig(newConfig); err != nil { + return err + } + + // 2. 备份当前配置文件 + if err := m.backupConfigFile(); err != nil { + log.Printf("Failed to backup config file: %v", err) + // 继续执行,备份失败不应阻止更新 + } + + // 3. 将新配置写入文件 + if err := m.writeConfigFile(newConfig); err != nil { + return err + } + + // 4. 更新内存中的配置 + oldConfig := m.config + m.config = newConfig + + // 5. 更新最后修改时间,避免文件监听重复触发 + if fileInfo, err := os.Stat(m.configFile); err == nil { + m.lastModTime = fileInfo.ModTime() + } + + log.Printf("Configuration updated successfully via API") + + // 6. 触发配置变更回调(在锁外执行) + callbacks := make([]func(*Config), len(m.callbacks)) + copy(callbacks, m.callbacks) + + go func() { + for _, callback := range callbacks { + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Config change callback panic: %v", r) + } + }() + callback(newConfig) + }() + } + }() + + // 7. 记录配置变更 + m.logConfigChanges(oldConfig, newConfig) + + return nil +} + +// validateConfig 验证配置的有效性 +func (m *Manager) validateConfig(cfg *Config) error { + // 基本验证 + if cfg.Server.Port <= 0 || cfg.Server.Port > 65535 { + return fmt.Errorf("invalid server port: %d", cfg.Server.Port) + } + + if len(cfg.Buckets) == 0 { + return fmt.Errorf("at least one bucket is required") + } + + // 验证存储桶配置 + for i, bucket := range cfg.Buckets { + if bucket.Name == "" { + return fmt.Errorf("bucket[%d]: name is required", i) + } + + // 虚拟存储桶不需要端点和凭据 + if !bucket.Virtual { + if bucket.Endpoint == "" { + return fmt.Errorf("bucket[%d] (%s): endpoint is required for non-virtual bucket", i, bucket.Name) + } + if bucket.AccessKeyID == "" { + return fmt.Errorf("bucket[%d] (%s): access_key_id is required for non-virtual bucket", i, bucket.Name) + } + if bucket.SecretAccessKey == "" { + return fmt.Errorf("bucket[%d] (%s): secret_access_key is required for non-virtual bucket", i, bucket.Name) + } + } + + // 解析并验证容量大小 + if err := cfg.Buckets[i].ParseMaxSize(); err != nil { + return fmt.Errorf("bucket[%d] (%s): invalid max_size: %w", i, bucket.Name, err) + } + } + + // 验证负载均衡策略 + validStrategies := map[string]bool{ + "round-robin": true, + "least-space": true, + "weighted": true, + } + if !validStrategies[cfg.Balancer.Strategy] { + return fmt.Errorf("invalid balancer strategy: %s (must be one of: round-robin, least-space, weighted)", cfg.Balancer.Strategy) + } + + // 验证数据库配置 + if cfg.Database.Type == "" { + return fmt.Errorf("database type is required") + } + validDBTypes := map[string]bool{ + "sqlite": true, + "mysql": true, + "postgres": true, + } + if !validDBTypes[cfg.Database.Type] { + return fmt.Errorf("invalid database type: %s (must be one of: sqlite, mysql, postgres)", cfg.Database.Type) + } + + return nil +} + +// backupConfigFile 备份当前配置文件 +func (m *Manager) backupConfigFile() error { + backupPath := m.configFile + ".backup." + time.Now().Format("20060102-150405") + + sourceData, err := os.ReadFile(m.configFile) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + if err := os.WriteFile(backupPath, sourceData, 0644); err != nil { + return fmt.Errorf("failed to write backup file: %w", err) + } + + log.Printf("Config file backed up to: %s", backupPath) + return nil +} + +// writeConfigFile 将配置写入 YAML 文件 +func (m *Manager) writeConfigFile(cfg *Config) error { + // 临时文件,确保原子性 + tmpFile := m.configFile + ".tmp" + + file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return fmt.Errorf("failed to create temp config file: %w", err) + } + defer file.Close() + + encoder := yaml.NewEncoder(file) + encoder.SetIndent(2) + + if err := encoder.Encode(cfg); err != nil { + file.Close() + os.Remove(tmpFile) + return fmt.Errorf("failed to encode config: %w", err) + } + + if err := encoder.Close(); err != nil { + file.Close() + os.Remove(tmpFile) + return fmt.Errorf("failed to close encoder: %w", err) + } + + if err := file.Close(); err != nil { + os.Remove(tmpFile) + return fmt.Errorf("failed to close temp file: %w", err) + } + + // 原子性替换原文件 + if err := os.Rename(tmpFile, m.configFile); err != nil { + os.Remove(tmpFile) + return fmt.Errorf("failed to replace config file: %w", err) + } + + return nil +} + // Close 关闭配置管理器 func (m *Manager) Close() error { // 停止监听协程