mirror of
https://github.com/krau/SaveAny-Bot.git
synced 2026-05-18 02:07:36 +08:00
feat: support import files from storages to telegram (#183)
* feat: add import command and batch import functionality - Implemented the `/import` command to allow users to import files from storage to Telegram. - Added support for listing files in storage and filtering based on regex patterns. - Created a batch import task to handle multiple file uploads concurrently. - Introduced progress tracking for batch imports, providing real-time updates to users. - Enhanced storage interfaces to support file listing and reading capabilities. - Updated localization files for the new import command and its usage instructions. - Added utility functions for file size formatting and speed calculation. - Refactored Telegram storage handling to support reading from non-seekable streams. * feat: add i18n for import command * feat: implement ListFiles and OpenFile methods for WebDAV and Alist storage * Update common/i18n/locale/zh-Hans.yaml Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update core/tasks/batchimport/progress.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update core/tasks/batchimport/execute.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update storage/alist/alist.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update common/i18n/locale/en.yaml Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update pkg/storagetypes/fileinfo.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update common/i18n/locale/en.yaml Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update core/tasks/batchimport/execute.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update storage/webdav/webdav.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update storage/telegram/telegram.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update core/tasks/batchimport/execute.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update storage/webdav/webdav.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: missing progress stats i18n * refactor: use strutil to parse args * chore: update generated code files for consistency --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
141
core/tasks/batchimport/execute.go
Normal file
141
core/tasks/batchimport/execute.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package batchimport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Execute implements core.Executable.
|
||||
func (t *Task) Execute(ctx context.Context) error {
|
||||
logger := log.FromContext(ctx).WithPrefix(fmt.Sprintf("batch_import[%s]", t.ID))
|
||||
logger.Info("Starting batch import task")
|
||||
t.Progress.OnStart(ctx, t)
|
||||
|
||||
workers := config.C().Workers
|
||||
eg, gctx := errgroup.WithContext(ctx)
|
||||
eg.SetLimit(workers)
|
||||
|
||||
for _, elem := range t.elems {
|
||||
eg.Go(func() error {
|
||||
t.processingMu.RLock()
|
||||
if t.processing[elem.ID] != nil {
|
||||
t.processingMu.RUnlock()
|
||||
return fmt.Errorf("element with ID %s is already being processed", elem.ID)
|
||||
}
|
||||
t.processingMu.RUnlock()
|
||||
|
||||
t.processingMu.Lock()
|
||||
t.processing[elem.ID] = &elem
|
||||
t.processingMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
t.processingMu.Lock()
|
||||
delete(t.processing, elem.ID)
|
||||
t.processingMu.Unlock()
|
||||
}()
|
||||
|
||||
err := t.processElement(gctx, elem)
|
||||
if err != nil && !t.IgnoreErrors {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
t.processingMu.Lock()
|
||||
t.failed[elem.ID] = err
|
||||
t.processingMu.Unlock()
|
||||
logger.Errorf("Failed to process file %s: %v", elem.FileInfo.Name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
logger.Errorf("Error during batch import processing: %v", err)
|
||||
} else {
|
||||
logger.Info("Batch import task completed successfully")
|
||||
}
|
||||
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *Task) processElement(ctx context.Context, elem TaskElement) error {
|
||||
logger := log.FromContext(ctx).WithPrefix(fmt.Sprintf("file[%s]", elem.FileInfo.Name))
|
||||
|
||||
// Check whether the source storage supports reading
|
||||
readableStorage, ok := elem.SourceStorage.(storage.StorageReadable)
|
||||
if !ok {
|
||||
return fmt.Errorf("source storage %s does not support reading", elem.SourceStorage.Name())
|
||||
}
|
||||
|
||||
logger.Info("Opening file from source storage")
|
||||
reader, size, err := readableStorage.OpenFile(ctx, elem.SourcePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file: %w", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Build Telegram storage path: /<chat_id>/<filename>
|
||||
storagePath := fmt.Sprintf("/%d/%s", elem.TargetChatID, elem.FileInfo.Name)
|
||||
|
||||
// 注入文件大小到 context
|
||||
ctx = context.WithValue(ctx, ctxkey.ContentLength, size)
|
||||
|
||||
if config.C().Stream {
|
||||
if err := elem.TargetStorage.Save(ctx, reader, storagePath); err != nil {
|
||||
return fmt.Errorf("failed to upload file to telegram: %w", err)
|
||||
}
|
||||
} else {
|
||||
logger.Info("Downloading to temporary file for ReadSeeker support")
|
||||
tempFile, err := t.downloadToTemp(reader, elem.FileInfo.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download to temp: %w", err)
|
||||
}
|
||||
defer os.Remove(tempFile.Name())
|
||||
defer tempFile.Close()
|
||||
|
||||
if _, err := tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
return fmt.Errorf("failed to seek temp file: %w", err)
|
||||
}
|
||||
|
||||
logger.Infof("Uploading file to Telegram storage (size: %d bytes)", size)
|
||||
if err := elem.TargetStorage.Save(ctx, tempFile, storagePath); err != nil {
|
||||
return fmt.Errorf("failed to upload file to telegram: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
t.uploaded.Add(size)
|
||||
t.Progress.OnProgress(ctx, t)
|
||||
|
||||
logger.Info("File uploaded successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Task) downloadToTemp(reader io.Reader, filename string) (*os.File, error) {
|
||||
tempDir := config.C().Temp.BasePath
|
||||
if tempDir == "" {
|
||||
tempDir = os.TempDir()
|
||||
}
|
||||
|
||||
tempFile, err := os.CreateTemp(tempDir, filepath.Base(filename)+"-*.tmp")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(tempFile, reader); err != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempFile.Name())
|
||||
return nil, fmt.Errorf("failed to copy to temp file: %w", err)
|
||||
}
|
||||
|
||||
return tempFile, nil
|
||||
}
|
||||
Reference in New Issue
Block a user