refactor: use CountingReader for upload integrity instead of List API

List()-based size check depends on the storage backend returning accurate
file sizes, which is not guaranteed (some WebDAV/Google Drive impls may
return 0 or omit the size field).

New approach: wrap the upload io.Reader with a CountingReader that counts
bytes as they flow through during upload. After upload completes, compare
counter.n against the expected fileSize. This is:
- Zero extra network calls (no List, no Download)
- Zero extra CPU/memory overhead (just an int64 increment per Read)
- Storage-backend agnostic (works with any provider)

If bytes transmitted != expected size → mark failed + auto-delete remote.
This commit is contained in:
Awuqing
2026-03-31 12:40:12 +08:00
parent e5a4aaadb2
commit 7568d8a2a2

View File

@@ -371,39 +371,22 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
}
defer artifact.Close()
logger.Infof("开始上传备份到存储目标:%s", targetName)
if uploadErr := provider.Upload(ctx, storagePath, artifact, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
// 用 CountingReader 包装,上传过程中统计实际传输字节数(零额外开销)
counter := &countingReader{reader: artifact}
if uploadErr := provider.Upload(ctx, storagePath, counter, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
return
}
// 上传后轻量级完整性校验:通过 List 检查远端文件大小
remoteObjects, listErr := provider.List(ctx, storagePath)
if listErr != nil {
// List 失败不阻断,仅警告(文件可能已上传成功)
logger.Warnf("存储目标 %s 上传后大小校验跳过List 失败):%v", targetName, listErr)
} else {
remoteSize := int64(0)
for _, obj := range remoteObjects {
if obj.Key == storagePath {
remoteSize = obj.Size
break
}
}
if remoteSize == 0 {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: "完整性校验失败: 远端文件大小为 0上传损坏"}
logger.Errorf("存储目标 %s 完整性校验失败:远端文件大小为 0", targetName)
_ = provider.Delete(ctx, storagePath)
return
}
if remoteSize != fileSize {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 本地=%d bytes, 远端=%d bytes", fileSize, remoteSize)}
logger.Errorf("存储目标 %s 完整性校验失败:本地 %d bytes, 远端 %d bytes", targetName, fileSize, remoteSize)
_ = provider.Delete(ctx, storagePath)
return
}
// 完整性校验:对比实际传输字节数与本地文件大小
if counter.n != fileSize {
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: fmt.Sprintf("完整性校验失败: 预期 %d bytes, 实际传输 %d bytes", fileSize, counter.n)}
logger.Errorf("存储目标 %s 完整性校验失败:预期 %d bytes, 实际传输 %d bytes", targetName, fileSize, counter.n)
_ = provider.Delete(ctx, storagePath)
return
}
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "success", StoragePath: storagePath, FileSize: fileSize}
logger.Infof("存储目标 %s 上传成功,大小校验通过 (%d bytes, SHA-256=%s)", targetName, fileSize, localChecksum)
logger.Infof("存储目标 %s 上传成功 (%d bytes, SHA-256=%s)", targetName, fileSize, localChecksum)
// 每个成功目标独立执行保留策略
if s.retention != nil {
cleanupResult, cleanupErr := s.retention.Cleanup(ctx, task, provider)
@@ -617,3 +600,15 @@ func buildStorageProviderFromRepos(ctx context.Context, storageTargetID uint, st
}
return provider, target, nil
}
// countingReader 包装 io.Reader统计实际读取字节数
type countingReader struct {
reader io.Reader
n int64
}
func (r *countingReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.n += int64(n)
return n, err
}