diff --git a/core/batchtftask/execute.go b/core/batchtftask/execute.go index 1b9b1db..22c2212 100644 --- a/core/batchtftask/execute.go +++ b/core/batchtftask/execute.go @@ -61,10 +61,12 @@ func (t *Task) processElement(ctx context.Context, elem TaskElement) error { t.Progress.OnProgress(ctx, t) }) errg.Go(func() error { + defer pw.Close() logger.Info("Starting file download in stream mode") _, err := tfile.NewDownloader(elem.File).Stream(uploadCtx, wr) - if closeErr := pw.CloseWithError(err); closeErr != nil { - logger.Errorf("Failed to close pipe writer: %v", closeErr) + if err != nil { + logger.Errorf("Failed to download file: %v", err) + pw.CloseWithError(err) } return err }) diff --git a/core/tftask/stream.go b/core/tftask/stream.go index 53f14aa..47ed33a 100644 --- a/core/tftask/stream.go +++ b/core/tftask/stream.go @@ -21,10 +21,12 @@ func executeStream(ctx context.Context, task *Task) error { }) wr := newWriter(ctx, pw, task.Progress, task) errg.Go(func() error { + defer pw.Close() logger.Info("Starting file download in stream mode") _, err := tfile.NewDownloader(task.File).Stream(uploadCtx, wr) - if closeErr := pw.CloseWithError(err); closeErr != nil { - logger.Errorf("Failed to close pipe writer: %v", closeErr) + if err != nil { + logger.Errorf("Failed to download file: %v", err) + pw.CloseWithError(err) } return err })