功能: 一键部署 Agent 向导 (#44)

This commit is contained in:
Wu Qing
2026-04-19 17:25:34 +08:00
committed by GitHub
parent 66373fa8e4
commit 83bf5ec656
32 changed files with 3177 additions and 207 deletions

View File

@@ -0,0 +1,107 @@
package repository
import (
"context"
"errors"
"time"
"backupx/server/internal/model"
"gorm.io/gorm"
)
// AgentInstallTokenRepository 一次性安装令牌仓储。
type AgentInstallTokenRepository interface {
Create(ctx context.Context, t *model.AgentInstallToken) error
// FindByToken 按 token 字符串查询(不过滤状态),用于管理工具或审计场景。
FindByToken(ctx context.Context, token string) (*model.AgentInstallToken, error)
// FindValidByToken 查询且要求 consumed_at IS NULL 且 expires_at > now
// 适用于 compose 端点预检 Mode 等"只读不消费但需有效"的场景。
FindValidByToken(ctx context.Context, token string) (*model.AgentInstallToken, error)
// ConsumeByToken 原子消费:仅当 token 存在、未过期、未消费时成功,返回消费后的记录。
// 其它情况(不存在/已过期/已消费)一律返回 (nil, nil)。
ConsumeByToken(ctx context.Context, token string) (*model.AgentInstallToken, error)
// DeleteExpiredBefore 硬删除 ExpiresAt < threshold 的记录。
DeleteExpiredBefore(ctx context.Context, threshold time.Time) (int64, error)
// CountCreatedSince 统计 node 在 since 之后创建的数量(用于节点级限流)。
CountCreatedSince(ctx context.Context, nodeID uint, since time.Time) (int64, error)
}
type GormAgentInstallTokenRepository struct {
db *gorm.DB
}
func NewAgentInstallTokenRepository(db *gorm.DB) *GormAgentInstallTokenRepository {
return &GormAgentInstallTokenRepository{db: db}
}
func (r *GormAgentInstallTokenRepository) Create(ctx context.Context, t *model.AgentInstallToken) error {
return r.db.WithContext(ctx).Create(t).Error
}
func (r *GormAgentInstallTokenRepository) FindByToken(ctx context.Context, token string) (*model.AgentInstallToken, error) {
var item model.AgentInstallToken
if err := r.db.WithContext(ctx).Where("token = ?", token).First(&item).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &item, nil
}
// FindValidByToken 仅返回未消费且未过期的记录,过滤条件与 ConsumeByToken 对齐。
func (r *GormAgentInstallTokenRepository) FindValidByToken(ctx context.Context, token string) (*model.AgentInstallToken, error) {
var item model.AgentInstallToken
now := time.Now().UTC()
err := r.db.WithContext(ctx).
Where("token = ? AND consumed_at IS NULL AND expires_at > ?", token, now).
First(&item).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &item, nil
}
// ConsumeByToken 使用条件 UPDATE + RowsAffected 实现原子消费。
// SQLite 不支持 SELECT FOR UPDATE但 UPDATE 本身在 SQLite 中是原子的。
func (r *GormAgentInstallTokenRepository) ConsumeByToken(ctx context.Context, token string) (*model.AgentInstallToken, error) {
var consumed *model.AgentInstallToken
err := r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
now := time.Now().UTC()
result := tx.Model(&model.AgentInstallToken{}).
Where("token = ? AND consumed_at IS NULL AND expires_at > ?", token, now).
Update("consumed_at", &now)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return nil
}
var item model.AgentInstallToken
if err := tx.Where("token = ?", token).First(&item).Error; err != nil {
return err
}
consumed = &item
return nil
})
if err != nil {
return nil, err
}
return consumed, nil
}
func (r *GormAgentInstallTokenRepository) DeleteExpiredBefore(ctx context.Context, threshold time.Time) (int64, error) {
result := r.db.WithContext(ctx).Where("expires_at < ?", threshold).Delete(&model.AgentInstallToken{})
return result.RowsAffected, result.Error
}
func (r *GormAgentInstallTokenRepository) CountCreatedSince(ctx context.Context, nodeID uint, since time.Time) (int64, error) {
var n int64
err := r.db.WithContext(ctx).Model(&model.AgentInstallToken{}).
Where("node_id = ? AND created_at >= ?", nodeID, since).
Count(&n).Error
return n, err
}

View File

