diff --git a/internal/api/s3_handler.go b/internal/api/s3_handler.go index 6eee3f6..54d8051 100644 --- a/internal/api/s3_handler.go +++ b/internal/api/s3_handler.go @@ -59,15 +59,22 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) { // Bucket operations router.HandleFunc("/{bucket}", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE") - // Object operations - router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE") - - // Multipart upload operations + // Multipart upload operations - must be registered before generic object operations + // Upload part handler - must handle PUT with partNumber and uploadId + router.HandleFunc("/{bucket}/{key:.*}", h.handleUploadPart).Methods("PUT").Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId}") + // Initiate multipart upload router.HandleFunc("/{bucket}/{key:.*}", h.handleMultipartUpload).Methods("POST").Queries("uploads", "") + // List multipart uploads router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartUploads).Methods("GET").Queries("uploads", "") - router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "") - router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "") - router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "") + // List parts + router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "{uploadId}") + // Complete multipart upload + router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "{uploadId}") + // Abort multipart upload + router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "{uploadId}") + + // Object operations - must be registered after multipart operations to avoid conflicts + router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE") // 添加认证中间件 router.Use(h.s3AuthMiddleware) @@ -726,15 +733,158 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b w.WriteHeader(http.StatusNoContent) } +// handleUploadPart 处理分片上传的单个分片 +func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + bucketName := vars["bucket"] + key := vars["key"] + partNumber := vars["partNumber"] + uploadID := vars["uploadId"] + + // 检查请求的存储桶是否为虚拟存储桶 + requestedBucket, ok := h.bucketManager.GetBucket(bucketName) + if !ok { + h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) + return + } + + // 获取内容长度 + contentLength := r.ContentLength + if contentLength < 0 { + h.sendS3Error(w, "MissingContentLength", "Content-Length header is required", key) + return + } + + var targetBucket *bucket.BucketInfo + + // 如果是虚拟存储桶,需要通过映射查找真实存储桶 + if requestedBucket.IsVirtual() { + // 获取虚拟存储桶映射 + mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key) + if err != nil { + // 对于分片上传,映射应该在初始化分片上传时已经创建 + // 如果没有找到映射,使用负载均衡器选择一个新的存储桶 + targetBucket, err = h.balancer.SelectBucket(key, contentLength) + if err != nil { + h.sendS3Error(w, "InternalError", "Failed to select bucket for multipart upload", key) + return + } + + // 创建虚拟存储桶文件级映射 + if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil { + h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key) + return + } + } else { + // 映射已存在,获取对应的真实存储桶 + targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) + if !ok { + h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) + return + } + } + } else { + // 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作 + h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) + return + } + + // 转换partNumber为整数 + partNum, err := strconv.Atoi(partNumber) + if err != nil { + h.sendS3Error(w, "InvalidArgument", "Invalid part number", key) + return + } + + // 生成预签名上传分片URL + presignClient := s3.NewPresignClient(targetBucket.Client) + uploadPartInput := &s3.UploadPartInput{ + Bucket: aws.String(targetBucket.Config.Name), + Key: aws.String(key), + UploadId: aws.String(uploadID), + PartNumber: aws.Int32(int32(partNum)), + } + + presignRequest, err := presignClient.PresignUploadPart(context.Background(), uploadPartInput, func(opts *s3.PresignOptions) { + opts.Expires = 15 * time.Minute + }) + if err != nil { + log.Printf("Failed to generate upload part URL for key %s, part %s: %v", key, partNumber, err) + h.sendS3Error(w, "InternalError", "Failed to generate upload part URL", key) + return + } + + // 使用反向代理上传分片到真实预签名URL + req, err := http.NewRequest("PUT", presignRequest.URL, r.Body) + if err != nil { + h.sendS3Error(w, "InternalError", "Failed to create upload part request", key) + return + } + + // 设置必要的头 + req.ContentLength = contentLength + if ct := r.Header.Get("Content-Type"); ct != "" { + req.Header.Set("Content-Type", ct) + } + + // 执行上传 + client := &http.Client{Timeout: 30 * time.Minute} + resp, err := client.Do(req) + if err != nil { + log.Printf("Failed to upload part %s for key %s: %v", partNumber, key, err) + h.sendS3Error(w, "InternalError", "Failed to upload part", key) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + // 从响应中获取ETag并返回给客户端 + etag := resp.Header.Get("ETag") + if etag != "" { + w.Header().Set("ETag", etag) + } + w.WriteHeader(http.StatusOK) + } else { + // 读取错误响应体以获取详细信息 + body, _ := io.ReadAll(resp.Body) + log.Printf("Upload part failed with status %d: %s", resp.StatusCode, string(body)) + h.sendS3Error(w, "InternalError", fmt.Sprintf("Upload part failed with status %d", resp.StatusCode), key) + } +} + // handleMultipartUpload 初始化分片上传 func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) + bucketName := vars["bucket"] key := vars["key"] - // 选择目标存储桶 - targetBucket, err := h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间 - if err != nil { - h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key) + // 检查请求的存储桶是否为虚拟存储桶 + requestedBucket, ok := h.bucketManager.GetBucket(bucketName) + if !ok { + h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) + return + } + + var targetBucket *bucket.BucketInfo + var err error + + // 如果是虚拟存储桶,需要选择真实存储桶并创建映射 + if requestedBucket.IsVirtual() { + // 选择目标存储桶 + targetBucket, err = h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间 + if err != nil { + h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key) + return + } + + // 创建虚拟存储桶文件级映射 + if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil { + h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key) + return + } + } else { + // 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作 + h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } @@ -751,7 +901,7 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request result := InitiateMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", - Bucket: targetBucket.Config.Name, + Bucket: bucketName, // 返回虚拟存储桶名称给客户端 Key: key, UploadID: *createResp.UploadId, } @@ -819,13 +969,36 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http key := vars["key"] uploadID := r.URL.Query().Get("uploadId") - // 查找对象所在的实际存储桶(简化实现,使用配置的bucket) - bucket, ok := h.bucketManager.GetBucket(bucketName) + // 检查请求的存储桶是否为虚拟存储桶 + requestedBucket, ok := h.bucketManager.GetBucket(bucketName) if !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } + var targetBucket *bucket.BucketInfo + + // 如果是虚拟存储桶,需要通过映射查找真实存储桶 + if requestedBucket.IsVirtual() { + // 获取虚拟存储桶映射 + mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key) + if err != nil { + h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key) + return + } + + // 获取映射到的真实存储桶 + targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) + if !ok { + h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) + return + } + } else { + // 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作 + h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) + return + } + // 解析请求体以获取分片列表 var completeReq CompleteMultipartUpload body, _ := io.ReadAll(r.Body) @@ -841,8 +1014,8 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http }) } - completeResp, err := bucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ - Bucket: aws.String(bucket.Config.Name), + completeResp, err := targetBucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), UploadId: aws.String(uploadID), MultipartUpload: &types.CompletedMultipartUpload{ @@ -856,14 +1029,33 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http result := CompleteMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", - Location: "/" + bucket.Config.Name + "/" + key, - Bucket: bucket.Config.Name, + Location: "/" + bucketName + "/" + key, // 返回虚拟存储桶路径 + Bucket: bucketName, // 返回虚拟存储桶名称 Key: key, ETag: *completeResp.ETag, } - // 记录对象元数据(简化:假设总大小) - h.storage.RecordObject(key, bucket.Config.Name, 0, nil) + // 获取完成上传后的对象大小 + var objectSize int64 + headResp, err := targetBucket.Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(targetBucket.Config.Name), + Key: aws.String(key), + }) + if err != nil { + // 如果获取大小失败,记录警告但不影响响应 + log.Printf("Warning: Failed to get object size after multipart upload for key %s: %v", key, err) + objectSize = 0 + } else if headResp.ContentLength != nil { + objectSize = *headResp.ContentLength + } + + // 记录对象元数据(使用实际大小) + h.storage.RecordObject(key, targetBucket.Config.Name, objectSize, nil) + + // 更新存储桶使用量 + if objectSize > 0 { + targetBucket.UpdateUsedSize(objectSize) + } h.sendXMLResponse(w, http.StatusOK, result) } @@ -875,23 +1067,52 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re key := vars["key"] uploadID := r.URL.Query().Get("uploadId") - // 查找对象所在的实际存储桶(简化实现,使用配置的bucket) - bucket, ok := h.bucketManager.GetBucket(bucketName) + // 检查请求的存储桶是否为虚拟存储桶 + requestedBucket, ok := h.bucketManager.GetBucket(bucketName) if !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } + var targetBucket *bucket.BucketInfo + + // 如果是虚拟存储桶,需要通过映射查找真实存储桶 + if requestedBucket.IsVirtual() { + // 获取虚拟存储桶映射 + mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key) + if err != nil { + // 如果映射不存在,可能是上传已经被中止了,返回成功 + w.WriteHeader(http.StatusNoContent) + return + } + + // 获取映射到的真实存储桶 + targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) + if !ok { + h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key) + return + } + } else { + // 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接操作 + w.WriteHeader(http.StatusNoContent) + return + } + // 中止分片上传 ctx := context.Background() - _, err := bucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ - Bucket: aws.String(bucket.Config.Name), + _, err := targetBucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(targetBucket.Config.Name), Key: aws.String(key), UploadId: aws.String(uploadID), }) if err != nil { - h.sendS3Error(w, "InternalError", "Failed to abort multipart upload", key) - return + // 如果中止失败,可能是因为上传已经完成或中止,不需要报错 + log.Printf("Failed to abort multipart upload for key %s: %v", key, err) + } + + // 如果是虚拟存储桶,还需要删除文件级别映射 + if requestedBucket.IsVirtual() { + h.storage.DeleteVirtualBucketFileMapping(bucketName, key) } w.WriteHeader(http.StatusNoContent)