diff --git a/cli/hrp/cmd/boom.go b/cli/hrp/cmd/boom.go index 35a7d6a9..e6bc89da 100644 --- a/cli/hrp/cmd/boom.go +++ b/cli/hrp/cmd/boom.go @@ -29,6 +29,9 @@ var boomCmd = &cobra.Command{ } hrpBoomer := hrp.NewBoomer(spawnCount, spawnRate) hrpBoomer.SetRateLimiter(maxRPS, requestIncreaseRate) + if loopCount > 0 { + hrpBoomer.SetLoopCount(loopCount) + } if !disableConsoleOutput { hrpBoomer.AddOutput(boomer.NewConsoleOutput()) } @@ -45,6 +48,7 @@ var ( spawnCount int spawnRate float64 maxRPS int64 + loopCount int64 requestIncreaseRate string memoryProfile string memoryProfileDuration time.Duration @@ -61,6 +65,7 @@ func init() { boomCmd.Flags().StringVar(&requestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.") boomCmd.Flags().IntVar(&spawnCount, "spawn-count", 1, "The number of users to spawn for load testing") boomCmd.Flags().Float64Var(&spawnRate, "spawn-rate", 1, "The rate for spawning users") + boomCmd.Flags().Int64Var(&loopCount, "loop-count", -1, "The specify running cycles for load testing") boomCmd.Flags().StringVar(&memoryProfile, "mem-profile", "", "Enable memory profiling.") boomCmd.Flags().DurationVar(&memoryProfileDuration, "mem-profile-duration", 30*time.Second, "Memory profile duration.") boomCmd.Flags().StringVar(&cpuProfile, "cpu-profile", "", "Enable CPU profiling.") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e7c708e6..ab72d19f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,5 +1,10 @@ # Release History +## v0.5.1 (2022-01-13) + +- feat: support specifying running cycles for load testing +- fix: ensure last stats reported when stop running + ## v0.5.0 (2022-01-08) - feat: support creating and calling custom functions with [go plugin](https://pkg.go.dev/plugin) diff --git a/docs/cmd/hrp_boom.md b/docs/cmd/hrp_boom.md index f421c9d9..1d0a6263 100644 --- a/docs/cmd/hrp_boom.md +++ b/docs/cmd/hrp_boom.md @@ -25,6 +25,7 @@ hrp boom [flags] --cpu-profile-duration duration CPU profile duration. (default 30s) --disable-console-output Disable console output. -h, --help help for boom + --loop-count int The specify running cycles for load testing (default -1) --max-rps int Max RPS that boomer can generate, disabled by default. --mem-profile string Enable memory profiling. --mem-profile-duration duration Memory profile duration. (default 30s) diff --git a/internal/boomer/boomer.go b/internal/boomer/boomer.go index 55a01e7f..0b22f6cb 100644 --- a/internal/boomer/boomer.go +++ b/internal/boomer/boomer.go @@ -52,6 +52,11 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) { } } +// SetLoopCount set loop count for test. +func (b *Boomer) SetLoopCount(loopCount int64) { + b.localRunner.loop = &Loop{loopCount: loopCount} +} + // AddOutput accepts outputs which implements the boomer.Output interface. func (b *Boomer) AddOutput(o Output) { b.localRunner.addOutput(o) diff --git a/internal/boomer/output.go b/internal/boomer/output.go index ac8c7538..5c7c7542 100644 --- a/internal/boomer/output.go +++ b/internal/boomer/output.go @@ -197,6 +197,8 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) { return nil, fmt.Errorf("stats is not []interface{}") } + errors := data["errors"].(map[string]map[string]interface{}) + transactions, ok := data["transactions"].(map[string]int64) if !ok { return nil, fmt.Errorf("transactions is not map[string]int64") @@ -223,6 +225,7 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) { TotalRPS: getCurrentRps(entryTotalOutput.NumRequests), TotalFailRatio: getTotalFailRatio(entryTotalOutput.NumRequests, entryTotalOutput.NumFailures), Stats: make([]*statsEntryOutput, 0, len(stats)), + Errors: errors, } // convert stats @@ -329,6 +332,24 @@ var ( ) ) +// summary for total +var ( + summaryResponseTime = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "response_time", + Help: "The summary of response time", + Objectives: map[float64]float64{ + 0.5: 0.01, + 0.9: 0.01, + 0.95: 0.005, + }, + AgeBuckets: 1, + MaxAge: 100000 * time.Second, + }, + []string{"method", "name"}, + ) +) + // gauges for total var ( gaugeUsers = prometheus.NewGauge( @@ -367,6 +388,13 @@ var ( Help: "The accumulated number of failed transactions", }, ) + gaugeErrors = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "errors", + Help: "The errors of load testing", + }, + []string{"method", "name", "error"}, + ) ) // NewPrometheusPusherOutput returns a PrometheusPusherOutput. @@ -397,6 +425,9 @@ func (o *PrometheusPusherOutput) OnStart() { gaugeAverageContentLength, gaugeCurrentRPS, gaugeCurrentFailPerSec, + gaugeErrors, + // summary for total + summaryResponseTime, // gauges for total gaugeUsers, gaugeState, @@ -449,6 +480,21 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) { gaugeAverageContentLength.WithLabelValues(method, name).Set(float64(stat.avgContentLength)) gaugeCurrentRPS.WithLabelValues(method, name).Set(stat.currentRps) gaugeCurrentFailPerSec.WithLabelValues(method, name).Set(float64(stat.currentFailPerSec)) + for responseTime, count := range stat.ResponseTimes { + var i int64 + for i = 0; i < count; i++ { + summaryResponseTime.WithLabelValues(method, name).Observe(float64(responseTime)) + } + } + } + + // errors + for _, requestError := range output.Errors { + gaugeErrors.WithLabelValues( + requestError["method"].(string), + requestError["name"].(string), + requestError["error"].(string), + ).Set(float64(requestError["occurrences"].(int64))) } if err := o.pusher.Push(); err != nil { diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index 2ddd6f30..9a7da130 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -24,6 +24,31 @@ const ( reportStatsInterval = 3 * time.Second ) +type Loop struct { + loopCount int64 // more than 0 + acquiredCount int64 // count acquired of load testing + finishedCount int64 // count finished of load testing +} + +func (l *Loop) isFinished() bool { + // return true when there are no remaining loop count to test + return atomic.LoadInt64(&l.finishedCount) == l.loopCount +} + +func (l *Loop) acquire() bool { + // get one ticket when there are still remaining loop count to test + // return true when getting ticket successfully + if atomic.LoadInt64(&l.acquiredCount) < l.loopCount { + atomic.AddInt64(&l.acquiredCount, 1) + return true + } + return false +} + +func (l *Loop) increaseFinishedCount() { + atomic.AddInt64(&l.finishedCount, 1) +} + type runner struct { state int32 @@ -37,6 +62,7 @@ type runner struct { currentClientsNum int32 // current clients count spawnCount int // target clients to spawn spawnRate float64 + loop *Loop // specify running cycles outputs []Output } @@ -78,7 +104,7 @@ func (r *runner) outputOnStart() { wg.Wait() } -func (r *runner) outputOnEevent(data map[string]interface{}) { +func (r *runner) outputOnEvent(data map[string]interface{}) { size := len(r.outputs) if size == 0 { return @@ -110,7 +136,14 @@ func (r *runner) outputOnStop() { wg.Wait() } -func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { +func (r *runner) reportStats() { + data := r.stats.collectReportData() + data["user_count"] = atomic.LoadInt32(&r.currentClientsNum) + data["state"] = atomic.LoadInt32(&r.state) + r.outputOnEvent(data) +} + +func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { log.Info(). Int("spawnCount", spawnCount). Float64("spawnRate", spawnRate). @@ -135,6 +168,9 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, case <-quit: return default: + if r.loop != nil && !r.loop.acquire() { + return + } if r.rateLimitEnabled { blocked := r.rateLimiter.Acquire() if !blocked { @@ -145,6 +181,12 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, task := r.getTask() r.safeRun(task.Fn) } + if r.loop != nil { + r.loop.increaseFinishedCount() + if r.loop.isFinished() { + r.stop() + } + } } } }() @@ -231,49 +273,59 @@ func (r *localRunner) start() { // all running workers(goroutines) will select on this channel. // close this channel will stop all running workers. quitChan := make(chan bool) + // when this channel is closed, all statistics are reported successfully + reportedChan := make(chan bool) go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil) // output setup r.outputOnStart() // start running - var ticker = time.NewTicker(reportStatsInterval) - for { - select { - // record stats - case t := <-r.stats.transactionChan: - r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) - case m := <-r.stats.requestSuccessChan: - r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) - case n := <-r.stats.requestFailureChan: - r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) - r.stats.logError(n.requestType, n.name, n.errMsg) - // report stats - case <-ticker.C: - data := r.stats.collectReportData() - data["user_count"] = atomic.LoadInt32(&r.currentClientsNum) - data["state"] = atomic.LoadInt32(&r.state) - r.outputOnEevent(data) - // stop - case <-r.stopChan: - atomic.StoreInt32(&r.state, stateQuitting) - - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(quitChan) - - // stop rate limiter - if r.rateLimitEnabled { - r.rateLimiter.Stop() + go func() { + var ticker = time.NewTicker(reportStatsInterval) + for { + select { + // record stats + case t := <-r.stats.transactionChan: + r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) + case m := <-r.stats.requestSuccessChan: + r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) + case n := <-r.stats.requestFailureChan: + r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) + r.stats.logError(n.requestType, n.name, n.errMsg) + // report stats + case <-ticker.C: + r.reportStats() + // close reportedChan and return if the last stats is reported successfully + if atomic.LoadInt32(&r.state) == stateQuitting { + close(reportedChan) + return + } } - - // output teardown - r.outputOnStop() - - atomic.StoreInt32(&r.state, stateStopped) - return } + }() + + // stop + <-r.stopChan + atomic.StoreInt32(&r.state, stateQuitting) + + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + close(quitChan) + + // wait until all stats are reported successfully + <-reportedChan + + // stop rate limiter + if r.rateLimitEnabled { + r.rateLimiter.Stop() } + + // output teardown + r.outputOnStop() + + atomic.StoreInt32(&r.state, stateStopped) + return } func (r *localRunner) stop() { diff --git a/internal/boomer/runner_test.go b/internal/boomer/runner_test.go index 22f6ad8b..e6e15992 100644 --- a/internal/boomer/runner_test.go +++ b/internal/boomer/runner_test.go @@ -1,8 +1,11 @@ package boomer import ( + "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/assert" ) type HitOutput struct { @@ -45,13 +48,13 @@ func TestOutputOnStart(t *testing.T) { } } -func TestOutputOnEevent(t *testing.T) { +func TestOutputOnEvent(t *testing.T) { hitOutput := &HitOutput{} hitOutput2 := &HitOutput{} runner := &runner{} runner.addOutput(hitOutput) runner.addOutput(hitOutput2) - runner.outputOnEevent(nil) + runner.outputOnEvent(nil) if !hitOutput.onEvent { t.Error("hitOutput's OnEvent has not been called") } @@ -90,3 +93,24 @@ func TestLocalRunner(t *testing.T) { time.Sleep(4 * time.Second) runner.stop() } + +func TestLoopCount(t *testing.T) { + taskA := &Task{ + Weight: 10, + Fn: func() { + time.Sleep(time.Second) + }, + Name: "TaskA", + } + tasks := []*Task{taskA} + runner := newLocalRunner(2, 2) + runner.loop = &Loop{loopCount: 4} + runner.setTasks(tasks) + go runner.start() + ticker := time.NewTicker(4 * time.Second) + defer ticker.Stop() + <-ticker.C + if !assert.Equal(t, runner.loop.loopCount, atomic.LoadInt64(&runner.loop.finishedCount)) { + t.Fail() + } +} diff --git a/internal/version/init.go b/internal/version/init.go index 8b4b8520..53bdb1fa 100644 --- a/internal/version/init.go +++ b/internal/version/init.go @@ -1,3 +1,3 @@ package version -const VERSION = "v0.5.0" +const VERSION = "v0.5.1"