mirror of
https://github.com/krau/SaveAny-Bot.git
synced 2026-05-11 16:59:40 +08:00
refactor: update storage interface to use io.Reader for Save method and remove stream implementations
This commit is contained in:
@@ -3,6 +3,7 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
"github.com/krau/SaveAny-Bot/types"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func processPendingTask(task *types.Task) error {
|
||||
@@ -40,10 +42,6 @@ func processPendingTask(task *types.Task) error {
|
||||
}
|
||||
task.StoragePath = taskStorage.JoinStoragePath(*task)
|
||||
|
||||
if task.File.FileSize == 0 {
|
||||
return processPhoto(task, taskStorage, cacheDestPath)
|
||||
}
|
||||
|
||||
ctx, ok := task.Ctx.(*ext.Context)
|
||||
if !ok {
|
||||
return fmt.Errorf("context is not *ext.Context: %T", task.Ctx)
|
||||
@@ -52,38 +50,47 @@ func processPendingTask(task *types.Task) error {
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
task.Cancel = cancel
|
||||
|
||||
downloadBuider := Downloader.Download(bot.Client.API(), task.File.Location).WithThreads(getTaskThreads(task.File.FileSize))
|
||||
if task.File.FileSize == 0 {
|
||||
return processPhoto(task, taskStorage)
|
||||
}
|
||||
|
||||
downloadBuilder := Downloader.Download(bot.Client.API(), task.File.Location).WithThreads(getTaskThreads(task.File.FileSize))
|
||||
|
||||
taskStreamStorage, isStreamStorage := taskStorage.(storage.StreamStorage)
|
||||
if config.Cfg.Stream {
|
||||
if !isStreamStorage {
|
||||
common.Log.Warnf("存储 %s 不支持流式上传", taskStorage.Name())
|
||||
} else {
|
||||
text, entities := buildProgressMessageEntity(task, 0, task.StartTime, 0)
|
||||
ctx.EditMessage(task.ReplyChatID, &tg.MessagesEditMessageRequest{
|
||||
Message: text,
|
||||
Entities: entities,
|
||||
ID: task.ReplyMessageID,
|
||||
ReplyMarkup: getCancelTaskMarkup(task),
|
||||
})
|
||||
uploadStream, err := taskStreamStorage.NewUploadStream(cancelCtx, task.StoragePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建上传流失败: %w", err)
|
||||
|
||||
text, entities := buildProgressMessageEntity(task, 0, task.StartTime, 0)
|
||||
ctx.EditMessage(task.ReplyChatID, &tg.MessagesEditMessageRequest{
|
||||
Message: text,
|
||||
Entities: entities,
|
||||
ID: task.ReplyMessageID,
|
||||
ReplyMarkup: getCancelTaskMarkup(task),
|
||||
})
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
defer pr.Close()
|
||||
|
||||
task.StartTime = time.Now()
|
||||
progressCallback := buildProgressCallback(ctx, task, getProgressUpdateCount(task.File.FileSize))
|
||||
|
||||
progressStream := NewProgressStream(pw, task.File.FileSize, progressCallback)
|
||||
|
||||
eg, uploadCtx := errgroup.WithContext(cancelCtx)
|
||||
|
||||
eg.Go(func() error {
|
||||
return taskStorage.Save(uploadCtx, pr, task.StoragePath)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
_, err := downloadBuilder.Stream(uploadCtx, progressStream)
|
||||
if closeErr := pw.CloseWithError(err); closeErr != nil {
|
||||
common.Log.Errorf("Failed to close pipe writer: %v", closeErr)
|
||||
}
|
||||
defer uploadStream.Close()
|
||||
|
||||
task.StartTime = time.Now()
|
||||
progressCallback := buildProgressCallback(ctx, task, getProgressUpdateCount(task.File.FileSize))
|
||||
|
||||
progressStream := NewProgressStream(uploadStream, task.File.FileSize, progressCallback)
|
||||
|
||||
_, err = downloadBuider.Stream(cancelCtx, progressStream)
|
||||
if err != nil {
|
||||
return fmt.Errorf("下载文件失败: %w", err)
|
||||
}
|
||||
common.Log.Infof("Uploaded file: %s", task.StoragePath)
|
||||
return nil
|
||||
return err
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
text, entities := buildProgressMessageEntity(task, 0, task.StartTime, 0)
|
||||
@@ -101,7 +108,7 @@ func processPendingTask(task *types.Task) error {
|
||||
}
|
||||
defer dest.Close()
|
||||
task.StartTime = time.Now()
|
||||
_, err = downloadBuider.Parallel(cancelCtx, dest)
|
||||
_, err = downloadBuilder.Parallel(cancelCtx, dest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("下载文件失败: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -20,12 +21,16 @@ import (
|
||||
"github.com/krau/SaveAny-Bot/types"
|
||||
)
|
||||
|
||||
func saveFileWithRetry(ctx context.Context, task *types.Task, taskStorage storage.Storage, localFilePath string) error {
|
||||
func saveFileWithRetry(ctx context.Context, task *types.Task, taskStorage storage.Storage, cacheFilePath string) error {
|
||||
for i := 0; i <= config.Cfg.Retry; i++ {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return fmt.Errorf("context canceled while saving file: %w", err)
|
||||
}
|
||||
if err := taskStorage.Save(ctx, localFilePath, task.StoragePath); err != nil {
|
||||
file, err := os.Open(cacheFilePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open cache file: %w", err)
|
||||
}
|
||||
if err := taskStorage.Save(ctx, file, task.StoragePath); err != nil {
|
||||
if i == config.Cfg.Retry {
|
||||
return fmt.Errorf("failed to save file: %w", err)
|
||||
}
|
||||
@@ -42,7 +47,7 @@ func saveFileWithRetry(ctx context.Context, task *types.Task, taskStorage storag
|
||||
return nil
|
||||
}
|
||||
|
||||
func processPhoto(task *types.Task, taskStorage storage.Storage, cachePath string) error {
|
||||
func processPhoto(task *types.Task, taskStorage storage.Storage) error {
|
||||
res, err := bot.Client.API().UploadGetFile(task.Ctx, &tg.UploadGetFileRequest{
|
||||
Location: task.File.Location,
|
||||
Offset: 0,
|
||||
@@ -57,15 +62,9 @@ func processPhoto(task *types.Task, taskStorage storage.Storage, cachePath strin
|
||||
return fmt.Errorf("unexpected type %T", res)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(cachePath, result.Bytes, os.ModePerm); err != nil {
|
||||
return fmt.Errorf("failed to write file: %w", err)
|
||||
}
|
||||
common.Log.Infof("Downloaded photo: %s", task.FileName())
|
||||
|
||||
defer cleanCacheFile(cachePath)
|
||||
|
||||
common.Log.Infof("Downloaded file: %s", cachePath)
|
||||
|
||||
return saveFileWithRetry(task.Ctx, task, taskStorage, cachePath)
|
||||
return taskStorage.Save(task.Ctx, bytes.NewReader(result.Bytes), task.StoragePath)
|
||||
}
|
||||
|
||||
func cleanCacheFile(destPath string) {
|
||||
|
||||
Reference in New Issue
Block a user