feat: add support for handling unsupported stream storage in download process

This commit is contained in:
krau
2025-03-26 10:35:40 +08:00
parent 32519b8c08
commit 491ba55f1e
3 changed files with 46 additions and 28 deletions

View File

@@ -59,41 +59,50 @@ func processPendingTask(task *types.Task) error {
downloadBuilder := Downloader.Download(bot.Client.API(), task.File.Location).WithThreads(getTaskThreads(task.File.FileSize))
notsupportStreamStorage, notsupportStream := taskStorage.(storage.StorageNotSupportStream)
cancelMarkUp := getCancelTaskMarkup(task)
if config.Cfg.Stream {
if !notsupportStream {
text, entities := buildProgressMessageEntity(task, 0, task.StartTime, 0)
ctx.EditMessage(task.ReplyChatID, &tg.MessagesEditMessageRequest{
Message: text,
Entities: entities,
ID: task.ReplyMessageID,
ReplyMarkup: cancelMarkUp,
})
text, entities := buildProgressMessageEntity(task, 0, task.StartTime, 0)
ctx.EditMessage(task.ReplyChatID, &tg.MessagesEditMessageRequest{
Message: text,
Entities: entities,
ID: task.ReplyMessageID,
ReplyMarkup: getCancelTaskMarkup(task),
})
pr, pw := io.Pipe()
defer pr.Close()
pr, pw := io.Pipe()
defer pr.Close()
task.StartTime = time.Now()
progressCallback := buildProgressCallback(ctx, task, getProgressUpdateCount(task.File.FileSize))
task.StartTime = time.Now()
progressCallback := buildProgressCallback(ctx, task, getProgressUpdateCount(task.File.FileSize))
progressStream := NewProgressStream(pw, task.File.FileSize, progressCallback)
progressStream := NewProgressStream(pw, task.File.FileSize, progressCallback)
eg, uploadCtx := errgroup.WithContext(cancelCtx)
eg, uploadCtx := errgroup.WithContext(cancelCtx)
eg.Go(func() error {
return taskStorage.Save(uploadCtx, pr, task.StoragePath)
})
eg.Go(func() error {
_, err := downloadBuilder.Stream(uploadCtx, progressStream)
if closeErr := pw.CloseWithError(err); closeErr != nil {
common.Log.Errorf("Failed to close pipe writer: %v", closeErr)
eg.Go(func() error {
return taskStorage.Save(uploadCtx, pr, task.StoragePath)
})
eg.Go(func() error {
_, err := downloadBuilder.Stream(uploadCtx, progressStream)
if closeErr := pw.CloseWithError(err); closeErr != nil {
common.Log.Errorf("Failed to close pipe writer: %v", closeErr)
}
return err
})
if err := eg.Wait(); err != nil {
return err
}
return err
})
if err := eg.Wait(); err != nil {
return err
}
return nil
return nil
}
common.Log.Warnf("存储 %s 不支持流式传输: %s", task.StorageName, notsupportStreamStorage.NotSupportStream())
ctx.EditMessage(task.ReplyChatID, &tg.MessagesEditMessageRequest{
Message: fmt.Sprintf("存储 %s 不支持流式传输: %s\n正在使用普通下载...", task.StorageName, notsupportStreamStorage.NotSupportStream()),
ID: task.ReplyMessageID,
ReplyMarkup: cancelMarkUp,
})
}
cacheDestPath := filepath.Join(config.Cfg.Temp.BasePath, task.FileName())
@@ -110,7 +119,7 @@ func processPendingTask(task *types.Task) error {
Message: text,
Entities: entities,
ID: task.ReplyMessageID,
ReplyMarkup: getCancelTaskMarkup(task),
ReplyMarkup: cancelMarkUp,
})
progressCallback := buildProgressCallback(ctx, task, getProgressUpdateCount(task.File.FileSize))

View File

@@ -140,6 +140,10 @@ func (a *Alist) Save(ctx context.Context, reader io.Reader, storagePath string)
return nil
}
func (a *Alist) NotSupportStream() string {
return "Alist does not support chunked transfer encoding"
}
func (a *Alist) JoinStoragePath(task types.Task) string {
return path.Join(a.config.BasePath, task.StoragePath)
}

View File

@@ -23,6 +23,11 @@ type Storage interface {
Save(ctx context.Context, reader io.Reader, storagePath string) error
}
type StorageNotSupportStream interface {
Storage
NotSupportStream() string
}
var Storages = make(map[string]Storage)
var UserStorages = make(map[int64][]Storage)