Merge pull request #21 from Awuqing/feat/community-enhancements

feat: community enhancements, CI/CD pipeline, and backup integrity verification
This commit is contained in:
Wu Qing
2026-03-31 13:23:11 +08:00
committed by GitHub
6 changed files with 61 additions and 5 deletions

View File

@@ -17,6 +17,7 @@ type BackupRecord struct {
Status string `gorm:"size:20;index;not null" json:"status"`
FileName string `gorm:"column:file_name;size:255" json:"fileName"`
FileSize int64 `gorm:"column:file_size;not null;default:0" json:"fileSize"`
Checksum string `gorm:"column:checksum;size:64" json:"checksum"`
StoragePath string `gorm:"column:storage_path;size:500" json:"storagePath"`
StorageUploadResults string `gorm:"column:storage_upload_results;type:text" json:"-"`
DurationSeconds int `gorm:"column:duration_seconds;not null;default:0" json:"durationSeconds"`

View File

@@ -2,8 +2,11 @@ package service
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
"os"
"path/filepath"
@@ -253,10 +256,11 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
errMessage := ""
var fileName string
var fileSize int64
var checksum string
var storagePath string
var uploadResults []StorageUploadResultItem
completeRecord := func() {
if finalizeErr := s.finalizeRecord(ctx, task, recordID, startedAt, status, errMessage, logger.String(), fileName, fileSize, storagePath); finalizeErr != nil {
if finalizeErr := s.finalizeRecord(ctx, task, recordID, startedAt, status, errMessage, logger.String(), fileName, fileSize, checksum, storagePath); finalizeErr != nil {
logger.Errorf("写回备份记录失败:%v", finalizeErr)
}
// 写入多目标上传结果
@@ -335,6 +339,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
// 并行上传到所有目标
uploadResults = make([]StorageUploadResultItem, len(targetIDs))
var checksumOnce sync.Once
var wg sync.WaitGroup
for i, tid := range targetIDs {
wg.Add(1)
@@ -359,13 +364,25 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
defer artifact.Close()
logger.Infof("开始上传备份到存储目标:%s", targetName)
if uploadErr := provider.Upload(ctx, storagePath, artifact, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
// hashingReader: 上传过程中同步计算字节数 + SHA-256单次读取零额外 I/O
hr := newHashingReader(artifact)
if uploadErr := provider.Upload(ctx, storagePath, hr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
return
}
// 完整性校验:对比实际传输字节数
if hr.n != fileSize {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, hr.n)}
logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, hr.n)
_ = provider.Delete(ctx, storagePath)
return
}
// 取第一个成功目标的哈希写入 record所有目标读同一文件哈希一定相同
targetChecksum := hr.Sum()
checksumOnce.Do(func() { checksum = targetChecksum })
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "success", StoragePath: storagePath, FileSize: fileSize}
logger.Infof("存储目标 %s 上传成功", targetName)
logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, targetChecksum)
// 每个成功目标独立执行保留策略
if s.retention != nil {
cleanupResult, cleanupErr := s.retention.Cleanup(ctx, task, provider)
@@ -403,7 +420,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
}
func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model.BackupTask, recordID uint, startedAt time.Time, status string, errorMessage string, logContent string, fileName string, fileSize int64, storagePath string) error {
func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model.BackupTask, recordID uint, startedAt time.Time, status string, errorMessage string, logContent string, fileName string, fileSize int64, checksum string, storagePath string) error {
record, err := s.records.FindByID(ctx, recordID)
if err != nil {
return err
@@ -415,6 +432,7 @@ func (s *BackupExecutionService) finalizeRecord(ctx context.Context, task *model
record.Status = status
record.FileName = fileName
record.FileSize = fileSize
record.Checksum = checksum
record.StoragePath = storagePath
record.DurationSeconds = int(completedAt.Sub(startedAt).Seconds())
record.ErrorMessage = strings.TrimSpace(errorMessage)
@@ -578,3 +596,28 @@ func buildStorageProviderFromRepos(ctx context.Context, storageTargetID uint, st
}
return provider, target, nil
}
// hashingReader 在上传过程中同步计算字节数和 SHA-256零额外 I/O
type hashingReader struct {
reader io.Reader
hash hash.Hash
n int64
}
func newHashingReader(reader io.Reader) *hashingReader {
h := sha256.New()
return &hashingReader{
reader: io.TeeReader(reader, h),
hash: h,
}
}
func (r *hashingReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.n += int64(n)
return n, err
}
func (r *hashingReader) Sum() string {
return hex.EncodeToString(r.hash.Sum(nil))
}

View File

@@ -55,7 +55,11 @@ func newExecutionTestServices(t *testing.T) (*BackupExecutionService, *BackupRec
runnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewMySQLRunner(nil), backup.NewSQLiteRunner(), backup.NewPostgreSQLRunner(nil))
storageRegistry := storage.NewRegistry(localdisk.NewFactory())
retentionService := backupretention.NewService(records)
executionService := NewBackupExecutionService(tasks, records, targets, storageRegistry, runnerRegistry, logHub, retentionService, cipher, nil, "", 2)
tempDir := filepath.Join(baseDir, "tmp")
if err := os.MkdirAll(tempDir, 0o755); err != nil {
t.Fatalf("MkdirAll tempDir returned error: %v", err)
}
executionService := NewBackupExecutionService(tasks, records, targets, storageRegistry, runnerRegistry, logHub, retentionService, cipher, nil, tempDir, 2)
recordService := NewBackupRecordService(records, executionService, logHub)
return executionService, recordService, tasks, targets, records, sourceDir, storageDir
}

View File

@@ -30,6 +30,7 @@ type BackupRecordSummary struct {
Status string `json:"status"`
FileName string `json:"fileName"`
FileSize int64 `json:"fileSize"`
Checksum string `json:"checksum"`
StoragePath string `json:"storagePath"`
DurationSeconds int `json:"durationSeconds"`
ErrorMessage string `json:"errorMessage"`
@@ -111,6 +112,7 @@ func toBackupRecordSummary(item *model.BackupRecord) BackupRecordSummary {
Status: item.Status,
FileName: item.FileName,
FileSize: item.FileSize,
Checksum: item.Checksum,
StoragePath: item.StoragePath,
DurationSeconds: item.DurationSeconds,
ErrorMessage: item.ErrorMessage,

View File

@@ -98,6 +98,11 @@ export function BackupRecordsPage() {
<Space direction="vertical" size={2}>
<Typography.Text>{record.fileName || '-'}</Typography.Text>
<Typography.Text type="secondary">{formatBytes(record.fileSize)}</Typography.Text>
{record.checksum && (
<Typography.Text type="secondary" copyable style={{ fontSize: 11 }}>
SHA-256: {record.checksum.substring(0, 16)}...
</Typography.Text>
)}
</Space>
),
},

View File

@@ -19,6 +19,7 @@ export interface BackupRecordSummary {
status: BackupRecordStatus
fileName: string
fileSize: number
checksum: string
storagePath: string
durationSeconds: number
errorMessage: string