feat: get client export ip on the server

This commit is contained in:
徐聪
2022-07-12 22:01:12 +08:00
parent 0c710dc103
commit 5b085be6bc
2 changed files with 20 additions and 13 deletions

View File

@@ -840,7 +840,7 @@ func (r *workerRunner) run() {
log.Warn().Msg("Timeout waiting for sending quit message to master, boomer will quit any way.")
}
if r.getState() != StateMissing {
if atomic.LoadInt32(&r.client.failCount) < 2 {
if err = r.client.signOut(r.client.config.ctx); err != nil {
log.Error().Err(err).Msg("failed to sign out")
}
@@ -1293,17 +1293,18 @@ func (r *masterRunner) reportStats() {
println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v",
currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount()))
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Worker ID", "State", "Current Users", "CPU Usage", "CPU Warning Emitted", "Memory Usage", "Heartbeat"})
table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage", "CPU Warning Emitted", "Memory Usage", "Heartbeat"})
for _, worker := range r.server.getAllWorkers() {
row := make([]string, 7)
row := make([]string, 8)
row[0] = worker.ID
row[1] = fmt.Sprintf("%v", getStateName(worker.getState()))
row[2] = fmt.Sprintf("%v", worker.getSpawnCount())
row[3] = fmt.Sprintf("%v", worker.getCPUUsage())
row[4] = fmt.Sprintf("%v", worker.getCPUWarningEmitted())
row[5] = fmt.Sprintf("%v", worker.getMemoryUsage())
row[6] = fmt.Sprintf("%v", worker.getHeartbeat())
row[1] = worker.IP
row[2] = fmt.Sprintf("%v", getStateName(worker.getState()))
row[3] = fmt.Sprintf("%v", worker.getSpawnCount())
row[4] = fmt.Sprintf("%v", worker.getCPUUsage())
row[5] = fmt.Sprintf("%v", worker.getCPUWarningEmitted())
row[6] = fmt.Sprintf("%v", worker.getMemoryUsage())
row[7] = fmt.Sprintf("%v", worker.getHeartbeat())
table.Append(row)
}
table.Render()

View File

@@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
@@ -23,6 +24,7 @@ import (
type WorkerNode struct {
ID string `json:"id"`
IP string `json:"ip"`
State int32 `json:"state"`
Heartbeat int32 `json:"heartbeat"`
SpawnCount int64 `json:"spawn_count"`
@@ -34,9 +36,9 @@ type WorkerNode struct {
disconnectedChan chan bool
}
func newWorkerNode(id string) *WorkerNode {
func newWorkerNode(id, ip string) *WorkerNode {
stream := make(chan *messager.StreamResponse, 100)
return &WorkerNode{State: StateInit, ID: id, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)}
return &WorkerNode{State: StateInit, ID: id, IP: ip, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)}
}
func (w *WorkerNode) getState() int32 {
@@ -116,6 +118,7 @@ func (w *WorkerNode) getWorkerInfo() WorkerNode {
defer w.mutex.RUnlock()
return WorkerNode{
ID: w.ID,
IP: w.IP,
State: w.getState(),
Heartbeat: w.getHeartbeat(),
SpawnCount: w.getSpawnCount(),
@@ -257,9 +260,12 @@ func (s *grpcServer) start() (err error) {
return nil
}
func (s *grpcServer) Register(_ context.Context, req *messager.RegisterRequest) (*messager.RegisterResponse, error) {
func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest) (*messager.RegisterResponse, error) {
// get client ip
p, _ := peer.FromContext(ctx)
clientIp := strings.Split(p.Addr.String(), ":")[0]
// store worker information
wn := newWorkerNode(req.NodeID)
wn := newWorkerNode(req.NodeID, clientIp)
s.clients.Store(req.NodeID, wn)
log.Warn().Str("worker id", req.NodeID).Msg("worker joined")
return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil