- Implemented the `/import` command to allow users to import files from storage to Telegram. - Added support for listing files in storage and filtering based on regex patterns. - Created a batch import task to handle multiple file uploads concurrently. - Introduced progress tracking for batch imports, providing real-time updates to users. - Enhanced storage interfaces to support file listing and reading capabilities. - Updated localization files for the new import command and its usage instructions. - Added utility functions for file size formatting and speed calculation. - Refactored Telegram storage handling to support reading from non-seekable streams.
98 lines
2.0 KiB
Go
98 lines
2.0 KiB
Go
package batchimport
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/krau/SaveAny-Bot/core"
|
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
|
"github.com/krau/SaveAny-Bot/pkg/storagetypes"
|
|
"github.com/krau/SaveAny-Bot/storage"
|
|
"github.com/rs/xid"
|
|
)
|
|
|
|
var _ core.Executable = (*Task)(nil)
|
|
|
|
type TaskElement struct {
|
|
ID string
|
|
SourceStorage storage.Storage
|
|
SourcePath string
|
|
FileInfo storagetypes.FileInfo
|
|
TargetStorage storage.Storage
|
|
TargetChatID int64
|
|
}
|
|
|
|
type Task struct {
|
|
ID string
|
|
ctx context.Context
|
|
elems []TaskElement
|
|
Progress ProgressTracker
|
|
IgnoreErrors bool
|
|
uploaded atomic.Int64
|
|
totalSize int64
|
|
processing map[string]TaskElementInfo
|
|
processingMu sync.RWMutex
|
|
failed map[string]error
|
|
}
|
|
|
|
// Title implements core.Executable.
|
|
func (t *Task) Title() string {
|
|
return fmt.Sprintf("[%s](%d files/%.2fMB)", t.Type(), len(t.elems), float64(t.totalSize)/(1024*1024))
|
|
}
|
|
|
|
// Type implements core.Executable.
|
|
func (t *Task) Type() tasktype.TaskType {
|
|
return tasktype.TaskTypeBatchimport
|
|
}
|
|
|
|
// TaskID implements core.Executable.
|
|
func (t *Task) TaskID() string {
|
|
return t.ID
|
|
}
|
|
|
|
func NewTaskElement(
|
|
sourceStorage storage.Storage,
|
|
fileInfo storagetypes.FileInfo,
|
|
targetStorage storage.Storage,
|
|
targetChatID int64,
|
|
) *TaskElement {
|
|
id := xid.New().String()
|
|
return &TaskElement{
|
|
ID: id,
|
|
SourceStorage: sourceStorage,
|
|
SourcePath: fileInfo.Path,
|
|
FileInfo: fileInfo,
|
|
TargetStorage: targetStorage,
|
|
TargetChatID: targetChatID,
|
|
}
|
|
}
|
|
|
|
func NewBatchImportTask(
|
|
id string,
|
|
ctx context.Context,
|
|
elems []TaskElement,
|
|
progress ProgressTracker,
|
|
ignoreErrors bool,
|
|
) *Task {
|
|
task := &Task{
|
|
ID: id,
|
|
ctx: ctx,
|
|
elems: elems,
|
|
Progress: progress,
|
|
uploaded: atomic.Int64{},
|
|
totalSize: func() int64 {
|
|
var total int64
|
|
for _, elem := range elems {
|
|
total += elem.FileInfo.Size
|
|
}
|
|
return total
|
|
}(),
|
|
processing: make(map[string]TaskElementInfo),
|
|
IgnoreErrors: ignoreErrors,
|
|
failed: make(map[string]error),
|
|
}
|
|
return task
|
|
}
|