feat: add Minio storage support

Signed-off-by: ysicing <i@ysicing.me>
This commit is contained in:
ysicing
2025-03-11 21:29:35 +08:00
parent 3def9df4b4
commit 495ad3ea5c
9 changed files with 244 additions and 3 deletions

71
storage/minio/client.go Normal file
View File

@@ -0,0 +1,71 @@
package minio
import (
"context"
"fmt"
"path"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/logger"
"github.com/krau/SaveAny-Bot/types"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
type Minio struct {
config config.MinioStorageConfig
client *minio.Client
}
func (m *Minio) Init(cfg config.StorageConfig) error {
minioConfig, ok := cfg.(*config.MinioStorageConfig)
if !ok {
return fmt.Errorf("failed to cast minio config")
}
if err := minioConfig.Validate(); err != nil {
return err
}
m.config = *minioConfig
client, err := minio.New(m.config.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(m.config.AccessKeyID, m.config.SecretAccessKey, ""),
Secure: m.config.UseSSL,
})
if err != nil {
return fmt.Errorf("failed to create minio client: %w", err)
}
exists, err := client.BucketExists(context.Background(), m.config.BucketName)
if err != nil {
return fmt.Errorf("failed to check bucket existence: %w", err)
}
if !exists {
return fmt.Errorf("bucket %s does not exist", m.config.BucketName)
}
m.client = client
return nil
}
func (m *Minio) Type() types.StorageType {
return types.StorageTypeMinio
}
func (m *Minio) Name() string {
return m.config.Name
}
func (m *Minio) JoinStoragePath(task types.Task) string {
return path.Join(m.config.BasePath, task.StoragePath)
}
func (m *Minio) Save(ctx context.Context, localFilePath, storagePath string) error {
logger.L.Infof("Saving file %s to %s", localFilePath, storagePath)
_, err := m.client.FPutObject(ctx, m.config.BucketName, storagePath, localFilePath, minio.PutObjectOptions{})
if err != nil {
return fmt.Errorf("failed to upload file to minio: %w", err)
}
return nil
}

92
storage/minio/stream.go Normal file
View File

@@ -0,0 +1,92 @@
package minio
import (
"context"
"fmt"
"io"
"github.com/krau/SaveAny-Bot/logger"
"github.com/minio/minio-go/v7"
)
type MinioWriter struct {
pipeWriter *io.PipeWriter
done chan error
path string
ctx context.Context
closed bool
}
func (w *MinioWriter) Write(p []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
default:
return w.pipeWriter.Write(p)
}
}
func (w *MinioWriter) Close() error {
if w.closed {
return nil
}
w.closed = true
if err := w.pipeWriter.Close(); err != nil {
return fmt.Errorf("failed to close pipe writer: %w", err)
}
select {
case err := <-w.done:
if err != nil {
return fmt.Errorf("upload failed: %w", err)
}
return nil
case <-w.ctx.Done():
return fmt.Errorf("upload cancelled: %w", w.ctx.Err())
}
}
func (m *Minio) NewUploadStream(ctx context.Context, storagePath string) (io.WriteCloser, error) {
logger.L.Infof("Creating upload stream for %s", storagePath)
uploadCtx, cancel := context.WithCancel(ctx)
pipeReader, pipeWriter := io.Pipe()
done := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
done <- fmt.Errorf("panic during upload: %v", r)
}
pipeReader.Close()
cancel()
}()
info, err := m.client.PutObject(
uploadCtx,
m.config.BucketName,
storagePath,
pipeReader,
-1,
minio.PutObjectOptions{},
)
if err != nil {
logger.L.Errorf("Failed to upload to %s: %v", storagePath, err)
done <- err
return
}
logger.L.Infof("uploaded %d bytes to %s", info.Size, storagePath)
done <- nil
}()
return &MinioWriter{
pipeWriter: pipeWriter,
done: done,
path: storagePath,
ctx: uploadCtx,
closed: false,
}, nil
}