From d64ecac37c5f2343096ef754c909bd3aa5c64d5f Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Wed, 29 Oct 2025 16:58:10 +0800 Subject: [PATCH 1/9] A&B operation history --- cmd/s3-balance/main.go | 11 ++ internal/api/stats_handler.go | 193 +++++++++++++++++++++++++ internal/database/database.go | 1 + internal/scheduler/monthly_archiver.go | 92 ++++++++++++ internal/storage/models.go | 17 +++ internal/storage/service.go | 110 ++++++++++++++ 6 files changed, 424 insertions(+) create mode 100644 internal/api/stats_handler.go create mode 100644 internal/scheduler/monthly_archiver.go diff --git a/cmd/s3-balance/main.go b/cmd/s3-balance/main.go index 7839bfc..dfeac0f 100644 --- a/cmd/s3-balance/main.go +++ b/cmd/s3-balance/main.go @@ -17,6 +17,7 @@ import ( "github.com/DullJZ/s3-balance/internal/config" "github.com/DullJZ/s3-balance/internal/database" "github.com/DullJZ/s3-balance/internal/metrics" + "github.com/DullJZ/s3-balance/internal/scheduler" "github.com/DullJZ/s3-balance/internal/storage" "github.com/DullJZ/s3-balance/pkg/presigner" "github.com/gorilla/mux" @@ -80,6 +81,11 @@ func main() { // 启动定期清理过期上传会话的任务 startSessionCleaner(ctx, storageService) + // 启动月度统计归档任务(每小时检查一次) + monthlyArchiver := scheduler.NewMonthlyArchiver(storageService, 1*time.Hour) + monthlyArchiver.Start() + defer monthlyArchiver.Stop() + // 创建S3兼容API处理器 s3Handler := api.NewS3Handler( bucketManager, @@ -124,6 +130,11 @@ func main() { log.Printf("Metrics server enabled at %s", cfg.Metrics.Path) } + // 添加统计API端点 + statsHandler := api.NewStatsHandler(storageService) + statsHandler.RegisterRoutes(router) + log.Println("Statistics API endpoints registered at /api/stats/*") + // 运行在S3兼容模式 log.Println("Running in S3-compatible mode") s3Handler.RegisterS3Routes(router) diff --git a/internal/api/stats_handler.go b/internal/api/stats_handler.go new file mode 100644 index 0000000..3436dc1 --- /dev/null +++ b/internal/api/stats_handler.go @@ -0,0 +1,193 @@ +package api + +import ( + "encoding/json" + "log" + "net/http" + "strconv" + "time" + + "github.com/DullJZ/s3-balance/internal/storage" + "github.com/gorilla/mux" +) + +// StatsHandler 统计数据处理器 +type StatsHandler struct { + storage *storage.Service +} + +// NewStatsHandler 创建统计处理器 +func NewStatsHandler(storage *storage.Service) *StatsHandler { + return &StatsHandler{ + storage: storage, + } +} + +// RegisterRoutes 注册统计API路由 +func (h *StatsHandler) RegisterRoutes(router *mux.Router) { + router.HandleFunc("/api/stats/monthly", h.GetCurrentMonthStats).Methods("GET") + router.HandleFunc("/api/stats/monthly/{year}/{month}", h.GetMonthlyStats).Methods("GET") + router.HandleFunc("/api/stats/monthly/range", h.GetMonthlyStatsRange).Methods("GET") + router.HandleFunc("/api/stats/bucket/{bucket}/history", h.GetBucketHistory).Methods("GET") +} + +// MonthlyStatsResponse 月度统计响应 +type MonthlyStatsResponse struct { + Year int `json:"year"` + Month int `json:"month"` + Bucket string `json:"bucket"` + Stats BucketOperationCounts `json:"stats"` +} + +// BucketOperationCounts 存储桶操作计数 +type BucketOperationCounts struct { + OperationCountA int64 `json:"operation_count_a"` + OperationCountB int64 `json:"operation_count_b"` + Total int64 `json:"total"` +} + +// GetCurrentMonthStats 获取当前月份的统计 +func (h *StatsHandler) GetCurrentMonthStats(w http.ResponseWriter, r *http.Request) { + stats, err := h.storage.GetCurrentMonthStats() + if err != nil { + log.Printf("Failed to get current month stats: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetMonthlyStats 获取指定月份的统计 +func (h *StatsHandler) GetMonthlyStats(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + year, err := strconv.Atoi(vars["year"]) + if err != nil { + http.Error(w, "Invalid year", http.StatusBadRequest) + return + } + + month, err := strconv.Atoi(vars["month"]) + if err != nil || month < 1 || month > 12 { + http.Error(w, "Invalid month", http.StatusBadRequest) + return + } + + stats, err := h.storage.GetMonthlyStats(year, month) + if err != nil { + log.Printf("Failed to get monthly stats: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetMonthlyStatsRange 获取时间范围内的统计 +func (h *StatsHandler) GetMonthlyStatsRange(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + + startYear, err := strconv.Atoi(query.Get("start_year")) + if err != nil { + http.Error(w, "Invalid start_year", http.StatusBadRequest) + return + } + + startMonth, err := strconv.Atoi(query.Get("start_month")) + if err != nil || startMonth < 1 || startMonth > 12 { + http.Error(w, "Invalid start_month", http.StatusBadRequest) + return + } + + endYear, err := strconv.Atoi(query.Get("end_year")) + if err != nil { + http.Error(w, "Invalid end_year", http.StatusBadRequest) + return + } + + endMonth, err := strconv.Atoi(query.Get("end_month")) + if err != nil || endMonth < 1 || endMonth > 12 { + http.Error(w, "Invalid end_month", http.StatusBadRequest) + return + } + + stats, err := h.storage.GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth) + if err != nil { + log.Printf("Failed to get monthly stats range: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// GetBucketHistory 获取指定存储桶的历史统计 +func (h *StatsHandler) GetBucketHistory(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + bucket := vars["bucket"] + + // 获取查询参数中的月份数,默认12个月 + months := 12 + if monthsStr := r.URL.Query().Get("months"); monthsStr != "" { + if m, err := strconv.Atoi(monthsStr); err == nil && m > 0 { + months = m + } + } + + stats, err := h.storage.GetBucketMonthlyHistory(bucket, months) + if err != nil { + log.Printf("Failed to get bucket history: %v", err) + http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError) + return + } + + response := h.formatMonthlyStats(stats) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// formatMonthlyStats 格式化月度统计数据 +func (h *StatsHandler) formatMonthlyStats(stats []storage.BucketMonthlyStats) []MonthlyStatsResponse { + result := make([]MonthlyStatsResponse, 0, len(stats)) + + for _, stat := range stats { + result = append(result, MonthlyStatsResponse{ + Year: stat.Year, + Month: stat.Month, + Bucket: stat.BucketName, + Stats: BucketOperationCounts{ + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + Total: stat.OperationCountA + stat.OperationCountB, + }, + }) + } + + return result +} + +// ArchiveCurrentMonth 手动触发归档当前月份(管理API) +func (h *StatsHandler) ArchiveCurrentMonth(w http.ResponseWriter, r *http.Request) { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + if err := h.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to archive monthly stats: %v", err) + http.Error(w, "Failed to archive statistics", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "success", + "message": "Monthly statistics archived successfully", + "year": strconv.Itoa(year), + "month": strconv.Itoa(month), + }) +} diff --git a/internal/database/database.go b/internal/database/database.go index 34459b8..9d1ff4f 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -181,6 +181,7 @@ func AutoMigrate() error { models := []interface{}{ &storage.Object{}, &storage.BucketStats{}, + &storage.BucketMonthlyStats{}, &storage.UploadSession{}, &storage.AccessLog{}, &storage.VirtualBucketMapping{}, diff --git a/internal/scheduler/monthly_archiver.go b/internal/scheduler/monthly_archiver.go new file mode 100644 index 0000000..2302b9c --- /dev/null +++ b/internal/scheduler/monthly_archiver.go @@ -0,0 +1,92 @@ +package scheduler + +import ( + "log" + "time" + + "github.com/DullJZ/s3-balance/internal/storage" +) + +// MonthlyArchiver 月度统计归档器 +type MonthlyArchiver struct { + storage *storage.Service + ticker *time.Ticker + stopChan chan struct{} +} + +// NewMonthlyArchiver 创建月度归档器 +func NewMonthlyArchiver(storage *storage.Service, checkInterval time.Duration) *MonthlyArchiver { + return &MonthlyArchiver{ + storage: storage, + ticker: time.NewTicker(checkInterval), + stopChan: make(chan struct{}), + } +} + +// Start 启动月度归档定期任务 +func (m *MonthlyArchiver) Start() { + log.Println("Starting monthly statistics archiver...") + + // 启动时立即归档上个月的数据(如果还没有归档) + m.archiveLastMonth() + + go func() { + for { + select { + case <-m.ticker.C: + m.checkAndArchive() + case <-m.stopChan: + log.Println("Monthly statistics archiver stopped") + return + } + } + }() +} + +// Stop 停止归档任务 +func (m *MonthlyArchiver) Stop() { + close(m.stopChan) + m.ticker.Stop() +} + +// checkAndArchive 检查并归档统计数据 +func (m *MonthlyArchiver) checkAndArchive() { + now := time.Now() + + // 如果是每月的第一天凌晨,归档上个月的数据 + if now.Day() == 1 && now.Hour() < 1 { + m.archiveLastMonth() + } + + // 每天都归档当前月份(实时更新) + m.archiveCurrentMonth() +} + +// archiveLastMonth 归档上个月的数据 +func (m *MonthlyArchiver) archiveLastMonth() { + now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + year, month := lastMonth.Year(), int(lastMonth.Month()) + + log.Printf("Archiving monthly stats for %d-%02d...", year, month) + + if err := m.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to archive monthly stats for %d-%02d: %v", year, month, err) + return + } + + log.Printf("Successfully archived monthly stats for %d-%02d", year, month) +} + +// archiveCurrentMonth 归档当前月份(实时更新) +func (m *MonthlyArchiver) archiveCurrentMonth() { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + if err := m.storage.ArchiveMonthlyStats(year, month); err != nil { + log.Printf("Failed to update current month stats for %d-%02d: %v", year, month, err) + return + } + + log.Printf("Updated current month stats for %d-%02d", year, month) +} diff --git a/internal/storage/models.go b/internal/storage/models.go index a88837f..25e7549 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -45,6 +45,23 @@ func (BucketStats) TableName() string { return "bucket_stats" } +// BucketMonthlyStats 存储桶月度统计信息模型 +type BucketMonthlyStats struct { + ID uint `gorm:"primaryKey" json:"id"` + BucketName string `gorm:"index:idx_bucket_month;size:255;not null" json:"bucket_name"` + Year int `gorm:"index:idx_bucket_month;not null" json:"year"` + Month int `gorm:"index:idx_bucket_month;not null" json:"month"` + OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"` + OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// TableName 指定表名 +func (BucketMonthlyStats) TableName() string { + return "bucket_monthly_stats" +} + // VirtualBucketMapping 虚拟存储桶文件级映射模型 type VirtualBucketMapping struct { ID uint `gorm:"primaryKey" json:"id"` diff --git a/internal/storage/service.go b/internal/storage/service.go index 9ab629e..0d8a0c1 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -630,3 +630,113 @@ func (s *Service) DeleteVirtualBucketFileMapping(virtualBucketName, objectKey st } return nil } + +// ArchiveMonthlyStats 归档指定月份的统计数据 +// 如果该月份的记录已存在,则更新;否则创建新记录 +func (s *Service) ArchiveMonthlyStats(year, month int) error { + var stats []BucketStats + if err := s.db.Find(&stats).Error; err != nil { + return fmt.Errorf("failed to fetch bucket stats: %w", err) + } + + for _, stat := range stats { + monthlyStats := BucketMonthlyStats{ + BucketName: stat.BucketName, + Year: year, + Month: month, + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + } + + // 使用 UPSERT 逻辑:如果存在则更新,否则创建 + if err := s.db.Where("bucket_name = ? AND year = ? AND month = ?", + stat.BucketName, year, month). + Assign(BucketMonthlyStats{ + OperationCountA: stat.OperationCountA, + OperationCountB: stat.OperationCountB, + }). + FirstOrCreate(&monthlyStats).Error; err != nil { + return fmt.Errorf("failed to archive monthly stats for bucket %s: %w", stat.BucketName, err) + } + } + + return nil +} + +// GetMonthlyStats 获取指定月份的统计数据 +func (s *Service) GetMonthlyStats(year, month int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("year = ? AND month = ?", year, month). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch monthly stats: %w", err) + } + return stats, nil +} + +// GetMonthlyStatsRange 获取指定时间范围的统计数据 +func (s *Service) GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("(year > ? OR (year = ? AND month >= ?)) AND (year < ? OR (year = ? AND month <= ?))", + startYear, startYear, startMonth, endYear, endYear, endMonth). + Order("year, month, bucket_name"). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch monthly stats range: %w", err) + } + return stats, nil +} + +// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算) +func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { + now := time.Now() + year, month := now.Year(), int(now.Month()) + + // 获取上个月的归档数据 + var lastMonthStats []BucketMonthlyStats + lastYear, lastMonth := year, month-1 + if lastMonth == 0 { + lastMonth = 12 + lastYear-- + } + + lastMonthMap := make(map[string]BucketMonthlyStats) + if err := s.db.Where("year = ? AND month = ?", lastYear, lastMonth). + Find(&lastMonthStats).Error; err == nil { + for _, stat := range lastMonthStats { + lastMonthMap[stat.BucketName] = stat + } + } + + // 获取当前累计数据 + var currentStats []BucketStats + if err := s.db.Find(¤tStats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch current bucket stats: %w", err) + } + + // 计算当前月份的增量 + result := make([]BucketMonthlyStats, 0, len(currentStats)) + for _, current := range currentStats { + lastMonth := lastMonthMap[current.BucketName] + result = append(result, BucketMonthlyStats{ + BucketName: current.BucketName, + Year: year, + Month: month, + OperationCountA: current.OperationCountA - lastMonth.OperationCountA, + OperationCountB: current.OperationCountB - lastMonth.OperationCountB, + UpdatedAt: time.Now(), + }) + } + + return result, nil +} + +// GetBucketMonthlyHistory 获取指定存储桶的月度历史统计 +func (s *Service) GetBucketMonthlyHistory(bucketName string, months int) ([]BucketMonthlyStats, error) { + var stats []BucketMonthlyStats + if err := s.db.Where("bucket_name = ?", bucketName). + Order("year DESC, month DESC"). + Limit(months). + Find(&stats).Error; err != nil { + return nil, fmt.Errorf("failed to fetch bucket monthly history: %w", err) + } + return stats, nil +} From a36b6a13f43521d21d45b35f7f30310323815239 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 20:03:25 +0800 Subject: [PATCH 2/9] Fix slash-at-end bug --- VERSION | 2 +- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/VERSION b/VERSION index a1c2c6a..f4caccc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.1.1 \ No newline at end of file +v0.1.2 \ No newline at end of file diff --git a/go.mod b/go.mod index 4d635e6..9790aac 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/DullJZ/s3-balance go 1.24.5 require ( - github.com/DullJZ/s3-validate v0.0.0-20251004111253-b3ec227d3796 + github.com/DullJZ/s3-validate v0.0.0-20251103105435-c25eac6b580b github.com/aws/aws-sdk-go-v2 v1.39.2 github.com/aws/aws-sdk-go-v2/config v1.31.1 github.com/aws/aws-sdk-go-v2/credentials v1.18.5 diff --git a/go.sum b/go.sum index 7895e0d..5909d66 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/DullJZ/s3-validate v0.0.0-20251004111253-b3ec227d3796 h1:0Lipgc3EHF2QOKpCziXApbVocdyzZ/3a52xluuWraXg= -github.com/DullJZ/s3-validate v0.0.0-20251004111253-b3ec227d3796/go.mod h1:OEx+/bRlDdI0oj/Bb1Plsq+1+qU1qal3/g9phixhU6Y= +github.com/DullJZ/s3-validate v0.0.0-20251103105435-c25eac6b580b h1:BHue7N77inSdaDUUZSO/gMmc3+4ZGdQA3ORdcLHnxtg= +github.com/DullJZ/s3-validate v0.0.0-20251103105435-c25eac6b580b/go.mod h1:OEx+/bRlDdI0oj/Bb1Plsq+1+qU1qal3/g9phixhU6Y= github.com/aws/aws-sdk-go-v2 v1.39.2 h1:EJLg8IdbzgeD7xgvZ+I8M1e0fL0ptn/M47lianzth0I= github.com/aws/aws-sdk-go-v2 v1.39.2/go.mod h1:sDioUELIUO9Znk23YVmIk86/9DOpkbyyVb1i/gUNFXY= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.0 h1:6GMWV6CNpA/6fbFHnoAjrv4+LGfyTqZz2LtCHnspgDg= From 4ae2a54da322538d6cb538f1a1dc4f3b9a2ce12f Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 20:33:05 +0800 Subject: [PATCH 3/9] No strict slash for WinSCP --- internal/api/s3_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/s3_handler.go b/internal/api/s3_handler.go index 8a8c7b0..b12ea4a 100644 --- a/internal/api/s3_handler.go +++ b/internal/api/s3_handler.go @@ -76,7 +76,7 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) { // 带认证/虚拟主机的路由 protected := router.NewRoute().PathPrefix("/{bucket}").Subrouter() - protected.StrictSlash(true) + // 注意:不使用 StrictSlash(true) 以避免 301 重定向,兼容WinSCP // Bucket operations protected.HandleFunc("", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE") From dc6916f3df67e73d7fb8ceedaae7311269342757 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:00:35 +0800 Subject: [PATCH 4/9] Fix: Add unique index to prevent duplicate monthly stats records --- internal/storage/models.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/storage/models.go b/internal/storage/models.go index 25e7549..b87eef6 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -48,9 +48,9 @@ func (BucketStats) TableName() string { // BucketMonthlyStats 存储桶月度统计信息模型 type BucketMonthlyStats struct { ID uint `gorm:"primaryKey" json:"id"` - BucketName string `gorm:"index:idx_bucket_month;size:255;not null" json:"bucket_name"` - Year int `gorm:"index:idx_bucket_month;not null" json:"year"` - Month int `gorm:"index:idx_bucket_month;not null" json:"month"` + BucketName string `gorm:"uniqueIndex:idx_bucket_month;size:255;not null" json:"bucket_name"` + Year int `gorm:"uniqueIndex:idx_bucket_month;not null" json:"year"` + Month int `gorm:"uniqueIndex:idx_bucket_month;not null" json:"month"` OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"` OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"` CreatedAt time.Time `json:"created_at"` From 953feb196574217ba06a493fbc80659249de5bb1 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:23:12 +0800 Subject: [PATCH 5/9] Fix: Store monthly increments instead of cumulative snapshots --- internal/storage/service.go | 84 ++++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 19 deletions(-) diff --git a/internal/storage/service.go b/internal/storage/service.go index 0d8a0c1..9eae811 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -631,29 +631,69 @@ func (s *Service) DeleteVirtualBucketFileMapping(virtualBucketName, objectKey st return nil } -// ArchiveMonthlyStats 归档指定月份的统计数据 +// ArchiveMonthlyStats 归档指定月份的统计数据(存储增量值,非累计值) // 如果该月份的记录已存在,则更新;否则创建新记录 func (s *Service) ArchiveMonthlyStats(year, month int) error { - var stats []BucketStats - if err := s.db.Find(&stats).Error; err != nil { + // 获取当前所有bucket的累计统计 + var currentStats []BucketStats + if err := s.db.Find(¤tStats).Error; err != nil { return fmt.Errorf("failed to fetch bucket stats: %w", err) } - for _, stat := range stats { + // 获取上个月的累计值(从上月归档数据推算) + lastYear, lastMonth := year, month-1 + if lastMonth == 0 { + lastMonth = 12 + lastYear-- + } + + // 查询上个月及之前的所有归档数据,用于推算上月末的累计值 + var lastMonthArchived []BucketMonthlyStats + lastMonthMap := make(map[string]int64) // bucket_name -> last_month_cumulative_a + lastMonthMapB := make(map[string]int64) // bucket_name -> last_month_cumulative_b + + if err := s.db.Where("year < ? OR (year = ? AND month <= ?)", lastYear, lastYear, lastMonth). + Order("year ASC, month ASC"). + Find(&lastMonthArchived).Error; err == nil { + + // 累加历史增量得到上月末累计值 + cumulativeA := make(map[string]int64) + cumulativeB := make(map[string]int64) + + for _, archived := range lastMonthArchived { + cumulativeA[archived.BucketName] += archived.OperationCountA + cumulativeB[archived.BucketName] += archived.OperationCountB + } + + lastMonthMap = cumulativeA + lastMonthMapB = cumulativeB + } + + // 对每个bucket,计算本月增量并存储 + for _, stat := range currentStats { + lastCumulativeA := lastMonthMap[stat.BucketName] + lastCumulativeB := lastMonthMapB[stat.BucketName] + + incrementA := stat.OperationCountA - lastCumulativeA + incrementB := stat.OperationCountB - lastCumulativeB + + // 如果是首次运行(没有历史数据),incrementA/B 可能等于累计值 + // 这是预期行为:首月记录的就是从0到当前的增量 + monthlyStats := BucketMonthlyStats{ BucketName: stat.BucketName, Year: year, Month: month, - OperationCountA: stat.OperationCountA, - OperationCountB: stat.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, } // 使用 UPSERT 逻辑:如果存在则更新,否则创建 if err := s.db.Where("bucket_name = ? AND year = ? AND month = ?", stat.BucketName, year, month). Assign(BucketMonthlyStats{ - OperationCountA: stat.OperationCountA, - OperationCountB: stat.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, }). FirstOrCreate(&monthlyStats).Error; err != nil { return fmt.Errorf("failed to archive monthly stats for bucket %s: %w", stat.BucketName, err) @@ -685,24 +725,28 @@ func (s *Service) GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth return stats, nil } -// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算) +// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算增量) func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { now := time.Now() year, month := now.Year(), int(now.Month()) - // 获取上个月的归档数据 - var lastMonthStats []BucketMonthlyStats + // 获取上个月末的累计值(通过累加所有历史增量) lastYear, lastMonth := year, month-1 if lastMonth == 0 { lastMonth = 12 lastYear-- } - lastMonthMap := make(map[string]BucketMonthlyStats) - if err := s.db.Where("year = ? AND month = ?", lastYear, lastMonth). - Find(&lastMonthStats).Error; err == nil { - for _, stat := range lastMonthStats { - lastMonthMap[stat.BucketName] = stat + var historicalStats []BucketMonthlyStats + lastMonthCumulativeA := make(map[string]int64) + lastMonthCumulativeB := make(map[string]int64) + + if err := s.db.Where("year < ? OR (year = ? AND month <= ?)", lastYear, lastYear, lastMonth). + Find(&historicalStats).Error; err == nil { + // 累加所有历史增量得到上月末累计值 + for _, stat := range historicalStats { + lastMonthCumulativeA[stat.BucketName] += stat.OperationCountA + lastMonthCumulativeB[stat.BucketName] += stat.OperationCountB } } @@ -715,13 +759,15 @@ func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { // 计算当前月份的增量 result := make([]BucketMonthlyStats, 0, len(currentStats)) for _, current := range currentStats { - lastMonth := lastMonthMap[current.BucketName] + incrementA := current.OperationCountA - lastMonthCumulativeA[current.BucketName] + incrementB := current.OperationCountB - lastMonthCumulativeB[current.BucketName] + result = append(result, BucketMonthlyStats{ BucketName: current.BucketName, Year: year, Month: month, - OperationCountA: current.OperationCountA - lastMonth.OperationCountA, - OperationCountB: current.OperationCountB - lastMonth.OperationCountB, + OperationCountA: incrementA, + OperationCountB: incrementB, UpdatedAt: time.Now(), }) } From c35ff96397bae30c46197a915c9e7d50368b2caf Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:38:09 +0800 Subject: [PATCH 6/9] Fix: Prevent duplicate archiving with idempotency tracking --- internal/scheduler/monthly_archiver.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/scheduler/monthly_archiver.go b/internal/scheduler/monthly_archiver.go index 2302b9c..82c2698 100644 --- a/internal/scheduler/monthly_archiver.go +++ b/internal/scheduler/monthly_archiver.go @@ -9,9 +9,10 @@ import ( // MonthlyArchiver 月度统计归档器 type MonthlyArchiver struct { - storage *storage.Service - ticker *time.Ticker - stopChan chan struct{} + storage *storage.Service + ticker *time.Ticker + stopChan chan struct{} + lastArchivedDate string // 格式: "2025-01" - 记录上次归档的月份 } // NewMonthlyArchiver 创建月度归档器 @@ -52,10 +53,13 @@ func (m *MonthlyArchiver) Stop() { // checkAndArchive 检查并归档统计数据 func (m *MonthlyArchiver) checkAndArchive() { now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + lastMonthKey := lastMonth.Format("2006-01") - // 如果是每月的第一天凌晨,归档上个月的数据 - if now.Day() == 1 && now.Hour() < 1 { + // 如果是每月的第一天,且上个月还未归档,则归档上个月的数据 + if now.Day() == 1 && m.lastArchivedDate != lastMonthKey { m.archiveLastMonth() + m.lastArchivedDate = lastMonthKey } // 每天都归档当前月份(实时更新) From 86d2bc1f3e4d0ac705cb964e445820305b6643ad Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 22:50:39 +0800 Subject: [PATCH 7/9] Fix: Add boundary checks for negative increment values --- internal/storage/service.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/storage/service.go b/internal/storage/service.go index 9eae811..fcc1b31 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -680,6 +680,14 @@ func (s *Service) ArchiveMonthlyStats(year, month int) error { // 如果是首次运行(没有历史数据),incrementA/B 可能等于累计值 // 这是预期行为:首月记录的就是从0到当前的增量 + // 边界情况:如果计算出负值,说明数据不一致,设置为0 + if incrementA < 0 { + incrementA = 0 + } + if incrementB < 0 { + incrementB = 0 + } + monthlyStats := BucketMonthlyStats{ BucketName: stat.BucketName, Year: year, @@ -762,6 +770,14 @@ func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) { incrementA := current.OperationCountA - lastMonthCumulativeA[current.BucketName] incrementB := current.OperationCountB - lastMonthCumulativeB[current.BucketName] + // 边界情况:如果计算出负值,说明数据不一致,设置为0 + if incrementA < 0 { + incrementA = 0 + } + if incrementB < 0 { + incrementB = 0 + } + result = append(result, BucketMonthlyStats{ BucketName: current.BucketName, Year: year, From de019d56d8f2c05764053e8e89405266b3885660 Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:03:09 +0800 Subject: [PATCH 8/9] Fix: Use transaction for atomic increment and read operation --- internal/storage/service.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/internal/storage/service.go b/internal/storage/service.go index fcc1b31..e7ba263 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -304,18 +304,29 @@ func (s *Service) IncrementBucketOperation(bucketName, category string) (int64, return 0, fmt.Errorf("unknown operation category: %s", category) } - if err := s.db.Model(&BucketStats{}). - Where("bucket_name = ?", bucketName). - UpdateColumn(field, gorm.Expr(field+" + ?", 1)).Error; err != nil { - return 0, fmt.Errorf("failed to increment %s for bucket %s: %w", field, bucketName, err) - } - + // 使用事务确保原子性 var count int64 - if err := s.db.Model(&BucketStats{}). - Where("bucket_name = ?", bucketName). - Select(field). - Scan(&count).Error; err != nil { - return 0, fmt.Errorf("failed to fetch updated %s for bucket %s: %w", field, bucketName, err) + err := s.db.Transaction(func(tx *gorm.DB) error { + // 原子递增 + if err := tx.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + UpdateColumn(field, gorm.Expr(field+" + ?", 1)).Error; err != nil { + return fmt.Errorf("failed to increment %s for bucket %s: %w", field, bucketName, err) + } + + // 在同一事务中读取最新值 + if err := tx.Model(&BucketStats{}). + Where("bucket_name = ?", bucketName). + Select(field). + Scan(&count).Error; err != nil { + return fmt.Errorf("failed to fetch updated %s for bucket %s: %w", field, bucketName, err) + } + + return nil + }) + + if err != nil { + return 0, err } return count, nil From bf5db75d26d4125edaa78ee29939ceb009b5e04a Mon Sep 17 00:00:00 2001 From: DullJZ <79080562+DullJZ@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:44:09 +0800 Subject: [PATCH 9/9] Update config.example.yaml --- config/config.example.yaml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/config/config.example.yaml b/config/config.example.yaml index d305fe7..7680fa2 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -141,11 +141,12 @@ s3api: virtual_host: false # 工作模式: - # false:预签名重定向模式,客户端直接与后端存储交互 - # true (默认):代理模式,数据通过S3 Balance服务器传输 + # false:预签名重定向模式,客户端下载直接重定向到与后端存储 + # true (默认):代理模式,数据通过S3 Balance服务器中转传输 + # 该选项仅适用于下载,上传操作始终为全代理模式 proxy_mode: true - # 是否需要认证(开启后使用 Basic Auth,凭据来自 access_key/secret_key) + # 是否需要认证(使用配置的 access_key/secret_key) auth_required: true # 用于签名验证的Host(可选)