From 2ca94381efa2249c8086eb85afd136db8af81489 Mon Sep 17 00:00:00 2001 From: xucong053 Date: Mon, 4 Jul 2022 22:00:39 +0800 Subject: [PATCH] fix: report stats for httprunner master --- hrp/internal/boomer/output.go | 14 +------ hrp/internal/boomer/runner.go | 62 ++++++++++++++++++++++++++---- hrp/internal/boomer/server_grpc.go | 4 +- 3 files changed, 57 insertions(+), 23 deletions(-) diff --git a/hrp/internal/boomer/output.go b/hrp/internal/boomer/output.go index a0866e02..152f0768 100644 --- a/hrp/internal/boomer/output.go +++ b/hrp/internal/boomer/output.go @@ -116,19 +116,7 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) { return } - var state string - switch output.State { - case StateInit: - state = "initializing" - case StateSpawning: - state = "spawning" - case StateRunning: - state = "running" - case StateQuitting: - state = "quitting" - case StateStopped: - state = "stopped" - } + state := getStateName(output.State) currentTime := time.Now() println(fmt.Sprintf("Current time: %s, Users: %d, State: %s, Total RPS: %.1f, Total Average Response Time: %.1fms, Total Fail Ratio: %.1f%%", diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 8b87d6a7..80d8552e 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -26,6 +26,26 @@ const ( StateMissing // missing ) +func getStateName(state int32) (stateName string) { + switch state { + case StateInit: + stateName = "initializing" + case StateSpawning: + stateName = "spawning" + case StateRunning: + stateName = "running" + case StateStopping: + stateName = "stopping" + case StateStopped: + stateName = "stopped" + case StateQuitting: + stateName = "quitting" + case StateMissing: + stateName = "stopped" + } + return +} + const ( reportStatsInterval = 3 * time.Second heartbeatInterval = 1 * time.Second @@ -839,12 +859,13 @@ func (r *masterRunner) setExpectWorkers(expectWorkers int, expectWorkersMaxWait func (r *masterRunner) heartbeatWorker() { log.Info().Msg("heartbeatWorker, listen and record heartbeat from worker") - var ticker = time.NewTicker(heartbeatInterval) + var heartBeatTicker = time.NewTicker(heartbeatInterval) + var reportTicker = time.NewTicker(heartbeatLiveness) for { select { case <-r.closeChan: return - case <-ticker.C: + case <-heartBeatTicker.C: r.server.clients.Range(func(key, value interface{}) bool { workerInfo, ok := value.(*WorkerNode) if !ok { @@ -863,6 +884,8 @@ func (r *masterRunner) heartbeatWorker() { } return true }) + case <-reportTicker.C: + r.reportStats() } } } @@ -889,7 +912,7 @@ func (r *masterRunner) clientListener() { } workerInfo.setState(StateInit) if r.getState() == StateRunning { - println(fmt.Sprintf("worker(%s) joined, ready to rebalance the load of each worker", workerInfo.ID)) + log.Warn().Str("worker id", workerInfo.ID).Msg("worker joined, ready to rebalance the load of each worker") err := r.rebalance() if err != nil { log.Error().Err(err).Msg("failed to rebalance") @@ -916,7 +939,7 @@ func (r *masterRunner) clientListener() { case typeSpawningComplete: workerInfo.setState(StateRunning) if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() { - println(fmt.Sprintf("all(%v) workers spawn done, setting state as running", r.server.getClientsLength())) + log.Warn().Msg("all workers spawn done, setting state as running") r.updateState(StateRunning) } case typeQuit: @@ -926,7 +949,7 @@ func (r *masterRunner) clientListener() { workerInfo.setState(StateQuitting) if r.isStarted() { if r.server.getClientsLength() > 0 { - println(fmt.Sprintf("worker(%s) quited, ready to rebalance the load of each worker", workerInfo.ID)) + log.Warn().Str("worker id", workerInfo.ID).Msg("worker quited, ready to rebalance the load of each worker") err := r.rebalance() if err != nil { log.Error().Err(err).Msg("failed to rebalance") @@ -1004,8 +1027,7 @@ func (r *masterRunner) start() error { profile := r.profile.dispatch(int64(numWorkers)) r.server.sendChannel() <- newMessageToWorker("spawn", ProfileToBytes(profile), nil, testcase) - println("send spawn data to worker successful") - log.Info().Msg("send spawn data to worker successful") + log.Warn().Interface("profile", profile).Msg("send spawn data to worker successful") return nil } @@ -1017,7 +1039,7 @@ func (r *masterRunner) rebalance() error { profile := r.profile.dispatch(int64(numWorkers)) r.server.sendChannel() <- newMessageToWorker("rebalance", ProfileToBytes(profile), nil, nil) - println("send rebalance data to worker successful") + log.Warn().Msg("send rebalance data to worker successful") return nil } @@ -1061,3 +1083,27 @@ func (r *masterRunner) close() { close(r.closeChan) r.server.close() } + +func (r *masterRunner) reportStats() { + currentTime := time.Now() + println() + println("===================== HttpRunner Master for Distributed Load Testing ===================== ") + println(fmt.Sprintf("Current time: %s, State: %v, Current Valid Workers: %v, Target Users: %v", + currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount())) + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader([]string{"Worker ID", "State", "Current Users", "CPU Usage", "CPU Warning Emitted", "Memory Usage", "Heartbeat"}) + + for _, worker := range r.server.getAllWorkers() { + row := make([]string, 7) + row[0] = worker.ID + row[1] = fmt.Sprintf("%v", getStateName(worker.getState())) + row[2] = fmt.Sprintf("%v", worker.getSpawnCount()) + row[3] = fmt.Sprintf("%v", worker.getCPUUsage()) + row[4] = fmt.Sprintf("%v", worker.getCPUWarningEmitted()) + row[5] = fmt.Sprintf("%v", worker.getMemoryUsage()) + row[6] = fmt.Sprintf("%v", worker.getHeartbeat()) + table.Append(row) + } + table.Render() + println() +} diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index bcd85001..0bf593ea 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -35,10 +35,10 @@ func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_Bidirect } wn := &WorkerNode{messenger: srv, ID: req.NodeID, Heartbeat: 3} s.clients.Store(req.NodeID, wn) - println(fmt.Sprintf("worker(%v) joined, current worker count: %v", req.NodeID, s.getClientsLength())) + log.Warn().Str("worker id", req.NodeID).Msg("worker joined") <-s.disconnectedChannel() s.clients.Delete(req.NodeID) - println(fmt.Sprintf("worker(%v) quited, current worker count: %v", req.NodeID, s.getClientsLength())) + log.Warn().Str("worker id", req.NodeID).Msg("worker quited") return nil }