refactor: simplify boomer runner

This commit is contained in:
debugtalk
2021-12-24 10:05:27 +08:00
parent bc8e70a742
commit f2afb8f9de
8 changed files with 40 additions and 57 deletions

View File

@@ -22,4 +22,4 @@ Copyright 2021 debugtalk
* [hrp har2case](hrp_har2case.md) - Convert HAR to json/yaml testcase files
* [hrp run](hrp_run.md) - run API test
###### Auto generated by spf13/cobra on 23-Dec-2021
###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -38,4 +38,4 @@ hrp boom [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021
###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -23,4 +23,4 @@ hrp har2case harPath... [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021
###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -31,4 +31,4 @@ hrp run path... [flags]
* [hrp](hrp.md) - One-stop solution for HTTP(S) testing.
###### Auto generated by spf13/cobra on 23-Dec-2021
###### Auto generated by spf13/cobra on 24-Dec-2021

View File

@@ -9,26 +9,19 @@ import (
// A Boomer is used to run tasks.
type Boomer struct {
rateLimiter RateLimiter
localRunner *localRunner
spawnCount int
spawnRate float64
cpuProfile string
cpuProfileDuration time.Duration
memoryProfile string
memoryProfileDuration time.Duration
outputs []Output
}
// NewStandaloneBoomer returns a new Boomer, which can run without master.
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
return &Boomer{
spawnCount: spawnCount,
spawnRate: spawnRate,
localRunner: newLocalRunner(spawnCount, spawnRate),
}
}
@@ -52,12 +45,16 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
log.Error().Err(err).Msg("failed to create rate limiter")
return
}
b.rateLimiter = rateLimiter
if rateLimiter != nil {
b.localRunner.rateLimitEnabled = true
b.localRunner.rateLimiter = rateLimiter
}
}
// AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) {
b.outputs = append(b.outputs, o)
b.localRunner.addOutput(o)
}
// EnableCPUProfile will start cpu profiling after run.
@@ -87,19 +84,12 @@ func (b *Boomer) Run(tasks ...*Task) {
}
}
b.localRunner = newLocalRunner(tasks, b.rateLimiter, b.spawnCount, b.spawnRate)
for _, o := range b.outputs {
b.localRunner.addOutput(o)
}
b.localRunner.setTasks(tasks)
b.localRunner.start()
}
// RecordTransaction reports a transaction stat.
func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.transactionChan <- &transaction{
name: name,
success: success,
@@ -110,10 +100,6 @@ func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64,
// RecordSuccess reports a success.
func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.requestSuccessChan <- &requestSuccess{
requestType: requestType,
name: name,
@@ -124,10 +110,6 @@ func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, res
// RecordFailure reports a failure.
func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stats.requestFailureChan <- &requestFailure{
requestType: requestType,
name: name,
@@ -138,9 +120,5 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
// Quit will send a quit message to the master.
func (b *Boomer) Quit() {
if b.localRunner == nil {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.stop()
}

View File

@@ -12,11 +12,11 @@ import (
func TestNewStandaloneBoomer(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
if b.spawnCount != 100 {
if b.localRunner.spawnCount != 100 {
t.Error("spawnCount should be 100")
}
if b.spawnRate != 10 {
if b.localRunner.spawnRate != 10 {
t.Error("spawnRate should be 10")
}
}
@@ -25,7 +25,7 @@ func TestSetRateLimiter(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
b.SetRateLimiter(10, "10/1s")
if b.rateLimiter == nil {
if b.localRunner.rateLimiter == nil {
t.Error("b.rateLimiter should not be nil")
}
}
@@ -35,7 +35,7 @@ func TestAddOutput(t *testing.T) {
b.AddOutput(NewConsoleOutput())
b.AddOutput(NewConsoleOutput())
if len(b.outputs) != 2 {
if len(b.localRunner.outputs) != 2 {
t.Error("length of outputs should be 2")
}
}
@@ -106,7 +106,7 @@ func TestCreateRatelimiter(t *testing.T) {
b := NewStandaloneBoomer(10, 10)
b.SetRateLimiter(100, "-1")
if stableRateLimiter, ok := b.rateLimiter.(*StableRateLimiter); !ok {
if stableRateLimiter, ok := b.localRunner.rateLimiter.(*StableRateLimiter); !ok {
t.Error("Expected stableRateLimiter")
} else {
if stableRateLimiter.threshold != 100 {
@@ -115,7 +115,7 @@ func TestCreateRatelimiter(t *testing.T) {
}
b.SetRateLimiter(0, "1")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok {
if rampUpRateLimiter, ok := b.localRunner.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != math.MaxInt64 {
@@ -127,7 +127,7 @@ func TestCreateRatelimiter(t *testing.T) {
}
b.SetRateLimiter(10, "2/2s")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok {
if rampUpRateLimiter, ok := b.localRunner.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != 10 {

View File

@@ -201,20 +201,17 @@ type localRunner struct {
stopChan chan bool
}
func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) {
r = &localRunner{}
r.setTasks(tasks)
r.spawnRate = spawnRate
r.spawnCount = spawnCount
r.stopChan = make(chan bool)
if rateLimiter != nil {
r.rateLimitEnabled = true
r.rateLimiter = rateLimiter
func newLocalRunner(spawnCount int, spawnRate float64) *localRunner {
return &localRunner{
runner: runner{
state: stateInit,
spawnRate: spawnRate,
spawnCount: spawnCount,
stats: newRequestStats(),
outputs: make([]Output, 0),
},
stopChan: make(chan bool),
}
r.stats = newRequestStats()
return r
}
func (r *localRunner) start() {
@@ -240,6 +237,7 @@ func (r *localRunner) start() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
@@ -247,28 +245,34 @@ func (r *localRunner) start() {
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEevent(data)
// stop
case <-r.stopChan:
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
}
}
func (r *localRunner) stop() {
atomic.StoreInt32(&r.state, stateQuitting)
close(r.stopChan)
atomic.StoreInt32(&r.state, stateStopped)
}

View File

@@ -84,7 +84,8 @@ func TestLocalRunner(t *testing.T) {
Name: "TaskA",
}
tasks := []*Task{taskA}
runner := newLocalRunner(tasks, nil, 2, 2)
runner := newLocalRunner(2, 2)
runner.setTasks(tasks)
go runner.start()
time.Sleep(4 * time.Second)
runner.stop()