diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 19770c9f..3dcce593 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,7 @@ - feat: report GA events with version - feat: run load test with the given limit and burst as rate limiter - change: update API models +- feat: report runner state ## v0.2.2 (2021-12-07) diff --git a/internal/boomer/output.go b/internal/boomer/output.go index 3f897e91..958134fe 100644 --- a/internal/boomer/output.go +++ b/internal/boomer/output.go @@ -125,8 +125,8 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) { } currentTime := time.Now() - println(fmt.Sprintf("Current time: %s, Users: %d, Total RPS: %d, Total Fail Ratio: %.1f%%", - currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.TotalRPS, output.TotalFailRatio*100)) + println(fmt.Sprintf("Current time: %s, Users: %d, State: %d, Total RPS: %d, Total Fail Ratio: %.1f%%", + currentTime.Format("2006/01/02 15:04:05"), output.UserCount, output.State, output.TotalRPS, output.TotalFailRatio*100)) println(fmt.Sprintf("Accumulated Transactions: %d Passed, %d Failed", output.TransactionsPassed, output.TransactionsFailed)) table := tablewriter.NewWriter(os.Stdout) @@ -163,6 +163,7 @@ type statsEntryOutput struct { type dataOutput struct { UserCount int32 `json:"user_count"` + State int32 `json:"state"` TotalStats *statsEntryOutput `json:"stats_total"` TransactionsPassed int64 `json:"transactions_passed"` TransactionsFailed int64 `json:"transactions_failed"` @@ -177,6 +178,10 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) { if !ok { return nil, fmt.Errorf("user_count is not int32") } + state, ok := data["state"].(int32) + if !ok { + return nil, fmt.Errorf("state is not int32") + } stats, ok := data["stats"].([]interface{}) if !ok { return nil, fmt.Errorf("stats is not []interface{}") @@ -201,6 +206,7 @@ func convertData(data map[string]interface{}) (output *dataOutput, err error) { output = &dataOutput{ UserCount: userCount, + State: state, TotalStats: entryTotalOutput, TransactionsPassed: transactionsPassed, TransactionsFailed: transactionsFailed, @@ -321,6 +327,12 @@ var ( Help: "The current number of users", }, ) + gaugeState = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "state", + Help: "The current runner state, 1=initializing, 2=spawning, 3=running, 4=quitting, 5=stopped", + }, + ) gaugeTotalRPS = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "total_rps", @@ -377,6 +389,7 @@ func (o *PrometheusPusherOutput) OnStart() { gaugeCurrentFailPerSec, // gauges for total gaugeUsers, + gaugeState, gaugeTotalRPS, gaugeTotalFailRatio, gaugeTransactionsPassed, @@ -401,6 +414,9 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) { // user count gaugeUsers.Set(float64(output.UserCount)) + // runner state + gaugeState.Set(float64(output.State)) + // rps in total gaugeTotalRPS.Set(float64(output.TotalRPS)) diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index a60ba805..4d7f56b8 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -13,11 +13,11 @@ import ( ) const ( - stateInit = "ready" - stateSpawning = "spawning" - stateRunning = "running" - stateStopped = "stopped" - stateQuitting = "quitting" + stateInit = iota + 1 // initializing + stateSpawning // spawning + stateRunning // running + stateQuitting // quitting + stateStopped // stopped ) const ( @@ -25,7 +25,7 @@ const ( ) type runner struct { - state string + state int32 tasks []*Task totalTaskWeight int @@ -116,6 +116,7 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, Float64("spawnRate", spawnRate). Msg("Spawning workers") + atomic.StoreInt32(&r.state, stateSpawning) // TODO: spawn workers with spawnRate for i := 1; i <= spawnCount; i++ { select { @@ -150,6 +151,7 @@ func (r *runner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, if spawnCompleteFunc != nil { spawnCompleteFunc() } + atomic.StoreInt32(&r.state, stateRunning) } // setTasks will set the runner's task list AND the total task weight @@ -217,7 +219,7 @@ func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spaw func (r *localRunner) start() { // init state - r.state = stateInit + atomic.StoreInt32(&r.state, stateInit) atomic.StoreInt32(&r.currentClientsNum, 0) r.stats.clearAll() @@ -248,6 +250,7 @@ func (r *localRunner) start() { case <-ticker.C: data := r.stats.collectReportData() data["user_count"] = atomic.LoadInt32(&r.currentClientsNum) + data["state"] = atomic.LoadInt32(&r.state) r.outputOnEevent(data) case <-r.stopChan: // stop previous goroutines without blocking @@ -265,5 +268,7 @@ func (r *localRunner) start() { } func (r *localRunner) stop() { + atomic.StoreInt32(&r.state, stateQuitting) close(r.stopChan) + atomic.StoreInt32(&r.state, stateStopped) }