mirror of
https://github.com/DullJZ/s3-balance.git
synced 2026-06-28 22:41:23 +08:00
修复CopyObject实现,添加RealObjectKey实现真正零拷贝
This commit is contained in:
@@ -237,7 +237,7 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b
|
||||
targetBucket := realBuckets[0]
|
||||
|
||||
// 创建虚拟存储桶到真实存储桶的映射
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name); err != nil {
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name, ""); err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -57,8 +57,8 @@ func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// 创建虚拟存储桶文件级映射
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
||||
// 创建虚拟存储桶文件级映射(对于Multipart,虚拟key和真实key相同)
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||
return
|
||||
}
|
||||
@@ -204,8 +204,8 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
// 创建虚拟存储桶文件级映射
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
||||
// 创建虚拟存储桶文件级映射(对于Multipart,虚拟key和真实key相同)
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -6,7 +6,9 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/DullJZ/s3-balance/internal/bucket"
|
||||
@@ -56,6 +58,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
|
||||
var err error
|
||||
var bucket1 *bucket.BucketInfo
|
||||
|
||||
var realKey string
|
||||
if requestedBucket.IsVirtual() {
|
||||
// 获取虚拟存储桶映射
|
||||
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
|
||||
@@ -64,21 +67,24 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
|
||||
return
|
||||
}
|
||||
|
||||
// 获取映射到的真实存储桶
|
||||
// 获取映射到的真实存储桶和真实key
|
||||
bucket1, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
||||
if !ok {
|
||||
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
||||
return
|
||||
}
|
||||
|
||||
realKey = mapping.RealObjectKey
|
||||
h.recordBackendOperation(bucket1, bucket.OperationTypeB)
|
||||
} else {
|
||||
realKey = key
|
||||
}
|
||||
|
||||
// 生成预签名下载URL
|
||||
// 生成预签名下载URL(使用真实key)
|
||||
downloadInfo, err := h.presigner.GenerateDownloadURL(
|
||||
context.Background(),
|
||||
bucket1,
|
||||
key,
|
||||
realKey,
|
||||
)
|
||||
if err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to generate download URL", key)
|
||||
@@ -182,6 +188,13 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
|
||||
|
||||
// handlePutObject 上传对象
|
||||
func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
||||
// 检查是否是复制操作(CopyObject)
|
||||
copySource := r.Header.Get("x-amz-copy-source")
|
||||
if copySource != "" {
|
||||
h.handleCopyObject(w, r, bucketName, key, copySource)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查请求的存储桶是否为虚拟存储桶
|
||||
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
|
||||
if !ok {
|
||||
@@ -211,8 +224,8 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
|
||||
return
|
||||
}
|
||||
|
||||
// 创建虚拟存储桶文件级映射
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
||||
// 创建虚拟存储桶文件级映射(对于普通PUT,虚拟key和真实key相同)
|
||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||
return
|
||||
}
|
||||
@@ -290,6 +303,108 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
|
||||
}
|
||||
}
|
||||
|
||||
// handleCopyObject 复制对象(只在数据库中创建新映射)
|
||||
func (h *S3Handler) handleCopyObject(w http.ResponseWriter, r *http.Request, destBucket, destKey, copySource string) {
|
||||
// 解析复制源 (格式: /source-bucket/source-key 或 source-bucket/source-key)
|
||||
copySource = strings.TrimPrefix(copySource, "/")
|
||||
parts := strings.SplitN(copySource, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
h.sendS3Error(w, "InvalidArgument", "Invalid copy source format", copySource)
|
||||
return
|
||||
}
|
||||
|
||||
sourceBucket := parts[0]
|
||||
sourceKey := parts[1]
|
||||
|
||||
// URL 解码源对象键
|
||||
sourceKey, err := url.QueryUnescape(sourceKey)
|
||||
if err != nil {
|
||||
h.sendS3Error(w, "InvalidArgument", "Invalid source key encoding", sourceKey)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查目标存储桶是否存在
|
||||
_, ok := h.bucketManager.GetBucket(destBucket)
|
||||
if !ok {
|
||||
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", destBucket)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查源存储桶是否存在
|
||||
_, ok = h.bucketManager.GetBucket(sourceBucket)
|
||||
if !ok {
|
||||
h.sendS3Error(w, "NoSuchBucket", "The source bucket does not exist", sourceBucket)
|
||||
return
|
||||
}
|
||||
|
||||
// 获取元数据指令
|
||||
metadataDirective := r.Header.Get("x-amz-metadata-directive")
|
||||
var metadata map[string]string
|
||||
|
||||
if metadataDirective == "REPLACE" {
|
||||
// 使用请求中的新元数据
|
||||
metadata = make(map[string]string)
|
||||
for k, v := range r.Header {
|
||||
if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
|
||||
metaKey := strings.TrimPrefix(strings.ToLower(k), "x-amz-meta-")
|
||||
metadata[metaKey] = v[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
// 如果是 COPY 或未指定,则复制源对象的元数据(在 storage.CopyObject 中处理)
|
||||
|
||||
// 获取源对象的映射信息
|
||||
sourceMapping, err := h.storage.GetVirtualBucketMapping(sourceBucket, sourceKey)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get source object mapping %s: %v", sourceKey, err)
|
||||
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", sourceKey)
|
||||
return
|
||||
}
|
||||
|
||||
// 复制操作只创建新的虚拟映射,指向相同的真实对象(零拷贝)
|
||||
destBucketInfo, destOk := h.bucketManager.GetBucket(destBucket)
|
||||
if !destOk || !destBucketInfo.IsVirtual() {
|
||||
h.sendS3Error(w, "NoSuchBucket", "The destination bucket does not exist", destBucket)
|
||||
return
|
||||
}
|
||||
|
||||
// 创建新的虚拟存储桶映射,指向相同的真实bucket和真实key
|
||||
if err := h.storage.CreateVirtualBucketMapping(destBucket, destKey, sourceMapping.RealBucketName, sourceMapping.RealObjectKey); err != nil {
|
||||
log.Printf("Failed to create virtual bucket mapping for copied object %s: %v", destKey, err)
|
||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", destKey)
|
||||
return
|
||||
}
|
||||
|
||||
// 获取源对象信息用于响应
|
||||
sourceObj, err := h.storage.GetObjectInfo(sourceMapping.RealObjectKey)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get source object info: %v", err)
|
||||
}
|
||||
|
||||
// 返回成功响应
|
||||
w.Header().Set("Content-Type", "application/xml")
|
||||
|
||||
// 生成 ETag
|
||||
etag := fmt.Sprintf("\"%x\"", time.Now().UnixNano())
|
||||
|
||||
// 构造 CopyObjectResult XML 响应
|
||||
lastModified := time.Now().UTC().Format(time.RFC3339)
|
||||
if sourceObj != nil {
|
||||
lastModified = sourceObj.UpdatedAt.UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
response := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
|
||||
<CopyObjectResult>
|
||||
<LastModified>%s</LastModified>
|
||||
<ETag>%s</ETag>
|
||||
</CopyObjectResult>`, lastModified, etag)
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(response))
|
||||
|
||||
log.Printf("Object copied successfully: %s -> %s", sourceKey, destKey)
|
||||
}
|
||||
|
||||
// handleDeleteObject 删除对象
|
||||
func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
||||
// 检查请求的存储桶是否为虚拟存储桶
|
||||
@@ -301,6 +416,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
|
||||
|
||||
var targetBucket *bucket.BucketInfo
|
||||
var err error
|
||||
var realKey string
|
||||
|
||||
if requestedBucket.IsVirtual() {
|
||||
// 获取虚拟存储桶文件映射
|
||||
@@ -311,47 +427,59 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
|
||||
return
|
||||
}
|
||||
|
||||
// 获取映射到的真实存储桶
|
||||
// 获取映射到的真实存储桶和真实key
|
||||
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
||||
if !ok {
|
||||
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
||||
return
|
||||
}
|
||||
|
||||
realKey = mapping.RealObjectKey
|
||||
} else {
|
||||
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接DELETE操作
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
|
||||
|
||||
// 生成预签名删除URL
|
||||
deleteInfo, err := h.presigner.GenerateDeleteURL(
|
||||
context.Background(),
|
||||
targetBucket,
|
||||
key,
|
||||
)
|
||||
if err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to generate delete URL", key)
|
||||
return
|
||||
// 先删除虚拟存储桶映射
|
||||
if err := h.storage.DeleteVirtualBucketObjectMapping(bucketName, key); err != nil {
|
||||
log.Printf("Failed to delete virtual bucket mapping for %s/%s: %v", bucketName, key, err)
|
||||
}
|
||||
|
||||
// 执行删除
|
||||
req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil)
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
// 检查是否还有其他映射指向同一个真实对象
|
||||
count, err := h.storage.CountMappingsToRealObject(targetBucket.Config.Name, realKey)
|
||||
if err != nil {
|
||||
h.sendS3Error(w, "InternalError", "Failed to delete object", key)
|
||||
return
|
||||
log.Printf("Failed to count mappings for real object %s: %v", realKey, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 从数据库中删除对象记录
|
||||
h.storage.DeleteObject(key)
|
||||
// 只有当没有其他映射引用时,才删除真实S3对象
|
||||
if count == 0 {
|
||||
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
|
||||
|
||||
// 如果是虚拟存储桶,还需要删除文件级别映射
|
||||
if requestedBucket.IsVirtual() {
|
||||
h.storage.DeleteVirtualBucketFileMapping(bucketName, key)
|
||||
// 生成预签名删除URL(使用真实key)
|
||||
deleteInfo, err := h.presigner.GenerateDeleteURL(
|
||||
context.Background(),
|
||||
targetBucket,
|
||||
realKey,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("Failed to generate delete URL for %s: %v", realKey, err)
|
||||
} else {
|
||||
// 执行删除真实S3对象
|
||||
req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil)
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("Failed to delete real S3 object %s: %v", realKey, err)
|
||||
} else {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// 从数据库中删除对象记录
|
||||
if err := h.storage.DeleteObject(realKey); err != nil {
|
||||
log.Printf("Failed to delete object record for %s: %v", realKey, err)
|
||||
}
|
||||
}
|
||||
|
||||
// S3规范要求删除操作总是返回204
|
||||
|
||||
Reference in New Issue
Block a user