refactor: single-pass hashing during upload via TeeReader

Previous approach read the file twice (once for SHA-256, once for upload),
doubling disk I/O. Under concurrent multi-target uploads this becomes a
bottleneck.

New design — hashingReader wraps io.TeeReader + sha256.Hash:
  file.Read() → TeeReader → sha256.Write() (hash) + provider (upload)
Single read pass yields both byte count and SHA-256 simultaneously.

Each upload goroutine independently opens the file and computes its own
hash. The first successful target writes checksum to the record via
sync.Once. Zero extra disk I/O, zero extra memory copies, fully
concurrent-safe.
This commit is contained in:
Awuqing
2026-03-31 13:08:10 +08:00
parent 7631cca01d
commit f388b98943
2 changed files with 31 additions and 39 deletions

View File

@@ -1,10 +1,7 @@
package backup
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"strings"
@@ -24,20 +21,6 @@ func createTempArtifact(baseDir, taskName string, extension string) (string, str
return tempDir, filepath.Join(tempDir, fileName), nil
}
// SHA256File 计算文件的 SHA-256 哈希值,返回十六进制字符串
func SHA256File(path string) (string, error) {
file, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("open file for checksum: %w", err)
}
defer file.Close()
hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {
return "", fmt.Errorf("compute checksum: %w", err)
}
return hex.EncodeToString(hash.Sum(nil)), nil
}
func sanitizeFileName(value string) string {
builder := strings.Builder{}
for _, char := range strings.TrimSpace(value) {

View File

@@ -2,8 +2,11 @@ package service
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
"os"
"path/filepath"
@@ -326,17 +329,6 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
fileName = filepath.Base(finalPath)
storagePath = backup.BuildStorageKey(task.Type, startedAt, fileName)
// 计算文件 SHA-256 哈希(上传前)
logger.Infof("计算备份文件校验和...")
localChecksum, checksumErr := backup.SHA256File(finalPath)
if checksumErr != nil {
errMessage = checksumErr.Error()
logger.Errorf("计算文件校验和失败:%v", checksumErr)
return
}
checksum = localChecksum
logger.Infof("文件校验和: SHA-256=%s, 大小=%d bytes", checksum, fileSize)
// 收集所有存储目标
targetIDs := collectTargetIDs(task)
if len(targetIDs) == 0 {
@@ -347,6 +339,7 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
// 并行上传到所有目标
uploadResults = make([]StorageUploadResultItem, len(targetIDs))
var checksumOnce sync.Once
var wg sync.WaitGroup
for i, tid := range targetIDs {
wg.Add(1)
@@ -371,22 +364,25 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
defer artifact.Close()
logger.Infof("开始上传备份到存储目标:%s", targetName)
// 用 CountingReader 包装,上传过程中统计实际传输字节数(零额外开销)
counter := &countingReader{reader: artifact}
if uploadErr := provider.Upload(ctx, storagePath, counter, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
// hashingReader: 上传过程中同步计算字节数 + SHA-256单次读取零额外 I/O
hr := newHashingReader(artifact)
if uploadErr := provider.Upload(ctx, storagePath, hr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
return
}
// 完整性校验:对比实际传输字节数与本地文件大小
if counter.n != fileSize {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, counter.n)}
logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, counter.n)
// 完整性校验:对比实际传输字节数
if hr.n != fileSize {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, hr.n)}
logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, hr.n)
_ = provider.Delete(ctx, storagePath)
return
}
// 取第一个成功目标的哈希写入 record所有目标读同一文件哈希一定相同
targetChecksum := hr.Sum()
checksumOnce.Do(func() { checksum = targetChecksum })
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "success", StoragePath: storagePath, FileSize: fileSize}
logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, localChecksum)
logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, targetChecksum)
// 每个成功目标独立执行保留策略
if s.retention != nil {
cleanupResult, cleanupErr := s.retention.Cleanup(ctx, task, provider)
@@ -601,14 +597,27 @@ func buildStorageProviderFromRepos(ctx context.Context, storageTargetID uint, st
return provider, target, nil
}
// countingReader 包装 io.Reader统计实际读取字节数
type countingReader struct {
// hashingReader 在上传过程中同步计算字节数和 SHA-256零额外 I/O
type hashingReader struct {
reader io.Reader
hash hash.Hash
n int64
}
func (r *countingReader) Read(p []byte) (int, error) {
func newHashingReader(reader io.Reader) *hashingReader {
h := sha256.New()
return &hashingReader{
reader: io.TeeReader(reader, h),
hash: h,
}
}
func (r *hashingReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.n += int64(n)
return n, err
}
func (r *hashingReader) Sum() string {
return hex.EncodeToString(r.hash.Sum(nil))
}