refactor: fork boomer as sub module

This commit is contained in:
debugtalk
2021-12-21 20:52:48 +08:00
parent f11310a8bd
commit a845b58ff5
20 changed files with 2329 additions and 45 deletions

View File

@@ -0,0 +1,7 @@
# boomer
This module is initially forked from [myzhan/boomer] and made a lot of changes.
- remove distribute runner
[myzhan/boomer]: https://github.com/myzhan/boomer

139
internal/boomer/boomer.go Normal file
View File

@@ -0,0 +1,139 @@
package boomer
import (
"log"
"math"
"time"
)
// A Boomer is used to run tasks.
type Boomer struct {
rateLimiter RateLimiter
localRunner *localRunner
spawnCount int
spawnRate float64
cpuProfile string
cpuProfileDuration time.Duration
memoryProfile string
memoryProfileDuration time.Duration
outputs []Output
}
// NewStandaloneBoomer returns a new Boomer, which can run without master.
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
return &Boomer{
spawnCount: spawnCount,
spawnRate: spawnRate,
}
}
// SetRateLimiter creates rate limiter with the given limit and burst.
func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
var rateLimiter RateLimiter
var err error
if requestIncreaseRate != "-1" {
if maxRPS > 0 {
log.Println("The max RPS that boomer may generate is limited to", maxRPS, "with a increase rate", requestIncreaseRate)
rateLimiter, err = NewRampUpRateLimiter(maxRPS, requestIncreaseRate, time.Second)
} else {
log.Println("The max RPS that boomer may generate is limited by a increase rate", requestIncreaseRate)
rateLimiter, err = NewRampUpRateLimiter(math.MaxInt64, requestIncreaseRate, time.Second)
}
} else {
if maxRPS > 0 {
log.Println("The max RPS that boomer may generate is limited to", maxRPS)
rateLimiter = NewStableRateLimiter(maxRPS, time.Second)
}
}
if err != nil {
return
}
b.rateLimiter = rateLimiter
}
// AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) {
b.outputs = append(b.outputs, o)
}
// EnableCPUProfile will start cpu profiling after run.
func (b *Boomer) EnableCPUProfile(cpuProfile string, duration time.Duration) {
b.cpuProfile = cpuProfile
b.cpuProfileDuration = duration
}
// EnableMemoryProfile will start memory profiling after run.
func (b *Boomer) EnableMemoryProfile(memoryProfile string, duration time.Duration) {
b.memoryProfile = memoryProfile
b.memoryProfileDuration = duration
}
// Run accepts a slice of Task and connects to the locust master.
func (b *Boomer) Run(tasks ...*Task) {
if b.cpuProfile != "" {
err := startCPUProfile(b.cpuProfile, b.cpuProfileDuration)
if err != nil {
log.Printf("Error starting cpu profiling, %v", err)
}
}
if b.memoryProfile != "" {
err := startMemoryProfile(b.memoryProfile, b.memoryProfileDuration)
if err != nil {
log.Printf("Error starting memory profiling, %v", err)
}
}
b.localRunner = newLocalRunner(tasks, b.rateLimiter, b.spawnCount, b.spawnRate)
for _, o := range b.outputs {
b.localRunner.addOutput(o)
}
b.localRunner.run()
}
// RecordTransaction reports a transaction stat.
func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) {
if b.localRunner == nil {
return
}
b.localRunner.stats.transactionChan <- &transaction{
name: name,
success: success,
elapsedTime: elapsedTime,
contentSize: contentSize,
}
}
// RecordSuccess reports a success.
func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
if b.localRunner == nil {
return
}
b.localRunner.stats.requestSuccessChan <- &requestSuccess{
requestType: requestType,
name: name,
responseTime: responseTime,
responseLength: responseLength,
}
}
// RecordFailure reports a failure.
func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) {
if b.localRunner == nil {
return
}
b.localRunner.stats.requestFailureChan <- &requestFailure{
requestType: requestType,
name: name,
responseTime: responseTime,
errMsg: exception,
}
}
// Quit will send a quit message to the master.
func (b *Boomer) Quit() {
b.localRunner.close()
}

View File

@@ -0,0 +1,146 @@
package boomer
import (
"math"
"os"
"runtime"
"sync/atomic"
"testing"
"time"
)
func TestNewStandaloneBoomer(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
if b.spawnCount != 100 {
t.Error("spawnCount should be 100")
}
if b.spawnRate != 10 {
t.Error("spawnRate should be 10")
}
}
func TestSetRateLimiter(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
b.SetRateLimiter(10, "10/1s")
if b.rateLimiter == nil {
t.Error("b.rateLimiter should not be nil")
}
}
func TestAddOutput(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
b.AddOutput(NewConsoleOutput())
b.AddOutput(NewConsoleOutput())
if len(b.outputs) != 2 {
t.Error("length of outputs should be 2")
}
}
func TestEnableCPUProfile(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
b.EnableCPUProfile("cpu.prof", time.Second)
if b.cpuProfile != "cpu.prof" {
t.Error("cpuProfile should be cpu.prof")
}
if b.cpuProfileDuration != time.Second {
t.Error("cpuProfileDuration should 1 second")
}
}
func TestEnableMemoryProfile(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
b.EnableMemoryProfile("mem.prof", time.Second)
if b.memoryProfile != "mem.prof" {
t.Error("memoryProfile should be mem.prof")
}
if b.memoryProfileDuration != time.Second {
t.Error("memoryProfileDuration should 1 second")
}
}
func TestStandaloneRun(t *testing.T) {
b := NewStandaloneBoomer(10, 10)
b.EnableCPUProfile("cpu.pprof", 2*time.Second)
b.EnableMemoryProfile("mem.pprof", 2*time.Second)
count := int64(0)
taskA := &Task{
Name: "increaseCount",
Fn: func() {
atomic.AddInt64(&count, 1)
runtime.Goexit()
},
}
go b.Run(taskA)
time.Sleep(5 * time.Second)
b.Quit()
if count != 10 {
t.Error("count is", count, "expected: 10")
}
if _, err := os.Stat("cpu.pprof"); os.IsNotExist(err) {
t.Error("File cpu.pprof is not generated")
} else {
os.Remove("cpu.pprof")
}
if _, err := os.Stat("mem.pprof"); os.IsNotExist(err) {
t.Error("File mem.pprof is not generated")
} else {
os.Remove("mem.pprof")
}
}
func TestCreateRatelimiter(t *testing.T) {
b := NewStandaloneBoomer(10, 10)
b.SetRateLimiter(100, "-1")
if stableRateLimiter, ok := b.rateLimiter.(*StableRateLimiter); !ok {
t.Error("Expected stableRateLimiter")
} else {
if stableRateLimiter.threshold != 100 {
t.Error("threshold should be equals to math.MaxInt64, was", stableRateLimiter.threshold)
}
}
b.SetRateLimiter(0, "1")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != math.MaxInt64 {
t.Error("maxThreshold should be equals to math.MaxInt64, was", rampUpRateLimiter.maxThreshold)
}
if rampUpRateLimiter.rampUpRate != "1" {
t.Error("rampUpRate should be equals to \"1\", was", rampUpRateLimiter.rampUpRate)
}
}
b.SetRateLimiter(10, "2/2s")
if rampUpRateLimiter, ok := b.rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != 10 {
t.Error("maxThreshold should be equals to 10, was", rampUpRateLimiter.maxThreshold)
}
if rampUpRateLimiter.rampUpRate != "2/2s" {
t.Error("rampUpRate should be equals to \"2/2s\", was", rampUpRateLimiter.rampUpRate)
}
if rampUpRateLimiter.rampUpStep != 2 {
t.Error("rampUpStep should be equals to 2, was", rampUpRateLimiter.rampUpStep)
}
if rampUpRateLimiter.rampUpPeroid != 2*time.Second {
t.Error("rampUpPeroid should be equals to 2 seconds, was", rampUpRateLimiter.rampUpPeroid)
}
}
}

