mirror of
https://github.com/DullJZ/s3-balance.git
synced 2026-07-03 17:11:20 +08:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9258d4924 | ||
|
|
9738091c87 | ||
|
|
72fac92783 | ||
|
|
322d41fbb2 | ||
|
|
eecda55eb3 | ||
|
|
6b0d2e8596 | ||
|
|
f87a19c651 | ||
|
|
9c6351e9ad |
@@ -88,12 +88,12 @@ go build -o s3-balance cmd/s3-balance/main.go
|
|||||||
- `database`:GORM 支持 sqlite/mysql/postgres,并存储对象元数据、分片会话等。
|
- `database`:GORM 支持 sqlite/mysql/postgres,并存储对象元数据、分片会话等。
|
||||||
- `buckets`:列出真实与虚拟桶。`virtual: true` 的条目会对外暴露,真实桶为 `virtual: false`。可设置 `path_style` 与 `max_size`。
|
- `buckets`:列出真实与虚拟桶。`virtual: true` 的条目会对外暴露,真实桶为 `virtual: false`。可设置 `path_style` 与 `max_size`。
|
||||||
- `balancer`:策略 (`round-robin`|`least-space`|`weighted`)、健康检查周期、重试次数与延迟。
|
- `balancer`:策略 (`round-robin`|`least-space`|`weighted`)、健康检查周期、重试次数与延迟。
|
||||||
- `metrics`:是否启用 Prometheus 指标及路径。
|
- `metrics`:是否启用 Prometheus 指标、路径与可选抓取 Token。
|
||||||
- `s3api`:Access/Secret Key、`proxy_mode`(true=服务代理,false=重定向)、`auth_required`(SigV4 校验)、`virtual_host`(Host-style 路由)。
|
- `s3api`:Access/Secret Key、`proxy_mode`(true=服务代理,false=重定向)、`auth_required`(SigV4 校验)、`virtual_host`(Host-style 路由)。
|
||||||
|
|
||||||
## API & 测试
|
## API & 测试
|
||||||
|
|
||||||
- 默认监听 `http://localhost:8080`,支持 `GET /health` 健康检查、`GET /metrics` 指标。
|
- 默认监听 `http://localhost:8080`,支持 `GET /health` 健康检查、`GET /metrics` 指标(配置 `metrics.token` 后需携带 `Authorization: Bearer <token>`)。
|
||||||
- 可使用 AWS CLI、s3cmd、MinIO Client 或 `python3 test_virtual_bucket_s3.py` 验证兼容性;脚本运行前需修改 endpoint 与凭据。
|
- 可使用 AWS CLI、s3cmd、MinIO Client 或 `python3 test_virtual_bucket_s3.py` 验证兼容性;脚本运行前需修改 endpoint 与凭据。
|
||||||
|
|
||||||
## 项目结构
|
## 项目结构
|
||||||
|
|||||||
@@ -86,12 +86,12 @@ go build -o s3-balance cmd/s3-balance/main.go
|
|||||||
- `database`: GORM supports sqlite/mysql/postgres and stores object metadata, multipart sessions, etc.
|
- `database`: GORM supports sqlite/mysql/postgres and stores object metadata, multipart sessions, etc.
|
||||||
- `buckets`: Lists real and virtual buckets. Entries with `virtual: true` are exposed externally, while real buckets are marked as `virtual: false`. Supports `path_style` and `max_size` settings.
|
- `buckets`: Lists real and virtual buckets. Entries with `virtual: true` are exposed externally, while real buckets are marked as `virtual: false`. Supports `path_style` and `max_size` settings.
|
||||||
- `balancer`: Strategy (`round-robin`|`least-space`|`weighted`), health check intervals, retry counts, and delays.
|
- `balancer`: Strategy (`round-robin`|`least-space`|`weighted`), health check intervals, retry counts, and delays.
|
||||||
- `metrics`: Whether to enable Prometheus metrics and their path.
|
- `metrics`: Whether to enable Prometheus metrics, the path, and optional scrape token.
|
||||||
- `s3api`: Access/Secret Key, `proxy_mode` (true=proxy, false=redirect), `auth_required` (SigV4 validation), `virtual_host` (host-style routing).
|
- `s3api`: Access/Secret Key, `proxy_mode` (true=proxy, false=redirect), `auth_required` (SigV4 validation), `virtual_host` (host-style routing).
|
||||||
|
|
||||||
## API & Testing
|
## API & Testing
|
||||||
|
|
||||||
- Default listening at `http://localhost:8080`, supports `GET /health` for health checks and `GET /metrics` for metrics.
|
- Default listening at `http://localhost:8080`, supports `GET /health` for health checks and `GET /metrics` for metrics (if `metrics.token` is set, send `Authorization: Bearer <token>`).
|
||||||
- Compatibility can be verified using AWS CLI, s3cmd, MinIO Client, or `python3 test_virtual_bucket_s3.py`. Modify the endpoint and credentials in the script before running.
|
- Compatibility can be verified using AWS CLI, s3cmd, MinIO Client, or `python3 test_virtual_bucket_s3.py`. Modify the endpoint and credentials in the script before running.
|
||||||
|
|
||||||
## Project Structure
|
## Project Structure
|
||||||
@@ -106,4 +106,4 @@ internal/storage/ # GORM models and services
|
|||||||
pkg/presigner/ # Pre-signed URL utilities
|
pkg/presigner/ # Pre-signed URL utilities
|
||||||
config/ # Example configurations and deployment manifests
|
config/ # Example configurations and deployment manifests
|
||||||
deploy/ # Docker/Kubernetes/Helm manifests
|
deploy/ # Docker/Kubernetes/Helm manifests
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -137,8 +137,16 @@ func main() {
|
|||||||
|
|
||||||
// 添加指标端点
|
// 添加指标端点
|
||||||
if cfg.Metrics.Enabled {
|
if cfg.Metrics.Enabled {
|
||||||
router.Path(cfg.Metrics.Path).Handler(promhttp.Handler())
|
metricsHandler := promhttp.Handler()
|
||||||
log.Printf("Metrics server enabled at %s", cfg.Metrics.Path)
|
if cfg.Metrics.Token != "" {
|
||||||
|
metricsHandler = middleware.TokenAuthMiddleware(cfg.Metrics.Token)(metricsHandler)
|
||||||
|
}
|
||||||
|
router.Path(cfg.Metrics.Path).Handler(metricsHandler)
|
||||||
|
log.Printf(
|
||||||
|
"Metrics server enabled at %s (auth required: %t)",
|
||||||
|
cfg.Metrics.Path,
|
||||||
|
cfg.Metrics.Token != "",
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 注册管理API路由(如果启用)
|
// 注册管理API路由(如果启用)
|
||||||
|
|||||||
@@ -128,6 +128,8 @@ balancer:
|
|||||||
metrics:
|
metrics:
|
||||||
enabled: true
|
enabled: true
|
||||||
path: "/metrics"
|
path: "/metrics"
|
||||||
|
# Prometheus 抓取时需要携带的 Token(可选,配置后需使用 Authorization: Bearer <token> 访问)
|
||||||
|
token: ""
|
||||||
|
|
||||||
# S3兼容API配置
|
# S3兼容API配置
|
||||||
s3api:
|
s3api:
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ type BucketResponse struct {
|
|||||||
UsedSize int64 `json:"used_size"`
|
UsedSize int64 `json:"used_size"`
|
||||||
AvailableSize int64 `json:"available_size"`
|
AvailableSize int64 `json:"available_size"`
|
||||||
UsagePercent float64 `json:"usage_percent"`
|
UsagePercent float64 `json:"usage_percent"`
|
||||||
|
ObjectCount int64 `json:"object_count"`
|
||||||
Weight int `json:"weight"`
|
Weight int `json:"weight"`
|
||||||
Enabled bool `json:"enabled"`
|
Enabled bool `json:"enabled"`
|
||||||
Available bool `json:"available"`
|
Available bool `json:"available"`
|
||||||
@@ -59,18 +60,18 @@ type BucketResponse struct {
|
|||||||
|
|
||||||
// BucketsListResponse 存储桶列表响应结构
|
// BucketsListResponse 存储桶列表响应结构
|
||||||
type BucketsListResponse struct {
|
type BucketsListResponse struct {
|
||||||
Total int `json:"total"`
|
Total int `json:"total"`
|
||||||
Buckets []BucketResponse `json:"buckets"`
|
Buckets []BucketResponse `json:"buckets"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// HealthResponse 健康状态响应结构
|
// HealthResponse 健康状态响应结构
|
||||||
type HealthResponse struct {
|
type HealthResponse struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
LoadBalancer string `json:"load_balancer_strategy"`
|
LoadBalancer string `json:"load_balancer_strategy"`
|
||||||
TotalBuckets int `json:"total_buckets"`
|
TotalBuckets int `json:"total_buckets"`
|
||||||
AvailableBuckets int `json:"available_buckets"`
|
AvailableBuckets int `json:"available_buckets"`
|
||||||
Database string `json:"database_type"`
|
Database string `json:"database_type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterRoutes 注册管理API路由
|
// RegisterRoutes 注册管理API路由
|
||||||
@@ -205,6 +206,7 @@ func (h *AdminHandler) convertBucketInfo(b *bucket.BucketInfo) BucketResponse {
|
|||||||
MaxSize: b.Config.MaxSize,
|
MaxSize: b.Config.MaxSize,
|
||||||
MaxSizeBytes: b.Config.MaxSizeBytes,
|
MaxSizeBytes: b.Config.MaxSizeBytes,
|
||||||
UsedSize: b.UsedSize,
|
UsedSize: b.UsedSize,
|
||||||
|
ObjectCount: b.GetObjectCount(),
|
||||||
Weight: b.Config.Weight,
|
Weight: b.Config.Weight,
|
||||||
Enabled: b.Config.Enabled,
|
Enabled: b.Config.Enabled,
|
||||||
Available: b.Available,
|
Available: b.Available,
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ func (h *S3Handler) handleCreateBucket(w http.ResponseWriter, r *http.Request, b
|
|||||||
targetBucket := realBuckets[0]
|
targetBucket := realBuckets[0]
|
||||||
|
|
||||||
// 创建虚拟存储桶到真实存储桶的映射
|
// 创建虚拟存储桶到真实存储桶的映射
|
||||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name); err != nil {
|
if err := h.storage.CreateVirtualBucketMapping(bucketName, "", targetBucket.Config.Name, ""); err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName)
|
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket mapping", bucketName)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,8 +57,8 @@ func (h *S3Handler) handleUploadPart(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建虚拟存储桶文件级映射
|
// 创建虚拟存储桶文件级映射(对于Multipart,虚拟key和真实key相同)
|
||||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -204,8 +204,8 @@ func (h *S3Handler) handleMultipartUpload(w http.ResponseWriter, r *http.Request
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建虚拟存储桶文件级映射
|
// 创建虚拟存储桶文件级映射(对于Multipart,虚拟key和真实key相同)
|
||||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,9 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/DullJZ/s3-balance/internal/bucket"
|
"github.com/DullJZ/s3-balance/internal/bucket"
|
||||||
@@ -56,6 +58,7 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
|
|||||||
var err error
|
var err error
|
||||||
var bucket1 *bucket.BucketInfo
|
var bucket1 *bucket.BucketInfo
|
||||||
|
|
||||||
|
var realKey string
|
||||||
if requestedBucket.IsVirtual() {
|
if requestedBucket.IsVirtual() {
|
||||||
// 获取虚拟存储桶映射
|
// 获取虚拟存储桶映射
|
||||||
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
|
mapping, err := h.storage.GetVirtualBucketMapping(bucketName, key)
|
||||||
@@ -64,21 +67,24 @@ func (h *S3Handler) handleGetObject(w http.ResponseWriter, r *http.Request, buck
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取映射到的真实存储桶
|
// 获取映射到的真实存储桶和真实key
|
||||||
bucket1, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
bucket1, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
||||||
if !ok {
|
if !ok {
|
||||||
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
realKey = mapping.RealObjectKey
|
||||||
h.recordBackendOperation(bucket1, bucket.OperationTypeB)
|
h.recordBackendOperation(bucket1, bucket.OperationTypeB)
|
||||||
|
} else {
|
||||||
|
realKey = key
|
||||||
}
|
}
|
||||||
|
|
||||||
// 生成预签名下载URL
|
// 生成预签名下载URL(使用真实key)
|
||||||
downloadInfo, err := h.presigner.GenerateDownloadURL(
|
downloadInfo, err := h.presigner.GenerateDownloadURL(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
bucket1,
|
bucket1,
|
||||||
key,
|
realKey,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to generate download URL", key)
|
h.sendS3Error(w, "InternalError", "Failed to generate download URL", key)
|
||||||
@@ -182,6 +188,13 @@ func (h *S3Handler) handleHeadObject(w http.ResponseWriter, r *http.Request, buc
|
|||||||
|
|
||||||
// handlePutObject 上传对象
|
// handlePutObject 上传对象
|
||||||
func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
||||||
|
// 检查是否是复制操作(CopyObject)
|
||||||
|
copySource := r.Header.Get("x-amz-copy-source")
|
||||||
|
if copySource != "" {
|
||||||
|
h.handleCopyObject(w, r, bucketName, key, copySource)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 检查请求的存储桶是否为虚拟存储桶
|
// 检查请求的存储桶是否为虚拟存储桶
|
||||||
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
|
requestedBucket, ok := h.bucketManager.GetBucket(bucketName)
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -211,8 +224,8 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建虚拟存储桶文件级映射
|
// 创建虚拟存储桶文件级映射(对于普通PUT,虚拟key和真实key相同)
|
||||||
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name); err != nil {
|
if err := h.storage.CreateVirtualBucketMapping(bucketName, key, targetBucket.Config.Name, key); err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -290,6 +303,108 @@ func (h *S3Handler) handlePutObject(w http.ResponseWriter, r *http.Request, buck
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleCopyObject 复制对象(只在数据库中创建新映射)
|
||||||
|
func (h *S3Handler) handleCopyObject(w http.ResponseWriter, r *http.Request, destBucket, destKey, copySource string) {
|
||||||
|
// 解析复制源 (格式: /source-bucket/source-key 或 source-bucket/source-key)
|
||||||
|
copySource = strings.TrimPrefix(copySource, "/")
|
||||||
|
parts := strings.SplitN(copySource, "/", 2)
|
||||||
|
if len(parts) != 2 {
|
||||||
|
h.sendS3Error(w, "InvalidArgument", "Invalid copy source format", copySource)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceBucket := parts[0]
|
||||||
|
sourceKey := parts[1]
|
||||||
|
|
||||||
|
// URL 解码源对象键
|
||||||
|
sourceKey, err := url.QueryUnescape(sourceKey)
|
||||||
|
if err != nil {
|
||||||
|
h.sendS3Error(w, "InvalidArgument", "Invalid source key encoding", sourceKey)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查目标存储桶是否存在
|
||||||
|
_, ok := h.bucketManager.GetBucket(destBucket)
|
||||||
|
if !ok {
|
||||||
|
h.sendS3Error(w, "NoSuchBucket", "The specified bucket does not exist", destBucket)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查源存储桶是否存在
|
||||||
|
_, ok = h.bucketManager.GetBucket(sourceBucket)
|
||||||
|
if !ok {
|
||||||
|
h.sendS3Error(w, "NoSuchBucket", "The source bucket does not exist", sourceBucket)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取元数据指令
|
||||||
|
metadataDirective := r.Header.Get("x-amz-metadata-directive")
|
||||||
|
var metadata map[string]string
|
||||||
|
|
||||||
|
if metadataDirective == "REPLACE" {
|
||||||
|
// 使用请求中的新元数据
|
||||||
|
metadata = make(map[string]string)
|
||||||
|
for k, v := range r.Header {
|
||||||
|
if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
|
||||||
|
metaKey := strings.TrimPrefix(strings.ToLower(k), "x-amz-meta-")
|
||||||
|
metadata[metaKey] = v[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 如果是 COPY 或未指定,则复制源对象的元数据(在 storage.CopyObject 中处理)
|
||||||
|
|
||||||
|
// 获取源对象的映射信息
|
||||||
|
sourceMapping, err := h.storage.GetVirtualBucketMapping(sourceBucket, sourceKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to get source object mapping %s: %v", sourceKey, err)
|
||||||
|
h.sendS3Error(w, "NoSuchKey", "The specified key does not exist", sourceKey)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 复制操作只创建新的虚拟映射,指向相同的真实对象(零拷贝)
|
||||||
|
destBucketInfo, destOk := h.bucketManager.GetBucket(destBucket)
|
||||||
|
if !destOk || !destBucketInfo.IsVirtual() {
|
||||||
|
h.sendS3Error(w, "NoSuchBucket", "The destination bucket does not exist", destBucket)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建新的虚拟存储桶映射,指向相同的真实bucket和真实key
|
||||||
|
if err := h.storage.CreateVirtualBucketMapping(destBucket, destKey, sourceMapping.RealBucketName, sourceMapping.RealObjectKey); err != nil {
|
||||||
|
log.Printf("Failed to create virtual bucket mapping for copied object %s: %v", destKey, err)
|
||||||
|
h.sendS3Error(w, "InternalError", "Failed to create virtual bucket file mapping", destKey)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取源对象信息用于响应
|
||||||
|
sourceObj, err := h.storage.GetObjectInfo(sourceMapping.RealObjectKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to get source object info: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回成功响应
|
||||||
|
w.Header().Set("Content-Type", "application/xml")
|
||||||
|
|
||||||
|
// 生成 ETag
|
||||||
|
etag := fmt.Sprintf("\"%x\"", time.Now().UnixNano())
|
||||||
|
|
||||||
|
// 构造 CopyObjectResult XML 响应
|
||||||
|
lastModified := time.Now().UTC().Format(time.RFC3339)
|
||||||
|
if sourceObj != nil {
|
||||||
|
lastModified = sourceObj.UpdatedAt.UTC().Format(time.RFC3339)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<CopyObjectResult>
|
||||||
|
<LastModified>%s</LastModified>
|
||||||
|
<ETag>%s</ETag>
|
||||||
|
</CopyObjectResult>`, lastModified, etag)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte(response))
|
||||||
|
|
||||||
|
log.Printf("Object copied successfully: %s -> %s", sourceKey, destKey)
|
||||||
|
}
|
||||||
|
|
||||||
// handleDeleteObject 删除对象
|
// handleDeleteObject 删除对象
|
||||||
func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, bucketName string, key string) {
|
||||||
// 检查请求的存储桶是否为虚拟存储桶
|
// 检查请求的存储桶是否为虚拟存储桶
|
||||||
@@ -301,6 +416,7 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
|
|||||||
|
|
||||||
var targetBucket *bucket.BucketInfo
|
var targetBucket *bucket.BucketInfo
|
||||||
var err error
|
var err error
|
||||||
|
var realKey string
|
||||||
|
|
||||||
if requestedBucket.IsVirtual() {
|
if requestedBucket.IsVirtual() {
|
||||||
// 获取虚拟存储桶文件映射
|
// 获取虚拟存储桶文件映射
|
||||||
@@ -311,47 +427,59 @@ func (h *S3Handler) handleDeleteObject(w http.ResponseWriter, r *http.Request, b
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 获取映射到的真实存储桶
|
// 获取映射到的真实存储桶和真实key
|
||||||
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
targetBucket, ok = h.bucketManager.GetBucket(mapping.RealBucketName)
|
||||||
if !ok {
|
if !ok {
|
||||||
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
h.sendS3Error(w, "InternalError", "Mapped real bucket not found", key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
realKey = mapping.RealObjectKey
|
||||||
} else {
|
} else {
|
||||||
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接DELETE操作
|
// 如果不是虚拟存储桶,拒绝客户端对真实存储桶的直接DELETE操作
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
|
// 先删除虚拟存储桶映射
|
||||||
|
if err := h.storage.DeleteVirtualBucketObjectMapping(bucketName, key); err != nil {
|
||||||
// 生成预签名删除URL
|
log.Printf("Failed to delete virtual bucket mapping for %s/%s: %v", bucketName, key, err)
|
||||||
deleteInfo, err := h.presigner.GenerateDeleteURL(
|
|
||||||
context.Background(),
|
|
||||||
targetBucket,
|
|
||||||
key,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
h.sendS3Error(w, "InternalError", "Failed to generate delete URL", key)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行删除
|
// 检查是否还有其他映射指向同一个真实对象
|
||||||
req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil)
|
count, err := h.storage.CountMappingsToRealObject(targetBucket.Config.Name, realKey)
|
||||||
client := &http.Client{Timeout: 30 * time.Second}
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.sendS3Error(w, "InternalError", "Failed to delete object", key)
|
log.Printf("Failed to count mappings for real object %s: %v", realKey, err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// 从数据库中删除对象记录
|
// 只有当没有其他映射引用时,才删除真实S3对象
|
||||||
h.storage.DeleteObject(key)
|
if count == 0 {
|
||||||
|
h.recordBackendOperation(targetBucket, bucket.OperationTypeA)
|
||||||
|
|
||||||
// 如果是虚拟存储桶,还需要删除文件级别映射
|
// 生成预签名删除URL(使用真实key)
|
||||||
if requestedBucket.IsVirtual() {
|
deleteInfo, err := h.presigner.GenerateDeleteURL(
|
||||||
h.storage.DeleteVirtualBucketFileMapping(bucketName, key)
|
context.Background(),
|
||||||
|
targetBucket,
|
||||||
|
realKey,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to generate delete URL for %s: %v", realKey, err)
|
||||||
|
} else {
|
||||||
|
// 执行删除真实S3对象
|
||||||
|
req, _ := http.NewRequest("DELETE", deleteInfo.URL, nil)
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Failed to delete real S3 object %s: %v", realKey, err)
|
||||||
|
} else {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 从数据库中删除对象记录
|
||||||
|
if err := h.storage.DeleteObject(realKey); err != nil {
|
||||||
|
log.Printf("Failed to delete object record for %s: %v", realKey, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// S3规范要求删除操作总是返回204
|
// S3规范要求删除操作总是返回204
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/DullJZ/s3-balance/internal/balancer"
|
"github.com/DullJZ/s3-balance/internal/balancer"
|
||||||
@@ -73,6 +74,7 @@ func (h *S3Handler) initSettings(accessKey, secretKey string, proxyMode, authReq
|
|||||||
func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
|
func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
|
||||||
// 公共路由(不需要认证)
|
// 公共路由(不需要认证)
|
||||||
router.HandleFunc("/", h.handleListBuckets).Methods("GET")
|
router.HandleFunc("/", h.handleListBuckets).Methods("GET")
|
||||||
|
router.HandleFunc("/", h.handleOptions).Methods("OPTIONS")
|
||||||
|
|
||||||
// 带认证/虚拟主机的路由
|
// 带认证/虚拟主机的路由
|
||||||
protected := router.NewRoute().PathPrefix("/{bucket}").Subrouter()
|
protected := router.NewRoute().PathPrefix("/{bucket}").Subrouter()
|
||||||
@@ -80,7 +82,9 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
|
|||||||
|
|
||||||
// Bucket operations
|
// Bucket operations
|
||||||
protected.HandleFunc("", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
protected.HandleFunc("", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
||||||
|
protected.HandleFunc("", h.handleOptions).Methods("OPTIONS")
|
||||||
protected.HandleFunc("/", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
protected.HandleFunc("/", h.handleBucketOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
||||||
|
protected.HandleFunc("/", h.handleOptions).Methods("OPTIONS")
|
||||||
|
|
||||||
// Multipart upload operations - must be registered before generic object operations
|
// Multipart upload operations - must be registered before generic object operations
|
||||||
protected.HandleFunc("/{key:.*}", h.handleUploadPart).Methods("PUT").Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId}")
|
protected.HandleFunc("/{key:.*}", h.handleUploadPart).Methods("PUT").Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId}")
|
||||||
@@ -92,6 +96,7 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
|
|||||||
|
|
||||||
// Object operations - must be registered after multipart operations to avoid conflicts
|
// Object operations - must be registered after multipart operations to avoid conflicts
|
||||||
protected.HandleFunc("/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
protected.HandleFunc("/{key:.*}", h.handleObjectOperations).Methods("GET", "HEAD", "PUT", "DELETE")
|
||||||
|
protected.HandleFunc("/{key:.*}", h.handleOptions).Methods("OPTIONS")
|
||||||
|
|
||||||
// 添加中间件
|
// 添加中间件
|
||||||
protected.Use(h.accessLogMiddleware)
|
protected.Use(h.accessLogMiddleware)
|
||||||
@@ -110,6 +115,10 @@ func (h *S3Handler) RegisterS3Routes(router *mux.Router) {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *S3Handler) handleOptions(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *S3Handler) loadSettings() handlerSettings {
|
func (h *S3Handler) loadSettings() handlerSettings {
|
||||||
if v := h.settings.Load(); v != nil {
|
if v := h.settings.Load(); v != nil {
|
||||||
return v.(handlerSettings)
|
return v.(handlerSettings)
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ type BucketInfo struct {
|
|||||||
Config config.BucketConfig
|
Config config.BucketConfig
|
||||||
Client *s3.Client
|
Client *s3.Client
|
||||||
UsedSize int64 // 已使用容量(字节)
|
UsedSize int64 // 已使用容量(字节)
|
||||||
|
ObjectCount int64 // 对象数量
|
||||||
Available bool // 是否可用(由health监控更新)
|
Available bool // 是否可用(由health监控更新)
|
||||||
LastChecked time.Time // 最后检查时间(由health监控更新)
|
LastChecked time.Time // 最后检查时间(由health监控更新)
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -320,6 +321,13 @@ func (b *BucketInfo) GetUsedSize() int64 {
|
|||||||
return b.UsedSize
|
return b.UsedSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetObjectCount 获取对象数量
|
||||||
|
func (b *BucketInfo) GetObjectCount() int64 {
|
||||||
|
b.mu.RLock()
|
||||||
|
defer b.mu.RUnlock()
|
||||||
|
return b.ObjectCount
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateUsedSize 更新已使用容量
|
// UpdateUsedSize 更新已使用容量
|
||||||
func (b *BucketInfo) UpdateUsedSize(delta int64) {
|
func (b *BucketInfo) UpdateUsedSize(delta int64) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ func (r *MetricsReporter) ReportStats(stats *health.Stats) {
|
|||||||
if exists {
|
if exists {
|
||||||
bucket.mu.Lock()
|
bucket.mu.Lock()
|
||||||
bucket.UsedSize = stats.UsedSize
|
bucket.UsedSize = stats.UsedSize
|
||||||
|
bucket.ObjectCount = stats.ObjectCount
|
||||||
bucket.mu.Unlock()
|
bucket.mu.Unlock()
|
||||||
|
|
||||||
// 更新 Prometheus 指标
|
// 更新 Prometheus 指标
|
||||||
|
|||||||
@@ -63,6 +63,7 @@ type BalancerConfig struct {
|
|||||||
type MetricsConfig struct {
|
type MetricsConfig struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
Path string `yaml:"path"`
|
Path string `yaml:"path"`
|
||||||
|
Token string `yaml:"token"` // 可选Token,保护Prometheus端点
|
||||||
// Port int `yaml:"port"` // 目前未使用,与主服务共享端口
|
// Port int `yaml:"port"` // 目前未使用,与主服务共享端口
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -66,8 +66,9 @@ func (BucketMonthlyStats) TableName() string {
|
|||||||
type VirtualBucketMapping struct {
|
type VirtualBucketMapping struct {
|
||||||
ID uint `gorm:"primaryKey" json:"id"`
|
ID uint `gorm:"primaryKey" json:"id"`
|
||||||
VirtualBucketName string `gorm:"index;size:255;not null" json:"virtual_bucket_name"`
|
VirtualBucketName string `gorm:"index;size:255;not null" json:"virtual_bucket_name"`
|
||||||
ObjectKey string `gorm:"index;size:512;not null" json:"object_key"`
|
ObjectKey string `gorm:"index;size:512;not null" json:"object_key"` // 虚拟对象key
|
||||||
RealBucketName string `gorm:"index;size:255;not null" json:"real_bucket_name"`
|
RealBucketName string `gorm:"index;size:255;not null" json:"real_bucket_name"`
|
||||||
|
RealObjectKey string `gorm:"size:512;not null" json:"real_object_key"` // 真实对象key
|
||||||
CreatedAt time.Time `gorm:"not null" json:"created_at"`
|
CreatedAt time.Time `gorm:"not null" json:"created_at"`
|
||||||
UpdatedAt time.Time `gorm:"not null" json:"updated_at"`
|
UpdatedAt time.Time `gorm:"not null" json:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -101,6 +101,63 @@ func (s *Service) GetObjectInfo(key string) (*Object, error) {
|
|||||||
return &obj, nil
|
return &obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CopyObject 复制对象(只创建新的数据库映射记录,不复制实际数据)
|
||||||
|
func (s *Service) CopyObject(sourceKey, destKey string, metadata map[string]string) error {
|
||||||
|
// 获取源对象信息
|
||||||
|
sourceObj, err := s.GetObjectInfo(sourceKey)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("source object not found: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查目标对象是否已存在
|
||||||
|
var existingObj Object
|
||||||
|
if err := s.db.Where("`key` = ?", destKey).Where("`deleted_at` IS NULL").First(&existingObj).Error; err == nil {
|
||||||
|
// 目标对象已存在,删除旧的
|
||||||
|
if err := s.DeleteObject(destKey); err != nil {
|
||||||
|
return fmt.Errorf("failed to delete existing destination object: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 清理已软删除的同名对象
|
||||||
|
var deletedObj Object
|
||||||
|
if err := s.db.Unscoped().Where("`key` = ?", destKey).Where("`deleted_at` IS NOT NULL").First(&deletedObj).Error; err == nil {
|
||||||
|
if err := s.db.Unscoped().Delete(&deletedObj).Error; err != nil {
|
||||||
|
return fmt.Errorf("failed to permanently delete soft-deleted object: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建新的对象记录,指向相同的真实存储桶
|
||||||
|
newObj := &Object{
|
||||||
|
Key: destKey,
|
||||||
|
BucketName: sourceObj.BucketName, // 指向相同的真实桶
|
||||||
|
Size: sourceObj.Size,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用提供的元数据,如果没有则复制源对象的元数据
|
||||||
|
if len(metadata) > 0 {
|
||||||
|
newObj.Metadata = make(JSON)
|
||||||
|
for k, v := range metadata {
|
||||||
|
newObj.Metadata[k] = v
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 复制源对象的元数据
|
||||||
|
newObj.Metadata = make(JSON)
|
||||||
|
for k, v := range sourceObj.Metadata {
|
||||||
|
newObj.Metadata[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 插入新记录
|
||||||
|
if err := s.db.Create(newObj).Error; err != nil {
|
||||||
|
return fmt.Errorf("failed to create copy object record: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新存储桶统计
|
||||||
|
s.updateBucketStats(sourceObj.BucketName)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteObject 删除对象记录(软删除)
|
// DeleteObject 删除对象记录(软删除)
|
||||||
func (s *Service) DeleteObject(key string) error {
|
func (s *Service) DeleteObject(key string) error {
|
||||||
var obj Object
|
var obj Object
|
||||||
@@ -527,11 +584,12 @@ func (s *Service) GetAccessLogs(filter *AccessLogFilter) ([]*AccessLog, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateVirtualBucketMapping 创建虚拟存储桶文件级映射
|
// CreateVirtualBucketMapping 创建虚拟存储桶文件级映射
|
||||||
func (s *Service) CreateVirtualBucketMapping(virtualBucketName, objectKey, realBucketName string) error {
|
func (s *Service) CreateVirtualBucketMapping(virtualBucketName, objectKey, realBucketName, realObjectKey string) error {
|
||||||
mapping := &VirtualBucketMapping{
|
mapping := &VirtualBucketMapping{
|
||||||
VirtualBucketName: virtualBucketName,
|
VirtualBucketName: virtualBucketName,
|
||||||
ObjectKey: objectKey,
|
ObjectKey: objectKey,
|
||||||
RealBucketName: realBucketName,
|
RealBucketName: realBucketName,
|
||||||
|
RealObjectKey: realObjectKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.db.Create(mapping).Error; err != nil {
|
if err := s.db.Create(mapping).Error; err != nil {
|
||||||
@@ -553,6 +611,17 @@ func (s *Service) GetVirtualBucketMapping(virtualBucketName, objectKey string) (
|
|||||||
return &mapping, nil
|
return &mapping, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountMappingsToRealObject 统计指向同一真实对象的映射数量
|
||||||
|
func (s *Service) CountMappingsToRealObject(realBucketName, realObjectKey string) (int64, error) {
|
||||||
|
var count int64
|
||||||
|
if err := s.db.Model(&VirtualBucketMapping{}).
|
||||||
|
Where("real_bucket_name = ? AND real_object_key = ?", realBucketName, realObjectKey).
|
||||||
|
Count(&count).Error; err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to count mappings: %w", err)
|
||||||
|
}
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetVirtualBucketMappings 获取所有虚拟存储桶映射
|
// GetVirtualBucketMappings 获取所有虚拟存储桶映射
|
||||||
func (s *Service) GetVirtualBucketMappings() ([]*VirtualBucketMapping, error) {
|
func (s *Service) GetVirtualBucketMappings() ([]*VirtualBucketMapping, error) {
|
||||||
var mappings []*VirtualBucketMapping
|
var mappings []*VirtualBucketMapping
|
||||||
@@ -618,19 +687,45 @@ func (s *Service) GetVirtualBucketObjects(virtualBucketName string) ([]*Object,
|
|||||||
return []*Object{}, nil
|
return []*Object{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 收集所有对象键
|
// 收集所有真实对象键
|
||||||
objectKeys := make([]string, 0, len(mappings))
|
realObjectKeys := make([]string, 0, len(mappings))
|
||||||
for _, mapping := range mappings {
|
for _, mapping := range mappings {
|
||||||
objectKeys = append(objectKeys, mapping.ObjectKey)
|
realObjectKeys = append(realObjectKeys, mapping.RealObjectKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 从对象表中查询这些对象
|
// 从对象表中查询这些真实对象
|
||||||
var objects []*Object
|
var realObjects []*Object
|
||||||
if err := s.db.Where("`key` IN ?", objectKeys).Find(&objects).Error; err != nil {
|
if err := s.db.Where("`key` IN ?", realObjectKeys).Find(&realObjects).Error; err != nil {
|
||||||
return nil, fmt.Errorf("failed to get objects for virtual bucket: %w", err)
|
return nil, fmt.Errorf("failed to get objects for virtual bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return objects, nil
|
// 创建真实key到对象的映射
|
||||||
|
realObjectMap := make(map[string]*Object)
|
||||||
|
for _, obj := range realObjects {
|
||||||
|
realObjectMap[obj.Key] = obj
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建虚拟对象列表(使用虚拟key,但其他信息来自真实对象)
|
||||||
|
virtualObjects := make([]*Object, 0, len(mappings))
|
||||||
|
for _, mapping := range mappings {
|
||||||
|
if realObj, exists := realObjectMap[mapping.RealObjectKey]; exists {
|
||||||
|
// 创建虚拟对象副本,使用虚拟key
|
||||||
|
virtualObj := &Object{
|
||||||
|
ID: realObj.ID,
|
||||||
|
Key: mapping.ObjectKey, // 使用虚拟key
|
||||||
|
BucketName: realObj.BucketName,
|
||||||
|
Size: realObj.Size,
|
||||||
|
Metadata: realObj.Metadata,
|
||||||
|
ContentType: realObj.ContentType,
|
||||||
|
ETag: realObj.ETag,
|
||||||
|
CreatedAt: realObj.CreatedAt,
|
||||||
|
UpdatedAt: realObj.UpdatedAt,
|
||||||
|
}
|
||||||
|
virtualObjects = append(virtualObjects, virtualObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return virtualObjects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteVirtualBucketFileMapping 删除虚拟存储桶文件映射
|
// DeleteVirtualBucketFileMapping 删除虚拟存储桶文件映射
|
||||||
|
|||||||
Reference in New Issue
Block a user