@@ -0,0 +1,151 @@
package repository
import (
"context"
"path/filepath"
"testing"
"time"
"backupx/server/internal/model"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
)
func openTestInstallTokenDB(t *testing.T) *gorm.DB {
t.Helper()
path := filepath.Join(t.TempDir(), "install.db")
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{Logger: gormlogger.Default.LogMode(gormlogger.Silent)})
if err != nil {
t.Fatalf("open: %v", err)
}
if err := db.AutoMigrate(&model.AgentInstallToken{}); err != nil {
t.Fatalf("migrate: %v", err)
}
return db
}
func TestInstallTokenConsumeOnce(t *testing.T) {
db := openTestInstallTokenDB(t)
repo := NewAgentInstallTokenRepository(db)
ctx := context.Background()
tok := &model.AgentInstallToken{
Token: "abc", NodeID: 1, Mode: model.InstallModeSystemd,
Arch: model.InstallArchAuto, AgentVer: "v1.7.0",
DownloadSrc: model.InstallSourceGitHub,
ExpiresAt: time.Now().UTC().Add(15 * time.Minute),
CreatedByID: 1,
}
if err := repo.Create(ctx, tok); err != nil {
t.Fatalf("create: %v", err)
}
got, err := repo.ConsumeByToken(ctx, "abc")
if err != nil {
t.Fatalf("consume err: %v", err)
}
if got == nil || got.ConsumedAt == nil {
t.Fatalf("expected consumed token, got %+v", got)
}
got, err = repo.ConsumeByToken(ctx, "abc")
if err != nil {
t.Fatalf("second consume err: %v", err)
}
if got != nil {
t.Fatalf("expected nil on second consume, got %+v", got)
}
}
func TestInstallTokenConsumeExpired(t *testing.T) {
db := openTestInstallTokenDB(t)
repo := NewAgentInstallTokenRepository(db)
ctx := context.Background()
tok := &model.AgentInstallToken{
Token: "stale", NodeID: 1, Mode: model.InstallModeSystemd,
Arch: model.InstallArchAuto, AgentVer: "v1.7.0",
DownloadSrc: model.InstallSourceGitHub,
ExpiresAt: time.Now().UTC().Add(-time.Minute),
CreatedByID: 1,
}
if err := repo.Create(ctx, tok); err != nil {
t.Fatalf("create: %v", err)
}
got, err := repo.ConsumeByToken(ctx, "stale")
if err != nil {
t.Fatalf("consume err: %v", err)
}
if got != nil {
t.Fatalf("expected nil on expired, got %+v", got)
}
}
func TestInstallTokenGC(t *testing.T) {
db := openTestInstallTokenDB(t)
repo := NewAgentInstallTokenRepository(db)
ctx := context.Background()
old := &model.AgentInstallToken{
Token: "old", NodeID: 1, Mode: model.InstallModeSystemd,
Arch: model.InstallArchAuto, AgentVer: "v1.7.0",
DownloadSrc: model.InstallSourceGitHub,
ExpiresAt: time.Now().UTC().Add(-8 * 24 * time.Hour),
CreatedByID: 1,
}
if err := repo.Create(ctx, old); err != nil {
t.Fatalf("create old: %v", err)
}
fresh := &model.AgentInstallToken{
Token: "fresh", NodeID: 1, Mode: model.InstallModeSystemd,
Arch: model.InstallArchAuto, AgentVer: "v1.7.0",
DownloadSrc: model.InstallSourceGitHub,
ExpiresAt: time.Now().UTC().Add(-1 * time.Hour),
CreatedByID: 1,
}
if err := repo.Create(ctx, fresh); err != nil {
t.Fatalf("create fresh: %v", err)
}
n, err := repo.DeleteExpiredBefore(ctx, time.Now().UTC().Add(-7*24*time.Hour))
if err != nil {
t.Fatalf("gc err: %v", err)
}
if n != 1 {
t.Fatalf("expected 1 deleted, got %d", n)
}
}
func TestInstallTokenCountCreatedSince(t *testing.T) {
db := openTestInstallTokenDB(t)
repo := NewAgentInstallTokenRepository(db)
ctx := context.Background()
// 同一节点 3 条
for i := 0; i < 3; i++ {
_ = repo.Create(ctx, &model.AgentInstallToken{
Token: "t" + string(rune('a'+i)), NodeID: 1, Mode: "systemd", Arch: "auto",
AgentVer: "v1", DownloadSrc: "github",
ExpiresAt: time.Now().UTC().Add(time.Minute), CreatedByID: 1,
})
}
// 另一节点 2 条(不计入)
for i := 0; i < 2; i++ {
_ = repo.Create(ctx, &model.AgentInstallToken{
Token: "n2_" + string(rune('a'+i)), NodeID: 2, Mode: "systemd", Arch: "auto",
AgentVer: "v1", DownloadSrc: "github",
ExpiresAt: time.Now().UTC().Add(time.Minute), CreatedByID: 1,
})
}
n, err := repo.CountCreatedSince(ctx, 1, time.Now().UTC().Add(-time.Minute))
if err != nil {
t.Fatalf("count err: %v", err)
}
if n != 3 {
t.Fatalf("expected 3, got %d", n)
}
}