447
internal/boomer/output.go Normal file
View File

@@ -0,0 +1,447 @@
package boomer
import (
"encoding/json"
"fmt"
"log"
"os"
"sort"
"strconv"
"time"
"github.com/olekukonko/tablewriter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
// Output is primarily responsible for printing test results to different destinations
// such as consoles, files. You can write you own output and add to boomer.
// When running in standalone mode, the default output is ConsoleOutput, you can add more.
// When running in distribute mode, test results will be reported to master with or without
// an output.
// All the OnXXX function will be call in a separated goroutine, just in case some output will block.
// But it will wait for all outputs return to avoid data lost.
type Output interface {
// OnStart will be call before the test starts.
OnStart()
// By default, each output receive stats data from runner every three seconds.
// OnEvent is responsible for dealing with the data.
OnEvent(data map[string]interface{})
// OnStop will be called before the test ends.
OnStop()
}
// ConsoleOutput is the default output for standalone mode.
type ConsoleOutput struct {
}
// NewConsoleOutput returns a ConsoleOutput.
func NewConsoleOutput() *ConsoleOutput {
return &ConsoleOutput{}
}
func getMedianResponseTime(numRequests int64, responseTimes map[int64]int64) int64 {
medianResponseTime := int64(0)
if len(responseTimes) != 0 {
pos := (numRequests - 1) / 2
var sortedKeys []int64
for k := range responseTimes {
sortedKeys = append(sortedKeys, k)
}
sort.Slice(sortedKeys, func(i, j int) bool {
return sortedKeys[i] < sortedKeys[j]
})
for _, k := range sortedKeys {
if pos < responseTimes[k] {
medianResponseTime = k
break
}
pos -= responseTimes[k]
}
}
return medianResponseTime
}
func getAvgResponseTime(numRequests int64, totalResponseTime int64) (avgResponseTime float64) {
avgResponseTime = float64(0)
if numRequests != 0 {
avgResponseTime = float64(totalResponseTime) / float64(numRequests)
}
return avgResponseTime
}
func getAvgContentLength(numRequests int64, totalContentLength int64) (avgContentLength int64) {
avgContentLength = int64(0)
if numRequests != 0 {
avgContentLength = totalContentLength / numRequests
}
return avgContentLength
}
func getCurrentRps(numRequests int64, numReqsPerSecond map[int64]int64) (currentRps int64) {
currentRps = int64(0)
numReqsPerSecondLength := int64(len(numReqsPerSecond))
if numReqsPerSecondLength != 0 {
currentRps = numRequests / numReqsPerSecondLength
}
return currentRps
}
func getCurrentFailPerSec(numFailures int64, numFailPerSecond map[int64]int64) (currentFailPerSec int64) {
currentFailPerSec = int64(0)
numFailPerSecondLength := int64(len(numFailPerSecond))
if numFailPerSecondLength != 0 {
currentFailPerSec = numFailures / numFailPerSecondLength
}
return currentFailPerSec
}
func getTotalFailRatio(totalRequests, totalFailures int64) (failRatio float64) {
if totalRequests == 0 {
return 0
}
return float64(totalFailures) / float64(totalRequests)
}
// OnStart of ConsoleOutput has nothing to do.
func (o *ConsoleOutput) OnStart() {
}
// OnStop of ConsoleOutput has nothing to do.
func (o *ConsoleOutput) OnStop() {
}
// OnEvent will print to the console.
func (o *ConsoleOutput) OnEvent(data map[string]interface{}) {
output, err := convertData(data)
if err != nil {
log.Println(fmt.Sprintf("convert data error: %v", err))
return
}
currentTime := time.Now()
println(fmt.Sprintf("Current time: %s, Users: %d, Total RPS: %d, Total Fail Ratio: %.1f%%",
currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.TotalRPS, output.TotalFailRatio*100))
println(fmt.Sprintf("Accumulated Transactions: %d Passed, %d Failed",
output.TransactionsPassed, output.TransactionsFailed))
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Type", "Name", "# requests", "# fails", "Median", "Average", "Min", "Max", "Content Size", "# reqs/sec", "# fails/sec"})
for _, stat := range output.Stats {
row := make([]string, 11)
row[0] = stat.Method
row[1] = stat.Name
row[2] = strconv.FormatInt(stat.NumRequests, 10)
row[3] = strconv.FormatInt(stat.NumFailures, 10)
row[4] = strconv.FormatInt(stat.medianResponseTime, 10)
row[5] = strconv.FormatFloat(stat.avgResponseTime, 'f', 2, 64)
row[6] = strconv.FormatInt(stat.MinResponseTime, 10)
row[7] = strconv.FormatInt(stat.MaxResponseTime, 10)
row[8] = strconv.FormatInt(stat.avgContentLength, 10)
row[9] = strconv.FormatInt(stat.currentRps, 10)
row[10] = strconv.FormatInt(stat.currentFailPerSec, 10)
table.Append(row)
}
table.Render()
println()
}
type statsEntryOutput struct {
statsEntry
medianResponseTime int64 // median response time
avgResponseTime float64 // average response time, round float to 2 decimal places
avgContentLength int64 // average content size
currentRps int64 // # reqs/sec
currentFailPerSec int64 // # fails/sec
}
type dataOutput struct {
UserCount int32 `json:"user_count"`
TotalStats *statsEntryOutput `json:"stats_total"`
TransactionsPassed int64 `json:"transactions_passed"`
TransactionsFailed int64 `json:"transactions_failed"`
TotalRPS int64 `json:"total_rps"`
TotalFailRatio float64 `json:"total_fail_ratio"`
Stats []*statsEntryOutput `json:"stats"`
Errors map[string]map[string]interface{} `json:"errors"`
}
func convertData(data map[string]interface{}) (output *dataOutput, err error) {
userCount, ok := data["user_count"].(int32)
if !ok {
return nil, fmt.Errorf("user_count is not int32")
}
stats, ok := data["stats"].([]interface{})
if !ok {
return nil, fmt.Errorf("stats is not []interface{}")
}
transactions, ok := data["transactions"].(map[string]int64)
if !ok {
return nil, fmt.Errorf("transactions is not map[string]int64")
}
transactionsPassed := transactions["passed"]
transactionsFailed := transactions["failed"]
// convert stats in total
statsTotal, ok := data["stats_total"].(interface{})
if !ok {
return nil, fmt.Errorf("stats_total is not interface{}")
}
entryTotalOutput, err := deserializeStatsEntry(statsTotal)
if err != nil {
return nil, err
}
output = &dataOutput{
UserCount: userCount,
TotalStats: entryTotalOutput,
TransactionsPassed: transactionsPassed,
TransactionsFailed: transactionsFailed,
TotalRPS: getCurrentRps(entryTotalOutput.NumRequests, entryTotalOutput.NumReqsPerSec),
TotalFailRatio: getTotalFailRatio(entryTotalOutput.NumRequests, entryTotalOutput.NumFailures),
Stats: make([]*statsEntryOutput, 0, len(stats)),
}
// convert stats
for _, stat := range stats {
entryOutput, err := deserializeStatsEntry(stat)
if err != nil {
return nil, err
}
output.Stats = append(output.Stats, entryOutput)
}
// sort stats by type
sort.Slice(output.Stats, func(i, j int) bool {
return output.Stats[i].Method < output.Stats[j].Method
})
return
}
func deserializeStatsEntry(stat interface{}) (entryOutput *statsEntryOutput, err error) {
statBytes, err := json.Marshal(stat)
if err != nil {
return nil, err
}
entry := statsEntry{}
if err = json.Unmarshal(statBytes, &entry); err != nil {
return nil, err
}
numRequests := entry.NumRequests
entryOutput = &statsEntryOutput{
statsEntry: entry,
medianResponseTime: getMedianResponseTime(numRequests, entry.ResponseTimes),
avgResponseTime: getAvgResponseTime(numRequests, entry.TotalResponseTime),
avgContentLength: getAvgContentLength(numRequests, entry.TotalContentLength),
currentRps: getCurrentRps(numRequests, entry.NumReqsPerSec),
currentFailPerSec: getCurrentFailPerSec(entry.NumFailures, entry.NumFailPerSec),
}
return
}
const (
namespace = "boomer"
)
// gauge vectors for requests
var (
gaugeNumRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "num_requests",
Help: "The number of requests",
},
[]string{"method", "name"},
)
gaugeNumFailures = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "num_failures",
Help: "The number of failures",
},
[]string{"method", "name"},
)
gaugeMedianResponseTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "median_response_time",
Help: "The median response time",
},
[]string{"method", "name"},
)
gaugeAverageResponseTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "average_response_time",
Help: "The average response time",
},
[]string{"method", "name"},
)
gaugeMinResponseTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "min_response_time",
Help: "The min response time",
},
[]string{"method", "name"},
)
gaugeMaxResponseTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "max_response_time",
Help: "The max response time",
},
[]string{"method", "name"},
)
gaugeAverageContentLength = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "average_content_length",
Help: "The average content length",
},
[]string{"method", "name"},
)
gaugeCurrentRPS = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "current_rps",
Help: "The current requests per second",
},
[]string{"method", "name"},
)
gaugeCurrentFailPerSec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "current_fail_per_sec",
Help: "The current failure number per second",
},
[]string{"method", "name"},
)
)
// gauges for total
var (
gaugeUsers = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "users",
Help: "The current number of users",
},
)
gaugeTotalRPS = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "total_rps",
Help: "The requests per second in total",
},
)
gaugeTotalFailRatio = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "fail_ratio",
Help: "The ratio of request failures in total",
},
)
gaugeTransactionsPassed = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "transactions_passed",
Help: "The accumulated number of passed transactions",
},
)
gaugeTransactionsFailed = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "transactions_failed",
Help: "The accumulated number of failed transactions",
},
)
)
// NewPrometheusPusherOutput returns a PrometheusPusherOutput.
func NewPrometheusPusherOutput(gatewayURL, jobName string) *PrometheusPusherOutput {
return &PrometheusPusherOutput{
pusher: push.New(gatewayURL, jobName),
}
}
// PrometheusPusherOutput pushes boomer stats to Prometheus Pushgateway.
type PrometheusPusherOutput struct {
pusher *push.Pusher // Prometheus Pushgateway Pusher
}
// OnStart will register all prometheus metric collectors
func (o *PrometheusPusherOutput) OnStart() {
log.Println("register prometheus metric collectors")
registry := prometheus.NewRegistry()
registry.MustRegister(
// gauge vectors for requests
gaugeNumRequests,
gaugeNumFailures,
gaugeMedianResponseTime,
gaugeAverageResponseTime,
gaugeMinResponseTime,
gaugeMaxResponseTime,
gaugeAverageContentLength,
gaugeCurrentRPS,
gaugeCurrentFailPerSec,
// gauges for total
gaugeUsers,
gaugeTotalRPS,
gaugeTotalFailRatio,
gaugeTransactionsPassed,
gaugeTransactionsFailed,
)
o.pusher = o.pusher.Gatherer(registry)
}
// OnStop of PrometheusPusherOutput has nothing to do.
func (o *PrometheusPusherOutput) OnStop() {
}
// OnEvent will push metric to Prometheus Pushgataway
func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
output, err := convertData(data)
if err != nil {
log.Println(fmt.Sprintf("convert data error: %v", err))
return
}
// user count
gaugeUsers.Set(float64(output.UserCount))
// rps in total
gaugeTotalRPS.Set(float64(output.TotalRPS))
// failure ratio in total
gaugeTotalFailRatio.Set(output.TotalFailRatio)
// accumulated number of transactions
gaugeTransactionsPassed.Set(float64(output.TransactionsPassed))
gaugeTransactionsFailed.Set(float64(output.TransactionsFailed))
for _, stat := range output.Stats {
method := stat.Method
name := stat.Name
gaugeNumRequests.WithLabelValues(method, name).Set(float64(stat.NumRequests))
gaugeNumFailures.WithLabelValues(method, name).Set(float64(stat.NumFailures))
gaugeMedianResponseTime.WithLabelValues(method, name).Set(float64(stat.medianResponseTime))
gaugeAverageResponseTime.WithLabelValues(method, name).Set(float64(stat.avgResponseTime))
gaugeMinResponseTime.WithLabelValues(method, name).Set(float64(stat.MinResponseTime))
gaugeMaxResponseTime.WithLabelValues(method, name).Set(float64(stat.MaxResponseTime))
gaugeAverageContentLength.WithLabelValues(method, name).Set(float64(stat.avgContentLength))
gaugeCurrentRPS.WithLabelValues(method, name).Set(float64(stat.currentRps))
gaugeCurrentFailPerSec.WithLabelValues(method, name).Set(float64(stat.currentFailPerSec))
}
if err := o.pusher.Push(); err != nil {
log.Println(fmt.Sprintf("Could not push to Pushgateway: error: %v", err))
}
}

