diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index cfbd17ea..69b782b4 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -491,11 +491,20 @@ func (b *Boomer) Stop() error { return b.masterRunner.stop() } -// GetWorkersInfo gets workers +// GetWorkersInfo gets workers information func (b *Boomer) GetWorkersInfo() []WorkerNode { return b.masterRunner.server.getAllWorkers() } +// GetMasterInfo gets master information +func (b *Boomer) GetMasterInfo() map[string]interface{} { + masterInfo := make(map[string]interface{}) + masterInfo["state"] = getStateName(b.masterRunner.getState()) + masterInfo["workers"] = b.masterRunner.server.getClientsLength() + masterInfo["target_users"] = b.masterRunner.getSpawnCount() + return masterInfo +} + func (b *Boomer) GetCloseChan() chan bool { switch b.mode { case DistributedWorkerMode: diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index df68c155..9a2f8794 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -1064,11 +1064,26 @@ func (r *masterRunner) clientListener() { workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"]))) } workerInfo.updateHeartbeat(3) - workerInfo.updateCPUUsage(builtin.ByteToFloat64(msg.Data["current_cpu_usage"])) - workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(msg.Data["current_pid_cpu_usage"])) - workerInfo.updateMemoryUsage(builtin.ByteToFloat64(msg.Data["current_memory_usage"])) - workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(msg.Data["current_pid_memory_usage"])) - workerInfo.updateUserCount(builtin.BytesToInt64(msg.Data["current_users"])) + currentCPUUsage, ok := msg.Data["current_cpu_usage"] + if ok { + workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage)) + } + currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"] + if ok { + workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage)) + } + currentMemoryUsage, ok := msg.Data["current_memory_usage"] + if ok { + workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage)) + } + currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"] + if ok { + workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage)) + } + currentUsers, ok := msg.Data["current_users"] + if ok { + workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers)) + } case typeSpawning: workerInfo.setState(StateSpawning) case typeSpawningComplete: diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index 5b71e451..f8a9e228 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/httprunner/httprunner/v4/hrp/internal/boomer/grpc/messager" + "github.com/httprunner/httprunner/v4/hrp/internal/builtin" "github.com/stretchr/testify/assert" ) @@ -529,7 +530,7 @@ func TestHeartbeatWorker(t *testing.T) { runner.server.recvChannel() <- &genericMessage{ Type: typeHeartbeat, NodeID: "testID2", - Data: map[string]int64{"state": 3}, + Data: map[string][]byte{"state": builtin.Int64ToBytes(3)}, } worker2, ok := runner.server.getClients().Load("testID2") if !ok { diff --git a/hrp/server.go b/hrp/server.go index cb1c07b9..c060d409 100644 --- a/hrp/server.go +++ b/hrp/server.go @@ -135,10 +135,6 @@ type CommonResponseBody struct { } type APIGetWorkersRequestBody struct { - ID string `json:"id"` - State int32 `json:"state"` - CPUUsage float64 `json:"cpu_usage"` - MemoryUsage float64 `json:"memory_usage"` } type APIGetWorkersResponseBody struct { @@ -146,6 +142,14 @@ type APIGetWorkersResponseBody struct { Data []boomer.WorkerNode `json:"data"` } +type APIGetMasterRequestBody struct { +} + +type APIGetMasterResponseBody struct { + ServerStatus + Data map[string]interface{} `json:"data"` +} + type apiHandler struct { boomer *HRPBoomer } @@ -321,6 +325,16 @@ func (api *apiHandler) GetWorkersInfo(w http.ResponseWriter, r *http.Request) { writeJSON(w, body, http.StatusOK) } +func (api *apiHandler) GetMasterInfo(w http.ResponseWriter, r *http.Request) { + resp := &APIGetMasterResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + Data: api.boomer.GetMasterInfo(), + } + + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) +} + func (api *apiHandler) Handler() http.Handler { mux := http.NewServeMux() @@ -330,6 +344,7 @@ func (api *apiHandler) Handler() http.Handler { mux.HandleFunc("/stop", methods(api.Stop, "GET")) mux.HandleFunc("/quit", methods(api.Quit, "GET")) mux.HandleFunc("/workers", methods(api.GetWorkersInfo, "GET")) + mux.HandleFunc("/master", methods(api.GetMasterInfo, "GET")) return mux }