mirror of
https://github.com/DullJZ/s3-balance.git
synced 2026-07-01 16:11:40 +08:00
Feat: Config Manager API
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -50,6 +50,7 @@ logs/
|
||||
# AI doc
|
||||
AGENTS.md
|
||||
CLAUDE.md
|
||||
docs/
|
||||
|
||||
# Generated files
|
||||
s3-balance
|
||||
@@ -135,7 +135,7 @@ func main() {
|
||||
// 必须在S3路由之前注册,因为S3路由使用 /{bucket} 通配符会匹配所有路径
|
||||
if cfg.API.Enabled {
|
||||
log.Println("Management API enabled")
|
||||
adminHandler := api.NewAdminHandler(bucketManager, lb, cfg)
|
||||
adminHandler := api.NewAdminHandler(bucketManager, lb, cfg, configManager)
|
||||
|
||||
// 创建子路由器并应用Token认证中间件
|
||||
apiRouter := router.PathPrefix("/api").Subrouter()
|
||||
|
||||
@@ -16,6 +16,7 @@ type AdminHandler struct {
|
||||
bucketManager *bucket.Manager
|
||||
balancer *balancer.Balancer
|
||||
config *config.Config
|
||||
configManager *config.Manager
|
||||
}
|
||||
|
||||
// NewAdminHandler 创建新的管理API处理器
|
||||
@@ -23,11 +24,13 @@ func NewAdminHandler(
|
||||
bucketManager *bucket.Manager,
|
||||
balancer *balancer.Balancer,
|
||||
cfg *config.Config,
|
||||
configManager *config.Manager,
|
||||
) *AdminHandler {
|
||||
return &AdminHandler{
|
||||
bucketManager: bucketManager,
|
||||
balancer: balancer,
|
||||
config: cfg,
|
||||
configManager: configManager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,9 +75,12 @@ type HealthResponse struct {
|
||||
|
||||
// RegisterRoutes 注册管理API路由
|
||||
func (h *AdminHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/buckets", h.ListBuckets).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/health", h.GetHealth).Methods(http.MethodGet)
|
||||
// 注册路由,同时支持 OPTIONS 方法用于 CORS 预检
|
||||
router.HandleFunc("/buckets", h.ListBuckets).Methods(http.MethodGet, http.MethodOptions)
|
||||
router.HandleFunc("/buckets/{name}", h.GetBucketDetail).Methods(http.MethodGet, http.MethodOptions)
|
||||
router.HandleFunc("/health", h.GetHealth).Methods(http.MethodGet, http.MethodOptions)
|
||||
router.HandleFunc("/config", h.GetConfig).Methods(http.MethodGet, http.MethodOptions)
|
||||
router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost, http.MethodOptions)
|
||||
}
|
||||
|
||||
// ListBuckets 获取存储桶列表
|
||||
@@ -137,6 +143,58 @@ func (h *AdminHandler) GetHealth(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// GetConfig 获取当前配置
|
||||
func (h *AdminHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
|
||||
if h.configManager == nil {
|
||||
http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
currentConfig := h.configManager.GetConfig()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(currentConfig)
|
||||
}
|
||||
|
||||
// UpdateConfig 更新配置
|
||||
func (h *AdminHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
|
||||
if h.configManager == nil {
|
||||
http.Error(w, `{"error": "config manager not available"}`, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// 解析请求体
|
||||
var newConfig config.Config
|
||||
if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil {
|
||||
http.Error(w, `{"error": "invalid JSON format: `+err.Error()+`"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// 设置默认值
|
||||
newConfig.SetDefaults()
|
||||
|
||||
// 更新配置
|
||||
if err := h.configManager.UpdateConfig(&newConfig); err != nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(map[string]string{
|
||||
"error": "validation failed",
|
||||
"message": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// 返回成功响应
|
||||
response := map[string]interface{}{
|
||||
"success": true,
|
||||
"message": "Configuration updated successfully. Changes will take effect automatically.",
|
||||
"config": &newConfig,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
// convertBucketInfo 转换BucketInfo为BucketResponse
|
||||
func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse {
|
||||
resp := BucketResponse{
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Manager 配置管理器,支持热更新
|
||||
@@ -227,6 +229,181 @@ func (m *Manager) logConfigChanges(oldConfig, newConfig *Config) {
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateConfig 通过 API 更新配置文件
|
||||
// 返回错误如果验证失败或写入失败
|
||||
func (m *Manager) UpdateConfig(newConfig *Config) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
// 1. 验证新配置
|
||||
if err := m.validateConfig(newConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. 备份当前配置文件
|
||||
if err := m.backupConfigFile(); err != nil {
|
||||
log.Printf("Failed to backup config file: %v", err)
|
||||
// 继续执行,备份失败不应阻止更新
|
||||
}
|
||||
|
||||
// 3. 将新配置写入文件
|
||||
if err := m.writeConfigFile(newConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 4. 更新内存中的配置
|
||||
oldConfig := m.config
|
||||
m.config = newConfig
|
||||
|
||||
// 5. 更新最后修改时间,避免文件监听重复触发
|
||||
if fileInfo, err := os.Stat(m.configFile); err == nil {
|
||||
m.lastModTime = fileInfo.ModTime()
|
||||
}
|
||||
|
||||
log.Printf("Configuration updated successfully via API")
|
||||
|
||||
// 6. 触发配置变更回调(在锁外执行)
|
||||
callbacks := make([]func(*Config), len(m.callbacks))
|
||||
copy(callbacks, m.callbacks)
|
||||
|
||||
go func() {
|
||||
for _, callback := range callbacks {
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Config change callback panic: %v", r)
|
||||
}
|
||||
}()
|
||||
callback(newConfig)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
// 7. 记录配置变更
|
||||
m.logConfigChanges(oldConfig, newConfig)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateConfig 验证配置的有效性
|
||||
func (m *Manager) validateConfig(cfg *Config) error {
|
||||
// 基本验证
|
||||
if cfg.Server.Port <= 0 || cfg.Server.Port > 65535 {
|
||||
return fmt.Errorf("invalid server port: %d", cfg.Server.Port)
|
||||
}
|
||||
|
||||
if len(cfg.Buckets) == 0 {
|
||||
return fmt.Errorf("at least one bucket is required")
|
||||
}
|
||||
|
||||
// 验证存储桶配置
|
||||
for i, bucket := range cfg.Buckets {
|
||||
if bucket.Name == "" {
|
||||
return fmt.Errorf("bucket[%d]: name is required", i)
|
||||
}
|
||||
|
||||
// 虚拟存储桶不需要端点和凭据
|
||||
if !bucket.Virtual {
|
||||
if bucket.Endpoint == "" {
|
||||
return fmt.Errorf("bucket[%d] (%s): endpoint is required for non-virtual bucket", i, bucket.Name)
|
||||
}
|
||||
if bucket.AccessKeyID == "" {
|
||||
return fmt.Errorf("bucket[%d] (%s): access_key_id is required for non-virtual bucket", i, bucket.Name)
|
||||
}
|
||||
if bucket.SecretAccessKey == "" {
|
||||
return fmt.Errorf("bucket[%d] (%s): secret_access_key is required for non-virtual bucket", i, bucket.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析并验证容量大小
|
||||
if err := cfg.Buckets[i].ParseMaxSize(); err != nil {
|
||||
return fmt.Errorf("bucket[%d] (%s): invalid max_size: %w", i, bucket.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 验证负载均衡策略
|
||||
validStrategies := map[string]bool{
|
||||
"round-robin": true,
|
||||
"least-space": true,
|
||||
"weighted": true,
|
||||
}
|
||||
if !validStrategies[cfg.Balancer.Strategy] {
|
||||
return fmt.Errorf("invalid balancer strategy: %s (must be one of: round-robin, least-space, weighted)", cfg.Balancer.Strategy)
|
||||
}
|
||||
|
||||
// 验证数据库配置
|
||||
if cfg.Database.Type == "" {
|
||||
return fmt.Errorf("database type is required")
|
||||
}
|
||||
validDBTypes := map[string]bool{
|
||||
"sqlite": true,
|
||||
"mysql": true,
|
||||
"postgres": true,
|
||||
}
|
||||
if !validDBTypes[cfg.Database.Type] {
|
||||
return fmt.Errorf("invalid database type: %s (must be one of: sqlite, mysql, postgres)", cfg.Database.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// backupConfigFile 备份当前配置文件
|
||||
func (m *Manager) backupConfigFile() error {
|
||||
backupPath := m.configFile + ".backup." + time.Now().Format("20060102-150405")
|
||||
|
||||
sourceData, err := os.ReadFile(m.configFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read config file: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(backupPath, sourceData, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write backup file: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("Config file backed up to: %s", backupPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeConfigFile 将配置写入 YAML 文件
|
||||
func (m *Manager) writeConfigFile(cfg *Config) error {
|
||||
// 临时文件,确保原子性
|
||||
tmpFile := m.configFile + ".tmp"
|
||||
|
||||
file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp config file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
encoder := yaml.NewEncoder(file)
|
||||
encoder.SetIndent(2)
|
||||
|
||||
if err := encoder.Encode(cfg); err != nil {
|
||||
file.Close()
|
||||
os.Remove(tmpFile)
|
||||
return fmt.Errorf("failed to encode config: %w", err)
|
||||
}
|
||||
|
||||
if err := encoder.Close(); err != nil {
|
||||
file.Close()
|
||||
os.Remove(tmpFile)
|
||||
return fmt.Errorf("failed to close encoder: %w", err)
|
||||
}
|
||||
|
||||
if err := file.Close(); err != nil {
|
||||
os.Remove(tmpFile)
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
|
||||
// 原子性替换原文件
|
||||
if err := os.Rename(tmpFile, m.configFile); err != nil {
|
||||
os.Remove(tmpFile)
|
||||
return fmt.Errorf("failed to replace config file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭配置管理器
|
||||
func (m *Manager) Close() error {
|
||||
// 停止监听协程
|
||||
|
||||
Reference in New Issue
Block a user