fix: resolve the problem of reporting last stats failure.

This commit is contained in:
徐聪
2022-01-13 10:54:18 +08:00
parent 66bc29ddf4
commit b70dc3a47b
2 changed files with 45 additions and 35 deletions

View File

@@ -273,50 +273,59 @@ func (r *localRunner) start() {
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
quitChan := make(chan bool)
// when this channel is closed, all statistics are reported successfully
reportedChan := make(chan bool)
go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil)
// output setup
r.outputOnStart()
// start running
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
r.reportStats()
// stop
case <-r.stopChan:
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
go func() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if atomic.LoadInt32(&r.state) == stateQuitting {
close(reportedChan)
return
}
}
// report last stats
<-ticker.C
r.reportStats()
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
}()
// stop
<-r.stopChan
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// wait until all stats are reported successfully
<-reportedChan
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
func (r *localRunner) stop() {