Add virtual bucket feature

This commit is contained in:
DullJZ
2025-08-22 22:16:26 +08:00
parent 37b6adb6de
commit b811414b93
7 changed files with 426 additions and 34 deletions

View File

@@ -1,4 +1,5 @@
# S3 Balance Service Configuration Example
# S3兼容负载均衡服务配置示例
# 服务器配置
server:
@@ -36,53 +37,70 @@ database:
# S3存储桶配置
buckets:
# 第一个存储桶 - AWS S3
# 真实存储桶 - AWS S3(用于存储数据)
- name: "my-bucket-1"
endpoint: "" # 留空使用默认AWS端点
region: "us-east-1"
access_key_id: "YOUR_AWS_ACCESS_KEY_ID"
secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY"
max_size: "10GB" # 最大容量限制
max_size: "100GB" # 最大容量限制
weight: 10 # 权重(用于加权负载均衡)
enabled: true
use_ssl: true
path_style: false # AWS S3使用虚拟主机风格
virtual: false # 这是真实存储桶
# 第二个存储桶 - MinIO
# 真实存储桶 - MinIO(用于存储数据)
- name: "my-bucket-2"
endpoint: "http://localhost:9000"
region: "us-east-1"
access_key_id: "minioadmin"
secret_access_key: "minioadmin"
max_size: "5GB"
max_size: "50GB"
weight: 5
enabled: true
use_ssl: false
path_style: true # MinIO通常使用路径风格
virtual: false # 这是真实存储桶
# 第三个存储桶 - 阿里云OSS兼容S3
# 虚拟存储桶 - user-bucket-1用户可见但实际存储在真实存储桶中
- name: "user-bucket-1"
endpoint: "" # 虚拟存储桶不需要端点
region: "us-east-1"
access_key_id: "" # 虚拟存储桶不需要密钥
secret_access_key: "" # 虚拟存储桶不需要密钥
max_size: "100GB" # 显示的最大容量
weight: 10 # 权重(用于在真实存储桶间负载均衡)
enabled: true
use_ssl: true
path_style: false
virtual: true # 这是虚拟存储桶
# 虚拟存储桶 - user-bucket-2用户可见但实际存储在真实存储桶中
- name: "user-bucket-2"
endpoint: ""
region: "us-east-1"
access_key_id: ""
secret_access_key: ""
max_size: "50GB"
weight: 10
enabled: true
use_ssl: true
path_style: false
virtual: true # 这是虚拟存储桶
# 真实存储桶 - 阿里云OSS备用存储桶
- name: "my-bucket-3"
endpoint: "https://oss-cn-hangzhou.aliyuncs.com"
region: "cn-hangzhou"
access_key_id: "YOUR_ALIYUN_ACCESS_KEY_ID"
secret_access_key: "YOUR_ALIYUN_SECRET_ACCESS_KEY"
max_size: "20GB"
max_size: "200GB"
weight: 15
enabled: true
use_ssl: true
path_style: false
# 备用存储桶(可以禁用)
- name: "backup-bucket"
endpoint: "http://backup-s3.example.com"
region: "us-west-2"
access_key_id: "BACKUP_ACCESS_KEY"
secret_access_key: "BACKUP_SECRET_KEY"
max_size: "100GB"
weight: 1
enabled: false # 当前禁用
use_ssl: false
path_style: true
virtual: false # 这是真实存储桶
# 负载均衡配置
balancer:

View File

