From f388b98943aba2d76c32a3084f420897dc2aeb07 Mon Sep 17 00:00:00 2001 From: Awuqing <3184394176@qq.com> Date: Tue, 31 Mar 2026 13:08:10 +0800 Subject: [PATCH] refactor: single-pass hashing during upload via TeeReader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previous approach read the file twice (once for SHA-256, once for upload), doubling disk I/O. Under concurrent multi-target uploads this becomes a bottleneck. New design — hashingReader wraps io.TeeReader + sha256.Hash: file.Read() → TeeReader → sha256.Write() (hash) + provider (upload) Single read pass yields both byte count and SHA-256 simultaneously. Each upload goroutine independently opens the file and computes its own hash. The first successful target writes checksum to the record via sync.Once. Zero extra disk I/O, zero extra memory copies, fully concurrent-safe. --- server/internal/backup/helpers.go | 17 ------ .../service/backup_execution_service.go | 53 +++++++++++-------- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/server/internal/backup/helpers.go b/server/internal/backup/helpers.go index 5ae055c..c8d7a93 100644 --- a/server/internal/backup/helpers.go +++ b/server/internal/backup/helpers.go @@ -1,10 +1,7 @@ package backup import ( - "crypto/sha256" - "encoding/hex" "fmt" - "io" "os" "path/filepath" "strings" @@ -24,20 +21,6 @@ func createTempArtifact(baseDir, taskName string, extension string) (string, str return tempDir, filepath.Join(tempDir, fileName), nil } -// SHA256File 计算文件的 SHA-256 哈希值,返回十六进制字符串 -func SHA256File(path string) (string, error) { - file, err := os.Open(path) - if err != nil { - return "", fmt.Errorf("open file for checksum: %w", err) - } - defer file.Close() - hash := sha256.New() - if _, err := io.Copy(hash, file); err != nil { - return "", fmt.Errorf("compute checksum: %w", err) - } - return hex.EncodeToString(hash.Sum(nil)), nil -} - func sanitizeFileName(value string) string { builder := strings.Builder{} for _, char := range strings.TrimSpace(value) { diff --git a/server/internal/service/backup_execution_service.go b/server/internal/service/backup_execution_service.go index 296394e..1cf575a 100644 --- a/server/internal/service/backup_execution_service.go +++ b/server/internal/service/backup_execution_service.go @@ -2,8 +2,11 @@ package service import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" + "hash" "io" "os" "path/filepath" @@ -326,17 +329,6 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba fileName = filepath.Base(finalPath) storagePath = backup.BuildStorageKey(task.Type, startedAt, fileName) - // 计算文件 SHA-256 哈希(上传前) - logger.Infof("计算备份文件校验和...") - localChecksum, checksumErr := backup.SHA256File(finalPath) - if checksumErr != nil { - errMessage = checksumErr.Error() - logger.Errorf("计算文件校验和失败:%v", checksumErr) - return - } - checksum = localChecksum - logger.Infof("文件校验和: SHA-256=%s, 大小=%d bytes", checksum, fileSize) - // 收集所有存储目标 targetIDs := collectTargetIDs(task) if len(targetIDs) == 0 { @@ -347,6 +339,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba // 并行上传到所有目标 uploadResults = make([]StorageUploadResultItem, len(targetIDs)) + var checksumOnce sync.Once var wg sync.WaitGroup for i, tid := range targetIDs { wg.Add(1) @@ -371,22 +364,25 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba } defer artifact.Close() logger.Infof("开始上传备份到存储目标:%s", targetName) - // 用 CountingReader 包装,上传过程中统计实际传输字节数(零额外开销) - counter := &countingReader{reader: artifact} - if uploadErr := provider.Upload(ctx, storagePath, counter, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil { + // hashingReader: 上传过程中同步计算字节数 + SHA-256,单次读取零额外 I/O + hr := newHashingReader(artifact) + if uploadErr := provider.Upload(ctx, storagePath, hr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil { uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()} logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr) return } - // 完整性校验:对比实际传输字节数与本地文件大小 - if counter.n != fileSize { - uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, counter.n)} - logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, counter.n) + // 完整性校验:对比实际传输字节数 + if hr.n != fileSize { + uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, hr.n)} + logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, hr.n) _ = provider.Delete(ctx, storagePath) return } + // 取第一个成功目标的哈希写入 record(所有目标读同一文件,哈希一定相同) + targetChecksum := hr.Sum() + checksumOnce.Do(func() { checksum = targetChecksum }) uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "success", StoragePath: storagePath, FileSize: fileSize} - logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, localChecksum) + logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, targetChecksum) // 每个成功目标独立执行保留策略 if s.retention != nil { cleanupResult, cleanupErr := s.retention.Cleanup(ctx, task, provider) @@ -601,14 +597,27 @@ func buildStorageProviderFromRepos(ctx context.Context, storageTargetID uint, st return provider, target, nil } -// countingReader 包装 io.Reader,统计实际读取字节数 -type countingReader struct { +// hashingReader 在上传过程中同步计算字节数和 SHA-256,零额外 I/O +type hashingReader struct { reader io.Reader + hash hash.Hash n int64 } -func (r *countingReader) Read(p []byte) (int, error) { +func newHashingReader(reader io.Reader) *hashingReader { + h := sha256.New() + return &hashingReader{ + reader: io.TeeReader(reader, h), + hash: h, + } +} + +func (r *hashingReader) Read(p []byte) (int, error) { n, err := r.reader.Read(p) r.n += int64(n) return n, err } + +func (r *hashingReader) Sum() string { + return hex.EncodeToString(r.hash.Sum(nil)) +}