mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-28 04:51:26 +08:00
Compare commits
2 Commits
feat/full-
...
fix/upload
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37ad6b1db1 | ||
|
|
d9e0609089 |
@@ -363,33 +363,46 @@ func (s *BackupExecutionService) executeTask(ctx context.Context, task *model.Ba
|
|||||||
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
logger.Warnf("存储目标 %s 创建客户端失败:%v", targetName, resolveErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
artifact, openErr := os.Open(finalPath)
|
|
||||||
if openErr != nil {
|
|
||||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
|
||||||
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer artifact.Close()
|
|
||||||
logger.Infof("开始上传备份到存储目标:%s", targetName)
|
logger.Infof("开始上传备份到存储目标:%s", targetName)
|
||||||
// hashingReader: 上传过程中同步计算字节数 + SHA-256,单次读取零额外 I/O
|
// 上传级重试:最多 3 次,指数退避(10s, 30s, 90s)
|
||||||
hr := newHashingReader(artifact)
|
maxAttempts := 3
|
||||||
// progressReader: 包装 hashingReader,通过 LogHub 推送实时上传进度
|
var lastUploadErr error
|
||||||
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
var hr *hashingReader
|
||||||
percent := float64(0)
|
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||||
if fileSize > 0 {
|
if attempt > 1 {
|
||||||
percent = float64(bytesRead) / float64(fileSize) * 100
|
backoff := time.Duration(attempt*attempt) * 10 * time.Second
|
||||||
|
logger.Warnf("存储目标 %s 第 %d 次重试(等待 %v):%v", targetName, attempt, backoff, lastUploadErr)
|
||||||
|
time.Sleep(backoff)
|
||||||
}
|
}
|
||||||
s.logHub.AppendProgress(recordID, backup.ProgressInfo{
|
artifact, openErr := os.Open(finalPath)
|
||||||
BytesSent: bytesRead,
|
if openErr != nil {
|
||||||
TotalBytes: fileSize,
|
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: openErr.Error()}
|
||||||
Percent: percent,
|
logger.Warnf("存储目标 %s 打开备份文件失败:%v", targetName, openErr)
|
||||||
SpeedBps: speedBps,
|
return
|
||||||
TargetName: targetName,
|
}
|
||||||
|
hr = newHashingReader(artifact)
|
||||||
|
pr := newProgressReader(hr, fileSize, func(bytesRead int64, speedBps float64) {
|
||||||
|
percent := float64(0)
|
||||||
|
if fileSize > 0 {
|
||||||
|
percent = float64(bytesRead) / float64(fileSize) * 100
|
||||||
|
}
|
||||||
|
s.logHub.AppendProgress(recordID, backup.ProgressInfo{
|
||||||
|
BytesSent: bytesRead,
|
||||||
|
TotalBytes: fileSize,
|
||||||
|
Percent: percent,
|
||||||
|
SpeedBps: speedBps,
|
||||||
|
TargetName: targetName,
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
lastUploadErr = provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)})
|
||||||
if uploadErr := provider.Upload(ctx, storagePath, pr, fileSize, map[string]string{"taskId": fmt.Sprintf("%d", task.ID), "recordId": fmt.Sprintf("%d", recordID)}); uploadErr != nil {
|
artifact.Close()
|
||||||
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: uploadErr.Error()}
|
if lastUploadErr == nil {
|
||||||
logger.Warnf("存储目标 %s 上传失败:%v", targetName, uploadErr)
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastUploadErr != nil {
|
||||||
|
uploadResults[index] = StorageUploadResultItem{StorageTargetID: targetID, StorageTargetName: targetName, Status: "failed", Error: lastUploadErr.Error()}
|
||||||
|
logger.Warnf("存储目标 %s 上传失败(已重试 %d 次):%v", targetName, maxAttempts, lastUploadErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 完整性校验:对比实际传输字节数
|
// 完整性校验:对比实际传输字节数
|
||||||
|
|||||||
Reference in New Issue
Block a user