mirror of
https://github.com/Awuqing/BackupX.git
synced 2026-06-08 03:09:34 +08:00
重构: 存储传输层集成 rclone 替代自研实现
将 8 种存储后端(本地磁盘、S3、WebDAV、Google Drive、FTP、阿里云 OSS、 腾讯云 COS、七牛 Kodo)的底层传输从 4 个独立 SDK 自研实现替换为 rclone fs 接口统一驱动。 - 新建 storage/rclone/ 包(~410 行胶水代码),包含通用 Provider 和 8 种 配置映射 Factory - 删除 10 个旧 provider 包(~1000 行),净减少约 1000 行代码 - StorageProvider 接口、前端 UI、数据库模型、备份执行引擎全部零改动 - 获得 rclone 工业级传输能力(分片上传、断点续传、自动重试)
This commit is contained in:
@@ -20,14 +20,7 @@ import (
|
||||
"backupx/server/internal/service"
|
||||
"backupx/server/internal/storage"
|
||||
"backupx/server/internal/storage/codec"
|
||||
"backupx/server/internal/storage/googledrive"
|
||||
"backupx/server/internal/storage/localdisk"
|
||||
storageAliyun "backupx/server/internal/storage/aliyun"
|
||||
storageFTP "backupx/server/internal/storage/ftp"
|
||||
storageTencent "backupx/server/internal/storage/tencent"
|
||||
storageQiniu "backupx/server/internal/storage/qiniu"
|
||||
storageS3 "backupx/server/internal/storage/s3"
|
||||
storageWebDAV "backupx/server/internal/storage/webdav"
|
||||
storageRclone "backupx/server/internal/storage/rclone"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
@@ -70,14 +63,14 @@ func New(ctx context.Context, cfg config.Config, version string) (*Application,
|
||||
systemService := service.NewSystemService(cfg, version, time.Now().UTC())
|
||||
configCipher := codec.NewConfigCipher(resolvedSecurity.EncryptionKey)
|
||||
storageRegistry := storage.NewRegistry(
|
||||
localdisk.NewFactory(),
|
||||
storageS3.NewFactory(),
|
||||
storageWebDAV.NewFactory(),
|
||||
googledrive.NewFactory(),
|
||||
storageAliyun.NewFactory(),
|
||||
storageTencent.NewFactory(),
|
||||
storageQiniu.NewFactory(),
|
||||
storageFTP.NewFactory(),
|
||||
storageRclone.NewLocalDiskFactory(),
|
||||
storageRclone.NewS3Factory(),
|
||||
storageRclone.NewWebDAVFactory(),
|
||||
storageRclone.NewGoogleDriveFactory(),
|
||||
storageRclone.NewAliyunOSSFactory(),
|
||||
storageRclone.NewTencentCOSFactory(),
|
||||
storageRclone.NewQiniuKodoFactory(),
|
||||
storageRclone.NewFTPFactory(),
|
||||
)
|
||||
storageTargetService := service.NewStorageTargetService(storageTargetRepo, oauthSessionRepo, storageRegistry, configCipher)
|
||||
storageTargetService.SetBackupTaskRepository(backupTaskRepo)
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"backupx/server/internal/repository"
|
||||
"backupx/server/internal/storage"
|
||||
"backupx/server/internal/storage/codec"
|
||||
"backupx/server/internal/storage/localdisk"
|
||||
storageRclone "backupx/server/internal/storage/rclone"
|
||||
)
|
||||
|
||||
func newExecutionTestServices(t *testing.T) (*BackupExecutionService, *BackupRecordService, repository.BackupTaskRepository, repository.StorageTargetRepository, repository.BackupRecordRepository, string, string) {
|
||||
@@ -53,7 +53,7 @@ func newExecutionTestServices(t *testing.T) (*BackupExecutionService, *BackupRec
|
||||
}
|
||||
logHub := backup.NewLogHub()
|
||||
runnerRegistry := backup.NewRegistry(backup.NewFileRunner(), backup.NewMySQLRunner(nil), backup.NewSQLiteRunner(), backup.NewPostgreSQLRunner(nil))
|
||||
storageRegistry := storage.NewRegistry(localdisk.NewFactory())
|
||||
storageRegistry := storage.NewRegistry(storageRclone.NewLocalDiskFactory())
|
||||
retentionService := backupretention.NewService(records)
|
||||
tempDir := filepath.Join(baseDir, "tmp")
|
||||
if err := os.MkdirAll(tempDir, 0o755); err != nil {
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
// Package aliyun provides an Aliyun OSS storage factory that delegates to the S3-compatible engine.
|
||||
// Aliyun OSS is fully S3-compatible; we auto-assemble the endpoint from the user-provided region.
|
||||
package aliyun
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
"backupx/server/internal/storage/s3"
|
||||
)
|
||||
|
||||
// Config is the user-facing configuration for Aliyun OSS.
|
||||
type Config struct {
|
||||
Region string `json:"region"`
|
||||
Bucket string `json:"bucket"`
|
||||
AccessKeyID string `json:"accessKeyId"`
|
||||
SecretAccessKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"` // optional override
|
||||
InternalNetwork bool `json:"internalNetwork"` // use -internal endpoint
|
||||
}
|
||||
|
||||
// Factory creates Aliyun OSS providers by composing the S3 engine.
|
||||
type Factory struct {
|
||||
s3Factory s3.Factory
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{s3Factory: s3.NewFactory()}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeAliyunOSS }
|
||||
func (Factory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
func (f Factory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[Config](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("aliyun oss region is required")
|
||||
}
|
||||
suffix := "aliyuncs.com"
|
||||
if cfg.InternalNetwork {
|
||||
endpoint = fmt.Sprintf("https://oss-%s-internal.%s", region, suffix)
|
||||
} else {
|
||||
endpoint = fmt.Sprintf("https://oss-%s.%s", region, suffix)
|
||||
}
|
||||
}
|
||||
|
||||
// Delegate to S3 engine with assembled endpoint.
|
||||
s3Config := map[string]any{
|
||||
"endpoint": endpoint,
|
||||
"region": cfg.Region,
|
||||
"bucket": cfg.Bucket,
|
||||
"accessKeyId": cfg.AccessKeyID,
|
||||
"secretAccessKey": cfg.SecretAccessKey,
|
||||
"forcePathStyle": false, // Aliyun OSS uses virtual-hosted style
|
||||
}
|
||||
return f.s3Factory.New(ctx, s3Config)
|
||||
}
|
||||
@@ -1,226 +0,0 @@
|
||||
package ftp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
|
||||
"github.com/jlaffaye/ftp"
|
||||
)
|
||||
|
||||
// Provider implements storage.StorageProvider for FTP.
|
||||
type Provider struct {
|
||||
config storage.FTPConfig
|
||||
}
|
||||
|
||||
// Factory creates FTP storage providers.
|
||||
type Factory struct{}
|
||||
|
||||
// NewFactory returns a new FTP Factory.
|
||||
func NewFactory() Factory {
|
||||
return Factory{}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeFTP }
|
||||
func (Factory) SensitiveFields() []string { return []string{"username", "password"} }
|
||||
|
||||
func (f Factory) New(_ context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.FTPConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Host) == "" {
|
||||
return nil, fmt.Errorf("FTP host is required")
|
||||
}
|
||||
if cfg.Port == 0 {
|
||||
cfg.Port = 21
|
||||
}
|
||||
return &Provider{config: cfg}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return storage.ProviderTypeFTP }
|
||||
|
||||
// dial establishes a connection to the FTP server and logs in.
|
||||
func (p *Provider) dial() (*ftp.ServerConn, error) {
|
||||
addr := fmt.Sprintf("%s:%d", p.config.Host, p.config.Port)
|
||||
|
||||
var opts []ftp.DialOption
|
||||
opts = append(opts, ftp.DialWithTimeout(30*time.Second))
|
||||
if p.config.UseTLS {
|
||||
opts = append(opts, ftp.DialWithExplicitTLS(nil))
|
||||
}
|
||||
|
||||
conn, err := ftp.Dial(addr, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect to FTP server %s: %w", addr, err)
|
||||
}
|
||||
|
||||
username := p.config.Username
|
||||
if username == "" {
|
||||
username = "anonymous"
|
||||
}
|
||||
if err := conn.Login(username, p.config.Password); err != nil {
|
||||
conn.Quit()
|
||||
return nil, fmt.Errorf("FTP login: %w", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (p *Provider) TestConnection(_ context.Context) error {
|
||||
conn, err := p.dial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Quit()
|
||||
|
||||
basePath := p.normalizeBasePath()
|
||||
if err := p.ensureDir(conn, basePath); err != nil {
|
||||
return fmt.Errorf("ensure FTP base path: %w", err)
|
||||
}
|
||||
_, err = conn.List(basePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list FTP base path: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
|
||||
conn, err := p.dial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Quit()
|
||||
|
||||
objectPath := p.resolvePath(objectKey)
|
||||
dir := path.Dir(objectPath)
|
||||
if err := p.ensureDir(conn, dir); err != nil {
|
||||
return fmt.Errorf("create FTP directories: %w", err)
|
||||
}
|
||||
|
||||
// Read all data into buffer since FTP STOR needs the full stream
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read upload data: %w", err)
|
||||
}
|
||||
|
||||
if err := conn.Stor(objectPath, bytes.NewReader(data)); err != nil {
|
||||
return fmt.Errorf("FTP upload: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Download(_ context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
conn, err := p.dial()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectPath := p.resolvePath(objectKey)
|
||||
resp, err := conn.Retr(objectPath)
|
||||
if err != nil {
|
||||
conn.Quit()
|
||||
return nil, fmt.Errorf("FTP download: %w", err)
|
||||
}
|
||||
|
||||
// Wrap the response to also close the FTP connection when done
|
||||
return &ftpReadCloser{ReadCloser: resp, conn: conn}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Delete(_ context.Context, objectKey string) error {
|
||||
conn, err := p.dial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Quit()
|
||||
|
||||
objectPath := p.resolvePath(objectKey)
|
||||
if err := conn.Delete(objectPath); err != nil {
|
||||
return fmt.Errorf("FTP delete: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) List(_ context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
conn, err := p.dial()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Quit()
|
||||
|
||||
basePath := p.normalizeBasePath()
|
||||
entries, err := conn.List(basePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FTP list: %w", err)
|
||||
}
|
||||
|
||||
items := make([]storage.ObjectInfo, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if entry.Type == ftp.EntryTypeFolder {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimPrefix(path.Join(strings.TrimPrefix(basePath, "/"), entry.Name), "/")
|
||||
if prefix != "" && !strings.HasPrefix(key, prefix) {
|
||||
continue
|
||||
}
|
||||
items = append(items, storage.ObjectInfo{
|
||||
Key: key,
|
||||
Size: int64(entry.Size),
|
||||
UpdatedAt: entry.Time.UTC(),
|
||||
})
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// normalizeBasePath returns a cleaned base path with leading slash.
|
||||
func (p *Provider) normalizeBasePath() string {
|
||||
clean := path.Clean("/" + strings.TrimSpace(p.config.BasePath))
|
||||
if clean == "." {
|
||||
return "/"
|
||||
}
|
||||
return clean
|
||||
}
|
||||
|
||||
// resolvePath returns the full FTP path for the given object key.
|
||||
func (p *Provider) resolvePath(objectKey string) string {
|
||||
cleanKey := path.Clean("/" + strings.TrimSpace(objectKey))
|
||||
return path.Clean(path.Join(p.normalizeBasePath(), cleanKey))
|
||||
}
|
||||
|
||||
// ensureDir creates all directories in the path recursively.
|
||||
func (p *Provider) ensureDir(conn *ftp.ServerConn, dirPath string) error {
|
||||
parts := strings.Split(strings.Trim(dirPath, "/"), "/")
|
||||
current := ""
|
||||
for _, part := range parts {
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
current = current + "/" + part
|
||||
if err := conn.MakeDir(current); err != nil {
|
||||
// Ignore errors if directory already exists
|
||||
// FTP doesn't have a standard "mkdir if not exists"
|
||||
_ = err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ftpReadCloser wraps an io.ReadCloser from FTP and closes the connection when done.
|
||||
type ftpReadCloser struct {
|
||||
io.ReadCloser
|
||||
conn *ftp.ServerConn
|
||||
}
|
||||
|
||||
func (f *ftpReadCloser) Close() error {
|
||||
err := f.ReadCloser.Close()
|
||||
if f.conn != nil {
|
||||
f.conn.Quit()
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -1,299 +0,0 @@
|
||||
package googledrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
"golang.org/x/oauth2"
|
||||
googleoauth "golang.org/x/oauth2/google"
|
||||
"google.golang.org/api/drive/v3"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
|
||||
type fileInfo struct {
|
||||
ID string
|
||||
Name string
|
||||
Size int64
|
||||
ModifiedTime time.Time
|
||||
}
|
||||
|
||||
type client interface {
|
||||
TestConnection(context.Context, string) error
|
||||
Upload(context.Context, string, string, io.Reader) error
|
||||
Download(context.Context, string, string) (io.ReadCloser, error)
|
||||
Delete(context.Context, string, string) error
|
||||
List(context.Context, string, string) ([]storage.ObjectInfo, error)
|
||||
EnsureFolder(ctx context.Context, parentID, name string) (string, error)
|
||||
}
|
||||
|
||||
type Provider struct {
|
||||
client client
|
||||
rootFolder string // user-configured folderId, empty means Drive root
|
||||
folderCache map[string]string // cache: path -> folderID
|
||||
}
|
||||
|
||||
type Factory struct {
|
||||
newClient func(context.Context, storage.GoogleDriveConfig) (client, error)
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{newClient: newDriveClient}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeGoogleDrive }
|
||||
func (Factory) SensitiveFields() []string {
|
||||
return []string{"clientId", "clientSecret", "refreshToken"}
|
||||
}
|
||||
|
||||
func (f Factory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.GoogleDriveConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg = cfg.Normalize()
|
||||
if strings.TrimSpace(cfg.ClientID) == "" || strings.TrimSpace(cfg.ClientSecret) == "" {
|
||||
return nil, fmt.Errorf("google drive client credentials are required")
|
||||
}
|
||||
if strings.TrimSpace(cfg.RefreshToken) == "" {
|
||||
return nil, fmt.Errorf("google drive refresh token is required")
|
||||
}
|
||||
newClient := f.newClient
|
||||
if newClient == nil {
|
||||
newClient = NewFactory().newClient
|
||||
}
|
||||
client, err := newClient(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Provider{
|
||||
client: client,
|
||||
rootFolder: strings.TrimSpace(cfg.FolderID),
|
||||
folderCache: make(map[string]string),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return storage.ProviderTypeGoogleDrive }
|
||||
|
||||
// ensureFolderPath creates nested folders for a path like "BackupX/file/260308"
|
||||
// and returns the deepest folder's ID.
|
||||
func (p *Provider) ensureFolderPath(ctx context.Context, folderPath string) (string, error) {
|
||||
if folderPath == "" || folderPath == "." {
|
||||
return p.rootFolder, nil
|
||||
}
|
||||
if cached, ok := p.folderCache[folderPath]; ok {
|
||||
return cached, nil
|
||||
}
|
||||
parts := strings.Split(path.Clean(folderPath), "/")
|
||||
parentID := p.rootFolder
|
||||
builtPath := ""
|
||||
for _, part := range parts {
|
||||
if part == "" || part == "." {
|
||||
continue
|
||||
}
|
||||
if builtPath == "" {
|
||||
builtPath = part
|
||||
} else {
|
||||
builtPath = builtPath + "/" + part
|
||||
}
|
||||
if cached, ok := p.folderCache[builtPath]; ok {
|
||||
parentID = cached
|
||||
continue
|
||||
}
|
||||
folderID, err := p.client.EnsureFolder(ctx, parentID, part)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("ensure folder %s: %w", builtPath, err)
|
||||
}
|
||||
p.folderCache[builtPath] = folderID
|
||||
parentID = folderID
|
||||
}
|
||||
return parentID, nil
|
||||
}
|
||||
|
||||
func (p *Provider) TestConnection(ctx context.Context) error {
|
||||
return p.client.TestConnection(ctx, p.rootFolder)
|
||||
}
|
||||
|
||||
func (p *Provider) Upload(ctx context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
|
||||
dir := path.Dir(objectKey)
|
||||
folderID, err := p.ensureFolderPath(ctx, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.client.Upload(ctx, folderID, objectKey, reader)
|
||||
}
|
||||
|
||||
func (p *Provider) Download(ctx context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
dir := path.Dir(objectKey)
|
||||
folderID, err := p.ensureFolderPath(ctx, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.client.Download(ctx, folderID, objectKey)
|
||||
}
|
||||
|
||||
func (p *Provider) Delete(ctx context.Context, objectKey string) error {
|
||||
dir := path.Dir(objectKey)
|
||||
folderID, err := p.ensureFolderPath(ctx, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.client.Delete(ctx, folderID, objectKey)
|
||||
}
|
||||
|
||||
func (p *Provider) List(ctx context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
dir := path.Dir(prefix)
|
||||
folderID, err := p.ensureFolderPath(ctx, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.client.List(ctx, folderID, prefix)
|
||||
}
|
||||
|
||||
type driveClient struct {
|
||||
service *drive.Service
|
||||
}
|
||||
|
||||
func newDriveClient(ctx context.Context, cfg storage.GoogleDriveConfig) (client, error) {
|
||||
cfg = cfg.Normalize()
|
||||
oauthCfg := &oauth2.Config{
|
||||
ClientID: cfg.ClientID,
|
||||
ClientSecret: cfg.ClientSecret,
|
||||
RedirectURL: cfg.RedirectURL,
|
||||
Endpoint: googleoauth.Endpoint,
|
||||
Scopes: []string{drive.DriveScope},
|
||||
}
|
||||
httpClient := oauthCfg.Client(ctx, &oauth2.Token{RefreshToken: cfg.RefreshToken})
|
||||
service, err := drive.NewService(ctx, option.WithHTTPClient(httpClient))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create google drive service: %w", err)
|
||||
}
|
||||
return &driveClient{service: service}, nil
|
||||
}
|
||||
|
||||
func (c *driveClient) TestConnection(ctx context.Context, folderID string) error {
|
||||
if strings.TrimSpace(folderID) == "" {
|
||||
_, err := c.service.About.Get().Fields("user").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("test google drive connection: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
_, err := c.service.Files.Get(folderID).Fields("id").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("test google drive folder: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *driveClient) EnsureFolder(ctx context.Context, parentID, name string) (string, error) {
|
||||
// Search for existing folder
|
||||
query := fmt.Sprintf("name = '%s' and mimeType = 'application/vnd.google-apps.folder' and trashed = false", escapeQuery(name))
|
||||
if strings.TrimSpace(parentID) != "" {
|
||||
query += fmt.Sprintf(" and '%s' in parents", escapeQuery(parentID))
|
||||
} else {
|
||||
query += " and 'root' in parents"
|
||||
}
|
||||
result, err := c.service.Files.List().Q(query).PageSize(1).Fields("files(id)").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("search for folder %s: %w", name, err)
|
||||
}
|
||||
if len(result.Files) > 0 {
|
||||
return result.Files[0].Id, nil
|
||||
}
|
||||
// Create the folder
|
||||
folder := &drive.File{
|
||||
Name: name,
|
||||
MimeType: "application/vnd.google-apps.folder",
|
||||
}
|
||||
if strings.TrimSpace(parentID) != "" {
|
||||
folder.Parents = []string{parentID}
|
||||
}
|
||||
created, err := c.service.Files.Create(folder).Fields("id").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create folder %s: %w", name, err)
|
||||
}
|
||||
return created.Id, nil
|
||||
}
|
||||
|
||||
func (c *driveClient) Upload(ctx context.Context, folderID, objectKey string, reader io.Reader) error {
|
||||
file := &drive.File{Name: path.Base(objectKey)}
|
||||
if strings.TrimSpace(folderID) != "" {
|
||||
file.Parents = []string{folderID}
|
||||
}
|
||||
_, err := c.service.Files.Create(file).Media(reader).Context(ctx).Do()
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload google drive object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *driveClient) Download(ctx context.Context, folderID, objectKey string) (io.ReadCloser, error) {
|
||||
file, err := c.findFile(ctx, folderID, objectKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response, err := c.service.Files.Get(file.ID).Context(ctx).Download()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("download google drive object: %w", err)
|
||||
}
|
||||
return response.Body, nil
|
||||
}
|
||||
|
||||
func (c *driveClient) Delete(ctx context.Context, folderID, objectKey string) error {
|
||||
file, err := c.findFile(ctx, folderID, objectKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.service.Files.Delete(file.ID).Context(ctx).Do(); err != nil {
|
||||
return fmt.Errorf("delete google drive object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *driveClient) List(ctx context.Context, folderID, prefix string) ([]storage.ObjectInfo, error) {
|
||||
query := "trashed = false"
|
||||
if strings.TrimSpace(folderID) != "" {
|
||||
query += fmt.Sprintf(" and '%s' in parents", escapeQuery(folderID))
|
||||
}
|
||||
if strings.TrimSpace(prefix) != "" {
|
||||
query += fmt.Sprintf(" and name contains '%s'", escapeQuery(prefix))
|
||||
}
|
||||
result, err := c.service.Files.List().Q(query).Fields("files(id,name,size,modifiedTime)").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list google drive objects: %w", err)
|
||||
}
|
||||
items := make([]storage.ObjectInfo, 0, len(result.Files))
|
||||
for _, file := range result.Files {
|
||||
modifiedAt, _ := time.Parse(time.RFC3339, file.ModifiedTime)
|
||||
items = append(items, storage.ObjectInfo{Key: file.Name, Size: file.Size, UpdatedAt: modifiedAt.UTC()})
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (c *driveClient) findFile(ctx context.Context, folderID, objectKey string) (*fileInfo, error) {
|
||||
query := fmt.Sprintf("name = '%s' and trashed = false", escapeQuery(path.Base(objectKey)))
|
||||
if strings.TrimSpace(folderID) != "" {
|
||||
query += fmt.Sprintf(" and '%s' in parents", escapeQuery(folderID))
|
||||
}
|
||||
result, err := c.service.Files.List().Q(query).PageSize(1).Fields("files(id,name,size,modifiedTime)").Context(ctx).Do()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query google drive object: %w", err)
|
||||
}
|
||||
if len(result.Files) == 0 {
|
||||
return nil, fmt.Errorf("google drive object not found: %s", objectKey)
|
||||
}
|
||||
file := result.Files[0]
|
||||
modifiedAt, _ := time.Parse(time.RFC3339, file.ModifiedTime)
|
||||
return &fileInfo{ID: file.Id, Name: file.Name, Size: file.Size, ModifiedTime: modifiedAt.UTC()}, nil
|
||||
}
|
||||
|
||||
func escapeQuery(value string) string {
|
||||
return strings.ReplaceAll(value, "'", "\\'")
|
||||
}
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
package googledrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
)
|
||||
|
||||
type fakeClient struct{ data map[string]string }
|
||||
|
||||
func (c *fakeClient) TestConnection(context.Context, string) error { return nil }
|
||||
func (c *fakeClient) Upload(_ context.Context, _ string, objectKey string, reader io.Reader) error {
|
||||
content, _ := io.ReadAll(reader)
|
||||
c.data[objectKey] = string(content)
|
||||
return nil
|
||||
}
|
||||
func (c *fakeClient) Download(_ context.Context, _ string, objectKey string) (io.ReadCloser, error) {
|
||||
return io.NopCloser(strings.NewReader(c.data[objectKey])), nil
|
||||
}
|
||||
func (c *fakeClient) Delete(_ context.Context, _ string, objectKey string) error {
|
||||
delete(c.data, objectKey)
|
||||
return nil
|
||||
}
|
||||
func (c *fakeClient) List(_ context.Context, _ string, prefix string) ([]storage.ObjectInfo, error) {
|
||||
items := make([]storage.ObjectInfo, 0)
|
||||
for key, value := range c.data {
|
||||
if prefix == "" || strings.HasPrefix(key, prefix) {
|
||||
items = append(items, storage.ObjectInfo{Key: key, Size: int64(len(value)), UpdatedAt: time.Now().UTC()})
|
||||
}
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
func (c *fakeClient) EnsureFolder(_ context.Context, _, name string) (string, error) {
|
||||
return "fake-folder-" + name, nil
|
||||
}
|
||||
|
||||
func TestGoogleDriveProviderCRUD(t *testing.T) {
|
||||
factory := Factory{newClient: func(context.Context, storage.GoogleDriveConfig) (client, error) {
|
||||
return &fakeClient{data: make(map[string]string)}, nil
|
||||
}}
|
||||
providerAny, err := factory.New(context.Background(), map[string]any{"clientId": "id", "clientSecret": "secret", "refreshToken": "refresh"})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
provider := providerAny.(*Provider)
|
||||
if err := provider.TestConnection(context.Background()); err != nil {
|
||||
t.Fatalf("TestConnection returned error: %v", err)
|
||||
}
|
||||
if err := provider.Upload(context.Background(), "backup.tar.gz", strings.NewReader("payload"), 7, nil); err != nil {
|
||||
t.Fatalf("Upload returned error: %v", err)
|
||||
}
|
||||
reader, err := provider.Download(context.Background(), "backup.tar.gz")
|
||||
if err != nil {
|
||||
t.Fatalf("Download returned error: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
content, _ := io.ReadAll(reader)
|
||||
if string(content) != "payload" {
|
||||
t.Fatalf("unexpected content: %s", string(content))
|
||||
}
|
||||
items, err := provider.List(context.Background(), "backup")
|
||||
if err != nil {
|
||||
t.Fatalf("List returned error: %v", err)
|
||||
}
|
||||
if len(items) != 1 || items[0].Key != "backup.tar.gz" {
|
||||
t.Fatalf("unexpected list result: %#v", items)
|
||||
}
|
||||
if err := provider.Delete(context.Background(), "backup.tar.gz"); err != nil {
|
||||
t.Fatalf("Delete returned error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1,137 +0,0 @@
|
||||
package localdisk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
basePath string
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
|
||||
func NewFactory() Factory { return Factory{} }
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeLocalDisk }
|
||||
func (Factory) SensitiveFields() []string { return nil }
|
||||
|
||||
func (Factory) New(_ context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.LocalDiskConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.BasePath) == "" {
|
||||
return nil, fmt.Errorf("local disk basePath is required")
|
||||
}
|
||||
return &Provider{basePath: filepath.Clean(cfg.BasePath)}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return storage.ProviderTypeLocalDisk }
|
||||
|
||||
func (p *Provider) TestConnection(_ context.Context) error {
|
||||
if err := os.MkdirAll(p.basePath, 0o755); err != nil {
|
||||
return fmt.Errorf("ensure local disk base path: %w", err)
|
||||
}
|
||||
tempFile, err := os.CreateTemp(p.basePath, ".backupx-connection-test-*")
|
||||
if err != nil {
|
||||
return fmt.Errorf("write access check failed: %w", err)
|
||||
}
|
||||
name := tempFile.Name()
|
||||
_ = tempFile.Close()
|
||||
_ = os.Remove(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
|
||||
targetPath, err := p.resolvePath(objectKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
|
||||
return fmt.Errorf("create local disk directories: %w", err)
|
||||
}
|
||||
file, err := os.Create(targetPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create local disk object: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
if _, err := io.Copy(file, reader); err != nil {
|
||||
return fmt.Errorf("write local disk object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Download(_ context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
targetPath, err := p.resolvePath(objectKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
file, err := os.Open(targetPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open local disk object: %w", err)
|
||||
}
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Delete(_ context.Context, objectKey string) error {
|
||||
targetPath, err := p.resolvePath(objectKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(targetPath); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("delete local disk object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) List(_ context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
items := make([]storage.ObjectInfo, 0)
|
||||
err := filepath.WalkDir(p.basePath, func(path string, entry fs.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if entry.IsDir() {
|
||||
return nil
|
||||
}
|
||||
rel, err := filepath.Rel(p.basePath, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := filepath.ToSlash(rel)
|
||||
if prefix != "" && !strings.HasPrefix(key, prefix) {
|
||||
return nil
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
items = append(items, storage.ObjectInfo{Key: key, Size: info.Size(), UpdatedAt: info.ModTime().UTC()})
|
||||
return nil
|
||||
})
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("list local disk objects: %w", err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (p *Provider) resolvePath(objectKey string) (string, error) {
|
||||
cleanBase := filepath.Clean(p.basePath)
|
||||
cleanKey := filepath.Clean(filepath.FromSlash(strings.TrimSpace(objectKey)))
|
||||
if cleanKey == "." || cleanKey == string(filepath.Separator) || cleanKey == "" {
|
||||
return "", fmt.Errorf("object key is required")
|
||||
}
|
||||
fullPath := filepath.Clean(filepath.Join(cleanBase, cleanKey))
|
||||
baseWithSep := cleanBase + string(filepath.Separator)
|
||||
if fullPath != cleanBase && !strings.HasPrefix(fullPath, baseWithSep) {
|
||||
return "", fmt.Errorf("object key escapes base path")
|
||||
}
|
||||
return fullPath, nil
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
package localdisk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLocalDiskProviderCRUD(t *testing.T) {
|
||||
providerAny, err := (Factory{}).New(context.Background(), map[string]any{"basePath": t.TempDir()})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
provider := providerAny.(*Provider)
|
||||
if err := provider.TestConnection(context.Background()); err != nil {
|
||||
t.Fatalf("TestConnection returned error: %v", err)
|
||||
}
|
||||
if err := provider.Upload(context.Background(), "daily/backup.txt", strings.NewReader("hello"), 5, nil); err != nil {
|
||||
t.Fatalf("Upload returned error: %v", err)
|
||||
}
|
||||
reader, err := provider.Download(context.Background(), "daily/backup.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("Download returned error: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
content, _ := io.ReadAll(reader)
|
||||
if string(content) != "hello" {
|
||||
t.Fatalf("expected downloaded content to match, got %s", string(content))
|
||||
}
|
||||
items, err := provider.List(context.Background(), "daily")
|
||||
if err != nil {
|
||||
t.Fatalf("List returned error: %v", err)
|
||||
}
|
||||
if len(items) != 1 || items[0].Key != "daily/backup.txt" {
|
||||
t.Fatalf("unexpected list result: %#v", items)
|
||||
}
|
||||
if err := provider.Delete(context.Background(), "daily/backup.txt"); err != nil {
|
||||
t.Fatalf("Delete returned error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalDiskProviderRejectsTraversal(t *testing.T) {
|
||||
providerAny, err := (Factory{}).New(context.Background(), map[string]any{"basePath": t.TempDir()})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
provider := providerAny.(*Provider)
|
||||
if _, err := provider.resolvePath("../escape.txt"); err == nil {
|
||||
t.Fatalf("expected traversal to be rejected")
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
// Package qiniu provides a Qiniu Cloud Kodo storage factory that delegates to the S3-compatible engine.
|
||||
// Qiniu Kodo is S3-compatible; we auto-assemble the endpoint from the user-provided region.
|
||||
package qiniu
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
"backupx/server/internal/storage/s3"
|
||||
)
|
||||
|
||||
// Config is the user-facing configuration for Qiniu Kodo.
|
||||
type Config struct {
|
||||
Region string `json:"region"` // e.g. z0, z1, z2, na0, as0
|
||||
Bucket string `json:"bucket"`
|
||||
AccessKey string `json:"accessKeyId"`
|
||||
SecretKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"` // optional override
|
||||
}
|
||||
|
||||
// regionEndpoints maps Qiniu storage region codes to their S3-compatible endpoints.
|
||||
var regionEndpoints = map[string]string{
|
||||
"z0": "https://s3-cn-east-1.qiniucs.com",
|
||||
"cn-east-2": "https://s3-cn-east-2.qiniucs.com",
|
||||
"z1": "https://s3-cn-north-1.qiniucs.com",
|
||||
"z2": "https://s3-cn-south-1.qiniucs.com",
|
||||
"na0": "https://s3-us-north-1.qiniucs.com",
|
||||
"as0": "https://s3-ap-southeast-1.qiniucs.com",
|
||||
}
|
||||
|
||||
// Factory creates Qiniu Kodo providers by composing the S3 engine.
|
||||
type Factory struct {
|
||||
s3Factory s3.Factory
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{s3Factory: s3.NewFactory()}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeQiniuKodo }
|
||||
func (Factory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
func (f Factory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[Config](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("qiniu kodo region is required")
|
||||
}
|
||||
var ok bool
|
||||
endpoint, ok = regionEndpoints[region]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unsupported qiniu region: %s (supported: z0, cn-east-2, z1, z2, na0, as0)", region)
|
||||
}
|
||||
}
|
||||
|
||||
s3Config := map[string]any{
|
||||
"endpoint": endpoint,
|
||||
"region": cfg.Region,
|
||||
"bucket": cfg.Bucket,
|
||||
"accessKeyId": cfg.AccessKey,
|
||||
"secretAccessKey": cfg.SecretKey,
|
||||
"forcePathStyle": true, // Qiniu S3-compatible uses path-style
|
||||
}
|
||||
return f.s3Factory.New(ctx, s3Config)
|
||||
}
|
||||
11
server/internal/storage/rclone/backends.go
Normal file
11
server/internal/storage/rclone/backends.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Package rclone 提供基于 rclone 的统一存储后端实现。
|
||||
// 按需引入 rclone backend,避免 backend/all 导致二进制膨胀。
|
||||
package rclone
|
||||
|
||||
import (
|
||||
_ "github.com/rclone/rclone/backend/drive"
|
||||
_ "github.com/rclone/rclone/backend/ftp"
|
||||
_ "github.com/rclone/rclone/backend/local"
|
||||
_ "github.com/rclone/rclone/backend/s3"
|
||||
_ "github.com/rclone/rclone/backend/webdav"
|
||||
)
|
||||
347
server/internal/storage/rclone/factory.go
Normal file
347
server/internal/storage/rclone/factory.go
Normal file
@@ -0,0 +1,347 @@
|
||||
package rclone
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 辅助函数
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// quoteParam 对 rclone 连接字符串中含特殊字符的值加单引号保护。
|
||||
func quoteParam(s string) string {
|
||||
if s == "" {
|
||||
return s
|
||||
}
|
||||
if !strings.ContainsAny(s, ",:='") {
|
||||
return s
|
||||
}
|
||||
return "'" + strings.ReplaceAll(s, "'", "''") + "'"
|
||||
}
|
||||
|
||||
// newFs 创建 rclone fs.Fs 实例并包装为 Provider。
|
||||
func newFs(ctx context.Context, providerType storage.ProviderType, remote string) (*Provider, error) {
|
||||
rfs, err := fs.NewFs(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create rclone fs for %s: %w", providerType, err)
|
||||
}
|
||||
return newProvider(providerType, rfs), nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// LocalDisk
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type LocalDiskFactory struct{}
|
||||
|
||||
func NewLocalDiskFactory() LocalDiskFactory { return LocalDiskFactory{} }
|
||||
|
||||
func (LocalDiskFactory) Type() storage.ProviderType { return storage.ProviderTypeLocalDisk }
|
||||
func (LocalDiskFactory) SensitiveFields() []string { return nil }
|
||||
|
||||
func (LocalDiskFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.LocalDiskConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
basePath := strings.TrimSpace(cfg.BasePath)
|
||||
if basePath == "" {
|
||||
return nil, fmt.Errorf("local disk basePath is required")
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeLocalDisk, basePath)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type S3Factory struct{}
|
||||
|
||||
func NewS3Factory() S3Factory { return S3Factory{} }
|
||||
|
||||
func (S3Factory) Type() storage.ProviderType { return storage.ProviderTypeS3 }
|
||||
func (S3Factory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
func (S3Factory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.S3Config](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Bucket) == "" {
|
||||
return nil, fmt.Errorf("s3 bucket is required")
|
||||
}
|
||||
if strings.TrimSpace(cfg.AccessKeyID) == "" || strings.TrimSpace(cfg.SecretAccessKey) == "" {
|
||||
return nil, fmt.Errorf("s3 credentials are required")
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeS3, buildS3Remote("Other", cfg.AccessKeyID, cfg.SecretAccessKey, cfg.Endpoint, cfg.Region, cfg.Bucket, cfg.ForcePathStyle))
|
||||
}
|
||||
|
||||
// buildS3Remote 构建 S3 兼容存储的 rclone 连接字符串。
|
||||
func buildS3Remote(provider, keyID, secret, endpoint, region, bucket string, pathStyle bool) string {
|
||||
var b strings.Builder
|
||||
b.WriteString(":s3,provider=")
|
||||
b.WriteString(quoteParam(provider))
|
||||
b.WriteString(",access_key_id=")
|
||||
b.WriteString(quoteParam(keyID))
|
||||
b.WriteString(",secret_access_key=")
|
||||
b.WriteString(quoteParam(secret))
|
||||
if strings.TrimSpace(endpoint) != "" {
|
||||
b.WriteString(",endpoint=")
|
||||
b.WriteString(quoteParam(strings.TrimRight(endpoint, "/")))
|
||||
}
|
||||
if strings.TrimSpace(region) != "" {
|
||||
b.WriteString(",region=")
|
||||
b.WriteString(quoteParam(region))
|
||||
}
|
||||
if pathStyle {
|
||||
b.WriteString(",force_path_style=true")
|
||||
}
|
||||
b.WriteString(",env_auth=false,no_check_bucket=true:")
|
||||
b.WriteString(bucket)
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// WebDAV
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type WebDAVFactory struct{}
|
||||
|
||||
func NewWebDAVFactory() WebDAVFactory { return WebDAVFactory{} }
|
||||
|
||||
func (WebDAVFactory) Type() storage.ProviderType { return storage.ProviderTypeWebDAV }
|
||||
func (WebDAVFactory) SensitiveFields() []string { return []string{"username", "password"} }
|
||||
|
||||
func (WebDAVFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.WebDAVConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Endpoint) == "" {
|
||||
return nil, fmt.Errorf("webdav endpoint is required")
|
||||
}
|
||||
remote := fmt.Sprintf(":webdav,url=%s,user=%s,pass=%s:%s",
|
||||
quoteParam(strings.TrimRight(cfg.Endpoint, "/")),
|
||||
quoteParam(cfg.Username),
|
||||
quoteParam(cfg.Password),
|
||||
strings.TrimSpace(cfg.BasePath))
|
||||
return newFs(ctx, storage.ProviderTypeWebDAV, remote)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Google Drive
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type GoogleDriveFactory struct{}
|
||||
|
||||
func NewGoogleDriveFactory() GoogleDriveFactory { return GoogleDriveFactory{} }
|
||||
|
||||
func (GoogleDriveFactory) Type() storage.ProviderType { return storage.ProviderTypeGoogleDrive }
|
||||
func (GoogleDriveFactory) SensitiveFields() []string {
|
||||
return []string{"clientId", "clientSecret", "refreshToken"}
|
||||
}
|
||||
|
||||
func (GoogleDriveFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.GoogleDriveConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg = cfg.Normalize()
|
||||
if strings.TrimSpace(cfg.ClientID) == "" || strings.TrimSpace(cfg.ClientSecret) == "" {
|
||||
return nil, fmt.Errorf("google drive client credentials are required")
|
||||
}
|
||||
if strings.TrimSpace(cfg.RefreshToken) == "" {
|
||||
return nil, fmt.Errorf("google drive refresh token is required")
|
||||
}
|
||||
// 构造 rclone 所需的 OAuth2 token JSON
|
||||
tokenJSON := fmt.Sprintf(`{"access_token":"","token_type":"Bearer","refresh_token":"%s","expiry":"0001-01-01T00:00:00Z"}`,
|
||||
strings.ReplaceAll(cfg.RefreshToken, `"`, `\"`))
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString(":drive,client_id=")
|
||||
b.WriteString(quoteParam(cfg.ClientID))
|
||||
b.WriteString(",client_secret=")
|
||||
b.WriteString(quoteParam(cfg.ClientSecret))
|
||||
b.WriteString(",token=")
|
||||
b.WriteString(quoteParam(tokenJSON))
|
||||
if strings.TrimSpace(cfg.FolderID) != "" {
|
||||
b.WriteString(",root_folder_id=")
|
||||
b.WriteString(quoteParam(cfg.FolderID))
|
||||
}
|
||||
b.WriteString(":")
|
||||
return newFs(ctx, storage.ProviderTypeGoogleDrive, b.String())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// FTP
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type FTPFactory struct{}
|
||||
|
||||
func NewFTPFactory() FTPFactory { return FTPFactory{} }
|
||||
|
||||
func (FTPFactory) Type() storage.ProviderType { return storage.ProviderTypeFTP }
|
||||
func (FTPFactory) SensitiveFields() []string { return []string{"username", "password"} }
|
||||
|
||||
func (FTPFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.FTPConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Host) == "" {
|
||||
return nil, fmt.Errorf("FTP host is required")
|
||||
}
|
||||
port := cfg.Port
|
||||
if port == 0 {
|
||||
port = 21
|
||||
}
|
||||
username := strings.TrimSpace(cfg.Username)
|
||||
if username == "" {
|
||||
username = "anonymous"
|
||||
}
|
||||
var b strings.Builder
|
||||
b.WriteString(fmt.Sprintf(":ftp,host=%s,port=%d,user=%s,pass=%s",
|
||||
quoteParam(cfg.Host), port, quoteParam(username), quoteParam(cfg.Password)))
|
||||
if cfg.UseTLS {
|
||||
b.WriteString(",tls=true,explicit_tls=true")
|
||||
}
|
||||
b.WriteString(":")
|
||||
basePath := strings.TrimSpace(cfg.BasePath)
|
||||
if basePath != "" {
|
||||
b.WriteString(basePath)
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeFTP, b.String())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 阿里云 OSS(委托 S3 引擎)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type AliyunOSSFactory struct{}
|
||||
|
||||
func NewAliyunOSSFactory() AliyunOSSFactory { return AliyunOSSFactory{} }
|
||||
|
||||
func (AliyunOSSFactory) Type() storage.ProviderType { return storage.ProviderTypeAliyunOSS }
|
||||
func (AliyunOSSFactory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
// AliyunConfig 是阿里云 OSS 的用户配置。
|
||||
type AliyunConfig struct {
|
||||
Region string `json:"region"`
|
||||
Bucket string `json:"bucket"`
|
||||
AccessKeyID string `json:"accessKeyId"`
|
||||
SecretAccessKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
InternalNetwork bool `json:"internalNetwork"`
|
||||
}
|
||||
|
||||
func (AliyunOSSFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[AliyunConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("aliyun oss region is required")
|
||||
}
|
||||
if cfg.InternalNetwork {
|
||||
endpoint = fmt.Sprintf("https://oss-%s-internal.aliyuncs.com", region)
|
||||
} else {
|
||||
endpoint = fmt.Sprintf("https://oss-%s.aliyuncs.com", region)
|
||||
}
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeAliyunOSS, buildS3Remote("Alibaba", cfg.AccessKeyID, cfg.SecretAccessKey, endpoint, cfg.Region, cfg.Bucket, false))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 腾讯云 COS(委托 S3 引擎)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type TencentCOSFactory struct{}
|
||||
|
||||
func NewTencentCOSFactory() TencentCOSFactory { return TencentCOSFactory{} }
|
||||
|
||||
func (TencentCOSFactory) Type() storage.ProviderType { return storage.ProviderTypeTencentCOS }
|
||||
func (TencentCOSFactory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
// TencentConfig 是腾讯云 COS 的用户配置。
|
||||
type TencentConfig struct {
|
||||
Region string `json:"region"`
|
||||
Bucket string `json:"bucket"`
|
||||
SecretID string `json:"accessKeyId"`
|
||||
SecretKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
}
|
||||
|
||||
func (TencentCOSFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[TencentConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("tencent cos region is required")
|
||||
}
|
||||
endpoint = fmt.Sprintf("https://cos.%s.myqcloud.com", region)
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeTencentCOS, buildS3Remote("TencentCOS", cfg.SecretID, cfg.SecretKey, endpoint, cfg.Region, cfg.Bucket, false))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 七牛云 Kodo(委托 S3 引擎)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type QiniuKodoFactory struct{}
|
||||
|
||||
func NewQiniuKodoFactory() QiniuKodoFactory { return QiniuKodoFactory{} }
|
||||
|
||||
func (QiniuKodoFactory) Type() storage.ProviderType { return storage.ProviderTypeQiniuKodo }
|
||||
func (QiniuKodoFactory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
// QiniuConfig 是七牛云 Kodo 的用户配置。
|
||||
type QiniuConfig struct {
|
||||
Region string `json:"region"`
|
||||
Bucket string `json:"bucket"`
|
||||
AccessKey string `json:"accessKeyId"`
|
||||
SecretKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
}
|
||||
|
||||
// regionEndpoints 映射七牛区域代码到 S3 兼容 endpoint。
|
||||
var regionEndpoints = map[string]string{
|
||||
"z0": "https://s3-cn-east-1.qiniucs.com",
|
||||
"cn-east-2": "https://s3-cn-east-2.qiniucs.com",
|
||||
"z1": "https://s3-cn-north-1.qiniucs.com",
|
||||
"z2": "https://s3-cn-south-1.qiniucs.com",
|
||||
"na0": "https://s3-us-north-1.qiniucs.com",
|
||||
"as0": "https://s3-ap-southeast-1.qiniucs.com",
|
||||
}
|
||||
|
||||
func (QiniuKodoFactory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[QiniuConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("qiniu kodo region is required")
|
||||
}
|
||||
var ok bool
|
||||
endpoint, ok = regionEndpoints[region]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unsupported qiniu region: %s (supported: z0, cn-east-2, z1, z2, na0, as0)", region)
|
||||
}
|
||||
}
|
||||
return newFs(ctx, storage.ProviderTypeQiniuKodo, buildS3Remote("Qiniu", cfg.AccessKey, cfg.SecretKey, endpoint, cfg.Region, cfg.Bucket, true))
|
||||
}
|
||||
112
server/internal/storage/rclone/provider.go
Normal file
112
server/internal/storage/rclone/provider.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package rclone
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
)
|
||||
|
||||
// Provider 包装 rclone fs.Fs,实现 storage.StorageProvider 接口。
|
||||
type Provider struct {
|
||||
providerType storage.ProviderType
|
||||
rfs fs.Fs
|
||||
}
|
||||
|
||||
func newProvider(providerType storage.ProviderType, rfs fs.Fs) *Provider {
|
||||
return &Provider{providerType: providerType, rfs: rfs}
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return p.providerType }
|
||||
|
||||
// TestConnection 通过列出根目录验证连通性。
|
||||
func (p *Provider) TestConnection(ctx context.Context) error {
|
||||
_, err := p.rfs.List(ctx, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("rclone test connection: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upload 通过 rclone fs.Fs.Put 上传文件。
|
||||
func (p *Provider) Upload(ctx context.Context, objectKey string, reader io.Reader, size int64, _ map[string]string) error {
|
||||
dir := pathDir(objectKey)
|
||||
if dir != "" && dir != "." {
|
||||
if err := p.rfs.Mkdir(ctx, dir); err != nil {
|
||||
return fmt.Errorf("rclone mkdir %s: %w", dir, err)
|
||||
}
|
||||
}
|
||||
info := object.NewStaticObjectInfo(objectKey, time.Now(), size, true, nil, nil)
|
||||
if _, err := p.rfs.Put(ctx, reader, info); err != nil {
|
||||
return fmt.Errorf("rclone upload %s: %w", objectKey, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Download 通过 rclone 获取对象并返回 io.ReadCloser。
|
||||
func (p *Provider) Download(ctx context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
obj, err := p.rfs.NewObject(ctx, objectKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rclone find object %s: %w", objectKey, err)
|
||||
}
|
||||
reader, err := obj.Open(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rclone download %s: %w", objectKey, err)
|
||||
}
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
// Delete 通过 rclone 删除远端对象。
|
||||
func (p *Provider) Delete(ctx context.Context, objectKey string) error {
|
||||
obj, err := p.rfs.NewObject(ctx, objectKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rclone find object %s: %w", objectKey, err)
|
||||
}
|
||||
if err := obj.Remove(ctx); err != nil {
|
||||
return fmt.Errorf("rclone delete %s: %w", objectKey, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List 递归列出指定前缀下的所有对象。
|
||||
func (p *Provider) List(ctx context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
var items []storage.ObjectInfo
|
||||
err := walk.ListR(ctx, p.rfs, prefix, true, -1, walk.ListObjects, func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
key := obj.Remote()
|
||||
if prefix != "" && !strings.HasPrefix(key, prefix) {
|
||||
continue
|
||||
}
|
||||
items = append(items, storage.ObjectInfo{
|
||||
Key: key,
|
||||
Size: obj.Size(),
|
||||
UpdatedAt: obj.ModTime(ctx),
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rclone list %s: %w", prefix, err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// pathDir 返回 objectKey 的目录部分(正斜杠分隔)。
|
||||
func pathDir(objectKey string) string {
|
||||
idx := strings.LastIndex(objectKey, "/")
|
||||
if idx < 0 {
|
||||
return ""
|
||||
}
|
||||
return objectKey[:idx]
|
||||
}
|
||||
129
server/internal/storage/rclone/provider_test.go
Normal file
129
server/internal/storage/rclone/provider_test.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package rclone
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestProviderLocalDiskCRUD(t *testing.T) {
|
||||
factory := NewLocalDiskFactory()
|
||||
provider, err := factory.New(context.Background(), map[string]any{"basePath": t.TempDir()})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
if err := provider.TestConnection(context.Background()); err != nil {
|
||||
t.Fatalf("TestConnection returned error: %v", err)
|
||||
}
|
||||
|
||||
// Upload
|
||||
if err := provider.Upload(context.Background(), "daily/backup.txt", strings.NewReader("hello"), 5, nil); err != nil {
|
||||
t.Fatalf("Upload returned error: %v", err)
|
||||
}
|
||||
|
||||
// Download
|
||||
reader, err := provider.Download(context.Background(), "daily/backup.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("Download returned error: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
content, _ := io.ReadAll(reader)
|
||||
if string(content) != "hello" {
|
||||
t.Fatalf("expected 'hello', got %q", string(content))
|
||||
}
|
||||
|
||||
// List with prefix
|
||||
items, err := provider.List(context.Background(), "daily")
|
||||
if err != nil {
|
||||
t.Fatalf("List returned error: %v", err)
|
||||
}
|
||||
if len(items) != 1 || items[0].Key != "daily/backup.txt" {
|
||||
t.Fatalf("unexpected list result: %#v", items)
|
||||
}
|
||||
|
||||
// Delete
|
||||
if err := provider.Delete(context.Background(), "daily/backup.txt"); err != nil {
|
||||
t.Fatalf("Delete returned error: %v", err)
|
||||
}
|
||||
|
||||
// List after delete should be empty
|
||||
items, err = provider.List(context.Background(), "daily")
|
||||
if err != nil {
|
||||
t.Fatalf("List after delete returned error: %v", err)
|
||||
}
|
||||
if len(items) != 0 {
|
||||
t.Fatalf("expected empty list after delete, got %d items", len(items))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProviderLocalDiskRequiresBasePath(t *testing.T) {
|
||||
_, err := NewLocalDiskFactory().New(context.Background(), map[string]any{"basePath": ""})
|
||||
if err == nil {
|
||||
t.Fatal("expected error for empty basePath")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProviderS3RequiresBucketAndCredentials(t *testing.T) {
|
||||
factory := NewS3Factory()
|
||||
_, err := factory.New(context.Background(), map[string]any{"bucket": "", "accessKeyId": "a", "secretAccessKey": "b"})
|
||||
if err == nil || !strings.Contains(err.Error(), "bucket") {
|
||||
t.Fatalf("expected bucket required error, got %v", err)
|
||||
}
|
||||
_, err = factory.New(context.Background(), map[string]any{"bucket": "demo", "accessKeyId": "", "secretAccessKey": "b"})
|
||||
if err == nil || !strings.Contains(err.Error(), "credentials") {
|
||||
t.Fatalf("expected credentials required error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuoteParam(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{"simple", "simple"},
|
||||
{"", ""},
|
||||
{"has,comma", "'has,comma'"},
|
||||
{"has:colon", "'has:colon'"},
|
||||
{"has=equals", "'has=equals'"},
|
||||
{"has'quote", "'has''quote'"},
|
||||
{"a,b:c=d'e", "'a,b:c=d''e'"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got := quoteParam(tt.input)
|
||||
if got != tt.expected {
|
||||
t.Errorf("quoteParam(%q) = %q, want %q", tt.input, got, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildS3Remote(t *testing.T) {
|
||||
remote := buildS3Remote("Alibaba", "keyID", "secret", "https://oss-cn-hangzhou.aliyuncs.com", "cn-hangzhou", "my-bucket", false)
|
||||
if !strings.Contains(remote, "provider=Alibaba") {
|
||||
t.Fatalf("expected provider=Alibaba in remote: %s", remote)
|
||||
}
|
||||
if !strings.Contains(remote, ":my-bucket") {
|
||||
t.Fatalf("expected :my-bucket suffix in remote: %s", remote)
|
||||
}
|
||||
if !strings.HasPrefix(remote, ":s3,") {
|
||||
t.Fatalf("expected :s3, prefix in remote: %s", remote)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPathDir(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{"BackupX/file/260308/backup.tar.gz", "BackupX/file/260308"},
|
||||
{"backup.tar.gz", ""},
|
||||
{"a/b", "a"},
|
||||
{"", ""},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
got := pathDir(tt.input)
|
||||
if got != tt.expected {
|
||||
t.Errorf("pathDir(%q) = %q, want %q", tt.input, got, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
awscore "github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
type client interface {
|
||||
HeadBucket(context.Context, *awss3.HeadBucketInput, ...func(*awss3.Options)) (*awss3.HeadBucketOutput, error)
|
||||
PutObject(context.Context, *awss3.PutObjectInput, ...func(*awss3.Options)) (*awss3.PutObjectOutput, error)
|
||||
GetObject(context.Context, *awss3.GetObjectInput, ...func(*awss3.Options)) (*awss3.GetObjectOutput, error)
|
||||
DeleteObject(context.Context, *awss3.DeleteObjectInput, ...func(*awss3.Options)) (*awss3.DeleteObjectOutput, error)
|
||||
ListObjectsV2(context.Context, *awss3.ListObjectsV2Input, ...func(*awss3.Options)) (*awss3.ListObjectsV2Output, error)
|
||||
}
|
||||
|
||||
type Provider struct {
|
||||
client client
|
||||
bucket string
|
||||
}
|
||||
|
||||
type Factory struct {
|
||||
newClient func(cfg storage.S3Config) client
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{newClient: func(cfg storage.S3Config) client {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
awsConfig := awscore.Config{
|
||||
Region: region,
|
||||
Credentials: credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
|
||||
}
|
||||
return awss3.NewFromConfig(awsConfig, func(options *awss3.Options) {
|
||||
options.UsePathStyle = cfg.ForcePathStyle
|
||||
if strings.TrimSpace(cfg.Endpoint) != "" {
|
||||
options.BaseEndpoint = awscore.String(strings.TrimRight(cfg.Endpoint, "/"))
|
||||
}
|
||||
})
|
||||
}}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeS3 }
|
||||
func (Factory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
func (f Factory) New(_ context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.S3Config](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Bucket) == "" {
|
||||
return nil, fmt.Errorf("s3 bucket is required")
|
||||
}
|
||||
if strings.TrimSpace(cfg.AccessKeyID) == "" || strings.TrimSpace(cfg.SecretAccessKey) == "" {
|
||||
return nil, fmt.Errorf("s3 credentials are required")
|
||||
}
|
||||
newClient := f.newClient
|
||||
if newClient == nil {
|
||||
factory := NewFactory()
|
||||
newClient = factory.newClient
|
||||
}
|
||||
return &Provider{client: newClient(cfg), bucket: cfg.Bucket}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return storage.ProviderTypeS3 }
|
||||
|
||||
func (p *Provider) TestConnection(ctx context.Context) error {
|
||||
_, err := p.client.HeadBucket(ctx, &awss3.HeadBucketInput{Bucket: awscore.String(p.bucket)})
|
||||
if err != nil {
|
||||
return fmt.Errorf("test s3 connection: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Upload(ctx context.Context, objectKey string, reader io.Reader, _ int64, metadata map[string]string) error {
|
||||
_, err := p.client.PutObject(ctx, &awss3.PutObjectInput{Bucket: awscore.String(p.bucket), Key: awscore.String(objectKey), Body: reader, Metadata: metadata})
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload s3 object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Download(ctx context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
result, err := p.client.GetObject(ctx, &awss3.GetObjectInput{Bucket: awscore.String(p.bucket), Key: awscore.String(objectKey)})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("download s3 object: %w", err)
|
||||
}
|
||||
return result.Body, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Delete(ctx context.Context, objectKey string) error {
|
||||
_, err := p.client.DeleteObject(ctx, &awss3.DeleteObjectInput{Bucket: awscore.String(p.bucket), Key: awscore.String(objectKey)})
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete s3 object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) List(ctx context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
result, err := p.client.ListObjectsV2(ctx, &awss3.ListObjectsV2Input{Bucket: awscore.String(p.bucket), Prefix: awscore.String(prefix)})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list s3 objects: %w", err)
|
||||
}
|
||||
items := make([]storage.ObjectInfo, 0, len(result.Contents))
|
||||
for _, object := range result.Contents {
|
||||
updatedAt := time.Time{}
|
||||
if object.LastModified != nil {
|
||||
updatedAt = object.LastModified.UTC()
|
||||
}
|
||||
size := int64(0)
|
||||
if object.Size != nil {
|
||||
size = *object.Size
|
||||
}
|
||||
items = append(items, storage.ObjectInfo{Key: awscore.ToString(object.Key), Size: size, UpdatedAt: updatedAt})
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
|
||||
awscore "github.com/aws/aws-sdk-go-v2/aws"
|
||||
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
awss3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
)
|
||||
|
||||
type fakeClient struct{ data map[string]string }
|
||||
|
||||
func (c *fakeClient) HeadBucket(context.Context, *awss3.HeadBucketInput, ...func(*awss3.Options)) (*awss3.HeadBucketOutput, error) {
|
||||
return &awss3.HeadBucketOutput{}, nil
|
||||
}
|
||||
|
||||
func (c *fakeClient) PutObject(_ context.Context, input *awss3.PutObjectInput, _ ...func(*awss3.Options)) (*awss3.PutObjectOutput, error) {
|
||||
body, _ := io.ReadAll(input.Body)
|
||||
c.data[awscore.ToString(input.Key)] = string(body)
|
||||
return &awss3.PutObjectOutput{}, nil
|
||||
}
|
||||
|
||||
func (c *fakeClient) GetObject(_ context.Context, input *awss3.GetObjectInput, _ ...func(*awss3.Options)) (*awss3.GetObjectOutput, error) {
|
||||
return &awss3.GetObjectOutput{Body: io.NopCloser(strings.NewReader(c.data[awscore.ToString(input.Key)]))}, nil
|
||||
}
|
||||
|
||||
func (c *fakeClient) DeleteObject(_ context.Context, input *awss3.DeleteObjectInput, _ ...func(*awss3.Options)) (*awss3.DeleteObjectOutput, error) {
|
||||
delete(c.data, awscore.ToString(input.Key))
|
||||
return &awss3.DeleteObjectOutput{}, nil
|
||||
}
|
||||
|
||||
func (c *fakeClient) ListObjectsV2(_ context.Context, _ *awss3.ListObjectsV2Input, _ ...func(*awss3.Options)) (*awss3.ListObjectsV2Output, error) {
|
||||
now := time.Now().UTC()
|
||||
return &awss3.ListObjectsV2Output{Contents: []awss3types.Object{{Key: awscore.String("backup.tar.gz"), Size: awscore.Int64(10), LastModified: &now}}}, nil
|
||||
}
|
||||
|
||||
func TestS3ProviderCRUD(t *testing.T) {
|
||||
factory := Factory{newClient: func(cfg storage.S3Config) client {
|
||||
return &fakeClient{data: make(map[string]string)}
|
||||
}}
|
||||
providerAny, err := factory.New(context.Background(), map[string]any{"bucket": "demo", "accessKeyId": "a", "secretAccessKey": "b"})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
provider := providerAny.(*Provider)
|
||||
if err := provider.TestConnection(context.Background()); err != nil {
|
||||
t.Fatalf("TestConnection returned error: %v", err)
|
||||
}
|
||||
if err := provider.Upload(context.Background(), "backup.tar.gz", bytes.NewBufferString("payload"), 7, nil); err != nil {
|
||||
t.Fatalf("Upload returned error: %v", err)
|
||||
}
|
||||
reader, err := provider.Download(context.Background(), "backup.tar.gz")
|
||||
if err != nil {
|
||||
t.Fatalf("Download returned error: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
content, _ := io.ReadAll(reader)
|
||||
if string(content) != "payload" {
|
||||
t.Fatalf("unexpected content: %s", string(content))
|
||||
}
|
||||
items, err := provider.List(context.Background(), "backup")
|
||||
if err != nil {
|
||||
t.Fatalf("List returned error: %v", err)
|
||||
}
|
||||
if len(items) != 1 || items[0].Key != "backup.tar.gz" {
|
||||
t.Fatalf("unexpected list result: %#v", items)
|
||||
}
|
||||
if err := provider.Delete(context.Background(), "backup.tar.gz"); err != nil {
|
||||
t.Fatalf("Delete returned error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
package s3provider
|
||||
|
||||
import "backupx/server/internal/storage/s3"
|
||||
|
||||
type Factory = s3.Factory
|
||||
|
||||
func NewFactory() Factory {
|
||||
return s3.NewFactory()
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
// Package tencent provides a Tencent Cloud COS storage factory that delegates to the S3-compatible engine.
|
||||
// Tencent COS is fully S3-compatible; we auto-assemble the endpoint from region and appId.
|
||||
package tencent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
"backupx/server/internal/storage/s3"
|
||||
)
|
||||
|
||||
// Config is the user-facing configuration for Tencent COS.
|
||||
type Config struct {
|
||||
Region string `json:"region"`
|
||||
Bucket string `json:"bucket"` // format: bucketname-appid
|
||||
SecretID string `json:"accessKeyId"`
|
||||
SecretKey string `json:"secretAccessKey"`
|
||||
Endpoint string `json:"endpoint"` // optional override
|
||||
}
|
||||
|
||||
// Factory creates Tencent COS providers by composing the S3 engine.
|
||||
type Factory struct {
|
||||
s3Factory s3.Factory
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{s3Factory: s3.NewFactory()}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeTencentCOS }
|
||||
func (Factory) SensitiveFields() []string { return []string{"accessKeyId", "secretAccessKey"} }
|
||||
|
||||
func (f Factory) New(ctx context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[Config](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endpoint := strings.TrimSpace(cfg.Endpoint)
|
||||
if endpoint == "" {
|
||||
region := strings.TrimSpace(cfg.Region)
|
||||
if region == "" {
|
||||
return nil, fmt.Errorf("tencent cos region is required")
|
||||
}
|
||||
// Tencent COS S3-compatible endpoint format
|
||||
endpoint = fmt.Sprintf("https://cos.%s.myqcloud.com", region)
|
||||
}
|
||||
|
||||
s3Config := map[string]any{
|
||||
"endpoint": endpoint,
|
||||
"region": cfg.Region,
|
||||
"bucket": cfg.Bucket,
|
||||
"accessKeyId": cfg.SecretID,
|
||||
"secretAccessKey": cfg.SecretKey,
|
||||
"forcePathStyle": false, // COS uses virtual-hosted style
|
||||
}
|
||||
return f.s3Factory.New(ctx, s3Config)
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
package webdav
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
gowebdav "github.com/studio-b12/gowebdav"
|
||||
)
|
||||
|
||||
type client interface {
|
||||
ReadDir(path string) ([]os.FileInfo, error)
|
||||
WriteStream(path string, stream io.Reader, perm os.FileMode) error
|
||||
ReadStream(path string) (io.ReadCloser, error)
|
||||
Remove(path string) error
|
||||
MkdirAll(path string, perm os.FileMode) error
|
||||
Stat(path string) (os.FileInfo, error)
|
||||
}
|
||||
|
||||
type Provider struct {
|
||||
client client
|
||||
basePath string
|
||||
}
|
||||
|
||||
type Factory struct {
|
||||
newClient func(cfg storage.WebDAVConfig) client
|
||||
}
|
||||
|
||||
func NewFactory() Factory {
|
||||
return Factory{newClient: func(cfg storage.WebDAVConfig) client {
|
||||
return gowebdav.NewClient(strings.TrimRight(cfg.Endpoint, "/"), cfg.Username, cfg.Password)
|
||||
}}
|
||||
}
|
||||
|
||||
func (Factory) Type() storage.ProviderType { return storage.ProviderTypeWebDAV }
|
||||
func (Factory) SensitiveFields() []string { return []string{"username", "password"} }
|
||||
|
||||
func (f Factory) New(_ context.Context, rawConfig map[string]any) (storage.StorageProvider, error) {
|
||||
cfg, err := storage.DecodeConfig[storage.WebDAVConfig](rawConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Endpoint) == "" {
|
||||
return nil, fmt.Errorf("webdav endpoint is required")
|
||||
}
|
||||
newClient := f.newClient
|
||||
if newClient == nil {
|
||||
factory := NewFactory()
|
||||
newClient = factory.newClient
|
||||
}
|
||||
return &Provider{client: newClient(cfg), basePath: normalizeBasePath(cfg.BasePath)}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Type() storage.ProviderType { return storage.ProviderTypeWebDAV }
|
||||
|
||||
func (p *Provider) TestConnection(_ context.Context) error {
|
||||
if err := p.client.MkdirAll(p.basePath, 0o755); err != nil {
|
||||
return fmt.Errorf("ensure webdav base path: %w", err)
|
||||
}
|
||||
if _, err := p.client.Stat(p.basePath); err != nil {
|
||||
return fmt.Errorf("stat webdav base path: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Upload(_ context.Context, objectKey string, reader io.Reader, _ int64, _ map[string]string) error {
|
||||
objectPath := p.resolvePath(objectKey)
|
||||
if err := p.client.MkdirAll(path.Dir(objectPath), 0o755); err != nil {
|
||||
return fmt.Errorf("create webdav directories: %w", err)
|
||||
}
|
||||
if err := p.client.WriteStream(objectPath, reader, 0o644); err != nil {
|
||||
return fmt.Errorf("write webdav object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) Download(_ context.Context, objectKey string) (io.ReadCloser, error) {
|
||||
reader, err := p.client.ReadStream(p.resolvePath(objectKey))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read webdav object: %w", err)
|
||||
}
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (p *Provider) Delete(_ context.Context, objectKey string) error {
|
||||
if err := p.client.Remove(p.resolvePath(objectKey)); err != nil {
|
||||
return fmt.Errorf("delete webdav object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) List(_ context.Context, prefix string) ([]storage.ObjectInfo, error) {
|
||||
entries, err := p.client.ReadDir(p.basePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list webdav directory: %w", err)
|
||||
}
|
||||
items := make([]storage.ObjectInfo, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimPrefix(path.Join(strings.TrimPrefix(p.basePath, "/"), entry.Name()), "/")
|
||||
if prefix != "" && !strings.HasPrefix(key, prefix) {
|
||||
continue
|
||||
}
|
||||
items = append(items, storage.ObjectInfo{Key: key, Size: entry.Size(), UpdatedAt: entry.ModTime().UTC()})
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func normalizeBasePath(value string) string {
|
||||
clean := path.Clean("/" + strings.TrimSpace(value))
|
||||
if clean == "." {
|
||||
return "/"
|
||||
}
|
||||
return clean
|
||||
}
|
||||
|
||||
func (p *Provider) resolvePath(objectKey string) string {
|
||||
cleanKey := path.Clean("/" + strings.TrimSpace(objectKey))
|
||||
return path.Clean(path.Join(p.basePath, cleanKey))
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
package webdav
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"backupx/server/internal/storage"
|
||||
)
|
||||
|
||||
type fakeFileInfo struct {
|
||||
name string
|
||||
size int64
|
||||
mod time.Time
|
||||
dir bool
|
||||
}
|
||||
|
||||
func (f fakeFileInfo) Name() string { return f.name }
|
||||
func (f fakeFileInfo) Size() int64 { return f.size }
|
||||
func (f fakeFileInfo) Mode() os.FileMode { return 0 }
|
||||
func (f fakeFileInfo) ModTime() time.Time { return f.mod }
|
||||
func (f fakeFileInfo) IsDir() bool { return f.dir }
|
||||
func (f fakeFileInfo) Sys() any { return nil }
|
||||
|
||||
type fakeClient struct{ data map[string]string }
|
||||
|
||||
func (c *fakeClient) ReadDir(_ string) ([]os.FileInfo, error) {
|
||||
return []os.FileInfo{fakeFileInfo{name: "backup.tar.gz", size: int64(len(c.data["/storage/backup.tar.gz"])), mod: time.Now().UTC()}}, nil
|
||||
}
|
||||
func (c *fakeClient) WriteStream(path string, stream io.Reader, _ os.FileMode) error {
|
||||
content, _ := io.ReadAll(stream)
|
||||
c.data[path] = string(content)
|
||||
return nil
|
||||
}
|
||||
func (c *fakeClient) ReadStream(path string) (io.ReadCloser, error) {
|
||||
return io.NopCloser(strings.NewReader(c.data[path])), nil
|
||||
}
|
||||
func (c *fakeClient) Remove(path string) error { delete(c.data, path); return nil }
|
||||
func (c *fakeClient) MkdirAll(_ string, _ os.FileMode) error { return nil }
|
||||
func (c *fakeClient) Stat(path string) (os.FileInfo, error) {
|
||||
return fakeFileInfo{name: path, dir: true}, nil
|
||||
}
|
||||
|
||||
func TestWebDAVProviderCRUD(t *testing.T) {
|
||||
factory := Factory{newClient: func(storage.WebDAVConfig) client { return &fakeClient{data: make(map[string]string)} }}
|
||||
providerAny, err := factory.New(context.Background(), map[string]any{"endpoint": "http://dav.example.com", "basePath": "/storage"})
|
||||
if err != nil {
|
||||
t.Fatalf("Factory.New returned error: %v", err)
|
||||
}
|
||||
provider := providerAny.(*Provider)
|
||||
if err := provider.TestConnection(context.Background()); err != nil {
|
||||
t.Fatalf("TestConnection returned error: %v", err)
|
||||
}
|
||||
if err := provider.Upload(context.Background(), "backup.tar.gz", strings.NewReader("payload"), 7, nil); err != nil {
|
||||
t.Fatalf("Upload returned error: %v", err)
|
||||
}
|
||||
reader, err := provider.Download(context.Background(), "backup.tar.gz")
|
||||
if err != nil {
|
||||
t.Fatalf("Download returned error: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
content, _ := io.ReadAll(reader)
|
||||
if string(content) != "payload" {
|
||||
t.Fatalf("unexpected content: %s", string(content))
|
||||
}
|
||||
items, err := provider.List(context.Background(), "storage")
|
||||
if err != nil {
|
||||
t.Fatalf("List returned error: %v", err)
|
||||
}
|
||||
if len(items) != 1 || items[0].Key != "storage/backup.tar.gz" {
|
||||
t.Fatalf("unexpected list result: %#v", items)
|
||||
}
|
||||
if err := provider.Delete(context.Background(), "backup.tar.gz"); err != nil {
|
||||
t.Fatalf("Delete returned error: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
package webdavprovider
|
||||
|
||||
import "backupx/server/internal/storage/webdav"
|
||||
|
||||
type Factory = webdav.Factory
|
||||
|
||||
func NewFactory() Factory {
|
||||
return webdav.NewFactory()
|
||||
}
|
||||
Reference in New Issue
Block a user