diff --git a/internal/bucket/manager.go b/internal/bucket/manager.go index 43a6eb8..48571f0 100644 --- a/internal/bucket/manager.go +++ b/internal/bucket/manager.go @@ -35,6 +35,7 @@ type Manager struct { metrics *metrics.Metrics healthMonitor *health.Monitor statsMonitor *health.StatsMonitor + monitorCtx context.Context } // NewManager 创建新的存储桶管理器 @@ -161,6 +162,16 @@ func (m *Manager) initHealthMonitoring() { // Start 启动管理器(健康检查和统计更新) func (m *Manager) Start(ctx context.Context) { + m.monitorCtx = ctx + m.startMonitors() +} + +func (m *Manager) startMonitors() { + ctx := m.monitorCtx + if ctx == nil { + return + } + // 启动健康监控 if m.healthMonitor != nil { m.healthMonitor.Start(ctx) @@ -175,12 +186,12 @@ func (m *Manager) Start(ctx context.Context) { // Stop 停止管理器 func (m *Manager) Stop() { close(m.stopChan) - + // 停止健康监控 if m.healthMonitor != nil { m.healthMonitor.Stop() } - + // 停止统计监控 if m.statsMonitor != nil { m.statsMonitor.Stop() @@ -199,7 +210,7 @@ func (m *Manager) GetBucket(name string) (*BucketInfo, bool) { func (m *Manager) GetAllBuckets() []*BucketInfo { m.mu.RLock() defer m.mu.RUnlock() - + buckets := make([]*BucketInfo, 0, len(m.buckets)) for _, b := range m.buckets { buckets = append(buckets, b) @@ -211,7 +222,7 @@ func (m *Manager) GetAllBuckets() []*BucketInfo { func (m *Manager) GetAvailableBuckets() []*BucketInfo { m.mu.RLock() defer m.mu.RUnlock() - + var available []*BucketInfo for _, b := range m.buckets { b.mu.RLock() @@ -228,7 +239,7 @@ func (m *Manager) GetAvailableBuckets() []*BucketInfo { func (b *BucketInfo) GetAvailableSpace() int64 { b.mu.RLock() defer b.mu.RUnlock() - + if b.Config.MaxSizeBytes == 0 { return 1 << 62 // 返回一个很大的数表示无限制 } @@ -267,7 +278,7 @@ func (b *BucketInfo) IsVirtual() bool { func (m *Manager) GetVirtualBuckets() []*BucketInfo { m.mu.RLock() defer m.mu.RUnlock() - + var virtual []*BucketInfo for _, b := range m.buckets { if b.IsVirtual() { @@ -293,17 +304,16 @@ func (m *Manager) GetRealBuckets() []*BucketInfo { // UpdateConfig 更新配置(支持热更新) func (m *Manager) UpdateConfig(newConfig *config.Config) error { - m.mu.Lock() - defer m.mu.Unlock() - log.Println("Updating bucket manager configuration...") - // 更新配置引用 + m.mu.Lock() oldConfig := m.config m.config = newConfig // 检查是否需要重新创建存储桶 needsRestart := m.checkIfRestartNeeded(oldConfig, newConfig) + restartMonitors := false + if needsRestart { log.Println("Bucket configuration changed significantly, recreating buckets...") @@ -326,8 +336,9 @@ func (m *Manager) UpdateConfig(newConfig *config.Config) error { client, err := createS3Client(bucketCfg) if err != nil { - // 如果创建失败,恢复旧配置 + // 如果创建失败,恢复旧配置并回滚 m.config = oldConfig + m.mu.Unlock() return fmt.Errorf("failed to create S3 client for bucket %s: %v", bucketCfg.Name, err) } @@ -343,6 +354,7 @@ func (m *Manager) UpdateConfig(newConfig *config.Config) error { // 重新初始化监控 m.initHealthMonitoring() + restartMonitors = true } else { // 只更新监控间隔(需要重启监控来改变间隔) log.Println("Updating monitoring intervals...") @@ -354,6 +366,13 @@ func (m *Manager) UpdateConfig(newConfig *config.Config) error { } // 重新初始化监控以应用新的间隔 m.initHealthMonitoring() + restartMonitors = true + } + + m.mu.Unlock() + + if restartMonitors { + m.startMonitors() } log.Println("Bucket manager configuration updated successfully") @@ -376,12 +395,12 @@ func (m *Manager) checkIfRestartNeeded(oldConfig, newConfig *config.Config) bool // 检查关键配置项 if oldBucket.Name != newBucket.Name || - oldBucket.Endpoint != newBucket.Endpoint || - oldBucket.AccessKeyID != newBucket.AccessKeyID || - oldBucket.SecretAccessKey != newBucket.SecretAccessKey || - oldBucket.Region != newBucket.Region || - oldBucket.Enabled != newBucket.Enabled || - oldBucket.Virtual != newBucket.Virtual { + oldBucket.Endpoint != newBucket.Endpoint || + oldBucket.AccessKeyID != newBucket.AccessKeyID || + oldBucket.SecretAccessKey != newBucket.SecretAccessKey || + oldBucket.Region != newBucket.Region || + oldBucket.Enabled != newBucket.Enabled || + oldBucket.Virtual != newBucket.Virtual { return true } }