From 5b085be6bcd9b2b0435b71a13879127cbf3a1807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Tue, 12 Jul 2022 22:01:12 +0800 Subject: [PATCH] feat: get client export ip on the server --- hrp/internal/boomer/runner.go | 19 ++++++++++--------- hrp/internal/boomer/server_grpc.go | 14 ++++++++++---- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index c6e87937..ffbcce72 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -840,7 +840,7 @@ func (r *workerRunner) run() { log.Warn().Msg("Timeout waiting for sending quit message to master, boomer will quit any way.") } - if r.getState() != StateMissing { + if atomic.LoadInt32(&r.client.failCount) < 2 { if err = r.client.signOut(r.client.config.ctx); err != nil { log.Error().Err(err).Msg("failed to sign out") } @@ -1293,17 +1293,18 @@ func (r *masterRunner) reportStats() { println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v", currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount())) table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"Worker ID", "State", "Current Users", "CPU Usage", "CPU Warning Emitted", "Memory Usage", "Heartbeat"}) + table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage", "CPU Warning Emitted", "Memory Usage", "Heartbeat"}) for _, worker := range r.server.getAllWorkers() { - row := make([]string, 7) + row := make([]string, 8) row[0] = worker.ID - row[1] = fmt.Sprintf("%v", getStateName(worker.getState())) - row[2] = fmt.Sprintf("%v", worker.getSpawnCount()) - row[3] = fmt.Sprintf("%v", worker.getCPUUsage()) - row[4] = fmt.Sprintf("%v", worker.getCPUWarningEmitted()) - row[5] = fmt.Sprintf("%v", worker.getMemoryUsage()) - row[6] = fmt.Sprintf("%v", worker.getHeartbeat()) + row[1] = worker.IP + row[2] = fmt.Sprintf("%v", getStateName(worker.getState())) + row[3] = fmt.Sprintf("%v", worker.getSpawnCount()) + row[4] = fmt.Sprintf("%v", worker.getCPUUsage()) + row[5] = fmt.Sprintf("%v", worker.getCPUWarningEmitted()) + row[6] = fmt.Sprintf("%v", worker.getMemoryUsage()) + row[7] = fmt.Sprintf("%v", worker.getHeartbeat()) table.Append(row) } table.Render() diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index 0c2f36f5..8702ef74 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" @@ -23,6 +24,7 @@ import ( type WorkerNode struct { ID string `json:"id"` + IP string `json:"ip"` State int32 `json:"state"` Heartbeat int32 `json:"heartbeat"` SpawnCount int64 `json:"spawn_count"` @@ -34,9 +36,9 @@ type WorkerNode struct { disconnectedChan chan bool } -func newWorkerNode(id string) *WorkerNode { +func newWorkerNode(id, ip string) *WorkerNode { stream := make(chan *messager.StreamResponse, 100) - return &WorkerNode{State: StateInit, ID: id, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)} + return &WorkerNode{State: StateInit, ID: id, IP: ip, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)} } func (w *WorkerNode) getState() int32 { @@ -116,6 +118,7 @@ func (w *WorkerNode) getWorkerInfo() WorkerNode { defer w.mutex.RUnlock() return WorkerNode{ ID: w.ID, + IP: w.IP, State: w.getState(), Heartbeat: w.getHeartbeat(), SpawnCount: w.getSpawnCount(), @@ -257,9 +260,12 @@ func (s *grpcServer) start() (err error) { return nil } -func (s *grpcServer) Register(_ context.Context, req *messager.RegisterRequest) (*messager.RegisterResponse, error) { +func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest) (*messager.RegisterResponse, error) { + // get client ip + p, _ := peer.FromContext(ctx) + clientIp := strings.Split(p.Addr.String(), ":")[0] // store worker information - wn := newWorkerNode(req.NodeID) + wn := newWorkerNode(req.NodeID, clientIp) s.clients.Store(req.NodeID, wn) log.Warn().Str("worker id", req.NodeID).Msg("worker joined") return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil