feat: add boomer state

This commit is contained in:
debugtalk
2021-12-23 22:15:30 +08:00
parent 0c0c154a28
commit 4ad57585cd
3 changed files with 31 additions and 9 deletions

View File

@@ -8,6 +8,7 @@
- feat: report GA events with version
- feat: run load test with the given limit and burst as rate limiter
- change: update API models
- feat: report runner state
## v0.2.2 (2021-12-07)

View File

@@ -125,8 +125,8 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) {
}
currentTime := time.Now()
println(fmt.Sprintf("Current time: %s, Users: %d, Total RPS: %d, Total Fail Ratio: %.1f%%",
currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.TotalRPS, output.TotalFailRatio*100))
println(fmt.Sprintf("Current time: %s, Users: %d, State: %d, Total RPS: %d, Total Fail Ratio: %.1f%%",
currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.State, output.TotalRPS, output.TotalFailRatio*100))
println(fmt.Sprintf("Accumulated Transactions: %d Passed, %d Failed",
output.TransactionsPassed, output.TransactionsFailed))
table := tablewriter.NewWriter(os.Stdout)
@@ -163,6 +163,7 @@ type statsEntryOutput struct {
type dataOutput struct {
UserCount int32 `json:"user_count"`
State int32 `json:"state"`
TotalStats *statsEntryOutput `json:"stats_total"`
TransactionsPassed int64 `json:"transactions_passed"`
TransactionsFailed int64 `json:"transactions_failed"`
@@ -177,6 +178,10 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
if !ok {
return nil, fmt.Errorf("user_count is not int32")
}
state, ok := data["state"].(int32)
if !ok {
return nil, fmt.Errorf("state is not int32")
}
stats, ok := data["stats"].([]interface{})
if !ok {
return nil, fmt.Errorf("stats is not []interface{}")
@@ -201,6 +206,7 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) {
output = &dataOutput{
UserCount: userCount,
State: state,
TotalStats: entryTotalOutput,
TransactionsPassed: transactionsPassed,
TransactionsFailed: transactionsFailed,
@@ -321,6 +327,12 @@ var (
Help: "The current number of users",
},
)
gaugeState = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "state",
Help: "The current runner state, 1=initializing, 2=spawning, 3=running, 4=quitting, 5=stopped",
},
)
gaugeTotalRPS = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "total_rps",
@@ -377,6 +389,7 @@ func (o *PrometheusPusherOutput) OnStart() {
gaugeCurrentFailPerSec,
// gauges for total
gaugeUsers,
gaugeState,
gaugeTotalRPS,
gaugeTotalFailRatio,
gaugeTransactionsPassed,
@@ -401,6 +414,9 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
// user count
gaugeUsers.Set(float64(output.UserCount))
// runner state
gaugeState.Set(float64(output.State))
// rps in total
gaugeTotalRPS.Set(float64(output.TotalRPS))

View File

@@ -13,11 +13,11 @@ import (
)
const (
stateInit = "ready"
stateSpawning = "spawning"
stateRunning = "running"
stateStopped = "stopped"
stateQuitting = "quitting"
stateInit = iota + 1 // initializing
stateSpawning // spawning
stateRunning // running
stateQuitting // quitting
stateStopped // stopped
)
const (
@@ -25,7 +25,7 @@ const (
)
type runner struct {
state string
state int32
tasks []*Task
totalTaskWeight int
@@ -116,6 +116,7 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
atomic.StoreInt32(&r.state, stateSpawning)
// TODO: spawn workers with spawnRate
for i := 1; i <= spawnCount; i++ {
select {
@@ -150,6 +151,7 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool,
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
atomic.StoreInt32(&r.state, stateRunning)
}
// setTasks will set the runner's task list AND the total task weight
@@ -217,7 +219,7 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw
func (r *localRunner) start() {
// init state
r.state = stateInit
atomic.StoreInt32(&r.state, stateInit)
atomic.StoreInt32(&r.currentClientsNum, 0)
r.stats.clearAll()
@@ -248,6 +250,7 @@ func (r *localRunner) start() {
case <-ticker.C:
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEevent(data)
case <-r.stopChan:
// stop previous goroutines without blocking
@@ -265,5 +268,7 @@ func (r *localRunner) start() {
}
func (r *localRunner) stop() {
atomic.StoreInt32(&r.state, stateQuitting)
close(r.stopChan)
atomic.StoreInt32(&r.state, stateStopped)
}