View File

@@ -0,0 +1,134 @@
package boomer
import (
"fmt"
"math"
"sort"
"testing"
)
func TestGetMedianResponseTime(t *testing.T) {
numRequests := int64(10)
responseTimes := map[int64]int64{
100: 1,
200: 3,
300: 6,
}
medianResponseTime := getMedianResponseTime(numRequests, responseTimes)
if medianResponseTime != 300 {
t.Error("medianResponseTime should be 300")
}
responseTimes = map[int64]int64{}
medianResponseTime = getMedianResponseTime(numRequests, responseTimes)
if medianResponseTime != 0 {
t.Error("medianResponseTime should be 0")
}
}
func TestGetAvgResponseTime(t *testing.T) {
numRequests := int64(3)
totalResponseTime := int64(100)
avgResponseTime := getAvgResponseTime(numRequests, totalResponseTime)
if math.Dim(float64(33.33), avgResponseTime) > 0.01 {
t.Error("avgResponseTime should be close to 33.33")
}
avgResponseTime = getAvgResponseTime(int64(0), totalResponseTime)
if avgResponseTime != float64(0) {
t.Error("avgResponseTime should be close to 0")
}
}
func TestGetAvgContentLength(t *testing.T) {
numRequests := int64(3)
totalContentLength := int64(100)
avgContentLength := getAvgContentLength(numRequests, totalContentLength)
if avgContentLength != 33 {
t.Error("avgContentLength should be 33")
}
avgContentLength = getAvgContentLength(int64(0), totalContentLength)
if avgContentLength != 0 {
t.Error("avgContentLength should be 0")
}
}
func TestGetCurrentRps(t *testing.T) {
numRequests := int64(10)
numReqsPerSecond := map[int64]int64{}
currentRps := getCurrentRps(numRequests, numReqsPerSecond)
if currentRps != 0 {
t.Error("currentRps should be 0")
}
numReqsPerSecond[1] = 2
numReqsPerSecond[2] = 3
numReqsPerSecond[3] = 2
numReqsPerSecond[4] = 3
currentRps = getCurrentRps(numRequests, numReqsPerSecond)
if currentRps != 2 {
t.Error("currentRps should be 2")
}
}
func TestConsoleOutput(t *testing.T) {
o := NewConsoleOutput()
o.OnStart()
data := map[string]interface{}{}
stat := map[string]interface{}{}
data["stats"] = []interface{}{stat}
stat["name"] = "http"
stat["method"] = "post"
stat["num_requests"] = int64(100)
stat["num_failures"] = int64(10)
stat["response_times"] = map[int64]int64{
10: 1,
100: 99,
}
stat["total_response_time"] = int64(9910)
stat["min_response_time"] = int64(10)
stat["max_response_time"] = int64(100)
stat["total_content_length"] = int64(100000)
stat["num_reqs_per_sec"] = map[int64]int64{
1: 20,
2: 40,
3: 40,
}
o.OnEvent(data)
o.OnStop()
}
func TestSortString(t *testing.T) {
stats := []struct {
method string
name string
}{
{"transaction", "Action"},
{"request-GET", "get with params"},
{"request-POST", "post form data"},
{"request-POST", "post json data"},
{"transaction", "tran1"},
}
sort.Slice(stats, func(i, j int) bool {
if stats[i].method < stats[j].method {
return true
}
return stats[i].name < stats[j].name
})
fmt.Println(stats)
}

