A&B operation history

This commit is contained in:
DullJZ
2025-10-29 16:58:10 +08:00
parent 529ee8fe55
commit d64ecac37c
6 changed files with 424 additions and 0 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/DullJZ/s3-balance/internal/config"
"github.com/DullJZ/s3-balance/internal/database"
"github.com/DullJZ/s3-balance/internal/metrics"
"github.com/DullJZ/s3-balance/internal/scheduler"
"github.com/DullJZ/s3-balance/internal/storage"
"github.com/DullJZ/s3-balance/pkg/presigner"
"github.com/gorilla/mux"
@@ -80,6 +81,11 @@ func main() {
// 启动定期清理过期上传会话的任务
startSessionCleaner(ctx, storageService)
// 启动月度统计归档任务(每小时检查一次)
monthlyArchiver := scheduler.NewMonthlyArchiver(storageService, 1*time.Hour)
monthlyArchiver.Start()
defer monthlyArchiver.Stop()
// 创建S3兼容API处理器
s3Handler := api.NewS3Handler(
bucketManager,
@@ -124,6 +130,11 @@ func main() {
log.Printf("Metrics server enabled at %s", cfg.Metrics.Path)
}
// 添加统计API端点
statsHandler := api.NewStatsHandler(storageService)
statsHandler.RegisterRoutes(router)
log.Println("Statistics API endpoints registered at /api/stats/*")
// 运行在S3兼容模式
log.Println("Running in S3-compatible mode")
s3Handler.RegisterS3Routes(router)

View File

@@ -0,0 +1,193 @@
package api
import (
"encoding/json"
"log"
"net/http"
"strconv"
"time"
"github.com/DullJZ/s3-balance/internal/storage"
"github.com/gorilla/mux"
)
// StatsHandler 统计数据处理器
type StatsHandler struct {
storage *storage.Service
}
// NewStatsHandler 创建统计处理器
func NewStatsHandler(storage *storage.Service) *StatsHandler {
return &StatsHandler{
storage: storage,
}
}
// RegisterRoutes 注册统计API路由
func (h *StatsHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/stats/monthly", h.GetCurrentMonthStats).Methods("GET")
router.HandleFunc("/api/stats/monthly/{year}/{month}", h.GetMonthlyStats).Methods("GET")
router.HandleFunc("/api/stats/monthly/range", h.GetMonthlyStatsRange).Methods("GET")
router.HandleFunc("/api/stats/bucket/{bucket}/history", h.GetBucketHistory).Methods("GET")
}
// MonthlyStatsResponse 月度统计响应
type MonthlyStatsResponse struct {
Year int `json:"year"`
Month int `json:"month"`
Bucket string `json:"bucket"`
Stats BucketOperationCounts `json:"stats"`
}
// BucketOperationCounts 存储桶操作计数
type BucketOperationCounts struct {
OperationCountA int64 `json:"operation_count_a"`
OperationCountB int64 `json:"operation_count_b"`
Total int64 `json:"total"`
}
// GetCurrentMonthStats 获取当前月份的统计
func (h *StatsHandler) GetCurrentMonthStats(w http.ResponseWriter, r *http.Request) {
stats, err := h.storage.GetCurrentMonthStats()
if err != nil {
log.Printf("Failed to get current month stats: %v", err)
http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError)
return
}
response := h.formatMonthlyStats(stats)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetMonthlyStats 获取指定月份的统计
func (h *StatsHandler) GetMonthlyStats(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
year, err := strconv.Atoi(vars["year"])
if err != nil {
http.Error(w, "Invalid year", http.StatusBadRequest)
return
}
month, err := strconv.Atoi(vars["month"])
if err != nil || month < 1 || month > 12 {
http.Error(w, "Invalid month", http.StatusBadRequest)
return
}
stats, err := h.storage.GetMonthlyStats(year, month)
if err != nil {
log.Printf("Failed to get monthly stats: %v", err)
http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError)
return
}
response := h.formatMonthlyStats(stats)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetMonthlyStatsRange 获取时间范围内的统计
func (h *StatsHandler) GetMonthlyStatsRange(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
startYear, err := strconv.Atoi(query.Get("start_year"))
if err != nil {
http.Error(w, "Invalid start_year", http.StatusBadRequest)
return
}
startMonth, err := strconv.Atoi(query.Get("start_month"))
if err != nil || startMonth < 1 || startMonth > 12 {
http.Error(w, "Invalid start_month", http.StatusBadRequest)
return
}
endYear, err := strconv.Atoi(query.Get("end_year"))
if err != nil {
http.Error(w, "Invalid end_year", http.StatusBadRequest)
return
}
endMonth, err := strconv.Atoi(query.Get("end_month"))
if err != nil || endMonth < 1 || endMonth > 12 {
http.Error(w, "Invalid end_month", http.StatusBadRequest)
return
}
stats, err := h.storage.GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth)
if err != nil {
log.Printf("Failed to get monthly stats range: %v", err)
http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError)
return
}
response := h.formatMonthlyStats(stats)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetBucketHistory 获取指定存储桶的历史统计
func (h *StatsHandler) GetBucketHistory(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
// 获取查询参数中的月份数默认12个月
months := 12
if monthsStr := r.URL.Query().Get("months"); monthsStr != "" {
if m, err := strconv.Atoi(monthsStr); err == nil && m > 0 {
months = m
}
}
stats, err := h.storage.GetBucketMonthlyHistory(bucket, months)
if err != nil {
log.Printf("Failed to get bucket history: %v", err)
http.Error(w, "Failed to fetch statistics", http.StatusInternalServerError)
return
}
response := h.formatMonthlyStats(stats)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// formatMonthlyStats 格式化月度统计数据
func (h *StatsHandler) formatMonthlyStats(stats []storage.BucketMonthlyStats) []MonthlyStatsResponse {
result := make([]MonthlyStatsResponse, 0, len(stats))
for _, stat := range stats {
result = append(result, MonthlyStatsResponse{
Year: stat.Year,
Month: stat.Month,
Bucket: stat.BucketName,
Stats: BucketOperationCounts{
OperationCountA: stat.OperationCountA,
OperationCountB: stat.OperationCountB,
Total: stat.OperationCountA + stat.OperationCountB,
},
})
}
return result
}
// ArchiveCurrentMonth 手动触发归档当前月份管理API
func (h *StatsHandler) ArchiveCurrentMonth(w http.ResponseWriter, r *http.Request) {
now := time.Now()
year, month := now.Year(), int(now.Month())
if err := h.storage.ArchiveMonthlyStats(year, month); err != nil {
log.Printf("Failed to archive monthly stats: %v", err)
http.Error(w, "Failed to archive statistics", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "success",
"message": "Monthly statistics archived successfully",
"year": strconv.Itoa(year),
"month": strconv.Itoa(month),
})
}

View File

@@ -181,6 +181,7 @@ func AutoMigrate() error {
models := []interface{}{
&storage.Object{},
&storage.BucketStats{},
&storage.BucketMonthlyStats{},
&storage.UploadSession{},
&storage.AccessLog{},
&storage.VirtualBucketMapping{},

View File

@@ -0,0 +1,92 @@
package scheduler
import (
"log"
"time"
"github.com/DullJZ/s3-balance/internal/storage"
)
// MonthlyArchiver 月度统计归档器
type MonthlyArchiver struct {
storage *storage.Service
ticker *time.Ticker
stopChan chan struct{}
}
// NewMonthlyArchiver 创建月度归档器
func NewMonthlyArchiver(storage *storage.Service, checkInterval time.Duration) *MonthlyArchiver {
return &MonthlyArchiver{
storage: storage,
ticker: time.NewTicker(checkInterval),
stopChan: make(chan struct{}),
}
}
// Start 启动月度归档定期任务
func (m *MonthlyArchiver) Start() {
log.Println("Starting monthly statistics archiver...")
// 启动时立即归档上个月的数据(如果还没有归档)
m.archiveLastMonth()
go func() {
for {
select {
case <-m.ticker.C:
m.checkAndArchive()
case <-m.stopChan:
log.Println("Monthly statistics archiver stopped")
return
}
}
}()
}
// Stop 停止归档任务
func (m *MonthlyArchiver) Stop() {
close(m.stopChan)
m.ticker.Stop()
}
// checkAndArchive 检查并归档统计数据
func (m *MonthlyArchiver) checkAndArchive() {
now := time.Now()
// 如果是每月的第一天凌晨,归档上个月的数据
if now.Day() == 1 && now.Hour() < 1 {
m.archiveLastMonth()
}
// 每天都归档当前月份(实时更新)
m.archiveCurrentMonth()
}
// archiveLastMonth 归档上个月的数据
func (m *MonthlyArchiver) archiveLastMonth() {
now := time.Now()
lastMonth := now.AddDate(0, -1, 0)
year, month := lastMonth.Year(), int(lastMonth.Month())
log.Printf("Archiving monthly stats for %d-%02d...", year, month)
if err := m.storage.ArchiveMonthlyStats(year, month); err != nil {
log.Printf("Failed to archive monthly stats for %d-%02d: %v", year, month, err)
return
}
log.Printf("Successfully archived monthly stats for %d-%02d", year, month)
}
// archiveCurrentMonth 归档当前月份(实时更新)
func (m *MonthlyArchiver) archiveCurrentMonth() {
now := time.Now()
year, month := now.Year(), int(now.Month())
if err := m.storage.ArchiveMonthlyStats(year, month); err != nil {
log.Printf("Failed to update current month stats for %d-%02d: %v", year, month, err)
return
}
log.Printf("Updated current month stats for %d-%02d", year, month)
}

View File

@@ -45,6 +45,23 @@ func (BucketStats) TableName() string {
return "bucket_stats"
}
// BucketMonthlyStats 存储桶月度统计信息模型
type BucketMonthlyStats struct {
ID uint `gorm:"primaryKey" json:"id"`
BucketName string `gorm:"index:idx_bucket_month;size:255;not null" json:"bucket_name"`
Year int `gorm:"index:idx_bucket_month;not null" json:"year"`
Month int `gorm:"index:idx_bucket_month;not null" json:"month"`
OperationCountA int64 `gorm:"not null;default:0" json:"operation_count_a"`
OperationCountB int64 `gorm:"not null;default:0" json:"operation_count_b"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// TableName 指定表名
func (BucketMonthlyStats) TableName() string {
return "bucket_monthly_stats"
}
// VirtualBucketMapping 虚拟存储桶文件级映射模型
type VirtualBucketMapping struct {
ID uint `gorm:"primaryKey" json:"id"`

View File

@@ -630,3 +630,113 @@ func (s *Service) DeleteVirtualBucketFileMapping(virtualBucketName, objectKey st
}
return nil
}
// ArchiveMonthlyStats 归档指定月份的统计数据
// 如果该月份的记录已存在,则更新;否则创建新记录
func (s *Service) ArchiveMonthlyStats(year, month int) error {
var stats []BucketStats
if err := s.db.Find(&stats).Error; err != nil {
return fmt.Errorf("failed to fetch bucket stats: %w", err)
}
for _, stat := range stats {
monthlyStats := BucketMonthlyStats{
BucketName: stat.BucketName,
Year: year,
Month: month,
OperationCountA: stat.OperationCountA,
OperationCountB: stat.OperationCountB,
}
// 使用 UPSERT 逻辑:如果存在则更新,否则创建
if err := s.db.Where("bucket_name = ? AND year = ? AND month = ?",
stat.BucketName, year, month).
Assign(BucketMonthlyStats{
OperationCountA: stat.OperationCountA,
OperationCountB: stat.OperationCountB,
}).
FirstOrCreate(&monthlyStats).Error; err != nil {
return fmt.Errorf("failed to archive monthly stats for bucket %s: %w", stat.BucketName, err)
}
}
return nil
}
// GetMonthlyStats 获取指定月份的统计数据
func (s *Service) GetMonthlyStats(year, month int) ([]BucketMonthlyStats, error) {
var stats []BucketMonthlyStats
if err := s.db.Where("year = ? AND month = ?", year, month).
Find(&stats).Error; err != nil {
return nil, fmt.Errorf("failed to fetch monthly stats: %w", err)
}
return stats, nil
}
// GetMonthlyStatsRange 获取指定时间范围的统计数据
func (s *Service) GetMonthlyStatsRange(startYear, startMonth, endYear, endMonth int) ([]BucketMonthlyStats, error) {
var stats []BucketMonthlyStats
if err := s.db.Where("(year > ? OR (year = ? AND month >= ?)) AND (year < ? OR (year = ? AND month <= ?))",
startYear, startYear, startMonth, endYear, endYear, endMonth).
Order("year, month, bucket_name").
Find(&stats).Error; err != nil {
return nil, fmt.Errorf("failed to fetch monthly stats range: %w", err)
}
return stats, nil
}
// GetCurrentMonthStats 获取当前月份的实时统计(从 bucket_stats 计算)
func (s *Service) GetCurrentMonthStats() ([]BucketMonthlyStats, error) {
now := time.Now()
year, month := now.Year(), int(now.Month())
// 获取上个月的归档数据
var lastMonthStats []BucketMonthlyStats
lastYear, lastMonth := year, month-1
if lastMonth == 0 {
lastMonth = 12
lastYear--
}
lastMonthMap := make(map[string]BucketMonthlyStats)
if err := s.db.Where("year = ? AND month = ?", lastYear, lastMonth).
Find(&lastMonthStats).Error; err == nil {
for _, stat := range lastMonthStats {
lastMonthMap[stat.BucketName] = stat
}
}
// 获取当前累计数据
var currentStats []BucketStats
if err := s.db.Find(&currentStats).Error; err != nil {
return nil, fmt.Errorf("failed to fetch current bucket stats: %w", err)
}
// 计算当前月份的增量
result := make([]BucketMonthlyStats, 0, len(currentStats))
for _, current := range currentStats {
lastMonth := lastMonthMap[current.BucketName]
result = append(result, BucketMonthlyStats{
BucketName: current.BucketName,
Year: year,
Month: month,
OperationCountA: current.OperationCountA - lastMonth.OperationCountA,
OperationCountB: current.OperationCountB - lastMonth.OperationCountB,
UpdatedAt: time.Now(),
})
}
return result, nil
}
// GetBucketMonthlyHistory 获取指定存储桶的月度历史统计
func (s *Service) GetBucketMonthlyHistory(bucketName string, months int) ([]BucketMonthlyStats, error) {
var stats []BucketMonthlyStats
if err := s.db.Where("bucket_name = ?", bucketName).
Order("year DESC, month DESC").
Limit(months).
Find(&stats).Error; err != nil {
return nil, fmt.Errorf("failed to fetch bucket monthly history: %w", err)
}
return stats, nil
}