feat: specify running cycles for load testing.

This commit is contained in:
徐聪
2022-01-12 14:48:43 +08:00
parent b80730d8a4
commit 7a15df3177
4 changed files with 83 additions and 8 deletions

View File

@@ -29,6 +29,9 @@ var boomCmd = &cobra.Command{
} }
hrpBoomer := hrp.NewBoomer(spawnCount, spawnRate) hrpBoomer := hrp.NewBoomer(spawnCount, spawnRate)
hrpBoomer.SetRateLimiter(maxRPS, requestIncreaseRate) hrpBoomer.SetRateLimiter(maxRPS, requestIncreaseRate)
if loopCount > 0 {
hrpBoomer.SetLoopCount(loopCount)
}
if !disableConsoleOutput { if !disableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput()) hrpBoomer.AddOutput(boomer.NewConsoleOutput())
} }
@@ -45,6 +48,7 @@ var (
spawnCount int spawnCount int
spawnRate float64 spawnRate float64
maxRPS int64 maxRPS int64
loopCount int64
requestIncreaseRate string requestIncreaseRate string
memoryProfile string memoryProfile string
memoryProfileDuration time.Duration memoryProfileDuration time.Duration

View File

@@ -52,6 +52,11 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
} }
} }
// SetLoopCount set loop count for test.
func (b *Boomer) SetLoopCount(loopCount int64) {
b.localRunner.loop = &Loop{loopCount: loopCount}
}
// AddOutput accepts outputs which implements the boomer.Output interface. // AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) { func (b *Boomer) AddOutput(o Output) {
b.localRunner.addOutput(o) b.localRunner.addOutput(o)

View File

@@ -24,6 +24,31 @@ const (
reportStatsInterval = 3 * time.Second reportStatsInterval = 3 * time.Second
) )
type Loop struct {
loopCount int64 // more than 0
acquiredCount int64 // count acquired of load testing
finishedCount int64 // count finished of load testing
}
func (l *Loop) isFinished() bool {
// return true when there are no remaining loop count to test
return atomic.LoadInt64(&l.finishedCount) == l.loopCount
}
func (l *Loop) acquire() bool {
// get one ticket when there are still remaining loop count to test
// return true when getting ticket successfully
if atomic.LoadInt64(&l.acquiredCount) < l.loopCount {
atomic.AddInt64(&l.acquiredCount, 1)
return true
}
return false
}
func (l *Loop) increaseFinishedCount() {
atomic.AddInt64(&l.finishedCount, 1)
}
type runner struct { type runner struct {
state int32 state int32
@@ -37,6 +62,7 @@ type runner struct {
currentClientsNum int32 // current clients count currentClientsNum int32 // current clients count
spawnCount int // target clients to spawn spawnCount int // target clients to spawn
spawnRate float64 spawnRate float64
loop *Loop // specify running cycles
outputs []Output outputs []Output
} }
@@ -78,7 +104,7 @@ func (r *runner) outputOnStart() {
wg.Wait() wg.Wait()
} }
func (r *runner) outputOnEevent(data map[string]interface{}) { func (r *runner) outputOnEvent(data map[string]interface{}) {
size := len(r.outputs) size := len(r.outputs)
if size == 0 { if size == 0 {
return return
@@ -110,7 +136,14 @@ func (r *runner) outputOnStop() {
wg.Wait() wg.Wait()
} }
func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { func (r *runner) reportStats() {
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEvent(data)
}
func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
log.Info(). log.Info().
Int("spawnCount", spawnCount). Int("spawnCount", spawnCount).
Float64("spawnRate", spawnRate). Float64("spawnRate", spawnRate).
@@ -135,6 +168,9 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
case <-quit: case <-quit:
return return
default: default:
if r.loop != nil && !r.loop.acquire() {
return
}
if r.rateLimitEnabled { if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire() blocked := r.rateLimiter.Acquire()
if !blocked { if !blocked {
@@ -145,6 +181,12 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
task := r.getTask() task := r.getTask()
r.safeRun(task.Fn) r.safeRun(task.Fn)
} }
if r.loop != nil {
r.loop.increaseFinishedCount()
if r.loop.isFinished() {
r.stop()
}
}
} }
} }
}() }()
@@ -250,10 +292,7 @@ func (r *localRunner) start() {
r.stats.logError(n.requestType, n.name, n.errMsg) r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats // report stats
case <-ticker.C: case <-ticker.C:
data := r.stats.collectReportData() r.reportStats()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEevent(data)
// stop // stop
case <-r.stopChan: case <-r.stopChan:
atomic.StoreInt32(&r.state, stateQuitting) atomic.StoreInt32(&r.state, stateQuitting)
@@ -267,6 +306,10 @@ func (r *localRunner) start() {
r.rateLimiter.Stop() r.rateLimiter.Stop()
} }
// report last stats
<-ticker.C
r.reportStats()
// output teardown // output teardown
r.outputOnStop() r.outputOnStop()

View File

@@ -3,6 +3,8 @@ package boomer
import ( import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
) )
type HitOutput struct { type HitOutput struct {
@@ -45,13 +47,13 @@ func TestOutputOnStart(t *testing.T) {
} }
} }
func TestOutputOnEevent(t *testing.T) { func TestOutputOnEvent(t *testing.T) {
hitOutput := &HitOutput{} hitOutput := &HitOutput{}
hitOutput2 := &HitOutput{} hitOutput2 := &HitOutput{}
runner := &runner{} runner := &runner{}
runner.addOutput(hitOutput) runner.addOutput(hitOutput)
runner.addOutput(hitOutput2) runner.addOutput(hitOutput2)
runner.outputOnEevent(nil) runner.outputOnEvent(nil)
if !hitOutput.onEvent { if !hitOutput.onEvent {
t.Error("hitOutput's OnEvent has not been called") t.Error("hitOutput's OnEvent has not been called")
} }
@@ -90,3 +92,24 @@ func TestLocalRunner(t *testing.T) {
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
runner.stop() runner.stop()
} }
func TestLoopCount(t *testing.T) {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}
tasks := []*Task{taskA}
runner := newLocalRunner(2, 2)
runner.loop = &Loop{loopCount: 4}
runner.setTasks(tasks)
go runner.start()
ticker := time.NewTicker(4 * time.Second)
defer ticker.Stop()
<-ticker.C
if !assert.Equal(t, runner.loop.loopCount, runner.loop.finishedCount) {
t.Fail()
}
}