View File

@@ -0,0 +1,213 @@
package boomer
import (
"errors"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
)
// RateLimiter is used to put limits on task executions.
type RateLimiter interface {
// Start is used to enable the rate limiter.
// It can be implemented as a noop if not needed.
Start()
// Acquire() is called before executing a task.Fn function.
// If Acquire() returns true, the task.Fn function will be executed.
// If Acquire() returns false, the task.Fn function won't be executed this time, but Acquire() will be called very soon.
// It works like:
// for {
// blocked := rateLimiter.Acquire()
// if !blocked {
// task.Fn()
// }
// }
// Acquire() should block the caller until execution is allowed.
Acquire() bool
// Stop is used to disable the rate limiter.
// It can be implemented as a noop if not needed.
Stop()
}
// A StableRateLimiter uses the token bucket algorithm.
// the bucket is refilled according to the refill period, no burst is allowed.
type StableRateLimiter struct {
threshold int64
currentThreshold int64
refillPeriod time.Duration
broadcastChannel chan bool
quitChannel chan bool
}
// NewStableRateLimiter returns a StableRateLimiter.
func NewStableRateLimiter(threshold int64, refillPeriod time.Duration) (rateLimiter *StableRateLimiter) {
rateLimiter = &StableRateLimiter{
threshold: threshold,
currentThreshold: threshold,
refillPeriod: refillPeriod,
broadcastChannel: make(chan bool),
}
return rateLimiter
}
// Start to refill the bucket periodically.
func (limiter *StableRateLimiter) Start() {
limiter.quitChannel = make(chan bool)
quitChannel := limiter.quitChannel
go func() {
for {
select {
case <-quitChannel:
return
default:
atomic.StoreInt64(&limiter.currentThreshold, limiter.threshold)
time.Sleep(limiter.refillPeriod)
close(limiter.broadcastChannel)
limiter.broadcastChannel = make(chan bool)
}
}
}()
}
// Acquire a token from the bucket, returns true if the bucket is exhausted.
func (limiter *StableRateLimiter) Acquire() (blocked bool) {
permit := atomic.AddInt64(&limiter.currentThreshold, -1)
if permit < 0 {
blocked = true
// block until the bucket is refilled
<-limiter.broadcastChannel
} else {
blocked = false
}
return blocked
}
// Stop the rate limiter.
func (limiter *StableRateLimiter) Stop() {
close(limiter.quitChannel)
}
// ErrParsingRampUpRate is the error returned if the format of rampUpRate is invalid.
var ErrParsingRampUpRate = errors.New("ratelimiter: invalid format of rampUpRate, try \"1\" or \"1/1s\"")
// A RampUpRateLimiter uses the token bucket algorithm.
// the threshold is updated according to the warm up rate.
// the bucket is refilled according to the refill period, no burst is allowed.
type RampUpRateLimiter struct {
maxThreshold int64
nextThreshold int64
currentThreshold int64
refillPeriod time.Duration
rampUpRate string
rampUpStep int64
rampUpPeroid time.Duration
broadcastChannel chan bool
rampUpChannel chan bool
quitChannel chan bool
}
// NewRampUpRateLimiter returns a RampUpRateLimiter.
// Valid formats of rampUpRate are "1", "1/1s".
func NewRampUpRateLimiter(maxThreshold int64, rampUpRate string, refillPeriod time.Duration) (rateLimiter *RampUpRateLimiter, err error) {
rateLimiter = &RampUpRateLimiter{
maxThreshold: maxThreshold,
nextThreshold: 0,
currentThreshold: 0,
rampUpRate: rampUpRate,
refillPeriod: refillPeriod,
broadcastChannel: make(chan bool),
}
rateLimiter.rampUpStep, rateLimiter.rampUpPeroid, err = rateLimiter.parseRampUpRate(rateLimiter.rampUpRate)
if err != nil {
return nil, err
}
return rateLimiter, nil
}
func (limiter *RampUpRateLimiter) parseRampUpRate(rampUpRate string) (rampUpStep int64, rampUpPeroid time.Duration, err error) {
if strings.Contains(rampUpRate, "/") {
tmp := strings.Split(rampUpRate, "/")
if len(tmp) != 2 {
return rampUpStep, rampUpPeroid, ErrParsingRampUpRate
}
rampUpStep, err := strconv.ParseInt(tmp[0], 10, 64)
if err != nil {
return rampUpStep, rampUpPeroid, ErrParsingRampUpRate
}
rampUpPeroid, err := time.ParseDuration(tmp[1])
if err != nil {
return rampUpStep, rampUpPeroid, ErrParsingRampUpRate
}
return rampUpStep, rampUpPeroid, nil
}
rampUpStep, err = strconv.ParseInt(rampUpRate, 10, 64)
if err != nil {
return rampUpStep, rampUpPeroid, ErrParsingRampUpRate
}
rampUpPeroid = time.Second
return rampUpStep, rampUpPeroid, nil
}
// Start to refill the bucket periodically.
func (limiter *RampUpRateLimiter) Start() {
limiter.quitChannel = make(chan bool)
quitChannel := limiter.quitChannel
// bucket updater
go func() {
for {
select {
case <-quitChannel:
return
default:
atomic.StoreInt64(&limiter.currentThreshold, limiter.nextThreshold)
time.Sleep(limiter.refillPeriod)
close(limiter.broadcastChannel)
limiter.broadcastChannel = make(chan bool)
}
}
}()
// threshold updater
go func() {
for {
select {
case <-quitChannel:
return
default:
nextValue := limiter.nextThreshold + limiter.rampUpStep
if nextValue < 0 {
// int64 overflow
nextValue = int64(math.MaxInt64)
}
if nextValue > limiter.maxThreshold {
nextValue = limiter.maxThreshold
}
atomic.StoreInt64(&limiter.nextThreshold, nextValue)
time.Sleep(limiter.rampUpPeroid)
}
}
}()
}
// Acquire a token from the bucket, returns true if the bucket is exhausted.
func (limiter *RampUpRateLimiter) Acquire() (blocked bool) {
permit := atomic.AddInt64(&limiter.currentThreshold, -1)
if permit < 0 {
blocked = true
// block until the bucket is refilled
<-limiter.broadcastChannel
} else {
blocked = false
}
return blocked
}
// Stop the rate limiter.
func (limiter *RampUpRateLimiter) Stop() {
limiter.nextThreshold = 0
close(limiter.quitChannel)
}

View File

