diff --git a/internal/api/s3_handler.go b/internal/api/s3_handler.go index 3f6b7c6..2832f8c 100644 --- a/internal/api/s3_handler.go +++ b/internal/api/s3_handler.go @@ -54,20 +54,20 @@ func NewS3Handler( func (h *S3Handler) RegisterS3Routes(router *mux.Router) { // Service operations router.HandleFunc("/", h.handleListBuckets).Methods("GET") - + // Bucket operations router.HandleFunc("/{bucket}", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE") - + // Object operations router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE") - + // Multipart upload operations router.HandleFunc("/{bucket}/{key:.*}", h.handleMultipartUpload).Methods("POST").Queries("uploads", "") router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartUploads).Methods("GET").Queries("uploads", "") router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "") router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "") router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "") - + // 添加认证中间件 router.Use(h.s3AuthMiddleware) } @@ -120,54 +120,54 @@ type CommonPrefix struct { } type InitiateMultipartUploadResult struct { - XMLName xml.Name `xml:"InitiateMultipartUploadResult"` - Xmlns string `xml:"xmlns,attr"` - Bucket string `xml:"Bucket"` - Key string `xml:"Key"` - UploadID string `xml:"UploadId"` + XMLName xml.Name `xml:"InitiateMultipartUploadResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` } type ListMultipartUploadsResult struct { - XMLName xml.Name `xml:"ListMultipartUploadsResult"` - Xmlns string `xml:"xmlns,attr"` - Bucket string `xml:"Bucket"` - KeyMarker string `xml:"KeyMarker"` - UploadIdMarker string `xml:"UploadIdMarker"` - NextKeyMarker string `xml:"NextKeyMarker"` - NextUploadIdMarker string `xml:"NextUploadIdMarker"` - MaxUploads int `xml:"MaxUploads"` - IsTruncated bool `xml:"IsTruncated"` - Uploads []Upload `xml:"Upload"` - CommonPrefixes []CommonPrefix `xml:"CommonPrefixes,omitempty"` + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + KeyMarker string `xml:"KeyMarker"` + UploadIdMarker string `xml:"UploadIdMarker"` + NextKeyMarker string `xml:"NextKeyMarker"` + NextUploadIdMarker string `xml:"NextUploadIdMarker"` + MaxUploads int `xml:"MaxUploads"` + IsTruncated bool `xml:"IsTruncated"` + Uploads []Upload `xml:"Upload"` + CommonPrefixes []CommonPrefix `xml:"CommonPrefixes,omitempty"` } type Upload struct { - Key string `xml:"Key"` - UploadID string `xml:"UploadId"` - Initiator Owner `xml:"Initiator"` - Owner Owner `xml:"Owner"` - StorageClass string `xml:"StorageClass"` - Initiated time.Time `xml:"Initiated"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + Initiator Owner `xml:"Initiator"` + Owner Owner `xml:"Owner"` + StorageClass string `xml:"StorageClass"` + Initiated time.Time `xml:"Initiated"` } type ListPartsResult struct { - XMLName xml.Name `xml:"ListPartsResult"` - Xmlns string `xml:"xmlns,attr"` - Bucket string `xml:"Bucket"` - Key string `xml:"Key"` - UploadID string `xml:"UploadId"` - PartNumberMarker int `xml:"PartNumberMarker"` - NextPartNumberMarker int `xml:"NextPartNumberMarker"` - MaxParts int `xml:"MaxParts"` - IsTruncated bool `xml:"IsTruncated"` - Parts []Part `xml:"Part"` + XMLName xml.Name `xml:"ListPartsResult"` + Xmlns string `xml:"xmlns,attr"` + Bucket string `xml:"Bucket"` + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + PartNumberMarker int `xml:"PartNumberMarker"` + NextPartNumberMarker int `xml:"NextPartNumberMarker"` + MaxParts int `xml:"MaxParts"` + IsTruncated bool `xml:"IsTruncated"` + Parts []Part `xml:"Part"` } type Part struct { PartNumber int `xml:"PartNumber"` LastModified time.Time `xml:"LastModified"` - ETag string `xml:"ETag"` - Size int64 `xml:"Size"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` } type CompleteMultipartUpload struct { @@ -202,7 +202,7 @@ func (h *S3Handler) s3AuthMiddleware(next http.Handler) http.Handler { // 允许匿名访问(用于测试) // 在生产环境中应该要求认证 } - + next.ServeHTTP(w, r) }) } @@ -210,7 +210,7 @@ func (h *S3Handler) s3AuthMiddleware(next http.Handler) http.Handler { // handleListBuckets 处理列出所有存储桶请求 func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) { buckets := h.bucketManager.GetAllBuckets() - + result := ListBucketsResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Owner: Owner{ @@ -221,7 +221,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) { Bucket: make([]BucketInfo, 0, len(buckets)), }, } - + for _, b := range buckets { // 只显示启用的虚拟存储桶,对客户端隐藏底层真实存储桶 if b.IsAvailable() && b.Config.Enabled && b.Config.Virtual { @@ -231,7 +231,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) { }) } } - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -239,7 +239,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) { func (h *S3Handler) handleBucketOperations(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) bucketName := vars["bucket"] - + switch r.Method { case "GET": h.handleListObjects(w, r, bucketName) @@ -260,13 +260,13 @@ func (h *S3Handler) handleListObjects(w http.ResponseWriter, r *http.Request, bu h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 如果是虚拟存储桶,列出虚拟存储桶中的对象 if bucket.IsVirtual() { h.handleListObjectsForVirtualBucket(w, r, bucketName) return } - + // 如果不是虚拟存储桶,拒绝客户端访问真实存储桶 h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) } @@ -278,21 +278,21 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r * marker := r.URL.Query().Get("marker") maxKeysStr := r.URL.Query().Get("max-keys") // delimiter := r.URL.Query().Get("delimiter") // 暂时不支持delimiter - + maxKeys := 1000 if maxKeysStr != "" { if mk, err := strconv.Atoi(maxKeysStr); err == nil { maxKeys = mk } } - + // 从存储服务获取虚拟存储桶中的对象 objects, err := h.storage.GetVirtualBucketObjects(bucketName) if err != nil { h.sendS3Error(w, "InternalError", "Failed to list virtual bucket objects", bucketName) return } - + result := ListBucketResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Name: bucketName, @@ -302,19 +302,19 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r * IsTruncated: false, Contents: make([]ObjectInfo, 0, len(objects)), } - + // 过滤对象并转换为S3格式 for _, obj := range objects { // 前缀过滤 if prefix != "" && !strings.HasPrefix(obj.Key, prefix) { continue } - + // Marker过滤 if marker != "" && obj.Key <= marker { continue } - + result.Contents = append(result.Contents, ObjectInfo{ Key: obj.Key, LastModified: obj.UpdatedAt, @@ -322,13 +322,13 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r * Size: obj.Size, }) } - + // 如果超过了最大数量,设置截断标志 if len(result.Contents) > maxKeys { result.Contents = result.Contents[:maxKeys] result.IsTruncated = true } - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -339,13 +339,13 @@ func (h *S3Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request, buc w.WriteHeader(http.StatusNotFound) return } - + // 虚拟存储桶也应该返回成功状态 if bucket.IsVirtual() { w.WriteHeader(http.StatusOK) return } - + // 如果不是虚拟存储桶,拒绝客户端访问真实存储桶 w.WriteHeader(http.StatusNotFound) } @@ -365,7 +365,7 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b return } } - + // 检查是否为虚拟存储桶 if requestedBucket, exists := h.bucketManager.GetBucket(bucketName); exists && requestedBucket.IsVirtual() { // 虚拟存储桶需要选择一个真实存储桶进行映射 @@ -374,21 +374,21 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b h.sendS3Error(w, "InternalError", "No real buckets available for virtual bucket mapping", bucketName) return } - + // 简化:选择第一个可用的真实存储桶 // 实际应用中可能需要更复杂的策略 targetBucket := realBuckets[0] - + // 创建虚拟存储桶到真实存储桶的映射 if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name); err != nil { h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName) return } } - + // 在负载均衡场景下,不真正创建bucket,只返回成功 // 实际的bucket应该在配置中预先定义 - w.Header().Set("Location", "/" + bucketName) + w.Header().Set("Location", "/"+bucketName) w.WriteHeader(http.StatusOK) } @@ -401,7 +401,7 @@ func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, b w.WriteHeader(http.StatusNoContent) return } - + // 虚拟存储桶需要删除映射关系 if bucket.IsVirtual() { // 删除虚拟存储桶映射 @@ -410,7 +410,7 @@ func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, b return } } - + // 在负载均衡场景下,不真正删除真实bucket w.WriteHeader(http.StatusNoContent) } @@ -420,7 +420,7 @@ func (h *S3Handler) handleObjectOperations(w http.ResponseWriter, r *http.Reques vars := mux.Vars(r) bucketName := vars["bucket"] key := vars["key"] - + switch r.Method { case "GET": h.handleGetObject(w, r, bucketName, key) @@ -441,7 +441,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 如果是虚拟存储桶,需要通过映射查找真实存储桶 var err error var bucket1 *bucket.BucketInfo @@ -453,7 +453,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key) return } - + // 获取映射到的真实存储桶 bucket1, ok = h.bucketManager.GetBucket(mapping.RealBucketName) if !ok { @@ -461,7 +461,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck return } } - + // 生成预签名下载URL downloadInfo, err := h.presigner.GenerateDownloadURL( context.Background(), @@ -472,7 +472,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "InternalError", "Failed to generate download URL", key) return } - + // 默认使用预签名重定向模式,只有明确指定时才使用代理模式 if r.URL.Query().Get("proxy") == "true" { // 代理模式:服务器下载内容并返回给客户端 @@ -482,12 +482,12 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck return } defer resp.Body.Close() - + // 复制响应头 for k, v := range resp.Header { w.Header()[k] = v } - + // 复制响应体 io.Copy(w, resp.Body) } else { @@ -504,7 +504,7 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc w.WriteHeader(http.StatusNotFound) return } - + // 如果是虚拟存储桶,需要通过映射查找真实存储桶 if requestedBucket.IsVirtual() { // 获取虚拟存储桶映射 @@ -513,20 +513,20 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc w.WriteHeader(http.StatusNotFound) return } - + _ = mapping // 使用mapping变量,避免编译错误 - + // 查找对象信息(在映射的真实存储桶中) obj, err := h.storage.GetObjectInfo(key) if err != nil { w.WriteHeader(http.StatusNotFound) return } - + h.setObjectHeaders(w, obj) return } - + // 真实存储桶的直接处理 // 从存储中获取对象信息 obj, err := h.storage.GetObjectInfo(key) @@ -534,7 +534,7 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc w.WriteHeader(http.StatusNotFound) return } - + h.setObjectHeaders(w, obj) w.WriteHeader(http.StatusOK) } @@ -551,7 +551,7 @@ func (h *S3Handler) setObjectHeaders(w http.ResponseWriter, obj *storage.Object) } } -// handlePutObject 上传对象(默认使用预签名URL重定向) +// handlePutObject 上传对象 func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) { // 检查请求的存储桶是否为虚拟存储桶 requestedBucket, ok := h.bucketManager.GetBucket(bucketName) @@ -559,17 +559,17 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 获取内容长度 contentLength := r.ContentLength if contentLength < 0 { h.sendS3Error(w, "MissingContentLength", "Content-Length header is required", key) return } - + var targetBucket *bucket.BucketInfo var err error - + // 如果是虚拟存储桶,需要选择真实存储桶并创建映射 if requestedBucket.IsVirtual() { // 获取虚拟存储桶文件映射,如果不存在则创建 @@ -581,7 +581,7 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "InsufficientStorage", "No bucket has enough space", key) return } - + // 创建虚拟存储桶文件级映射 if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil { h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key) @@ -600,7 +600,7 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 生成预签名上传URL uploadInfo, err := h.presigner.GenerateUploadURL( context.Background(), @@ -613,52 +613,45 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck h.sendS3Error(w, "InternalError", "Failed to generate upload URL", key) return } - - // 默认使用预签名重定向模式,只有明确指定时才使用代理模式 - if r.URL.Query().Get("proxy") == "true" { - // 代理模式:读取请求体并上传到预签名URL - // 创建新的请求 - req, err := http.NewRequest(uploadInfo.Method, uploadInfo.URL, r.Body) - if err != nil { - h.sendS3Error(w, "InternalError", "Failed to create upload request", key) - return - } - - // 设置必要的头 - req.ContentLength = contentLength - if ct := r.Header.Get("Content-Type"); ct != "" { - req.Header.Set("Content-Type", ct) - } - - // 添加预签名URL所需的额外头 - for k, v := range uploadInfo.Headers { - req.Header.Set(k, v) - } - - // 执行上传 - client := &http.Client{Timeout: 30 * time.Minute} - resp, err := client.Do(req) - if err != nil { - h.sendS3Error(w, "InternalError", "Failed to upload object", key) - return - } - defer resp.Body.Close() - - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - // 记录对象元数据 - h.storage.RecordObject(key, targetBucket.Config.Name, contentLength, nil) - targetBucket.UpdateUsedSize(contentLength) - - // 返回成功响应 - w.Header().Set("ETag", fmt.Sprintf("\"%x\"", time.Now().UnixNano())) - w.WriteHeader(http.StatusOK) - } else { - h.sendS3Error(w, "InternalError", "Upload failed", key) - } + + // 只使用反向代理上传到真实预签名URL,不再返回307重定向 + // 创建新的请求 + req, err := http.NewRequest(uploadInfo.Method, uploadInfo.URL, r.Body) + if err != nil { + h.sendS3Error(w, "InternalError", "Failed to create upload request", key) + return + } + + // 设置必要的头 + req.ContentLength = contentLength + if ct := r.Header.Get("Content-Type"); ct != "" { + req.Header.Set("Content-Type", ct) + } + + // 添加预签名URL所需的额外头 + for k, v := range uploadInfo.Headers { + req.Header.Set(k, v) + } + + // 执行上传 + client := &http.Client{Timeout: 30 * time.Minute} + resp, err := client.Do(req) + if err != nil { + h.sendS3Error(w, "InternalError", "Failed to upload object", key) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + // 记录对象元数据 + h.storage.RecordObject(key, targetBucket.Config.Name, contentLength, nil) + targetBucket.UpdateUsedSize(contentLength) + + // 返回成功响应 + w.Header().Set("ETag", fmt.Sprintf("\"%x\"", time.Now().UnixNano())) + w.WriteHeader(http.StatusOK) } else { - // 重定向模式:返回307临时重定向让客户端直接上传(默认) - w.Header().Set("Location", uploadInfo.URL) - w.WriteHeader(http.StatusTemporaryRedirect) + h.sendS3Error(w, "InternalError", "Upload failed", key) } } @@ -670,10 +663,10 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + var bucket *bucket.BucketInfo var err error - + if requestedBucket.IsVirtual() { // 获取虚拟存储桶文件映射 mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key) @@ -682,7 +675,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b w.WriteHeader(http.StatusNoContent) return } - + // 获取映射到的真实存储桶 bucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName) if !ok { @@ -694,7 +687,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b w.WriteHeader(http.StatusNoContent) return } - + // 生成预签名删除URL deleteInfo, err := h.presigner.GenerateDeleteURL( context.Background(), @@ -705,7 +698,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b h.sendS3Error(w, "InternalError", "Failed to generate delete URL", key) return } - + // 执行删除 req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil) client := &http.Client{Timeout: 30 * time.Second} @@ -715,15 +708,15 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b return } defer resp.Body.Close() - + // 从数据库中删除对象记录 h.storage.DeleteObject(key) - + // 如果是虚拟存储桶,还需要删除文件级别映射 if requestedBucket.IsVirtual() { h.storage.DeleteVirtualBucketFileMapping(bucketName, key) } - + // S3规范要求删除操作总是返回204 w.WriteHeader(http.StatusNoContent) } @@ -732,14 +725,14 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) key := vars["key"] - + // 选择目标存储桶 targetBucket, err := h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间 if err != nil { h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key) return } - + // 初始化分片上传 ctx := context.Background() createResp, err := targetBucket.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ @@ -750,14 +743,14 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request h.sendS3Error(w, "InternalError", "Failed to initiate multipart upload", key) return } - + result := InitiateMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", Bucket: targetBucket.Config.Name, Key: key, UploadID: *createResp.UploadId, } - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -766,13 +759,13 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re vars := mux.Vars(r) bucketName := vars["bucket"] key := vars["key"] - + // 检查bucket是否存在 if _, ok := h.bucketManager.GetBucket(bucketName); !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 简化实现:返回空列表 result := ListMultipartUploadsResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", @@ -782,7 +775,7 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re IsTruncated: false, Uploads: make([]Upload, 0), } - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -792,25 +785,25 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ bucketName := vars["bucket"] key := vars["key"] uploadID := r.URL.Query().Get("uploadId") - + // 检查bucket是否存在 if _, ok := h.bucketManager.GetBucket(bucketName); !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 简化实现:返回空列表 result := ListPartsResult{ - Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", - Bucket: bucketName, - Key: key, - UploadID: uploadID, - PartNumberMarker: 0, - MaxParts: 1000, - IsTruncated: false, - Parts: make([]Part, 0), + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Bucket: bucketName, + Key: key, + UploadID: uploadID, + PartNumberMarker: 0, + MaxParts: 1000, + IsTruncated: false, + Parts: make([]Part, 0), } - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -820,19 +813,19 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http bucketName := vars["bucket"] key := vars["key"] uploadID := r.URL.Query().Get("uploadId") - + // 查找对象所在的实际存储桶(简化实现,使用配置的bucket) bucket, ok := h.bucketManager.GetBucket(bucketName) if !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 解析请求体以获取分片列表 var completeReq CompleteMultipartUpload body, _ := io.ReadAll(r.Body) xml.Unmarshal(body, &completeReq) - + // 完成分片上传 ctx := context.Background() var parts []types.CompletedPart @@ -842,7 +835,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http PartNumber: aws.Int32(int32(part.PartNumber)), }) } - + completeResp, err := bucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(bucket.Config.Name), Key: aws.String(key), @@ -855,18 +848,18 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http h.sendS3Error(w, "InternalError", "Failed to complete multipart upload", key) return } - + result := CompleteMultipartUploadResult{ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", - Location: "/" + bucket.Config.Name + "/" + key, + Location: "/" + bucket.Config.Name + "/" + key, Bucket: bucket.Config.Name, Key: key, ETag: *completeResp.ETag, } - + // 记录对象元数据(简化:假设总大小) h.storage.RecordObject(key, bucket.Config.Name, 0, nil) - + h.sendXMLResponse(w, http.StatusOK, result) } @@ -876,14 +869,14 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re bucketName := vars["bucket"] key := vars["key"] uploadID := r.URL.Query().Get("uploadId") - + // 查找对象所在的实际存储桶(简化实现,使用配置的bucket) bucket, ok := h.bucketManager.GetBucket(bucketName) if !ok { h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName) return } - + // 中止分片上传 ctx := context.Background() _, err := bucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ @@ -895,7 +888,7 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re h.sendS3Error(w, "InternalError", "Failed to abort multipart upload", key) return } - + w.WriteHeader(http.StatusNoContent) } @@ -903,13 +896,13 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re func (h *S3Handler) sendXMLResponse(w http.ResponseWriter, statusCode int, data interface{}) { w.Header().Set("Content-Type", "application/xml") w.WriteHeader(statusCode) - + encoder := xml.NewEncoder(w) encoder.Indent("", " ") - + // 写入XML声明 w.Write([]byte(xml.Header)) - + if err := encoder.Encode(data); err != nil { // 如果编码失败,记录错误 http.Error(w, "Internal Server Error", http.StatusInternalServerError) @@ -924,7 +917,7 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri Resource: resource, RequestID: fmt.Sprintf("%d", time.Now().UnixNano()), } - + statusCode := http.StatusBadRequest switch code { case "NoSuchBucket", "NoSuchKey": @@ -938,7 +931,7 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri case "InsufficientStorage": statusCode = http.StatusInsufficientStorage } - + h.sendXMLResponse(w, statusCode, errorResp) } @@ -946,14 +939,14 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri func parseS3Path(requestPath string) (bucket string, key string) { requestPath = strings.TrimPrefix(requestPath, "/") parts := strings.SplitN(requestPath, "/", 2) - + if len(parts) > 0 { bucket = parts[0] } if len(parts) > 1 { key = parts[1] } - + return bucket, key } diff --git a/internal/storage/models.go b/internal/storage/models.go index f2c4cb6..a187459 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -11,7 +11,7 @@ import ( // Object 对象信息模型 type Object struct { ID uint `gorm:"primaryKey" json:"id"` - Key string `gorm:"uniqueIndex;size:512;not null" json:"key"` + Key string `gorm:"size:512;not null" json:"key"` BucketName string `gorm:"index;size:255;not null" json:"bucket_name"` Size int64 `gorm:"not null;default:0" json:"size"` Metadata JSON `gorm:"type:json" json:"metadata,omitempty"` diff --git a/internal/storage/service.go b/internal/storage/service.go index 12887ea..a6f3dc3 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -21,6 +21,15 @@ func NewService(db *gorm.DB) *Service { // RecordObject 记录对象信息 func (s *Service) RecordObject(key, bucketName string, size int64, metadata map[string]string) error { + // 首先检查是否存在已删除的同名对象 + var deletedObj Object + if err := s.db.Unscoped().Where("`key` = ?", key).Where("`deleted_at` IS NOT NULL").First(&deletedObj).Error; err == nil { + // 存在已删除的同名对象,永久删除它 + if err := s.db.Unscoped().Delete(&deletedObj).Error; err != nil { + return fmt.Errorf("failed to permanently delete soft-deleted object: %w", err) + } + } + obj := &Object{ Key: key, BucketName: bucketName, @@ -37,7 +46,7 @@ func (s *Service) RecordObject(key, bucketName string, size int64, metadata map[ } // 使用 Upsert(更新或插入) - result := s.db.Where("`key` = ?", key).FirstOrCreate(&obj) + result := s.db.Where("`key` = ?", key).Where("`deleted_at` IS NULL").FirstOrCreate(&obj) if result.Error != nil { return fmt.Errorf("failed to record object: %w", result.Error) } @@ -197,7 +206,10 @@ func (s *Service) updateBucketStats(bucketName string) error { // 获取或创建统计记录 result := s.db.Where("bucket_name = ?", bucketName).FirstOrCreate(&stats, BucketStats{ - BucketName: bucketName, + BucketName: bucketName, + LastCheckedAt: time.Now(), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), }) if result.Error != nil { return fmt.Errorf("failed to get bucket stats: %w", result.Error)