From 0c0c154a28e97a37eac1c1afd03853eb3237d607 Mon Sep 17 00:00:00 2001 From: debugtalk Date: Thu, 23 Dec 2021 22:01:35 +0800 Subject: [PATCH] refactor: simplify boomer runner --- internal/boomer/boomer.go | 4 +- internal/boomer/runner.go | 116 ++++++++++++++++----------------- internal/boomer/runner_test.go | 4 +- internal/boomer/stats.go | 42 +----------- internal/boomer/stats_test.go | 36 +--------- 5 files changed, 65 insertions(+), 137 deletions(-) diff --git a/internal/boomer/boomer.go b/internal/boomer/boomer.go index 3f50a60b..a0c59b2a 100644 --- a/internal/boomer/boomer.go +++ b/internal/boomer/boomer.go @@ -91,7 +91,7 @@ func (b *Boomer) Run(tasks ...*Task) { for _, o := range b.outputs { b.localRunner.addOutput(o) } - b.localRunner.run() + b.localRunner.start() } // RecordTransaction reports a transaction stat. @@ -142,5 +142,5 @@ func (b *Boomer) Quit() { log.Warn().Msg("boomer not initialized") return } - b.localRunner.close() + b.localRunner.stop() } diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index c8c24fab..a60ba805 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -34,15 +34,9 @@ type runner struct { rateLimitEnabled bool stats *requestStats - numClients int32 - spawnRate float64 - - // all running workers(goroutines) will select on this channel. - // close this channel will stop all running workers. - stopChan chan bool - - // close this channel will stop all goroutines used in runner. - closeChan chan bool + currentClientsNum int32 // current clients count + spawnCount int // target clients to spawn + spawnRate float64 outputs []Output } @@ -116,16 +110,21 @@ func (r *runner) outputOnStop() { wg.Wait() } -func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { - log.Info().Int("spawnCount", spawnCount).Msg("Spawning clients immediately") +func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { + log.Info(). + Int("spawnCount", spawnCount). + Float64("spawnRate", spawnRate). + Msg("Spawning workers") + // TODO: spawn workers with spawnRate for i := 1; i <= spawnCount; i++ { select { case <-quit: // quit spawning goroutine + log.Info().Msg("Quitting spawning workers") return default: - atomic.AddInt32(&r.numClients, 1) + atomic.AddInt32(&r.currentClientsNum, 1) go func() { for { select { @@ -193,28 +192,11 @@ func (r *runner) getTask() *Task { return nil } -func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { - r.stats.clearStatsChan <- true - r.stopChan = make(chan bool) - - atomic.StoreInt32(&r.numClients, 0) - - go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) -} - -func (r *runner) stop() { - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(r.stopChan) - if r.rateLimitEnabled { - r.rateLimiter.Stop() - } -} - type localRunner struct { runner - spawnCount int + // close this channel will stop all goroutines used in runner. + stopChan chan bool } func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) { @@ -222,7 +204,7 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw r.setTasks(tasks) r.spawnRate = spawnRate r.spawnCount = spawnCount - r.closeChan = make(chan bool) + r.stopChan = make(chan bool) if rateLimiter != nil { r.rateLimitEnabled = true @@ -233,39 +215,55 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw return r } -func (r *localRunner) run() { +func (r *localRunner) start() { + // init state r.state = stateInit - r.stats.start() - r.outputOnStart() - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for { - select { - case data := <-r.stats.messageToRunnerChan: - data["user_count"] = atomic.LoadInt32(&r.numClients) - r.outputOnEevent(data) - case <-r.closeChan: - r.stop() - wg.Done() - r.outputOnStop() - return - } - } - }() + atomic.StoreInt32(&r.currentClientsNum, 0) + r.stats.clearAll() + // start rate limiter if r.rateLimitEnabled { r.rateLimiter.Start() } - r.startSpawning(r.spawnCount, r.spawnRate, nil) - wg.Wait() -} + // all running workers(goroutines) will select on this channel. + // close this channel will stop all running workers. + quitChan := make(chan bool) + go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil) -func (r *localRunner) close() { - if r.stats != nil { - r.stats.close() + // output setup + r.outputOnStart() + + // start running + var ticker = time.NewTicker(reportStatsInterval) + for { + select { + case t := <-r.stats.transactionChan: + r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) + case m := <-r.stats.requestSuccessChan: + r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) + case n := <-r.stats.requestFailureChan: + r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) + r.stats.logError(n.requestType, n.name, n.errMsg) + case <-ticker.C: + data := r.stats.collectReportData() + data["user_count"] = atomic.LoadInt32(&r.currentClientsNum) + r.outputOnEevent(data) + case <-r.stopChan: + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + close(quitChan) + // stop rate limiter + if r.rateLimitEnabled { + r.rateLimiter.Stop() + } + // output teardown + r.outputOnStop() + return + } } - close(r.closeChan) +} + +func (r *localRunner) stop() { + close(r.stopChan) } diff --git a/internal/boomer/runner_test.go b/internal/boomer/runner_test.go index e1a7e3f0..16a3bc6e 100644 --- a/internal/boomer/runner_test.go +++ b/internal/boomer/runner_test.go @@ -85,7 +85,7 @@ func TestLocalRunner(t *testing.T) { } tasks := []*Task{taskA} runner := newLocalRunner(tasks, nil, 2, 2) - go runner.run() + go runner.start() time.Sleep(4 * time.Second) - runner.close() + runner.stop() } diff --git a/internal/boomer/stats.go b/internal/boomer/stats.go index 18d9a5c3..0c5aee37 100644 --- a/internal/boomer/stats.go +++ b/internal/boomer/stats.go @@ -36,11 +36,8 @@ type requestStats struct { transactionPassed int64 // accumulated number of passed transactions transactionFailed int64 // accumulated number of failed transactions - requestSuccessChan chan *requestSuccess - requestFailureChan chan *requestFailure - clearStatsChan chan bool - messageToRunnerChan chan map[string]interface{} - shutdownChan chan bool + requestSuccessChan chan *requestSuccess + requestFailureChan chan *requestFailure } func newRequestStats() (stats *requestStats) { @@ -54,9 +51,6 @@ func newRequestStats() (stats *requestStats) { stats.transactionChan = make(chan *transaction, 100) stats.requestSuccessChan = make(chan *requestSuccess, 100) stats.requestFailureChan = make(chan *requestFailure, 100) - stats.clearStatsChan = make(chan bool) - stats.messageToRunnerChan = make(chan map[string]interface{}, 10) - stats.shutdownChan = make(chan bool) stats.total = &statsEntry{ Name: "Total", @@ -106,9 +100,9 @@ func (s *requestStats) get(name string, method string) (entry *statsEntry) { Name: name, Method: method, NumReqsPerSec: make(map[int64]int64), + NumFailPerSec: make(map[int64]int64), ResponseTimes: make(map[int64]int64), } - newEntry.reset() s.entries[name+method] = newEntry return newEntry } @@ -155,36 +149,6 @@ func (s *requestStats) collectReportData() map[string]interface{} { return data } -func (s *requestStats) start() { - go func() { - var ticker = time.NewTicker(reportStatsInterval) - for { - select { - case t := <-s.transactionChan: - s.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) - case m := <-s.requestSuccessChan: - s.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) - case n := <-s.requestFailureChan: - s.logRequest(n.requestType, n.name, n.responseTime, 0) - s.logError(n.requestType, n.name, n.errMsg) - case <-s.clearStatsChan: - s.clearAll() - case <-ticker.C: - data := s.collectReportData() - // send data to channel, no network IO in this goroutine - s.messageToRunnerChan <- data - case <-s.shutdownChan: - return - } - } - }() -} - -// close is used by unit tests to avoid leakage of goroutines -func (s *requestStats) close() { - close(s.shutdownChan) -} - // statsEntry represents a single stats entry (name and method) type statsEntry struct { // Name (URL) of this stats entry diff --git a/internal/boomer/stats_test.go b/internal/boomer/stats_test.go index 4c82b76c..666a9636 100644 --- a/internal/boomer/stats_test.go +++ b/internal/boomer/stats_test.go @@ -2,7 +2,6 @@ package boomer import ( "testing" - "time" ) func TestLogRequest(t *testing.T) { @@ -135,10 +134,8 @@ func TestClearAll(t *testing.T) { func TestClearAllByChannel(t *testing.T) { newStats := newRequestStats() - newStats.start() - defer newStats.close() newStats.logRequest("http", "success", 1, 20) - newStats.clearStatsChan <- true + newStats.clearAll() if newStats.total.NumRequests != 0 { t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests) @@ -217,34 +214,3 @@ func TestCollectReportData(t *testing.T) { t.Error("Key stats not found") } } - -func TestStatsStart(t *testing.T) { - newStats := newRequestStats() - newStats.start() - defer newStats.close() - - newStats.requestSuccessChan <- &requestSuccess{ - requestType: "http", - name: "success", - responseTime: 2, - responseLength: 30, - } - - newStats.requestFailureChan <- &requestFailure{ - requestType: "http", - name: "failure", - responseTime: 1, - errMsg: "500 error", - } - - var ticker = time.NewTicker(reportStatsInterval + 500*time.Millisecond) - for { - select { - case <-ticker.C: - t.Error("Timeout waiting for stats reports to runner") - case <-newStats.messageToRunnerChan: - goto end - } - } -end: -}