Fix the deleted key issue

This commit is contained in:
DullJZ
2025-08-25 23:12:35 +08:00
parent 1d6a94e5b8
commit 8a340f2fa2
3 changed files with 178 additions and 173 deletions

View File

@@ -54,20 +54,20 @@ func NewS3Handler(
func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
// Service operations
router.HandleFunc("/", h.handleListBuckets).Methods("GET")
// Bucket operations
router.HandleFunc("/{bucket}", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
// Object operations
router.HandleFunc("/{bucket}/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE")
// Multipart upload operations
router.HandleFunc("/{bucket}/{key:.*}", h.handleMultipartUpload).Methods("POST").Queries("uploads", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartUploads).Methods("GET").Queries("uploads", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleListMultipartParts).Methods("GET").Queries("uploadId", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleCompleteMultipartUpload).Methods("POST").Queries("uploadId", "")
router.HandleFunc("/{bucket}/{key:.*}", h.handleAbortMultipartUpload).Methods("DELETE").Queries("uploadId", "")
// 添加认证中间件
router.Use(h.s3AuthMiddleware)
}
@@ -120,54 +120,54 @@ type CommonPrefix struct {
}
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
}
type ListMultipartUploadsResult struct {
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
KeyMarker string `xml:"KeyMarker"`
UploadIdMarker string `xml:"UploadIdMarker"`
NextKeyMarker string `xml:"NextKeyMarker"`
NextUploadIdMarker string `xml:"NextUploadIdMarker"`
MaxUploads int `xml:"MaxUploads"`
IsTruncated bool `xml:"IsTruncated"`
Uploads []Upload `xml:"Upload"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes,omitempty"`
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
KeyMarker string `xml:"KeyMarker"`
UploadIdMarker string `xml:"UploadIdMarker"`
NextKeyMarker string `xml:"NextKeyMarker"`
NextUploadIdMarker string `xml:"NextUploadIdMarker"`
MaxUploads int `xml:"MaxUploads"`
IsTruncated bool `xml:"IsTruncated"`
Uploads []Upload `xml:"Upload"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes,omitempty"`
}
type Upload struct {
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
Initiator Owner `xml:"Initiator"`
Owner Owner `xml:"Owner"`
StorageClass string `xml:"StorageClass"`
Initiated time.Time `xml:"Initiated"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
Initiator Owner `xml:"Initiator"`
Owner Owner `xml:"Owner"`
StorageClass string `xml:"StorageClass"`
Initiated time.Time `xml:"Initiated"`
}
type ListPartsResult struct {
XMLName xml.Name `xml:"ListPartsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
PartNumberMarker int `xml:"PartNumberMarker"`
NextPartNumberMarker int `xml:"NextPartNumberMarker"`
MaxParts int `xml:"MaxParts"`
IsTruncated bool `xml:"IsTruncated"`
Parts []Part `xml:"Part"`
XMLName xml.Name `xml:"ListPartsResult"`
Xmlns string `xml:"xmlns,attr"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadID string `xml:"UploadId"`
PartNumberMarker int `xml:"PartNumberMarker"`
NextPartNumberMarker int `xml:"NextPartNumberMarker"`
MaxParts int `xml:"MaxParts"`
IsTruncated bool `xml:"IsTruncated"`
Parts []Part `xml:"Part"`
}
type Part struct {
PartNumber int `xml:"PartNumber"`
LastModified time.Time `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
}
type CompleteMultipartUpload struct {
@@ -202,7 +202,7 @@ func (h *S3Handler) s3AuthMiddleware(next http.Handler) http.Handler {
// 允许匿名访问(用于测试)
// 在生产环境中应该要求认证
}
next.ServeHTTP(w, r)
})
}
@@ -210,7 +210,7 @@ func (h *S3Handler) s3AuthMiddleware(next http.Handler) http.Handler {
// handleListBuckets 处理列出所有存储桶请求
func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
buckets := h.bucketManager.GetAllBuckets()
result := ListBucketsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Owner: Owner{
@@ -221,7 +221,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
Bucket: make([]BucketInfo, 0, len(buckets)),
},
}
for _, b := range buckets {
// 只显示启用的虚拟存储桶,对客户端隐藏底层真实存储桶
if b.IsAvailable() && b.Config.Enabled && b.Config.Virtual {
@@ -231,7 +231,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
})
}
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -239,7 +239,7 @@ func (h *S3Handler) handleListBuckets(w http.ResponseWriter, r *http.Request) {
func (h *S3Handler) handleBucketOperations(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucketName := vars["bucket"]
switch r.Method {
case "GET":
h.handleListObjects(w, r, bucketName)
@@ -260,13 +260,13 @@ func (h *S3Handler) handleListObjects(w http.ResponseWriter, r *http.Request, bu
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 如果是虚拟存储桶,列出虚拟存储桶中的对象
if bucket.IsVirtual() {
h.handleListObjectsForVirtualBucket(w, r, bucketName)
return
}
// 如果不是虚拟存储桶,拒绝客户端访问真实存储桶
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
}
@@ -278,21 +278,21 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r *
marker := r.URL.Query().Get("marker")
maxKeysStr := r.URL.Query().Get("max-keys")
// delimiter := r.URL.Query().Get("delimiter") // 暂时不支持delimiter
maxKeys := 1000
if maxKeysStr != "" {
if mk, err := strconv.Atoi(maxKeysStr); err == nil {
maxKeys = mk
}
}
// 从存储服务获取虚拟存储桶中的对象
objects, err := h.storage.GetVirtualBucketObjects(bucketName)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to list virtual bucket objects", bucketName)
return
}
result := ListBucketResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Name: bucketName,
@@ -302,19 +302,19 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r *
IsTruncated: false,
Contents: make([]ObjectInfo, 0, len(objects)),
}
// 过滤对象并转换为S3格式
for _, obj := range objects {
// 前缀过滤
if prefix != "" && !strings.HasPrefix(obj.Key, prefix) {
continue
}
// Marker过滤
if marker != "" && obj.Key <= marker {
continue
}
result.Contents = append(result.Contents, ObjectInfo{
Key: obj.Key,
LastModified: obj.UpdatedAt,
@@ -322,13 +322,13 @@ func (h *S3Handler) handleListObjectsForVirtualBucket(w http.ResponseWriter, r *
Size: obj.Size,
})
}
// 如果超过了最大数量,设置截断标志
if len(result.Contents) > maxKeys {
result.Contents = result.Contents[:maxKeys]
result.IsTruncated = true
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -339,13 +339,13 @@ func (h *S3Handler) handleHeadBucket(w http.ResponseWriter, r *http.Request, buc
w.WriteHeader(http.StatusNotFound)
return
}
// 虚拟存储桶也应该返回成功状态
if bucket.IsVirtual() {
w.WriteHeader(http.StatusOK)
return
}
// 如果不是虚拟存储桶,拒绝客户端访问真实存储桶
w.WriteHeader(http.StatusNotFound)
}
@@ -365,7 +365,7 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b
return
}
}
// 检查是否为虚拟存储桶
if requestedBucket, exists := h.bucketManager.GetBucket(bucketName); exists && requestedBucket.IsVirtual() {
// 虚拟存储桶需要选择一个真实存储桶进行映射
@@ -374,21 +374,21 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b
h.sendS3Error(w, "InternalError", "No real buckets available for virtual bucket mapping", bucketName)
return
}
// 简化:选择第一个可用的真实存储桶
// 实际应用中可能需要更复杂的策略
targetBucket := realBuckets[0]
// 创建虚拟存储桶到真实存储桶的映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName)
return
}
}
// 在负载均衡场景下不真正创建bucket只返回成功
// 实际的bucket应该在配置中预先定义
w.Header().Set("Location", "/" + bucketName)
w.Header().Set("Location", "/"+bucketName)
w.WriteHeader(http.StatusOK)
}
@@ -401,7 +401,7 @@ func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, b
w.WriteHeader(http.StatusNoContent)
return
}
// 虚拟存储桶需要删除映射关系
if bucket.IsVirtual() {
// 删除虚拟存储桶映射
@@ -410,7 +410,7 @@ func (h *S3Handler) handleDeleteBucket(w http.ResponseWriter, r *http.Request, b
return
}
}
// 在负载均衡场景下不真正删除真实bucket
w.WriteHeader(http.StatusNoContent)
}
@@ -420,7 +420,7 @@ func (h *S3Handler) handleObjectOperations(w http.ResponseWriter, r *http.Reques
vars := mux.Vars(r)
bucketName := vars["bucket"]
key := vars["key"]
switch r.Method {
case "GET":
h.handleGetObject(w, r, bucketName, key)
@@ -441,7 +441,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
var err error
var bucket1 *bucket.BucketInfo
@@ -453,7 +453,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", key)
return
}
// 获取映射到的真实存储桶
bucket1, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
@@ -461,7 +461,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
return
}
}
// 生成预签名下载URL
downloadInfo, err := h.presigner.GenerateDownloadURL(
context.Background(),
@@ -472,7 +472,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "InternalError", "Failed to generate download URL", key)
return
}
// 默认使用预签名重定向模式,只有明确指定时才使用代理模式
if r.URL.Query().Get("proxy") == "true" {
// 代理模式:服务器下载内容并返回给客户端
@@ -482,12 +482,12 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
return
}
defer resp.Body.Close()
// 复制响应头
for k, v := range resp.Header {
w.Header()[k] = v
}
// 复制响应体
io.Copy(w, resp.Body)
} else {
@@ -504,7 +504,7 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
w.WriteHeader(http.StatusNotFound)
return
}
// 如果是虚拟存储桶,需要通过映射查找真实存储桶
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶映射
@@ -513,20 +513,20 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
w.WriteHeader(http.StatusNotFound)
return
}
_ = mapping // 使用mapping变量避免编译错误
// 查找对象信息(在映射的真实存储桶中)
obj, err := h.storage.GetObjectInfo(key)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
h.setObjectHeaders(w, obj)
return
}
// 真实存储桶的直接处理
// 从存储中获取对象信息
obj, err := h.storage.GetObjectInfo(key)
@@ -534,7 +534,7 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
w.WriteHeader(http.StatusNotFound)
return
}
h.setObjectHeaders(w, obj)
w.WriteHeader(http.StatusOK)
}
@@ -551,7 +551,7 @@ func (h *S3Handler) setObjectHeaders(w http.ResponseWriter, obj *storage.Object)
}
}
// handlePutObject 上传对象默认使用预签名URL重定向
// handlePutObject 上传对象
func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
// 检查请求的存储桶是否为虚拟存储桶
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
@@ -559,17 +559,17 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 获取内容长度
contentLength := r.ContentLength
if contentLength < 0 {
h.sendS3Error(w, "MissingContentLength", "Content-Length header is required", key)
return
}
var targetBucket *bucket.BucketInfo
var err error
// 如果是虚拟存储桶,需要选择真实存储桶并创建映射
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶文件映射,如果不存在则创建
@@ -581,7 +581,7 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "InsufficientStorage", "No bucket has enough space", key)
return
}
// 创建虚拟存储桶文件级映射
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
@@ -600,7 +600,7 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 生成预签名上传URL
uploadInfo, err := h.presigner.GenerateUploadURL(
context.Background(),
@@ -613,52 +613,45 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
h.sendS3Error(w, "InternalError", "Failed to generate upload URL", key)
return
}
// 默认使用预签名重定向模式,只有明确指定时才使用代理模式
if r.URL.Query().Get("proxy") == "true" {
// 代理模式读取请求体并上传到预签名URL
// 创建新的请求
req, err := http.NewRequest(uploadInfo.Method, uploadInfo.URL, r.Body)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to create upload request", key)
return
}
// 设置必要的头
req.ContentLength = contentLength
if ct := r.Header.Get("Content-Type"); ct != "" {
req.Header.Set("Content-Type", ct)
}
// 添加预签名URL所需的额外头
for k, v := range uploadInfo.Headers {
req.Header.Set(k, v)
}
// 执行上传
client := &http.Client{Timeout: 30 * time.Minute}
resp, err := client.Do(req)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to upload object", key)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// 记录对象元数据
h.storage.RecordObject(key, targetBucket.Config.Name, contentLength, nil)
targetBucket.UpdateUsedSize(contentLength)
// 返回成功响应
w.Header().Set("ETag", fmt.Sprintf("\"%x\"", time.Now().UnixNano()))
w.WriteHeader(http.StatusOK)
} else {
h.sendS3Error(w, "InternalError", "Upload failed", key)
}
// 只使用反向代理上传到真实预签名URL不再返回307重定向
// 创建新的请求
req, err := http.NewRequest(uploadInfo.Method, uploadInfo.URL, r.Body)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to create upload request", key)
return
}
// 设置必要的头
req.ContentLength = contentLength
if ct := r.Header.Get("Content-Type"); ct != "" {
req.Header.Set("Content-Type", ct)
}
// 添加预签名URL所需的额外头
for k, v := range uploadInfo.Headers {
req.Header.Set(k, v)
}
// 执行上传
client := &http.Client{Timeout: 30 * time.Minute}
resp, err := client.Do(req)
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to upload object", key)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// 记录对象元数据
h.storage.RecordObject(key, targetBucket.Config.Name, contentLength, nil)
targetBucket.UpdateUsedSize(contentLength)
// 返回成功响应
w.Header().Set("ETag", fmt.Sprintf("\"%x\"", time.Now().UnixNano()))
w.WriteHeader(http.StatusOK)
} else {
// 重定向模式返回307临时重定向让客户端直接上传默认
w.Header().Set("Location", uploadInfo.URL)
w.WriteHeader(http.StatusTemporaryRedirect)
h.sendS3Error(w, "InternalError", "Upload failed", key)
}
}
@@ -670,10 +663,10 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
var bucket *bucket.BucketInfo
var err error
if requestedBucket.IsVirtual() {
// 获取虚拟存储桶文件映射
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
@@ -682,7 +675,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
w.WriteHeader(http.StatusNoContent)
return
}
// 获取映射到的真实存储桶
bucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
if !ok {
@@ -694,7 +687,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
w.WriteHeader(http.StatusNoContent)
return
}
// 生成预签名删除URL
deleteInfo, err := h.presigner.GenerateDeleteURL(
context.Background(),
@@ -705,7 +698,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
h.sendS3Error(w, "InternalError", "Failed to generate delete URL", key)
return
}
// 执行删除
req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil)
client := &http.Client{Timeout: 30 * time.Second}
@@ -715,15 +708,15 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
return
}
defer resp.Body.Close()
// 从数据库中删除对象记录
h.storage.DeleteObject(key)
// 如果是虚拟存储桶,还需要删除文件级别映射
if requestedBucket.IsVirtual() {
h.storage.DeleteVirtualBucketFileMapping(bucketName, key)
}
// S3规范要求删除操作总是返回204
w.WriteHeader(http.StatusNoContent)
}
@@ -732,14 +725,14 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
// 选择目标存储桶
targetBucket, err := h.balancer.SelectBucket(key, 0) // 分片上传时不检查空间
if err != nil {
h.sendS3Error(w, "InternalError", "Failed to select bucket for upload", key)
return
}
// 初始化分片上传
ctx := context.Background()
createResp, err := targetBucket.Client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
@@ -750,14 +743,14 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request
h.sendS3Error(w, "InternalError", "Failed to initiate multipart upload", key)
return
}
result := InitiateMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: targetBucket.Config.Name,
Key: key,
UploadID: *createResp.UploadId,
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -766,13 +759,13 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re
vars := mux.Vars(r)
bucketName := vars["bucket"]
key := vars["key"]
// 检查bucket是否存在
if _, ok := h.bucketManager.GetBucket(bucketName); !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 简化实现:返回空列表
result := ListMultipartUploadsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
@@ -782,7 +775,7 @@ func (h *S3Handler) handleListMultipartUploads(w http.ResponseWriter, r *http.Re
IsTruncated: false,
Uploads: make([]Upload, 0),
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -792,25 +785,25 @@ func (h *S3Handler) handleListMultipartParts(w http.ResponseWriter, r *http.Requ
bucketName := vars["bucket"]
key := vars["key"]
uploadID := r.URL.Query().Get("uploadId")
// 检查bucket是否存在
if _, ok := h.bucketManager.GetBucket(bucketName); !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 简化实现:返回空列表
result := ListPartsResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucketName,
Key: key,
UploadID: uploadID,
PartNumberMarker: 0,
MaxParts: 1000,
IsTruncated: false,
Parts: make([]Part, 0),
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Bucket: bucketName,
Key: key,
UploadID: uploadID,
PartNumberMarker: 0,
MaxParts: 1000,
IsTruncated: false,
Parts: make([]Part, 0),
}
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -820,19 +813,19 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
bucketName := vars["bucket"]
key := vars["key"]
uploadID := r.URL.Query().Get("uploadId")
// 查找对象所在的实际存储桶简化实现使用配置的bucket
bucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 解析请求体以获取分片列表
var completeReq CompleteMultipartUpload
body, _ := io.ReadAll(r.Body)
xml.Unmarshal(body, &completeReq)
// 完成分片上传
ctx := context.Background()
var parts []types.CompletedPart
@@ -842,7 +835,7 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
PartNumber: aws.Int32(int32(part.PartNumber)),
})
}
completeResp, err := bucket.Client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket.Config.Name),
Key: aws.String(key),
@@ -855,18 +848,18 @@ func (h *S3Handler) handleCompleteMultipartUpload(w http.ResponseWriter, r *http
h.sendS3Error(w, "InternalError", "Failed to complete multipart upload", key)
return
}
result := CompleteMultipartUploadResult{
Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
Location: "/" + bucket.Config.Name + "/" + key,
Location: "/" + bucket.Config.Name + "/" + key,
Bucket: bucket.Config.Name,
Key: key,
ETag: *completeResp.ETag,
}
// 记录对象元数据(简化:假设总大小)
h.storage.RecordObject(key, bucket.Config.Name, 0, nil)
h.sendXMLResponse(w, http.StatusOK, result)
}
@@ -876,14 +869,14 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re
bucketName := vars["bucket"]
key := vars["key"]
uploadID := r.URL.Query().Get("uploadId")
// 查找对象所在的实际存储桶简化实现使用配置的bucket
bucket, ok := h.bucketManager.GetBucket(bucketName)
if !ok {
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", bucketName)
return
}
// 中止分片上传
ctx := context.Background()
_, err := bucket.Client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
@@ -895,7 +888,7 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re
h.sendS3Error(w, "InternalError", "Failed to abort multipart upload", key)
return
}
w.WriteHeader(http.StatusNoContent)
}
@@ -903,13 +896,13 @@ func (h *S3Handler) handleAbortMultipartUpload(w http.ResponseWriter, r *http.Re
func (h *S3Handler) sendXMLResponse(w http.ResponseWriter, statusCode int, data interface{}) {
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(statusCode)
encoder := xml.NewEncoder(w)
encoder.Indent("", " ")
// 写入XML声明
w.Write([]byte(xml.Header))
if err := encoder.Encode(data); err != nil {
// 如果编码失败,记录错误
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
@@ -924,7 +917,7 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri
Resource: resource,
RequestID: fmt.Sprintf("%d", time.Now().UnixNano()),
}
statusCode := http.StatusBadRequest
switch code {
case "NoSuchBucket", "NoSuchKey":
@@ -938,7 +931,7 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri
case "InsufficientStorage":
statusCode = http.StatusInsufficientStorage
}
h.sendXMLResponse(w, statusCode, errorResp)
}
@@ -946,14 +939,14 @@ func (h *S3Handler) sendS3Error(w http.ResponseWriter, code string, message stri
func parseS3Path(requestPath string) (bucket string, key string) {
requestPath = strings.TrimPrefix(requestPath, "/")
parts := strings.SplitN(requestPath, "/", 2)
if len(parts) > 0 {
bucket = parts[0]
}
if len(parts) > 1 {
key = parts[1]
}
return bucket, key
}

View File

@@ -11,7 +11,7 @@ import (
// Object 对象信息模型
type Object struct {
ID uint `gorm:"primaryKey" json:"id"`
Key string `gorm:"uniqueIndex;size:512;not null" json:"key"`
Key string `gorm:"size:512;not null" json:"key"`
BucketName string `gorm:"index;size:255;not null" json:"bucket_name"`
Size int64 `gorm:"not null;default:0" json:"size"`
Metadata JSON `gorm:"type:json" json:"metadata,omitempty"`

View File

@@ -21,6 +21,15 @@ func NewService(db *gorm.DB) *Service {
// RecordObject 记录对象信息
func (s *Service) RecordObject(key, bucketName string, size int64, metadata map[string]string) error {
// 首先检查是否存在已删除的同名对象
var deletedObj Object
if err := s.db.Unscoped().Where("`key` = ?", key).Where("`deleted_at` IS NOT NULL").First(&deletedObj).Error; err == nil {
// 存在已删除的同名对象,永久删除它
if err := s.db.Unscoped().Delete(&deletedObj).Error; err != nil {
return fmt.Errorf("failed to permanently delete soft-deleted object: %w", err)
}
}
obj := &Object{
Key: key,
BucketName: bucketName,
@@ -37,7 +46,7 @@ func (s *Service) RecordObject(key, bucketName string, size int64, metadata map[
}
// 使用 Upsert更新或插入
result := s.db.Where("`key` = ?", key).FirstOrCreate(&obj)
result := s.db.Where("`key` = ?", key).Where("`deleted_at` IS NULL").FirstOrCreate(&obj)
if result.Error != nil {
return fmt.Errorf("failed to record object: %w", result.Error)
}
@@ -197,7 +206,10 @@ func (s *Service) updateBucketStats(bucketName string) error {
// 获取或创建统计记录
result := s.db.Where("bucket_name = ?", bucketName).FirstOrCreate(&stats, BucketStats{
BucketName: bucketName,
BucketName: bucketName,
LastCheckedAt: time.Now(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
})
if result.Error != nil {
return fmt.Errorf("failed to get bucket stats: %w", result.Error)