View File

@@ -15,6 +15,8 @@ type NodeRepository interface {
FindByToken(context.Context, string) (*model.Node, error)
FindLocal(context.Context) (*model.Node, error)
Create(context.Context, *model.Node) error
// BatchCreate 在单一事务内批量创建节点,任一失败即全部回滚。
BatchCreate(ctx context.Context, nodes []*model.Node) error
Update(context.Context, *model.Node) error
Delete(context.Context, uint) error
MarkStaleOffline(ctx context.Context, threshold time.Time) (int64, error)
@@ -49,7 +51,20 @@ func (r *GormNodeRepository) FindByID(ctx context.Context, id uint) (*model.Node
func (r *GormNodeRepository) FindByToken(ctx context.Context, token string) (*model.Node, error) {
var item model.Node
if err := r.db.WithContext(ctx).Where("token = ?", token).First(&item).Error; err != nil {
// 主 token 查询
err := r.db.WithContext(ctx).Where("token = ?", token).First(&item).Error
if err == nil {
return &item, nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, err
}
// 回退prev_token 且未过期
now := time.Now().UTC()
err = r.db.WithContext(ctx).
Where("prev_token = ? AND prev_token_expires IS NOT NULL AND prev_token_expires > ?", token, now).
First(&item).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
@@ -73,6 +88,22 @@ func (r *GormNodeRepository) Create(ctx context.Context, item *model.Node) error
return r.db.WithContext(ctx).Create(item).Error
}
// BatchCreate 在单一事务中批量创建节点。任一记录失败即事务回滚。
// 节点 ID 在事务提交后回填到入参切片元素上。
func (r *GormNodeRepository) BatchCreate(ctx context.Context, nodes []*model.Node) error {
if len(nodes) == 0 {
return nil
}
return r.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, n := range nodes {
if err := tx.Create(n).Error; err != nil {
return err
}
}
return nil
})
}
func (r *GormNodeRepository) Update(ctx context.Context, item *model.Node) error {
return r.db.WithContext(ctx).Save(item).Error
}

View File

@@ -0,0 +1,76 @@
package repository
import (
"context"
"path/filepath"
"testing"
"time"
"backupx/server/internal/model"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
)
func openTestNodeDB(t *testing.T) *gorm.DB {
t.Helper()
path := filepath.Join(t.TempDir(), "nodes.db")
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{Logger: gormlogger.Default.LogMode(gormlogger.Silent)})
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
if err := db.AutoMigrate(&model.Node{}); err != nil {
t.Fatalf("migrate: %v", err)
}
return db
}
func TestFindByTokenFallsBackToPrevToken(t *testing.T) {
db := openTestNodeDB(t)
repo := NewNodeRepository(db)
ctx := context.Background()
future := time.Now().UTC().Add(24 * time.Hour)
node := &model.Node{
Name: "test", Token: "new-token",
PrevToken: "old-token", PrevTokenExpires: &future,
}
if err := repo.Create(ctx, node); err != nil {
t.Fatalf("create: %v", err)
}
// 新 token 能查到
got, err := repo.FindByToken(ctx, "new-token")
if err != nil || got == nil || got.ID != node.ID {
t.Fatalf("new token lookup failed: err=%v got=%v", err, got)
}
// 旧 token 也能查到(未过期)
got, err = repo.FindByToken(ctx, "old-token")
if err != nil || got == nil || got.ID != node.ID {
t.Fatalf("prev_token lookup failed: err=%v got=%v", err, got)
}
}
func TestFindByTokenRejectsExpiredPrevToken(t *testing.T) {
db := openTestNodeDB(t)
repo := NewNodeRepository(db)
ctx := context.Background()
past := time.Now().UTC().Add(-1 * time.Hour)
node := &model.Node{
Name: "test", Token: "new-token",
PrevToken: "stale", PrevTokenExpires: &past,
}
if err := repo.Create(ctx, node); err != nil {
t.Fatalf("create: %v", err)
}
got, err := repo.FindByToken(ctx, "stale")
if err != nil {
t.Fatalf("err=%v", err)
}
if got != nil {
t.Fatalf("expected stale prev_token rejected, got %v", got)
}
}