mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-28 13:01:22 +08:00
Compare commits
2 Commits
feat/full-
...
fix/upload
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37ad6b1db1 | ||
|
|
d9e0609089 |
@@ -363,17 +363,24 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
|||||||
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
logger.Infof("开始上传备份到存储目标:%s", targetName)
|
||||||
|
// 上传级重试:最多 3 次,指数退避(10s, 30s, 90s)
|
||||||
|
maxAttempts := 3
|
||||||
|
var lastUploadErr error
|
||||||
|
var hr *hashingReader
|
||||||
|
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||||
|
if attempt > 1 {
|
||||||
|
backoff := time.Duration(attempt*attempt) * 10 * time.Second
|
||||||
|
logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v):%v", targetName, attempt, backoff, lastUploadErr)
|
||||||
|
time.Sleep(backoff)
|
||||||
|
}
|
||||||
artifact, openErr := os.Open(finalPath)
|
artifact, openErr := os.Open(finalPath)
|
||||||
if openErr != nil {
|
if openErr != nil {
|
||||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
||||||
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer artifact.Close()
|
hr = newHashingReader(artifact)
|
||||||
logger.Infof("开始上传备份到存储目标:%s", targetName)
|
|
||||||
// hashingReader: 上传过程中同步计算字节数 + SHA-256,单次读取零额外 I/O
|
|
||||||
hr := newHashingReader(artifact)
|
|
||||||
// progressReader: 包装 hashingReader,通过 LogHub 推送实时上传进度
|
|
||||||
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
||||||
percent := float64(0)
|
percent := float64(0)
|
||||||
if fileSize > 0 {
|
if fileSize > 0 {
|
||||||
@@ -387,9 +394,15 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
|||||||
TargetName: targetName,
|
TargetName: targetName,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
if uploadErr := provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
|
lastUploadErr = provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)})
|
||||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
|
artifact.Close()
|
||||||
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
|
if lastUploadErr == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastUploadErr != nil {
|
||||||
|
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()}
|
||||||
|
logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 完整性校验:对比实际传输字节数
|
// 完整性校验:对比实际传输字节数
|
||||||
|
|||||||
Reference in New Issue
Block a user