refactor: simplify boomer runner

This commit is contained in:
debugtalk
2021-12-23 22:01:35 +08:00
parent 970734607e
commit 0c0c154a28
5 changed files with 65 additions and 137 deletions

View File

@@ -91,7 +91,7 @@ func (b *Boomer) Run(tasks ...*Task) {
for _, o := range b.outputs {
b.localRunner.addOutput(o)
}
b.localRunner.run()
b.localRunner.start()
}
// RecordTransaction reports a transaction stat.
@@ -142,5 +142,5 @@ func (b *Boomer) Quit() {
log.Warn().Msg("boomer not initialized")
return
}
b.localRunner.close()
b.localRunner.stop()
}

View File

@@ -34,15 +34,9 @@ type runner struct {
rateLimitEnabled bool
stats *requestStats
numClients int32
spawnRate float64
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
currentClientsNum int32 // current clients count
spawnCount int // target clients to spawn
spawnRate float64
outputs []Output
}
@@ -116,16 +110,21 @@ func (r *runner) outputOnStop() {
wg.Wait()
}
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Info().Int("spawnCount", spawnCount).Msg("Spawning clients immediately")
func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
log.Info().
Int("spawnCount", spawnCount).
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
// TODO: spawn workers with spawnRate
for i := 1; i <= spawnCount; i++ {
select {
case <-quit:
// quit spawning goroutine
log.Info().Msg("Quitting spawning workers")
return
default:
atomic.AddInt32(&r.numClients, 1)
atomic.AddInt32(&r.currentClientsNum, 1)
go func() {
for {
select {
@@ -193,28 +192,11 @@ func (r *runner) getTask() *Task {
return nil
}
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
r.stats.clearStatsChan <- true
r.stopChan = make(chan bool)
atomic.StoreInt32(&r.numClients, 0)
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
}
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
}
type localRunner struct {
runner
spawnCount int
// close this channel will stop all goroutines used in runner.
stopChan chan bool
}
func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) {
@@ -222,7 +204,7 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw
r.setTasks(tasks)
r.spawnRate = spawnRate
r.spawnCount = spawnCount
r.closeChan = make(chan bool)
r.stopChan = make(chan bool)
if rateLimiter != nil {
r.rateLimitEnabled = true
@@ -233,39 +215,55 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw
return r
}
func (r *localRunner) run() {
func (r *localRunner) start() {
// init state
r.state = stateInit
r.stats.start()
r.outputOnStart()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case data := <-r.stats.messageToRunnerChan:
data["user_count"] = atomic.LoadInt32(&r.numClients)
r.outputOnEevent(data)
case <-r.closeChan:
r.stop()
wg.Done()
r.outputOnStop()
return
}
}
}()
atomic.StoreInt32(&r.currentClientsNum, 0)
r.stats.clearAll()
// start rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.startSpawning(r.spawnCount, r.spawnRate, nil)
wg.Wait()
}
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
quitChan := make(chan bool)
go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil)
func (r *localRunner) close() {
if r.stats != nil {
r.stats.close()
// output setup
r.outputOnStart()
// start running
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
case <-ticker.C:
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
r.outputOnEevent(data)
case <-r.stopChan:
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
// output teardown
r.outputOnStop()
return
}
}
close(r.closeChan)
}
func (r *localRunner) stop() {
close(r.stopChan)
}

View File

@@ -85,7 +85,7 @@ func TestLocalRunner(t *testing.T) {
}
tasks := []*Task{taskA}
runner := newLocalRunner(tasks, nil, 2, 2)
go runner.run()
go runner.start()
time.Sleep(4 * time.Second)
runner.close()
runner.stop()
}

View File

@@ -36,11 +36,8 @@ type requestStats struct {
transactionPassed int64 // accumulated number of passed transactions
transactionFailed int64 // accumulated number of failed transactions
requestSuccessChan chan *requestSuccess
requestFailureChan chan *requestFailure
clearStatsChan chan bool
messageToRunnerChan chan map[string]interface{}
shutdownChan chan bool
requestSuccessChan chan *requestSuccess
requestFailureChan chan *requestFailure
}
func newRequestStats() (stats *requestStats) {
@@ -54,9 +51,6 @@ func newRequestStats() (stats *requestStats) {
stats.transactionChan = make(chan *transaction, 100)
stats.requestSuccessChan = make(chan *requestSuccess, 100)
stats.requestFailureChan = make(chan *requestFailure, 100)
stats.clearStatsChan = make(chan bool)
stats.messageToRunnerChan = make(chan map[string]interface{}, 10)
stats.shutdownChan = make(chan bool)
stats.total = &statsEntry{
Name: "Total",
@@ -106,9 +100,9 @@ func (s *requestStats) get(name string, method string) (entry *statsEntry) {
Name: name,
Method: method,
NumReqsPerSec: make(map[int64]int64),
NumFailPerSec: make(map[int64]int64),
ResponseTimes: make(map[int64]int64),
}
newEntry.reset()
s.entries[name+method] = newEntry
return newEntry
}
@@ -155,36 +149,6 @@ func (s *requestStats) collectReportData() map[string]interface{} {
return data
}
func (s *requestStats) start() {
go func() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
case t := <-s.transactionChan:
s.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-s.requestSuccessChan:
s.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-s.requestFailureChan:
s.logRequest(n.requestType, n.name, n.responseTime, 0)
s.logError(n.requestType, n.name, n.errMsg)
case <-s.clearStatsChan:
s.clearAll()
case <-ticker.C:
data := s.collectReportData()
// send data to channel, no network IO in this goroutine
s.messageToRunnerChan <- data
case <-s.shutdownChan:
return
}
}
}()
}
// close is used by unit tests to avoid leakage of goroutines
func (s *requestStats) close() {
close(s.shutdownChan)
}
// statsEntry represents a single stats entry (name and method)
type statsEntry struct {
// Name (URL) of this stats entry

View File

@@ -2,7 +2,6 @@ package boomer
import (
"testing"
"time"
)
func TestLogRequest(t *testing.T) {
@@ -135,10 +134,8 @@ func TestClearAll(t *testing.T) {
func TestClearAllByChannel(t *testing.T) {
newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.logRequest("http", "success", 1, 20)
newStats.clearStatsChan <- true
newStats.clearAll()
if newStats.total.NumRequests != 0 {
t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests)
@@ -217,34 +214,3 @@ func TestCollectReportData(t *testing.T) {
t.Error("Key stats not found")
}
}
func TestStatsStart(t *testing.T) {
newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.requestSuccessChan <- &requestSuccess{
requestType: "http",
name: "success",
responseTime: 2,
responseLength: 30,
}
newStats.requestFailureChan <- &requestFailure{
requestType: "http",
name: "failure",
responseTime: 1,
errMsg: "500 error",
}
var ticker = time.NewTicker(reportStatsInterval + 500*time.Millisecond)
for {
select {
case <-ticker.C:
t.Error("Timeout waiting for stats reports to runner")
case <-newStats.messageToRunnerChan:
goto end
}
}
end:
}