@@ -0,0 +1,101 @@
package boomer
import (
"testing"
"time"
)
func TestStableRateLimiter(t *testing.T) {
rateLimiter := NewStableRateLimiter(1, 10*time.Millisecond)
rateLimiter.Start()
defer rateLimiter.Stop()
blocked := rateLimiter.Acquire()
if blocked {
t.Error("Unexpected blocked by rate limiter")
}
blocked = rateLimiter.Acquire()
if !blocked {
t.Error("Should be blocked")
}
}
func TestRampUpRateLimiter(t *testing.T) {
rateLimiter, _ := NewRampUpRateLimiter(100, "10/200ms", 100*time.Millisecond)
rateLimiter.Start()
defer rateLimiter.Stop()
time.Sleep(110 * time.Millisecond)
for i := 0; i < 10; i++ {
blocked := rateLimiter.Acquire()
if blocked {
t.Error("Unexpected blocked by rate limiter")
}
}
blocked := rateLimiter.Acquire()
if !blocked {
t.Error("Should be blocked")
}
time.Sleep(110 * time.Millisecond)
// now, the threshold is 20
for i := 0; i < 20; i++ {
blocked := rateLimiter.Acquire()
if blocked {
t.Error("Unexpected blocked by rate limiter")
}
}
blocked = rateLimiter.Acquire()
if !blocked {
t.Error("Should be blocked")
}
}
func TestParseRampUpRate(t *testing.T) {
rateLimiter := &RampUpRateLimiter{}
rampUpStep, rampUpPeriod, _ := rateLimiter.parseRampUpRate("100")
if rampUpStep != 100 {
t.Error("Wrong rampUpStep, expected: 100, was:", rampUpStep)
}
if rampUpPeriod != time.Second {
t.Error("Wrong rampUpPeriod, expected: 1s, was:", rampUpPeriod)
}
rampUpStep, rampUpPeriod, _ = rateLimiter.parseRampUpRate("200/10s")
if rampUpStep != 200 {
t.Error("Wrong rampUpStep, expected: 200, was:", rampUpStep)
}
if rampUpPeriod != 10*time.Second {
t.Error("Wrong rampUpPeriod, expected: 10s, was:", rampUpPeriod)
}
}
func TestParseInvalidRampUpRate(t *testing.T) {
rateLimiter := &RampUpRateLimiter{}
_, _, err := rateLimiter.parseRampUpRate("A/1m")
if err == nil || err != ErrParsingRampUpRate {
t.Error("Expected ErrParsingRampUpRate")
}
_, _, err = rateLimiter.parseRampUpRate("A")
if err == nil || err != ErrParsingRampUpRate {
t.Error("Expected ErrParsingRampUpRate")
}
_, _, err = rateLimiter.parseRampUpRate("200/1s/")
if err == nil || err != ErrParsingRampUpRate {
t.Error("Expected ErrParsingRampUpRate")
}
_, _, err = rateLimiter.parseRampUpRate("200/1")
if err == nil || err != ErrParsingRampUpRate {
t.Error("Expected ErrParsingRampUpRate")
}
rateLimiter, err = NewRampUpRateLimiter(1, "200/1", time.Second)
if err == nil || err != ErrParsingRampUpRate {
t.Error("Expected ErrParsingRampUpRate")
}
}

270
internal/boomer/runner.go Normal file
View File

@@ -0,0 +1,270 @@
package boomer
import (
"fmt"
"log"
"math/rand"
"os"
"runtime/debug"
"sync"
"sync/atomic"
"time"
)
const (
stateInit = "ready"
stateSpawning = "spawning"
stateRunning = "running"
stateStopped = "stopped"
stateQuitting = "quitting"
)
const (
reportStatsInterval = 3 * time.Second
)
type runner struct {
state string
tasks []*Task
totalTaskWeight int
rateLimiter RateLimiter
rateLimitEnabled bool
stats *requestStats
numClients int32
spawnRate float64
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
outputs []Output
}
// safeRun runs fn and recovers from unexpected panics.
// it prevents panics from Task.Fn crashing boomer.
func (r *runner) safeRun(fn func()) {
defer func() {
// don't panic
err := recover()
if err != nil {
stackTrace := debug.Stack()
errMsg := fmt.Sprintf("%v", err)
os.Stderr.Write([]byte(errMsg))
os.Stderr.Write([]byte("\n"))
os.Stderr.Write(stackTrace)
}
}()
fn()
}
func (r *runner) addOutput(o Output) {
r.outputs = append(r.outputs, o)
}
func (r *runner) outputOnStart() {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnStart()
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) outputOnEevent(data map[string]interface{}) {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnEvent(data)
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) outputOnStop() {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnStop()
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Println("Spawning", spawnCount, "clients immediately")
for i := 1; i <= spawnCount; i++ {
select {
case <-quit:
// quit spawning goroutine
return
default:
atomic.AddInt32(&r.numClients, 1)
go func() {
for {
select {
case <-quit:
return
default:
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
task := r.getTask()
r.safeRun(task.Fn)
}
} else {
task := r.getTask()
r.safeRun(task.Fn)
}
}
}
}()
}
}
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
}
// setTasks will set the runner's task list AND the total task weight
// which is used to get a random task later
func (r *runner) setTasks(t []*Task) {
r.tasks = t
weightSum := 0
for _, task := range r.tasks {
weightSum += task.Weight
}
r.totalTaskWeight = weightSum
}
func (r *runner) getTask() *Task {
tasksCount := len(r.tasks)
if tasksCount == 1 {
// Fast path
return r.tasks[0]
}
rs := rand.New(rand.NewSource(time.Now().UnixNano()))
totalWeight := r.totalTaskWeight
if totalWeight <= 0 {
// If all the tasks have not weights defined, they have the same chance to run
randNum := rs.Intn(tasksCount)
return r.tasks[randNum]
}
randNum := rs.Intn(totalWeight)
runningSum := 0
for _, task := range r.tasks {
runningSum += task.Weight
if runningSum > randNum {
return task
}
}
return nil
}
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
r.stats.clearStatsChan <- true
r.stopChan = make(chan bool)
r.numClients = 0
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
}
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
}
type localRunner struct {
runner
spawnCount int
}
func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) {
r = &localRunner{}
r.setTasks(tasks)
r.spawnRate = spawnRate
r.spawnCount = spawnCount
r.closeChan = make(chan bool)
if rateLimiter != nil {
r.rateLimitEnabled = true
r.rateLimiter = rateLimiter
}
r.stats = newRequestStats()
return r
}
func (r *localRunner) run() {
r.state = stateInit
r.stats.start()
r.outputOnStart()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case data := <-r.stats.messageToRunnerChan:
data["user_count"] = r.numClients
r.outputOnEevent(data)
case <-r.closeChan:
r.stop()
wg.Done()
r.outputOnStop()
return
}
}
}()
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.startSpawning(r.spawnCount, r.spawnRate, nil)
wg.Wait()
}
func (r *localRunner) close() {
if r.stats != nil {
r.stats.close()
}
close(r.closeChan)
}

View File

@@ -0,0 +1,91 @@
package boomer
import (
"testing"
"time"
)
type HitOutput struct {
onStart bool
onEvent bool
onStop bool
}
func (o *HitOutput) OnStart() {
o.onStart = true
}
func (o *HitOutput) OnEvent(data map[string]interface{}) {
o.onEvent = true
}
func (o *HitOutput) OnStop() {
o.onStop = true
}
func TestSafeRun(t *testing.T) {
runner := &runner{}
runner.safeRun(func() {
panic("Runner will catch this panic")
})
}
func TestOutputOnStart(t *testing.T) {
hitOutput := &HitOutput{}
hitOutput2 := &HitOutput{}
runner := &runner{}
runner.addOutput(hitOutput)
runner.addOutput(hitOutput2)
runner.outputOnStart()
if !hitOutput.onStart {
t.Error("hitOutput's OnStart has not been called")
}
if !hitOutput2.onStart {
t.Error("hitOutput2's OnStart has not been called")
}
}
func TestOutputOnEevent(t *testing.T) {
hitOutput := &HitOutput{}
hitOutput2 := &HitOutput{}
runner := &runner{}
runner.addOutput(hitOutput)
runner.addOutput(hitOutput2)
runner.outputOnEevent(nil)
if !hitOutput.onEvent {
t.Error("hitOutput's OnEvent has not been called")
}
if !hitOutput2.onEvent {
t.Error("hitOutput2's OnEvent has not been called")
}
}
func TestOutputOnStop(t *testing.T) {
hitOutput := &HitOutput{}
hitOutput2 := &HitOutput{}
runner := &runner{}
runner.addOutput(hitOutput)
runner.addOutput(hitOutput2)
runner.outputOnStop()
if !hitOutput.onStop {
t.Error("hitOutput's OnStop has not been called")
}
if !hitOutput2.onStop {
t.Error("hitOutput2's OnStop has not been called")
}
}
func TestLocalRunner(t *testing.T) {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}
tasks := []*Task{taskA}
runner := newLocalRunner(tasks, nil, 2, 2)
go runner.run()
time.Sleep(4 * time.Second)
runner.close()
}

