Files
SaveAny-Bot/core/tasks/tfile/stream.go
Krau 302db2fe75 feat: parse url with js plugins support (#96)
* feat: WIP. add parser functionality and text message handling

* fix: use json to marshal js result

* feat: add metadata handling and version validation for jsParser

* refactor: rename parser package to parsers and restructure parser handling

* refactor: core code struct and impl parse task handle

* feat: impl parsed download

* fix: seek cache file when processing tph picture

* feat: implement parsed task handling and progress tracking

* feat: enhance task processing with concurrency control and progress tracking

* feat: add resource ID generation and improve resource processing handling

* feat: improve message formatting in parsed text and progress completion

* feat: add example js plugin

* feat: implement Twitter parser

* fix: twitter parse video json decode error

* feat: impl stream mode for parse task
2025-08-21 23:48:17 +08:00

45 lines
1.0 KiB
Go

package tfile
import (
"context"
"fmt"
"io"
"github.com/charmbracelet/log"
"github.com/krau/SaveAny-Bot/pkg/tfile"
"golang.org/x/sync/errgroup"
)
func executeStream(ctx context.Context, task *Task) error {
logger := log.FromContext(ctx).WithPrefix(fmt.Sprintf("file[%s]", task.File.Name()))
pr, pw := io.Pipe()
defer pr.Close()
errg, uploadCtx := errgroup.WithContext(ctx)
errg.Go(func() error {
return task.Storage.Save(uploadCtx, pr, task.Path)
})
wr := newWriter(ctx, pw, task.Progress, task)
errg.Go(func() error {
defer pw.Close()
logger.Info("Starting file download in stream mode")
_, err := tfile.NewDownloader(task.File).Stream(uploadCtx, wr)
if err != nil {
logger.Errorf("Failed to download file: %v", err)
pw.CloseWithError(err)
}
return err
})
var err error
defer func() {
if task.Progress != nil {
task.Progress.OnDone(ctx, task, err)
}
}()
if err = errg.Wait(); err != nil {
return err
}
logger.Info("File downloaded successfully in stream mode")
return nil
}