@@ -223,7 +223,8 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
}
for _, b := range buckets {
if b.IsAvailable() {
// 只显示启用的存储桶(包括虚拟存储桶)
if b.IsAvailable() && b.Config.Enabled {
result.Buckets.Bucket = append(result.Buckets.Bucket, BucketInfo{
Name: b.Config.Name,
CreationDate: time.Now().Add(-24 * time.Hour), // 模拟创建时间
@@ -254,11 +255,18 @@ func (h *S3Handler) handleBucketOperations(w http.ResponseWriter, r *http.Reques
// handleListObjects 列出存储桶中的对象
func (h *S3Handler) handleListObjects(w http.ResponseWriter, r *http.Request, bucketName string) {
// 检查bucket是否存在
if _, ok := h.bucketManager.GetBucket(bucketName); !ok {
bucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 如果是虚拟存储桶,列出虚拟存储桶中的对象
if bucket.IsVirtual() {
h.handleListObjectsForVirtualBucket(w, r, bucketName)
return
}
// 解析查询参数
prefix := r.URL.Query().Get("prefix")
marker := r.URL.Query().Get("marker")
@@ -312,18 +320,121 @@ func (h *S3Handler) handleListObjects(w http.ResponseWriter, r *http.Request, bu
h.sendXMLResponse(w, http.StatusOK, result)
}
// handleListObjectsForVirtualBucket 列出虚拟存储桶中的对象
func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
// 解析查询参数
prefix := r.URL.Query().Get("prefix")
marker := r.URL.Query().Get("marker")
maxKeysStr := r.URL.Query().Get("max-keys")
// delimiter := r.URL.Query().Get("delimiter") // 暂时不支持delimiter
maxKeys := 1000
if maxKeysStr != "" {
if mk, err := strconv.Atoi(maxKeysStr); err == nil {
maxKeys = mk
}
}
// 从存储服务获取虚拟存储桶中的对象
objects, err := h.storage.GetVirtualBucketObjects(bucketName)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to list virtual bucket objects", bucketName)
return
}
result := ListBucketResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucketName,
Prefix: prefix,
Marker: marker,
MaxKeys: maxKeys,
IsTruncated: false,
Contents: make([]ObjectInfo, 0, len(objects)),
}
// 过滤对象并转换为S3格式
for _, obj := range objects {
// 前缀过滤
if prefix != "" && !strings.HasPrefix(obj.Key, prefix) {
continue
}
// Marker过滤
if marker != "" && obj.Key <= marker {
continue
}
result.Contents = append(result.Contents, ObjectInfo{
Key: obj.Key,
LastModified: obj.UpdatedAt,
ETag: fmt.Sprintf("\"%x\"", obj.ID),
Size: obj.Size,
})
}
// 如果超过了最大数量,设置截断标志
if len(result.Contents) > maxKeys {
result.Contents = result.Contents[:maxKeys]
result.IsTruncated = true
}
h.sendXMLResponse(w, http.StatusOK, result)
}
// handleHeadBucket 检查存储桶是否存在
func (h *S3Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
if _, ok := h.bucketManager.GetBucket(bucketName); !ok {
bucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
// 虚拟存储桶也应该返回成功状态
if bucket.IsVirtual() {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
}
// handleCreateBucket 创建存储桶(虚拟实现)
func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
// 检查是否已经存在同名存储桶
if bucket, exists := h.bucketManager.GetBucket(bucketName); exists {
// 如果是虚拟存储桶,检查是否已经有映射
if bucket.IsVirtual() {
if _, err := h.storage.GetVirtualBucketMapping(bucketName); err == nil {
h.sendS3Error(w, "BucketAlreadyExists", "The requested bucket name is not available", bucketName)
return
}
} else {
// 如果是真实存储桶,返回已存在错误
h.sendS3Error(w, "BucketAlreadyExists", "The requested bucket name is not available", bucketName)
return
}
}
// 检查是否为虚拟存储桶
if bucket, exists := h.bucketManager.GetBucket(bucketName); exists && bucket.IsVirtual() {
// 虚拟存储桶需要选择一个真实存储桶进行映射
realBuckets := h.bucketManager.GetRealBuckets()
if len(realBuckets) == 0 {
h.sendS3Error(w, "InternalError", "No real buckets available for virtual bucket mapping", bucketName)
return
}
// 简化:选择第一个可用的真实存储桶
// 实际应用中可能需要更复杂的策略
targetBucket := realBuckets[0]
// 创建虚拟存储桶到真实存储桶的映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName)
return
}
}
// 在负载均衡场景下不真正创建bucket只返回成功
// 实际的bucket应该在配置中预先定义
w.Header().Set("Location", "/" + bucketName)
@@ -332,7 +443,24 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b
// handleDeleteBucket 删除存储桶(虚拟实现)
func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, bucketName string) {
// 在负载均衡场景下不真正删除bucket
// 检查存储桶是否存在
bucket, exists := h.bucketManager.GetBucket(bucketName)
if !exists {
// 不存在的桶返回成功S3标准
w.WriteHeader(http.StatusNoContent)
return
}
// 虚拟存储桶需要删除映射关系
if bucket.IsVirtual() {
// 删除虚拟存储桶映射
if err := h.storage.DeleteVirtualBucketMapping(bucketName); err != nil {
h.sendS3Error(w, "InternalError", "Failed to delete virtual bucket mapping", bucketName)
return
}
}
// 在负载均衡场景下不真正删除真实bucket
w.WriteHeader(http.StatusNoContent)
}
@@ -356,8 +484,28 @@ func (h *S3Handler) handleObjectOperations(w http.ResponseWriter, r *http.Reques
// handleGetObject 获取对象默认使用预签名URL重定向
func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
var actualBucketName string
var err error
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
_, err := h.storage.GetVirtualBucketMapping(bucketName)
if err != nil {
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key)
return
}
}
// 查找对象所在的实际存储桶
actualBucketName, err := h.storage.FindObjectBucket(key)
actualBucketName, err = h.storage.FindObjectBucket(key)
if err != nil {
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key)
return
@@ -405,6 +553,36 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
// handleHeadObject 获取对象元数据
func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
mapping, err := h.storage.GetVirtualBucketMapping(bucketName)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
_ = mapping // 使用mapping变量避免编译错误
// 查找对象信息(在映射的真实存储桶中)
obj, err := h.storage.GetObjectInfo(key)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
h.setObjectHeaders(w, obj)
return
}
// 真实存储桶的直接处理
// 从存储中获取对象信息
obj, err := h.storage.GetObjectInfo(key)
if err != nil {
@@ -412,15 +590,31 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
return
}
h.setObjectHeaders(w, obj)
w.WriteHeader(http.StatusOK)
}
// setObjectHeaders 设置对象响应头
func (h *S3Handler) setObjectHeaders(w http.ResponseWriter, obj *storage.Object) {
w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10))
w.Header().Set("Last-Modified", obj.UpdatedAt.Format(http.TimeFormat))
w.Header().Set("ETag", fmt.Sprintf("\"%x\"", obj.ID))
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
if obj.ContentType != "" {
w.Header().Set("Content-Type", obj.ContentType)
} else {
w.Header().Set("Content-Type", "application/octet-stream")
}
}
// handlePutObject 上传对象默认使用预签名URL重定向
func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 获取内容长度
contentLength := r.ContentLength
if contentLength < 0 {
@@ -428,11 +622,50 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
return
}
// 选择目标存储桶
targetBucket, err := h.balancer.SelectBucket(key, contentLength)
if err != nil {
h.sendS3Error(w, "InsufficientStorage", "No bucket has enough space", key)
return
var targetBucket *bucket.BucketInfo
var err error
// 如果是虚拟存储桶,需要选择真实存储桶并创建映射
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射,如果不存在则创建
_, mappingErr := h.storage.GetVirtualBucketMapping(bucketName)
if mappingErr != nil {
// 映射不存在,选择真实存储桶并创建映射
realBuckets := h.bucketManager.GetRealBuckets()
if len(realBuckets) == 0 {
h.sendS3Error(w, "InternalError", "No real buckets available for virtual bucket mapping", key)
return
}
// 简化:选择第一个可用的真实存储桶
targetBucket = realBuckets[0]
// 创建虚拟存储桶映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", key)
return
}
} else {
// 映射已存在,从存储服务获取对应的真实存储桶
mapping, err := h.storage.GetVirtualBucketMapping(bucketName)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to get virtual bucket mapping", key)
return
}
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
}
}
} else {
// 真实存储桶的直接处理
// 选择目标存储桶
targetBucket, err = h.balancer.SelectBucket(key, contentLength)
if err != nil {
h.sendS3Error(w, "InsufficientStorage", "No bucket has enough space", key)
return
}
}
// 生成预签名上传URL
@@ -498,6 +731,18 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
// handleDeleteObject 删除对象
func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
_ = requestedBucket // 使用requestedBucket变量避免编译错误
var bucket *bucket.BucketInfo
var err error
// 查找对象所在的实际存储桶
actualBucketName, err := h.storage.FindObjectBucket(key)
if err != nil {
@@ -506,7 +751,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
return
}
bucket, ok := h.bucketManager.GetBucket(actualBucketName)
bucket, ok = h.bucketManager.GetBucket(actualBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Internal server error", bucketName)
return

View File

@@ -261,7 +261,7 @@ func (m *Manager) GetAllBuckets() []*BucketInfo {
return buckets
}
// GetAvailableBuckets 获取所有可用的存储桶
// GetAvailableBuckets 获取所有可用的存储桶(排除虚拟存储桶)
func (m *Manager) GetAvailableBuckets() []*BucketInfo {
m.mu.RLock()
defer m.mu.RUnlock()
@@ -269,7 +269,8 @@ func (m *Manager) GetAvailableBuckets() []*BucketInfo {
var available []*BucketInfo
for _, b := range m.buckets {
b.mu.RLock()
if b.Available && (b.Config.MaxSizeBytes == 0 || b.UsedSize < b.Config.MaxSizeBytes) {
// 虚拟存储桶不用于负载均衡,排除它们
if !b.Config.Virtual && b.Available && (b.Config.MaxSizeBytes == 0 || b.UsedSize < b.Config.MaxSizeBytes) {
available = append(available, b)
}
b.mu.RUnlock()
@@ -308,3 +309,38 @@ func (b *BucketInfo) UpdateUsedSize(delta int64) {
defer b.mu.Unlock()
b.UsedSize += delta
}
// IsVirtual 检查是否为虚拟存储桶
func (b *BucketInfo) IsVirtual() bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.Config.Virtual
}
// GetVirtualBuckets 获取所有虚拟存储桶
func (m *Manager) GetVirtualBuckets() []*BucketInfo {
m.mu.RLock()
defer m.mu.RUnlock()
var virtual []*BucketInfo
for _, b := range m.buckets {
if b.IsVirtual() {
virtual = append(virtual, b)
}
}
return virtual
}
// GetRealBuckets 获取所有真实存储桶
func (m *Manager) GetRealBuckets() []*BucketInfo {
m.mu.RLock()
defer m.mu.RUnlock()
var real []*BucketInfo
for _, b := range m.buckets {
if !b.IsVirtual() {
real = append(real, b)
}
}
return real
}

View File

@@ -40,6 +40,7 @@ type BucketConfig struct {
Enabled bool `yaml:"enabled"` // 是否启用
UseSSL bool `yaml:"use_ssl"` // 是否使用SSL
PathStyle bool `yaml:"path_style"` // 是否使用路径风格访问
Virtual bool `yaml:"virtual"` // 是否为虚拟存储桶仅S3 API中可见
}
// BalancerConfig 负载均衡配置

View File

@@ -43,6 +43,20 @@ func (BucketStats) TableName() string {
return "bucket_stats"
}
// VirtualBucketMapping 虚拟存储桶映射模型
type VirtualBucketMapping struct {
ID uint `gorm:"primaryKey" json:"id"`
VirtualBucketName string `gorm:"uniqueIndex;size:255;not null" json:"virtual_bucket_name"`
RealBucketName string `gorm:"index;size:255;not null" json:"real_bucket_name"`
CreatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP" json:"created_at"`
UpdatedAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP" json:"updated_at"`
}
// TableName 指定表名
func (VirtualBucketMapping) TableName() string {
return "virtual_bucket_mappings"
}
// UploadSession 上传会话模型(用于跟踪分片上传)
type UploadSession struct {
ID uint `gorm:"primaryKey" json:"id"`

View File

@@ -348,3 +348,81 @@ func (s *Service) GetAccessLogs(filter *AccessLogFilter) ([]*AccessLog, error) {
return logs, nil
}
// CreateVirtualBucketMapping 创建虚拟存储桶映射
func (s *Service) CreateVirtualBucketMapping(virtualBucketName, realBucketName string) error {
mapping := &VirtualBucketMapping{
VirtualBucketName: virtualBucketName,
RealBucketName: realBucketName,
}
if err := s.db.Create(mapping).Error; err != nil {
return fmt.Errorf("failed to create virtual bucket mapping: %w", err)
}
return nil
}
// GetVirtualBucketMapping 获取虚拟存储桶映射
func (s *Service) GetVirtualBucketMapping(virtualBucketName string) (*VirtualBucketMapping, error) {
var mapping VirtualBucketMapping
if err := s.db.Where("virtual_bucket_name = ?", virtualBucketName).First(&mapping).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("virtual bucket mapping not found: %s", virtualBucketName)
}
return nil, fmt.Errorf("failed to get virtual bucket mapping: %w", err)
}
return &mapping, nil
}
// GetVirtualBucketMappings 获取所有虚拟存储桶映射
func (s *Service) GetVirtualBucketMappings() ([]*VirtualBucketMapping, error) {
var mappings []*VirtualBucketMapping
if err := s.db.Find(&mappings).Error; err != nil {
return nil, fmt.Errorf("failed to get virtual bucket mappings: %w", err)
}
return mappings, nil
}
// UpdateVirtualBucketMapping 更新虚拟存储桶映射
func (s *Service) UpdateVirtualBucketMapping(virtualBucketName, realBucketName string) error {
updates := map[string]interface{}{
"real_bucket_name": realBucketName,
"updated_at": time.Now(),
}
if err := s.db.Model(&VirtualBucketMapping{}).
Where("virtual_bucket_name = ?", virtualBucketName).
Updates(updates).Error; err != nil {
return fmt.Errorf("failed to update virtual bucket mapping: %w", err)
}
return nil
}
// DeleteVirtualBucketMapping 删除虚拟存储桶映射
func (s *Service) DeleteVirtualBucketMapping(virtualBucketName string) error {
if err := s.db.Where("virtual_bucket_name = ?", virtualBucketName).
Delete(&VirtualBucketMapping{}).Error; err != nil {
return fmt.Errorf("failed to delete virtual bucket mapping: %w", err)
}
return nil
}
// GetVirtualBucketObjects 获取虚拟存储桶中的所有对象
func (s *Service) GetVirtualBucketObjects(virtualBucketName string) ([]*Object, error) {
// 首先找到虚拟存储桶对应的真实存储桶
mapping, err := s.GetVirtualBucketMapping(virtualBucketName)
if err != nil {
return nil, err
}
// 查找所有映射到该虚拟存储桶的对象
var objects []*Object
if err := s.db.Where("bucket_name = ?", mapping.RealBucketName).
Find(&objects).Error; err != nil {
return nil, fmt.Errorf("failed to get virtual bucket objects: %w", err)
}
return objects, nil
}

View File

@@ -24,7 +24,7 @@ ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE"
SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
# 测试配置
TEST_BUCKET = "test-bucket-1"
TEST_BUCKET = "test-virtual-1"
TEST_KEY_PREFIX = f"test-{int(time.time())}"
def create_s3_client():