358
internal/boomer/stats.go Normal file
View File

@@ -0,0 +1,358 @@
package boomer
import (
"time"
)
type transaction struct {
name string
success bool
elapsedTime int64
contentSize int64
}
type requestSuccess struct {
requestType string
name string
responseTime int64
responseLength int64
}
type requestFailure struct {
requestType string
name string
responseTime int64
errMsg string
}
type requestStats struct {
entries map[string]*statsEntry
errors map[string]*statsError
total *statsEntry
startTime int64
transactionChan chan *transaction
transactionPassed int64 // accumulated number of passed transactions
transactionFailed int64 // accumulated number of failed transactions
requestSuccessChan chan *requestSuccess
requestFailureChan chan *requestFailure
clearStatsChan chan bool
messageToRunnerChan chan map[string]interface{}
shutdownChan chan bool
}
func newRequestStats() (stats *requestStats) {
entries := make(map[string]*statsEntry)
errors := make(map[string]*statsError)
stats = &requestStats{
entries: entries,
errors: errors,
}
stats.transactionChan = make(chan *transaction, 100)
stats.requestSuccessChan = make(chan *requestSuccess, 100)
stats.requestFailureChan = make(chan *requestFailure, 100)
stats.clearStatsChan = make(chan bool)
stats.messageToRunnerChan = make(chan map[string]interface{}, 10)
stats.shutdownChan = make(chan bool)
stats.total = &statsEntry{
Name: "Total",
Method: "",
}
stats.total.reset()
return stats
}
func (s *requestStats) logTransaction(name string, success bool, responseTime int64, contentLength int64) {
if success {
s.transactionPassed++
} else {
s.transactionFailed++
}
s.get(name, "transaction").log(responseTime, contentLength)
}
func (s *requestStats) logRequest(method, name string, responseTime int64, contentLength int64) {
s.total.log(responseTime, contentLength)
s.get(name, method).log(responseTime, contentLength)
}
func (s *requestStats) logError(method, name, err string) {
s.total.logError(err)
s.get(name, method).logError(err)
// store error in errors map
key := genMD5(method, name, err)
entry, ok := s.errors[key]
if !ok {
entry = &statsError{
name: name,
method: method,
error: err,
}
s.errors[key] = entry
}
entry.occured()
}
func (s *requestStats) get(name string, method string) (entry *statsEntry) {
entry, ok := s.entries[name+method]
if !ok {
newEntry := &statsEntry{
Name: name,
Method: method,
NumReqsPerSec: make(map[int64]int64),
ResponseTimes: make(map[int64]int64),
}
newEntry.reset()
s.entries[name+method] = newEntry
return newEntry
}
return entry
}
func (s *requestStats) clearAll() {
s.total = &statsEntry{
Name: "Total",
Method: "",
}
s.total.reset()
s.transactionPassed = 0
s.transactionFailed = 0
s.entries = make(map[string]*statsEntry)
s.errors = make(map[string]*statsError)
s.startTime = time.Now().Unix()
}
func (s *requestStats) serializeStats() []interface{} {
entries := make([]interface{}, 0, len(s.entries))
for _, v := range s.entries {
if !(v.NumRequests == 0 && v.NumFailures == 0) {
entries = append(entries, v.getStrippedReport())
}
}
return entries
}
func (s *requestStats) serializeErrors() map[string]map[string]interface{} {
errors := make(map[string]map[string]interface{})
for k, v := range s.errors {
errors[k] = v.toMap()
}
return errors
}
func (s *requestStats) collectReportData() map[string]interface{} {
data := make(map[string]interface{})
data["transactions"] = map[string]int64{
"passed": s.transactionPassed,
"failed": s.transactionFailed,
}
data["stats"] = s.serializeStats()
data["stats_total"] = s.total.getStrippedReport()
data["errors"] = s.serializeErrors()
s.errors = make(map[string]*statsError)
return data
}
func (s *requestStats) start() {
go func() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
case t := <-s.transactionChan:
s.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-s.requestSuccessChan:
s.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-s.requestFailureChan:
s.logRequest(n.requestType, n.name, n.responseTime, 0)
s.logError(n.requestType, n.name, n.errMsg)
case <-s.clearStatsChan:
s.clearAll()
case <-ticker.C:
data := s.collectReportData()
// send data to channel, no network IO in this goroutine
s.messageToRunnerChan <- data
case <-s.shutdownChan:
return
}
}
}()
}
// close is used by unit tests to avoid leakage of goroutines
func (s *requestStats) close() {
close(s.shutdownChan)
}
// statsEntry represents a single stats entry (name and method)
type statsEntry struct {
// Name (URL) of this stats entry
Name string `json:"name"`
// Method (GET, POST, PUT, etc.)
Method string `json:"method"`
// The number of requests made
NumRequests int64 `json:"num_requests"`
// Number of failed request
NumFailures int64 `json:"num_failures"`
// Total sum of the response times
TotalResponseTime int64 `json:"total_response_time"`
// Minimum response time
MinResponseTime int64 `json:"min_response_time"`
// Maximum response time
MaxResponseTime int64 `json:"max_response_time"`
// A {second => request_count} dict that holds the number of requests made per second
NumReqsPerSec map[int64]int64 `json:"num_reqs_per_sec"`
// A (second => failure_count) dict that hold the number of failures per second
NumFailPerSec map[int64]int64 `json:"num_fail_per_sec"`
// A {response_time => count} dict that holds the response time distribution of all the requests
// The keys (the response time in ms) are rounded to store 1, 2, ... 9, 10, 20. .. 90,
// 100, 200 .. 900, 1000, 2000 ... 9000, in order to save memory.
// This dict is used to calculate the median and percentile response times.
ResponseTimes map[int64]int64 `json:"response_times"`
// The sum of the content length of all the requests for this entry
TotalContentLength int64 `json:"total_content_length"`
// Time of the first request for this entry
StartTime int64 `json:"start_time"`
// Time of the last request for this entry
LastRequestTimestamp int64 `json:"last_request_timestamp"`
// Boomer doesn't allow None response time for requests like locust.
// num_none_requests is added to keep compatible with locust.
NumNoneRequests int64 `json:"num_none_requests"`
}
func (s *statsEntry) reset() {
s.StartTime = time.Now().Unix()
s.NumRequests = 0
s.NumFailures = 0
s.TotalResponseTime = 0
s.ResponseTimes = make(map[int64]int64)
s.MinResponseTime = 0
s.MaxResponseTime = 0
s.LastRequestTimestamp = time.Now().Unix()
s.NumReqsPerSec = make(map[int64]int64)
s.NumFailPerSec = make(map[int64]int64)
s.TotalContentLength = 0
}
func (s *statsEntry) log(responseTime int64, contentLength int64) {
s.NumRequests++
s.logTimeOfRequest()
s.logResponseTime(responseTime)
s.TotalContentLength += contentLength
}
func (s *statsEntry) logTimeOfRequest() {
key := time.Now().Unix()
_, ok := s.NumReqsPerSec[key]
if !ok {
s.NumReqsPerSec[key] = 1
} else {
s.NumReqsPerSec[key]++
}
s.LastRequestTimestamp = key
}
func (s *statsEntry) logResponseTime(responseTime int64) {
s.TotalResponseTime += responseTime
if s.MinResponseTime == 0 {
s.MinResponseTime = responseTime
}
if responseTime < s.MinResponseTime {
s.MinResponseTime = responseTime
}
if responseTime > s.MaxResponseTime {
s.MaxResponseTime = responseTime
}
var roundedResponseTime int64
// to avoid too much data that has to be transferred to the master node when
// running in distributed mode, we save the response time rounded in a dict
// so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000
// see also locust's stats.py
if responseTime < 100 {
roundedResponseTime = responseTime
} else if responseTime < 1000 {
roundedResponseTime = int64(round(float64(responseTime), .5, -1))
} else if responseTime < 10000 {
roundedResponseTime = int64(round(float64(responseTime), .5, -2))
} else {
roundedResponseTime = int64(round(float64(responseTime), .5, -3))
}
_, ok := s.ResponseTimes[roundedResponseTime]
if !ok {
s.ResponseTimes[roundedResponseTime] = 1
} else {
s.ResponseTimes[roundedResponseTime]++
}
}
func (s *statsEntry) logError(err string) {
s.NumFailures++
key := time.Now().Unix()
_, ok := s.NumFailPerSec[key]
if !ok {
s.NumFailPerSec[key] = 1
} else {
s.NumFailPerSec[key]++
}
}
func (s *statsEntry) serialize() map[string]interface{} {
result := make(map[string]interface{})
result["name"] = s.Name
result["method"] = s.Method
result["last_request_timestamp"] = s.LastRequestTimestamp
result["start_time"] = s.StartTime
result["num_requests"] = s.NumRequests
// Boomer doesn't allow None response time for requests like locust.
// num_none_requests is added to keep compatible with locust.
result["num_none_requests"] = 0
result["num_failures"] = s.NumFailures
result["total_response_time"] = s.TotalResponseTime
result["max_response_time"] = s.MaxResponseTime
result["min_response_time"] = s.MinResponseTime
result["total_content_length"] = s.TotalContentLength
result["response_times"] = s.ResponseTimes
result["num_reqs_per_sec"] = s.NumReqsPerSec
result["num_fail_per_sec"] = s.NumFailPerSec
return result
}
func (s *statsEntry) getStrippedReport() map[string]interface{} {
report := s.serialize()
s.reset()
return report
}
type statsError struct {
name string
method string
error string
occurrences int64
}
func (err *statsError) occured() {
err.occurrences++
}
func (err *statsError) toMap() map[string]interface{} {
m := make(map[string]interface{})
m["method"] = err.method
m["name"] = err.name
m["error"] = err.error
m["occurrences"] = err.occurrences
return m
}

