From b79bb960946eef126d53695c0f7eeaacf8364f46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Thu, 13 Jan 2022 10:54:18 +0800 Subject: [PATCH] fix: resolve the problem of reporting last stats failure. --- cli/hrp/cmd/boom.go | 1 + internal/boomer/runner.go | 79 ++++++++++++++++++++++----------------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/cli/hrp/cmd/boom.go b/cli/hrp/cmd/boom.go index c2765c5a..e6bc89da 100644 --- a/cli/hrp/cmd/boom.go +++ b/cli/hrp/cmd/boom.go @@ -65,6 +65,7 @@ func init() { boomCmd.Flags().StringVar(&requestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.") boomCmd.Flags().IntVar(&spawnCount, "spawn-count", 1, "The number of users to spawn for load testing") boomCmd.Flags().Float64Var(&spawnRate, "spawn-rate", 1, "The rate for spawning users") + boomCmd.Flags().Int64Var(&loopCount, "loop-count", -1, "The specify running cycles for load testing") boomCmd.Flags().StringVar(&memoryProfile, "mem-profile", "", "Enable memory profiling.") boomCmd.Flags().DurationVar(&memoryProfileDuration, "mem-profile-duration", 30*time.Second, "Memory profile duration.") boomCmd.Flags().StringVar(&cpuProfile, "cpu-profile", "", "Enable CPU profiling.") diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index a205b652..9a7da130 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -273,50 +273,59 @@ func (r *localRunner) start() { // all running workers(goroutines) will select on this channel. // close this channel will stop all running workers. quitChan := make(chan bool) + // when this channel is closed, all statistics are reported successfully + reportedChan := make(chan bool) go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil) // output setup r.outputOnStart() // start running - var ticker = time.NewTicker(reportStatsInterval) - for { - select { - // record stats - case t := <-r.stats.transactionChan: - r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) - case m := <-r.stats.requestSuccessChan: - r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) - case n := <-r.stats.requestFailureChan: - r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) - r.stats.logError(n.requestType, n.name, n.errMsg) - // report stats - case <-ticker.C: - r.reportStats() - // stop - case <-r.stopChan: - atomic.StoreInt32(&r.state, stateQuitting) - - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(quitChan) - - // stop rate limiter - if r.rateLimitEnabled { - r.rateLimiter.Stop() + go func() { + var ticker = time.NewTicker(reportStatsInterval) + for { + select { + // record stats + case t := <-r.stats.transactionChan: + r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) + case m := <-r.stats.requestSuccessChan: + r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) + case n := <-r.stats.requestFailureChan: + r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) + r.stats.logError(n.requestType, n.name, n.errMsg) + // report stats + case <-ticker.C: + r.reportStats() + // close reportedChan and return if the last stats is reported successfully + if atomic.LoadInt32(&r.state) == stateQuitting { + close(reportedChan) + return + } } - - // report last stats - <-ticker.C - r.reportStats() - - // output teardown - r.outputOnStop() - - atomic.StoreInt32(&r.state, stateStopped) - return } + }() + + // stop + <-r.stopChan + atomic.StoreInt32(&r.state, stateQuitting) + + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + close(quitChan) + + // wait until all stats are reported successfully + <-reportedChan + + // stop rate limiter + if r.rateLimitEnabled { + r.rateLimiter.Stop() } + + // output teardown + r.outputOnStop() + + atomic.StoreInt32(&r.state, stateStopped) + return } func (r *localRunner) stop() {