Fix multipart upload

This commit is contained in:
DullJZ
2025-09-11 14:29:24 +08:00
parent 34629444db
commit 7311ffeeae

View File

@@ -59,15 +59,22 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
// Bucket operations
router.HandleFunc("/{bucket}", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
// Object operations
router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE")
// Multipart upload operations
// Multipart upload operations - must be registered before generic object operations
// Upload part handler - must handle PUT with partNumber and uploadId
router.HandleFunc("/{bucket}/{key:.*}", h.handleUploadPart).Methods("PUT").Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId}")
// Initiate multipart upload
router.HandleFunc("/{bucket}/{key:.*}", h.handleMultipartUpload).Methods("POST").Queries("uploads", "")
// List multipart uploads
router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartUploads).Methods("GET").Queries("uploads", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "")
// List parts
router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "{uploadId}")
// Complete multipart upload
router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "{uploadId}")
// Abort multipart upload
router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "{uploadId}")
// Object operations - must be registered after multipart operations to avoid conflicts
router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE")
// 添加认证中间件
router.Use(h.s3AuthMiddleware)
@@ -726,15 +733,158 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
w.WriteHeader(http.StatusNoContent)
}
// handleUploadPart 处理分片上传的单个分片
func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucketName := vars["bucket"]
key := vars["key"]
partNumber := vars["partNumber"]
uploadID := vars["uploadId"]
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 获取内容长度
contentLength := r.ContentLength
if contentLength < 0 {
h.sendS3Error(w, "MissingContentLength", "Content-Length header is required", key)
return
}
var targetBucket *bucket.BucketInfo
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
if err != nil {
// 对于分片上传,映射应该在初始化分片上传时已经创建
// 如果没有找到映射,使用负载均衡器选择一个新的存储桶
targetBucket, err = h.balancer.SelectBucket(key, contentLength)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to select bucket for multipart upload", key)
return
}
// 创建虚拟存储桶文件级映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
return
}
} else {
// 映射已存在,获取对应的真实存储桶
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
}
}
} else {
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 转换partNumber为整数
partNum, err := strconv.Atoi(partNumber)
if err != nil {
h.sendS3Error(w, "InvalidArgument", "Invalid part number", key)
return
}
// 生成预签名上传分片URL
presignClient := s3.NewPresignClient(targetBucket.Client)
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),
UploadId: aws.String(uploadID),
PartNumber: aws.Int32(int32(partNum)),
}
presignRequest, err := presignClient.PresignUploadPart(context.Background(), uploadPartInput, func(opts *s3.PresignOptions) {
opts.Expires = 15 * time.Minute
})
if err != nil {
log.Printf("Failed to generate upload part URL for key %s, part %s: %v", key, partNumber, err)
h.sendS3Error(w, "InternalError", "Failed to generate upload part URL", key)
return
}
// 使用反向代理上传分片到真实预签名URL
req, err := http.NewRequest("PUT", presignRequest.URL, r.Body)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to create upload part request", key)
return
}
// 设置必要的头
req.ContentLength = contentLength
if ct := r.Header.Get("Content-Type"); ct != "" {
req.Header.Set("Content-Type", ct)
}
// 执行上传
client := &http.Client{Timeout: 30 * time.Minute}
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to upload part %s for key %s: %v", partNumber, key, err)
h.sendS3Error(w, "InternalError", "Failed to upload part", key)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// 从响应中获取ETag并返回给客户端
etag := resp.Header.Get("ETag")
if etag != "" {
w.Header().Set("ETag", etag)
}
w.WriteHeader(http.StatusOK)
} else {
// 读取错误响应体以获取详细信息
body, _ := io.ReadAll(resp.Body)
log.Printf("Upload part failed with status %d: %s", resp.StatusCode, string(body))
h.sendS3Error(w, "InternalError", fmt.Sprintf("Upload part failed with status %d", resp.StatusCode), key)
}
}
// handleMultipartUpload 初始化分片上传
func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucketName := vars["bucket"]
key := vars["key"]
// 选择目标存储桶
targetBucket, err := h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key)
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
var targetBucket *bucket.BucketInfo
var err error
// 如果是虚拟存储桶,需要选择真实存储桶并创建映射
if requestedBucket.IsVirtual() {
// 选择目标存储桶
targetBucket, err = h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key)
return
}
// 创建虚拟存储桶文件级映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
return
}
} else {
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
@@ -751,7 +901,7 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request
result := InitiateMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: targetBucket.Config.Name,
Bucket: bucketName, // 返回虚拟存储桶名称给客户端
Key: key,
UploadID: *createResp.UploadId,
}
@@ -819,13 +969,36 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
key := vars["key"]
uploadID := r.URL.Query().Get("uploadId")
// 查找对象所在的实际存储桶简化实现使用配置的bucket
bucket, ok := h.bucketManager.GetBucket(bucketName)
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
var targetBucket *bucket.BucketInfo
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
if err != nil {
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key)
return
}
// 获取映射到的真实存储桶
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
}
} else {
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 解析请求体以获取分片列表
var completeReq CompleteMultipartUpload
body, _ := io.ReadAll(r.Body)
@@ -841,8 +1014,8 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
})
}
completeResp, err := bucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket.Config.Name),
completeResp, err := targetBucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),
UploadId: aws.String(uploadID),
MultipartUpload: &types.CompletedMultipartUpload{
@@ -856,14 +1029,33 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
result := CompleteMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Location: "/" + bucket.Config.Name + "/" + key,
Bucket: bucket.Config.Name,
Location: "/" + bucketName + "/" + key, // 返回虚拟存储桶路径
Bucket: bucketName, // 返回虚拟存储桶名称
Key: key,
ETag: *completeResp.ETag,
}
// 记录对象元数据(简化:假设总大小
h.storage.RecordObject(key, bucket.Config.Name, 0, nil)
// 获取完成上传后的对象大小
var objectSize int64
headResp, err := targetBucket.Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),
})
if err != nil {
// 如果获取大小失败,记录警告但不影响响应
log.Printf("Warning: Failed to get object size after multipart upload for key %s: %v", key, err)
objectSize = 0
} else if headResp.ContentLength != nil {
objectSize = *headResp.ContentLength
}
// 记录对象元数据(使用实际大小)
h.storage.RecordObject(key, targetBucket.Config.Name, objectSize, nil)
// 更新存储桶使用量
if objectSize > 0 {
targetBucket.UpdateUsedSize(objectSize)
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -875,23 +1067,52 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re
key := vars["key"]
uploadID := r.URL.Query().Get("uploadId")
// 查找对象所在的实际存储桶简化实现使用配置的bucket
bucket, ok := h.bucketManager.GetBucket(bucketName)
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
var targetBucket *bucket.BucketInfo
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
if err != nil {
// 如果映射不存在,可能是上传已经被中止了,返回成功
w.WriteHeader(http.StatusNoContent)
return
}
// 获取映射到的真实存储桶
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
return
}
} else {
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作
w.WriteHeader(http.StatusNoContent)
return
}
// 中止分片上传
ctx := context.Background()
_, err := bucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket.Config.Name),
_, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(targetBucket.Config.Name),
Key: aws.String(key),
UploadId: aws.String(uploadID),
})
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to abort multipart upload", key)
return
// 如果中止失败,可能是因为上传已经完成或中止,不需要报错
log.Printf("Failed to abort multipart upload for key %s: %v", key, err)
}
// 如果是虚拟存储桶,还需要删除文件级别映射
if requestedBucket.IsVirtual() {
h.storage.DeleteVirtualBucketFileMapping(bucketName, key)
}
w.WriteHeader(http.StatusNoContent)