merge main

This commit is contained in:
debugtalk
2022-01-13 18:45:27 +08:00
8 changed files with 177 additions and 39 deletions

View File

@@ -29,6 +29,9 @@ var boomCmd = &cobra.Command{
}
hrpBoomer := hrp.NewBoomer(spawnCount, spawnRate)
hrpBoomer.SetRateLimiter(maxRPS, requestIncreaseRate)
if loopCount > 0 {
hrpBoomer.SetLoopCount(loopCount)
}
if !disableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput())
}
@@ -45,6 +48,7 @@ var (
spawnCount int
spawnRate float64
maxRPS int64
loopCount int64
requestIncreaseRate string
memoryProfile string
memoryProfileDuration time.Duration
@@ -61,6 +65,7 @@ func init() {
boomCmd.Flags().StringVar(&requestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.")
boomCmd.Flags().IntVar(&spawnCount, "spawn-count", 1, "The number of users to spawn for load testing")
boomCmd.Flags().Float64Var(&spawnRate, "spawn-rate", 1, "The rate for spawning users")
boomCmd.Flags().Int64Var(&loopCount, "loop-count", -1, "The specify running cycles for load testing")
boomCmd.Flags().StringVar(&memoryProfile, "mem-profile", "", "Enable memory profiling.")
boomCmd.Flags().DurationVar(&memoryProfileDuration, "mem-profile-duration", 30*time.Second, "Memory profile duration.")
boomCmd.Flags().StringVar(&cpuProfile, "cpu-profile", "", "Enable CPU profiling.")

View File

@@ -1,5 +1,10 @@
# Release History
## v0.5.1 (2022-01-13)
- feat: support specifying running cycles for load testing
- fix: ensure last stats reported when stop running
## v0.5.0 (2022-01-08)
- feat: support creating and calling custom functions with [go plugin](https://pkg.go.dev/plugin)

View File

@@ -25,6 +25,7 @@ hrp boom [flags]
--cpu-profile-duration duration CPU profile duration. (default 30s)
--disable-console-output Disable console output.
-h, --help help for boom
--loop-count int The specify running cycles for load testing (default -1)
--max-rps int Max RPS that boomer can generate, disabled by default.
--mem-profile string Enable memory profiling.
--mem-profile-duration duration Memory profile duration. (default 30s)

View File

@@ -52,6 +52,11 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
}
}
// SetLoopCount set loop count for test.
func (b *Boomer) SetLoopCount(loopCount int64) {
b.localRunner.loop = &Loop{loopCount: loopCount}
}
// AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) {
b.localRunner.addOutput(o)

View File

@@ -197,6 +197,8 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
return nil, fmt.Errorf("stats is not []interface{}")
}
errors := data["errors"].(map[string]map[string]interface{})
transactions, ok := data["transactions"].(map[string]int64)
if !ok {
return nil, fmt.Errorf("transactions is not map[string]int64")
@@ -223,6 +225,7 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
TotalRPS: getCurrentRps(entryTotalOutput.NumRequests),
TotalFailRatio: getTotalFailRatio(entryTotalOutput.NumRequests, entryTotalOutput.NumFailures),
Stats: make([]*statsEntryOutput, 0, len(stats)),
Errors: errors,
}
// convert stats
@@ -329,6 +332,24 @@ var (
)
)
// summary for total
var (
summaryResponseTime = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "response_time",
Help: "The summary of response time",
Objectives: map[float64]float64{
0.5: 0.01,
0.9: 0.01,
0.95: 0.005,
},
AgeBuckets: 1,
MaxAge: 100000 * time.Second,
},
[]string{"method", "name"},
)
)
// gauges for total
var (
gaugeUsers = prometheus.NewGauge(
@@ -367,6 +388,13 @@ var (
Help: "The accumulated number of failed transactions",
},
)
gaugeErrors = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "errors",
Help: "The errors of load testing",
},
[]string{"method", "name", "error"},
)
)
// NewPrometheusPusherOutput returns a PrometheusPusherOutput.
@@ -397,6 +425,9 @@ func (o *PrometheusPusherOutput) OnStart() {
gaugeAverageContentLength,
gaugeCurrentRPS,
gaugeCurrentFailPerSec,
gaugeErrors,
// summary for total
summaryResponseTime,
// gauges for total
gaugeUsers,
gaugeState,
@@ -449,6 +480,21 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
gaugeAverageContentLength.WithLabelValues(method, name).Set(float64(stat.avgContentLength))
gaugeCurrentRPS.WithLabelValues(method, name).Set(stat.currentRps)
gaugeCurrentFailPerSec.WithLabelValues(method, name).Set(float64(stat.currentFailPerSec))
for responseTime, count := range stat.ResponseTimes {
var i int64
for i = 0; i < count; i++ {
summaryResponseTime.WithLabelValues(method, name).Observe(float64(responseTime))
}
}
}
// errors
for _, requestError := range output.Errors {
gaugeErrors.WithLabelValues(
requestError["method"].(string),
requestError["name"].(string),
requestError["error"].(string),
).Set(float64(requestError["occurrences"].(int64)))
}
if err := o.pusher.Push(); err != nil {

View File

@@ -24,6 +24,31 @@ const (
reportStatsInterval = 3 * time.Second
)
type Loop struct {
loopCount int64 // more than 0
acquiredCount int64 // count acquired of load testing
finishedCount int64 // count finished of load testing
}
func (l *Loop) isFinished() bool {
// return true when there are no remaining loop count to test
return atomic.LoadInt64(&l.finishedCount) == l.loopCount
}
func (l *Loop) acquire() bool {
// get one ticket when there are still remaining loop count to test
// return true when getting ticket successfully
if atomic.LoadInt64(&l.acquiredCount) < l.loopCount {
atomic.AddInt64(&l.acquiredCount, 1)
return true
}
return false
}
func (l *Loop) increaseFinishedCount() {
atomic.AddInt64(&l.finishedCount, 1)
}
type runner struct {
state int32
@@ -37,6 +62,7 @@ type runner struct {
currentClientsNum int32 // current clients count
spawnCount int // target clients to spawn
spawnRate float64
loop *Loop // specify running cycles
outputs []Output
}
@@ -78,7 +104,7 @@ func (r *runner) outputOnStart() {
wg.Wait()
}
func (r *runner) outputOnEevent(data map[string]interface{}) {
func (r *runner) outputOnEvent(data map[string]interface{}) {
size := len(r.outputs)
if size == 0 {
return
@@ -110,7 +136,14 @@ func (r *runner) outputOnStop() {
wg.Wait()
}
func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
func (r *runner) reportStats() {
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEvent(data)
}
func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
log.Info().
Int("spawnCount", spawnCount).
Float64("spawnRate", spawnRate).
@@ -135,6 +168,9 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
case <-quit:
return
default:
if r.loop != nil && !r.loop.acquire() {
return
}
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
@@ -145,6 +181,12 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
task := r.getTask()
r.safeRun(task.Fn)
}
if r.loop != nil {
r.loop.increaseFinishedCount()
if r.loop.isFinished() {
r.stop()
}
}
}
}
}()
@@ -231,49 +273,59 @@ func (r *localRunner) start() {
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
quitChan := make(chan bool)
// when this channel is closed, all statistics are reported successfully
reportedChan := make(chan bool)
go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil)
// output setup
r.outputOnStart()
// start running
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEevent(data)
// stop
case <-r.stopChan:
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
go func() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if atomic.LoadInt32(&r.state) == stateQuitting {
close(reportedChan)
return
}
}
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
}()
// stop
<-r.stopChan
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
// wait until all stats are reported successfully
<-reportedChan
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
return
}
func (r *localRunner) stop() {

View File

@@ -1,8 +1,11 @@
package boomer
import (
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type HitOutput struct {
@@ -45,13 +48,13 @@ func TestOutputOnStart(t *testing.T) {
}
}
func TestOutputOnEevent(t *testing.T) {
func TestOutputOnEvent(t *testing.T) {
hitOutput := &HitOutput{}
hitOutput2 := &HitOutput{}
runner := &runner{}
runner.addOutput(hitOutput)
runner.addOutput(hitOutput2)
runner.outputOnEevent(nil)
runner.outputOnEvent(nil)
if !hitOutput.onEvent {
t.Error("hitOutput's OnEvent has not been called")
}
@@ -90,3 +93,24 @@ func TestLocalRunner(t *testing.T) {
time.Sleep(4 * time.Second)
runner.stop()
}
func TestLoopCount(t *testing.T) {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}
tasks := []*Task{taskA}
runner := newLocalRunner(2, 2)
runner.loop = &Loop{loopCount: 4}
runner.setTasks(tasks)
go runner.start()
ticker := time.NewTicker(4 * time.Second)
defer ticker.Stop()
<-ticker.C
if !assert.Equal(t, runner.loop.loopCount, atomic.LoadInt64(&runner.loop.finishedCount)) {
t.Fail()
}
}

View File

@@ -1,3 +1,3 @@
package version
const VERSION = "v0.5.0"
const VERSION = "v0.5.1"