mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-05-06 20:02:41 +08:00
修复: 上传操作级重试,解决 Google Drive 等远端临时故障导致自动备份连续失败
问题:rclone 底层重试只覆盖单个 HTTP 请求,但 Google API 的 502/timeout 等临时故障会导致整个上传操作失败,自动触发的备份任务连续失败。 修复:在 provider.Upload 外层增加操作级重试(最多 3 次,指数退避 10s/40s/90s), 每次重试重新打开文件并重建 reader 链。重试过程通过日志流实时反馈。
This commit is contained in:
@@ -363,33 +363,46 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
||||
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
||||
return
|
||||
}
|
||||
artifact, openErr := os.Open(finalPath)
|
||||
if openErr != nil {
|
||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
||||
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
||||
return
|
||||
}
|
||||
defer artifact.Close()
|
||||
logger.Infof("开始上传备份到存储目标:%s", targetName)
|
||||
// hashingReader: 上传过程中同步计算字节数 + SHA-256,单次读取零额外 I/O
|
||||
hr := newHashingReader(artifact)
|
||||
// progressReader: 包装 hashingReader,通过 LogHub 推送实时上传进度
|
||||
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
||||
percent := float64(0)
|
||||
if fileSize > 0 {
|
||||
percent = float64(bytesRead) / float64(fileSize) * 100
|
||||
// 上传级重试:最多 3 次,指数退避(10s, 30s, 90s)
|
||||
maxAttempts := 3
|
||||
var lastUploadErr error
|
||||
var hr *hashingReader
|
||||
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||
if attempt > 1 {
|
||||
backoff := time.Duration(attempt*attempt) * 10 * time.Second
|
||||
logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v):%v", targetName, attempt, backoff, lastUploadErr)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
s.logHub.AppendProgress(recordID, backup.ProgressInfo{
|
||||
BytesSent: bytesRead,
|
||||
TotalBytes: fileSize,
|
||||
Percent: percent,
|
||||
SpeedBps: speedBps,
|
||||
TargetName: targetName,
|
||||
artifact, openErr := os.Open(finalPath)
|
||||
if openErr != nil {
|
||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
||||
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
||||
return
|
||||
}
|
||||
hr = newHashingReader(artifact)
|
||||
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
||||
percent := float64(0)
|
||||
if fileSize > 0 {
|
||||
percent = float64(bytesRead) / float64(fileSize) * 100
|
||||
}
|
||||
s.logHub.AppendProgress(recordID, backup.ProgressInfo{
|
||||
BytesSent: bytesRead,
|
||||
TotalBytes: fileSize,
|
||||
Percent: percent,
|
||||
SpeedBps: speedBps,
|
||||
TargetName: targetName,
|
||||
})
|
||||
})
|
||||
})
|
||||
if uploadErr := provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
|
||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
|
||||
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
|
||||
lastUploadErr = provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)})
|
||||
artifact.Close()
|
||||
if lastUploadErr == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if lastUploadErr != nil {
|
||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()}
|
||||
logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr)
|
||||
return
|
||||
}
|
||||
// 完整性校验:对比实际传输字节数
|
||||
|
||||
Reference in New Issue
Block a user