View File

@@ -0,0 +1,250 @@
package boomer
import (
"testing"
"time"
)
func TestLogRequest(t *testing.T) {
newStats := newRequestStats()
newStats.logRequest("http", "success", 2, 30)
newStats.logRequest("http", "success", 3, 40)
newStats.logRequest("http", "success", 2, 40)
newStats.logRequest("http", "success", 1, 20)
entry := newStats.get("success", "http")
if entry.NumRequests != 4 {
t.Error("numRequests is wrong, expected: 4, got:", entry.NumRequests)
}
if entry.MinResponseTime != 1 {
t.Error("minResponseTime is wrong, expected: 1, got:", entry.MinResponseTime)
}
if entry.MaxResponseTime != 3 {
t.Error("maxResponseTime is wrong, expected: 3, got:", entry.MaxResponseTime)
}
if entry.TotalResponseTime != 8 {
t.Error("totalResponseTime is wrong, expected: 8, got:", entry.TotalResponseTime)
}
if entry.TotalContentLength != 130 {
t.Error("totalContentLength is wrong, expected: 130, got:", entry.TotalContentLength)
}
// check newStats.total
if newStats.total.NumRequests != 4 {
t.Error("newStats.total.numRequests is wrong, expected: 4, got:", newStats.total.NumRequests)
}
if newStats.total.MinResponseTime != 1 {
t.Error("newStats.total.minResponseTime is wrong, expected: 1, got:", newStats.total.MinResponseTime)
}
if newStats.total.MaxResponseTime != 3 {
t.Error("newStats.total.maxResponseTime is wrong, expected: 3, got:", newStats.total.MaxResponseTime)
}
if newStats.total.TotalResponseTime != 8 {
t.Error("newStats.total.totalResponseTime is wrong, expected: 8, got:", newStats.total.TotalResponseTime)
}
if newStats.total.TotalContentLength != 130 {
t.Error("newStats.total.totalContentLength is wrong, expected: 130, got:", newStats.total.TotalContentLength)
}
}
func BenchmarkLogRequest(b *testing.B) {
newStats := newRequestStats()
for i := 0; i < b.N; i++ {
newStats.logRequest("http", "success", 2, 30)
}
}
func TestRoundedResponseTime(t *testing.T) {
newStats := newRequestStats()
newStats.logRequest("http", "success", 147, 1)
newStats.logRequest("http", "success", 3432, 1)
newStats.logRequest("http", "success", 58760, 1)
entry := newStats.get("success", "http")
responseTimes := entry.ResponseTimes
if len(responseTimes) != 3 {
t.Error("len(responseTimes) is wrong, expected: 3, got:", len(responseTimes))
}
if val, ok := responseTimes[150]; !ok || val != 1 {
t.Error("Rounded response time should be", 150)
}
if val, ok := responseTimes[3400]; !ok || val != 1 {
t.Error("Rounded response time should be", 3400)
}
if val, ok := responseTimes[59000]; !ok || val != 1 {
t.Error("Rounded response time should be", 59000)
}
}
func TestLogError(t *testing.T) {
newStats := newRequestStats()
newStats.logError("http", "failure", "500 error")
newStats.logError("http", "failure", "400 error")
newStats.logError("http", "failure", "400 error")
entry := newStats.get("failure", "http")
if entry.NumFailures != 3 {
t.Error("numFailures is wrong, expected: 3, got:", entry.NumFailures)
}
if newStats.total.NumFailures != 3 {
t.Error("newStats.total.numFailures is wrong, expected: 3, got:", newStats.total.NumFailures)
}
// md5("httpfailure500 error") = 547c38e4e4742c1c581f9e2809ba4f55
err500 := newStats.errors["547c38e4e4742c1c581f9e2809ba4f55"]
if err500.error != "500 error" {
t.Error("Error message is wrong, expected: 500 error, got:", err500.error)
}
if err500.occurrences != 1 {
t.Error("Error occurrences is wrong, expected: 1, got:", err500.occurrences)
}
// md5("httpfailure400 error") = f391c310401ad8e10e929f2ee1a614e4
err400 := newStats.errors["f391c310401ad8e10e929f2ee1a614e4"]
if err400.error != "400 error" {
t.Error("Error message is wrong, expected: 400 error, got:", err400.error)
}
if err400.occurrences != 2 {
t.Error("Error occurrences is wrong, expected: 2, got:", err400.occurrences)
}
}
func BenchmarkLogError(b *testing.B) {
newStats := newRequestStats()
for i := 0; i < b.N; i++ {
// LogError use md5 to calculate hash keys, it may slow down the only goroutine,
// which consumes both requestSuccessChannel and requestFailureChannel.
newStats.logError("http", "failure", "500 error")
}
}
func TestClearAll(t *testing.T) {
newStats := newRequestStats()
newStats.logRequest("http", "success", 1, 20)
newStats.clearAll()
if newStats.total.NumRequests != 0 {
t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests)
}
}
func TestClearAllByChannel(t *testing.T) {
newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.logRequest("http", "success", 1, 20)
newStats.clearStatsChan <- true
if newStats.total.NumRequests != 0 {
t.Error("After clearAll(), newStats.total.numRequests is wrong, expected: 0, got:", newStats.total.NumRequests)
}
}
func TestSerializeStats(t *testing.T) {
newStats := newRequestStats()
newStats.logRequest("http", "success", 1, 20)
serialized := newStats.serializeStats()
if len(serialized) != 1 {
t.Error("The length of serialized results is wrong, expected: 1, got:", len(serialized))
return
}
first := serialized[0]
entry, err := deserializeStatsEntry(first)
if err != nil {
t.Fail()
}
if entry.Name != "success" {
t.Error("The name is wrong, expected:", "success", "got:", entry.Name)
}
if entry.Method != "http" {
t.Error("The method is wrong, expected:", "http", "got:", entry.Method)
}
if entry.NumRequests != int64(1) {
t.Error("The num_requests is wrong, expected:", 1, "got:", entry.NumRequests)
}
if entry.NumFailures != int64(0) {
t.Error("The num_failures is wrong, expected:", 0, "got:", entry.NumFailures)
}
}
func TestSerializeErrors(t *testing.T) {
newStats := newRequestStats()
newStats.logError("http", "failure", "500 error")
newStats.logError("http", "failure", "400 error")
newStats.logError("http", "failure", "400 error")
serialized := newStats.serializeErrors()
if len(serialized) != 2 {
t.Error("The length of serialized results is wrong, expected: 2, got:", len(serialized))
return
}
for key, value := range serialized {
if key == "f391c310401ad8e10e929f2ee1a614e4" {
err := value["error"].(string)
if err != "400 error" {
t.Error("expected: 400 error, got:", err)
}
occurrences := value["occurrences"].(int64)
if occurrences != int64(2) {
t.Error("expected: 2, got:", occurrences)
}
}
}
}
func TestCollectReportData(t *testing.T) {
newStats := newRequestStats()
newStats.logRequest("http", "success", 2, 30)
newStats.logError("http", "failure", "500 error")
result := newStats.collectReportData()
if _, ok := result["stats"]; !ok {
t.Error("Key stats not found")
}
if _, ok := result["stats_total"]; !ok {
t.Error("Key stats not found")
}
if _, ok := result["errors"]; !ok {
t.Error("Key stats not found")
}
}
func TestStatsStart(t *testing.T) {
newStats := newRequestStats()
newStats.start()
defer newStats.close()
newStats.requestSuccessChan <- &requestSuccess{
requestType: "http",
name: "success",
responseTime: 2,
responseLength: 30,
}
newStats.requestFailureChan <- &requestFailure{
requestType: "http",
name: "failure",
responseTime: 1,
errMsg: "500 error",
}
var ticker = time.NewTicker(reportStatsInterval + 500*time.Millisecond)
for {
select {
case <-ticker.C:
t.Error("Timeout waiting for stats reports to runner")
case <-newStats.messageToRunnerChan:
goto end
}
}
end:
}

