From 321bce3220cdb8182cd2bb8dfb60d71b62a0b4c8 Mon Sep 17 00:00:00 2001 From: krau <71133316+krau@users.noreply.github.com> Date: Fri, 30 Jan 2026 13:30:14 +0800 Subject: [PATCH] feat: add Rclone storage support with configuration and file operations --- config/storage/factory.go | 1 + config/storage/rclone.go | 33 ++++ pkg/enums/storage/storages.go | 2 +- pkg/enums/storage/storages_enum.go | 5 + storage/rclone/errs.go | 14 ++ storage/rclone/rclone.go | 289 +++++++++++++++++++++++++++++ storage/storage.go | 2 + 7 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 config/storage/rclone.go create mode 100644 storage/rclone/errs.go create mode 100644 storage/rclone/rclone.go diff --git a/config/storage/factory.go b/config/storage/factory.go index f0f6b99..8deeab4 100644 --- a/config/storage/factory.go +++ b/config/storage/factory.go @@ -16,6 +16,7 @@ var storageFactories = map[storenum.StorageType]func(cfg *BaseConfig) (StorageCo storenum.Minio: createStorageConfig(&MinioStorageConfig{}), storenum.S3: createStorageConfig(&S3StorageConfig{}), storenum.Telegram: createStorageConfig(&TelegramStorageConfig{}), + storenum.Rclone: createStorageConfig(&RcloneStorageConfig{}), } func createStorageConfig(configType StorageConfig) func(cfg *BaseConfig) (StorageConfig, error) { diff --git a/config/storage/rclone.go b/config/storage/rclone.go new file mode 100644 index 0000000..be17193 --- /dev/null +++ b/config/storage/rclone.go @@ -0,0 +1,33 @@ +package storage + +import ( + "fmt" + + storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage" +) + +type RcloneStorageConfig struct { + BaseConfig + // The name of the remote as defined in rclone config + Remote string `toml:"remote" mapstructure:"remote" json:"remote"` + BasePath string `toml:"base_path" mapstructure:"base_path" json:"base_path"` + // The path to the rclone config file, if not using the default + ConfigPath string `toml:"config_path" mapstructure:"config_path" json:"config_path"` + // Additional flags to pass to rclone commands + Flags []string `toml:"flags" mapstructure:"flags" json:"flags"` +} + +func (r *RcloneStorageConfig) Validate() error { + if r.Remote == "" { + return fmt.Errorf("remote is required for rclone storage") + } + return nil +} + +func (r *RcloneStorageConfig) GetType() storenum.StorageType { + return storenum.Rclone +} + +func (r *RcloneStorageConfig) GetName() string { + return r.Name +} diff --git a/pkg/enums/storage/storages.go b/pkg/enums/storage/storages.go index 648d3f7..8d0f699 100644 --- a/pkg/enums/storage/storages.go +++ b/pkg/enums/storage/storages.go @@ -4,6 +4,6 @@ package storage // StorageType /* ENUM( -local, webdav, alist, minio, telegram, s3 +local, webdav, alist, minio, telegram, s3, rclone ) */ type StorageType string diff --git a/pkg/enums/storage/storages_enum.go b/pkg/enums/storage/storages_enum.go index d60db81..4e578fc 100644 --- a/pkg/enums/storage/storages_enum.go +++ b/pkg/enums/storage/storages_enum.go @@ -24,6 +24,8 @@ const ( Telegram StorageType = "telegram" // S3 is a StorageType of type s3. S3 StorageType = "s3" + // Rclone is a StorageType of type rclone. + Rclone StorageType = "rclone" ) var ErrInvalidStorageType = fmt.Errorf("not a valid StorageType, try [%s]", strings.Join(_StorageTypeNames, ", ")) @@ -35,6 +37,7 @@ var _StorageTypeNames = []string{ string(Minio), string(Telegram), string(S3), + string(Rclone), } // StorageTypeNames returns a list of possible string values of StorageType. @@ -53,6 +56,7 @@ func StorageTypeValues() []StorageType { Minio, Telegram, S3, + Rclone, } } @@ -75,6 +79,7 @@ var _StorageTypeValue = map[string]StorageType{ "minio": Minio, "telegram": Telegram, "s3": S3, + "rclone": Rclone, } // ParseStorageType attempts to convert a string to a StorageType. diff --git a/storage/rclone/errs.go b/storage/rclone/errs.go new file mode 100644 index 0000000..c430d3a --- /dev/null +++ b/storage/rclone/errs.go @@ -0,0 +1,14 @@ +package rclone + +import "errors" + +var ( + ErrRcloneNotFound = errors.New("rclone: rclone command not found in PATH") + ErrRemoteNotFound = errors.New("rclone: remote not found") + ErrFailedToSaveFile = errors.New("rclone: failed to save file") + ErrFailedToListFiles = errors.New("rclone: failed to list files") + ErrFailedToOpenFile = errors.New("rclone: failed to open file") + ErrFailedToCheckFile = errors.New("rclone: failed to check file exists") + ErrFailedToCreateDir = errors.New("rclone: failed to create directory") + ErrCommandFailed = errors.New("rclone: command execution failed") +) diff --git a/storage/rclone/rclone.go b/storage/rclone/rclone.go new file mode 100644 index 0000000..e705ea7 --- /dev/null +++ b/storage/rclone/rclone.go @@ -0,0 +1,289 @@ +package rclone + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os/exec" + "path" + "strings" + "time" + + "github.com/charmbracelet/log" + config "github.com/krau/SaveAny-Bot/config/storage" + storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage" + "github.com/krau/SaveAny-Bot/pkg/storagetypes" + "github.com/rs/xid" +) + +type Rclone struct { + config config.RcloneStorageConfig + logger *log.Logger +} + +func (r *Rclone) Init(ctx context.Context, cfg config.StorageConfig) error { + rcloneConfig, ok := cfg.(*config.RcloneStorageConfig) + if !ok { + return fmt.Errorf("failed to cast rclone config") + } + if err := rcloneConfig.Validate(); err != nil { + return err + } + r.config = *rcloneConfig + r.logger = log.FromContext(ctx).WithPrefix(fmt.Sprintf("rclone[%s]", r.config.Name)) + + // 检查 rclone 是否安装 + if _, err := exec.LookPath("rclone"); err != nil { + return ErrRcloneNotFound + } + + args := r.buildBaseArgs() + args = append(args, "listremotes") + cmd := exec.CommandContext(ctx, "rclone", args...) + output, err := cmd.Output() + if err != nil { + r.logger.Errorf("Failed to list remotes: %v", err) + return fmt.Errorf("failed to verify rclone: %w", err) + } + + remoteName := strings.TrimSuffix(r.config.Remote, ":") + if !strings.HasSuffix(r.config.Remote, ":") { + remoteName = r.config.Remote + } + + found := false + scanner := bufio.NewScanner(bytes.NewReader(output)) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + line = strings.TrimSuffix(line, ":") + if line == remoteName { + found = true + break + } + } + + if !found { + r.logger.Errorf("Remote %s not found in rclone config", r.config.Remote) + return ErrRemoteNotFound + } + + r.logger.Infof("Initialized rclone storage with remote: %s", r.config.Remote) + return nil +} + +func (r *Rclone) Type() storenum.StorageType { + return storenum.Rclone +} + +func (r *Rclone) Name() string { + return r.config.Name +} + +func (r *Rclone) buildBaseArgs() []string { + var args []string + if r.config.ConfigPath != "" { + args = append(args, "--config", r.config.ConfigPath) + } + args = append(args, r.config.Flags...) + return args +} + +func (r *Rclone) getRemotePath(storagePath string) string { + remote := r.config.Remote + if !strings.HasSuffix(remote, ":") { + remote += ":" + } + basePath := strings.TrimPrefix(r.config.BasePath, "/") + fullPath := path.Join(basePath, storagePath) + return remote + fullPath +} + +func (r *Rclone) Save(ctx context.Context, reader io.Reader, storagePath string) error { + r.logger.Infof("Saving file to %s", storagePath) + + ext := path.Ext(storagePath) + base := strings.TrimSuffix(storagePath, ext) + candidate := storagePath + for i := 1; r.Exists(ctx, candidate); i++ { + candidate = fmt.Sprintf("%s_%d%s", base, i, ext) + if i > 100 { + r.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath) + candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext) + break + } + } + + remotePath := r.getRemotePath(candidate) + r.logger.Debugf("Remote path: %s", remotePath) + + // Use rclone rcat to read from stdin and upload + args := r.buildBaseArgs() + args = append(args, "rcat", remotePath) + + cmd := exec.CommandContext(ctx, "rclone", args...) + cmd.Stdin = reader + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + r.logger.Errorf("Failed to save file: %v, stderr: %s", err, stderr.String()) + return fmt.Errorf("%w: %s", ErrFailedToSaveFile, stderr.String()) + } + + r.logger.Infof("Successfully saved file to %s", candidate) + return nil +} + +func (r *Rclone) Exists(ctx context.Context, storagePath string) bool { + remotePath := r.getRemotePath(storagePath) + + args := r.buildBaseArgs() + args = append(args, "lsf", remotePath) + + cmd := exec.CommandContext(ctx, "rclone", args...) + err := cmd.Run() + return err == nil +} + +// lsjsonItem represents a single entry in the output of `rclone lsjson` +type lsjsonItem struct { + Path string `json:"Path"` + Name string `json:"Name"` + Size int64 `json:"Size"` + MimeType string `json:"MimeType"` + ModTime string `json:"ModTime"` + IsDir bool `json:"IsDir"` +} + +// ListFiles implements storage.StorageListable +func (r *Rclone) ListFiles(ctx context.Context, dirPath string) ([]storagetypes.FileInfo, error) { + r.logger.Infof("Listing files in %s", dirPath) + + remotePath := r.getRemotePath(dirPath) + + args := r.buildBaseArgs() + args = append(args, "lsjson", remotePath) + + cmd := exec.CommandContext(ctx, "rclone", args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + r.logger.Errorf("Failed to list files: %v, stderr: %s", err, stderr.String()) + return nil, fmt.Errorf("%w: %s", ErrFailedToListFiles, stderr.String()) + } + + var items []lsjsonItem + if err := json.Unmarshal(stdout.Bytes(), &items); err != nil { + r.logger.Errorf("Failed to parse lsjson output: %v", err) + return nil, fmt.Errorf("failed to parse lsjson output: %w", err) + } + + files := make([]storagetypes.FileInfo, 0, len(items)) + for _, item := range items { + var modTime time.Time + if item.ModTime != "" { + parsedTime, err := time.Parse(time.RFC3339Nano, item.ModTime) + if err != nil { + r.logger.Warnf("Failed to parse mod time %q for %s: %v", item.ModTime, item.Name, err) + } else { + modTime = parsedTime + } + } + + files = append(files, storagetypes.FileInfo{ + Name: item.Name, + Path: path.Join(dirPath, item.Name), + Size: item.Size, + IsDir: item.IsDir, + ModTime: modTime, + }) + } + + r.logger.Debugf("Found %d files/directories in %s", len(files), dirPath) + return files, nil +} + +// OpenFile implements storage.StorageReadable +func (r *Rclone) OpenFile(ctx context.Context, filePath string) (io.ReadCloser, int64, error) { + r.logger.Infof("Opening file %s", filePath) + + remotePath := r.getRemotePath(filePath) + + size, err := r.getFileSize(ctx, remotePath) + if err != nil { + r.logger.Errorf("Failed to get file size: %v", err) + return nil, 0, fmt.Errorf("%w: %v", ErrFailedToOpenFile, err) + } + + args := r.buildBaseArgs() + args = append(args, "cat", remotePath) + + cmd := exec.CommandContext(ctx, "rclone", args...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, 0, fmt.Errorf("failed to create stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return nil, 0, fmt.Errorf("failed to start rclone cat: %w", err) + } + + reader := &rcloneCatReader{ + reader: stdout, + cmd: cmd, + logger: r.logger, + } + + r.logger.Debugf("Opened file %s (size: %d bytes)", filePath, size) + return reader, size, nil +} + +func (r *Rclone) getFileSize(ctx context.Context, remotePath string) (int64, error) { + args := r.buildBaseArgs() + args = append(args, "lsjson", remotePath) + + cmd := exec.CommandContext(ctx, "rclone", args...) + var stdout bytes.Buffer + cmd.Stdout = &stdout + + if err := cmd.Run(); err != nil { + return 0, err + } + + var items []lsjsonItem + if err := json.Unmarshal(stdout.Bytes(), &items); err != nil { + return 0, err + } + + if len(items) > 0 { + return items[0].Size, nil + } + return 0, nil +} + +type rcloneCatReader struct { + reader io.ReadCloser + cmd *exec.Cmd + logger *log.Logger +} + +func (r *rcloneCatReader) Read(p []byte) (n int, err error) { + return r.reader.Read(p) +} + +func (r *rcloneCatReader) Close() error { + if err := r.reader.Close(); err != nil { + r.logger.Warnf("Failed to close reader: %v", err) + } + if err := r.cmd.Wait(); err != nil { + r.logger.Warnf("rclone cat process exited with error: %v", err) + } + return nil +} diff --git a/storage/storage.go b/storage/storage.go index e0cf09e..1d14cc1 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -11,6 +11,7 @@ import ( "github.com/krau/SaveAny-Bot/storage/alist" "github.com/krau/SaveAny-Bot/storage/local" "github.com/krau/SaveAny-Bot/storage/minio" + "github.com/krau/SaveAny-Bot/storage/rclone" "github.com/krau/SaveAny-Bot/storage/s3" "github.com/krau/SaveAny-Bot/storage/telegram" "github.com/krau/SaveAny-Bot/storage/webdav" @@ -53,6 +54,7 @@ var storageConstructors = map[storenum.StorageType]StorageConstructor{ storenum.Minio: func() Storage { return new(minio.Minio) }, storenum.S3: func() Storage { return new(s3.S3) }, storenum.Telegram: func() Storage { return new(telegram.Telegram) }, + storenum.Rclone: func() Storage { return new(rclone.Rclone) }, } // NewStorage creates a new storage instance based on the provided config and initializes it