13
internal/boomer/task.go Normal file
View File

@@ -0,0 +1,13 @@
package boomer
// Task is like the "Locust object" in locust, the python version.
// When boomer receives a start message from master, it will spawn several goroutines to run Task.Fn.
// But users can keep some information in the python version, they can't do the same things in boomer.
// Because Task.Fn is a pure function.
type Task struct {
// The weight is used to distribute goroutines over multiple tasks.
Weight int
// Fn is called by the goroutines allocated to this task, in a loop.
Fn func()
Name string
}

76
internal/boomer/utils.go Normal file
View File

@@ -0,0 +1,76 @@
package boomer
import (
"crypto/md5"
"fmt"
"io"
"log"
"math"
"os"
"runtime/pprof"
"time"
)
func round(val float64, roundOn float64, places int) (newVal float64) {
var round float64
pow := math.Pow(10, float64(places))
digit := pow * val
_, div := math.Modf(digit)
if div >= roundOn {
round = math.Ceil(digit)
} else {
round = math.Floor(digit)
}
newVal = round / pow
return
}
// genMD5 returns the md5 hash of strings.
func genMD5(slice ...string) string {
h := md5.New()
for _, v := range slice {
io.WriteString(h, v)
}
return fmt.Sprintf("%x", h.Sum(nil))
}
// startMemoryProfile starts memory profiling and save the results in file.
func startMemoryProfile(file string, duration time.Duration) (err error) {
f, err := os.Create(file)
if err != nil {
return err
}
log.Println("Start memory profiling for", duration)
time.AfterFunc(duration, func() {
err = pprof.WriteHeapProfile(f)
if err != nil {
log.Println(err)
}
f.Close()
log.Println("Stop memory profiling after", duration)
})
return nil
}
// startCPUProfile starts cpu profiling and save the results in file.
func startCPUProfile(file string, duration time.Duration) (err error) {
f, err := os.Create(file)
if err != nil {
return err
}
log.Println("Start cpu profiling for", duration)
err = pprof.StartCPUProfile(f)
if err != nil {
f.Close()
return err
}
time.AfterFunc(duration, func() {
pprof.StopCPUProfile()
f.Close()
log.Println("Stop CPU profiling after", duration)
})
return nil
}

View File

@@ -0,0 +1,73 @@
package boomer
import (
"os"
"testing"
"time"
)
func TestRound(t *testing.T) {
if int(round(float64(147.5002), .5, -1)) != 150 {
t.Error("147.5002 should be rounded to 150")
}
if int(round(float64(3432.5002), .5, -2)) != 3400 {
t.Error("3432.5002 should be rounded to 3400")
}
roundOne := round(float64(58760.5002), .5, -3)
roundTwo := round(float64(58960.6003), .5, -3)
if roundOne != roundTwo {
t.Error("round(58760.5002) should be equal to round(58960.6003)")
}
roundOne = round(float64(58360.5002), .5, -3)
roundTwo = round(float64(58460.6003), .5, -3)
if roundOne != roundTwo {
t.Error("round(58360.5002) should be equal to round(58460.6003)")
}
roundOne = round(float64(58360), .5, -3)
roundTwo = round(float64(58460), .5, -3)
if roundOne != roundTwo {
t.Error("round(58360) should be equal to round(58460)")
}
}
func TestGenMD5(t *testing.T) {
hashValue := genMD5("Hello", "World!")
if hashValue != "06e0e6637d27b2622ab52022db713ce2" {
t.Error("Expected: 06e0e6637d27b2622ab52022db713ce2, Got: ", hashValue)
}
}
func TestStartMemoryProfile(t *testing.T) {
if _, err := os.Stat("mem.pprof"); os.IsExist(err) {
os.Remove("mem.pprof")
}
if err := startMemoryProfile("mem.pprof", 2*time.Second); err != nil {
t.Error("Error starting memory profiling")
}
time.Sleep(2100 * time.Millisecond)
if _, err := os.Stat("mem.pprof"); os.IsNotExist(err) {
t.Error("File mem.pprof is not generated")
} else {
os.Remove("mem.pprof")
}
}
func TestStartCPUProfile(t *testing.T) {
if _, err := os.Stat("cpu.pprof"); os.IsExist(err) {
os.Remove("cpu.pprof")
}
if err := startCPUProfile("cpu.pprof", 2*time.Second); err != nil {
t.Error("Error starting cpu profiling")
}
time.Sleep(2100 * time.Millisecond)
if _, err := os.Stat("cpu.pprof"); os.IsNotExist(err) {
t.Error("File cpu.pprof is not generated")
} else {
os.Remove("cpu.pprof")
}
}