feat: report worker system information to the master

This commit is contained in:
徐聪
2022-07-15 13:46:09 +08:00
parent 76e058d709
commit 89871e6c68
8 changed files with 279 additions and 159 deletions

View File

@@ -3,6 +3,7 @@ package boomer
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
@@ -181,7 +182,7 @@ func (c *grpcClient) start() (err error) {
func (c *grpcClient) register(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
res, err := c.Register(ctx, &messager.RegisterRequest{NodeID: c.identity})
res, err := c.Register(ctx, &messager.RegisterRequest{NodeID: c.identity, Os: runtime.GOOS, Arch: runtime.GOARCH})
if err != nil {
return err
}
@@ -212,6 +213,9 @@ func (c *grpcClient) newBiStreamClient() (err error) {
if err != nil {
return err
}
// reset failCount
atomic.StoreInt32(&c.failCount, 0)
// set bidirectional stream client
c.config.setBiStreamClient(biStream)
println("successful to establish bidirectional stream with master, press Ctrl+c to quit.")
return nil

View File

@@ -25,9 +25,9 @@ type StreamRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
NodeID string `protobuf:"bytes,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
Data map[string][]byte `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *StreamRequest) Reset() {
@@ -69,13 +69,6 @@ func (x *StreamRequest) GetType() string {
return ""
}
func (x *StreamRequest) GetData() map[string]int64 {
if x != nil {
return x.Data
}
return nil
}
func (x *StreamRequest) GetNodeID() string {
if x != nil {
return x.NodeID
@@ -83,16 +76,23 @@ func (x *StreamRequest) GetNodeID() string {
return ""
}
func (x *StreamRequest) GetData() map[string][]byte {
if x != nil {
return x.Data
}
return nil
}
type StreamResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Profile []byte `protobuf:"bytes,2,opt,name=profile,proto3" json:"profile,omitempty"`
Data map[string]int64 `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,4,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
Tasks []byte `protobuf:"bytes,5,opt,name=tasks,proto3" json:"tasks,omitempty"`
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
NodeID string `protobuf:"bytes,2,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
Profile []byte `protobuf:"bytes,3,opt,name=profile,proto3" json:"profile,omitempty"`
Tasks []byte `protobuf:"bytes,4,opt,name=tasks,proto3" json:"tasks,omitempty"`
Data map[string][]byte `protobuf:"bytes,5,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *StreamResponse) Reset() {
@@ -134,20 +134,6 @@ func (x *StreamResponse) GetType() string {
return ""
}
func (x *StreamResponse) GetProfile() []byte {
if x != nil {
return x.Profile
}
return nil
}
func (x *StreamResponse) GetData() map[string]int64 {
if x != nil {
return x.Data
}
return nil
}
func (x *StreamResponse) GetNodeID() string {
if x != nil {
return x.NodeID
@@ -155,6 +141,13 @@ func (x *StreamResponse) GetNodeID() string {
return ""
}
func (x *StreamResponse) GetProfile() []byte {
if x != nil {
return x.Profile
}
return nil
}
func (x *StreamResponse) GetTasks() []byte {
if x != nil {
return x.Tasks
@@ -162,12 +155,21 @@ func (x *StreamResponse) GetTasks() []byte {
return nil
}
func (x *StreamResponse) GetData() map[string][]byte {
if x != nil {
return x.Data
}
return nil
}
type RegisterRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
NodeID string `protobuf:"bytes,1,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
NodeID string `protobuf:"bytes,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
Os string `protobuf:"bytes,2,opt,name=os,proto3" json:"os,omitempty"`
Arch string `protobuf:"bytes,3,opt,name=arch,proto3" json:"arch,omitempty"`
}
func (x *RegisterRequest) Reset() {
@@ -209,6 +211,20 @@ func (x *RegisterRequest) GetNodeID() string {
return ""
}
func (x *RegisterRequest) GetOs() string {
if x != nil {
return x.Os
}
return ""
}
func (x *RegisterRequest) GetArch() string {
if x != nil {
return x.Arch
}
return ""
}
type RegisterResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -269,7 +285,7 @@ type SignOutRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
NodeID string `protobuf:"bytes,1,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
NodeID string `protobuf:"bytes,1,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
}
func (x *SignOutRequest) Reset() {
@@ -373,59 +389,61 @@ var file_grpc_proto_messager_proto_rawDesc = []byte{
0x73, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x22, 0xaa, 0x01, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x34, 0x0a, 0x04, 0x64, 0x61,
0x74, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f,
0x64, 0x65, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65,
0x49, 0x44, 0x12, 0x34, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xdc, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x66,
0x69, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x66, 0x69,
0x6c, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64,
0x65, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49,
0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45,
0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65,
0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44,
0x12, 0x18, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61,
0x73, 0x6b, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73,
0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72,
0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x22, 0x29, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x40, 0x0a, 0x10, 0x52,
0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63,
0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x28, 0x0a,
0x0e, 0x53, 0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x3f, 0x0a, 0x0f, 0x53, 0x69, 0x67, 0x6e, 0x4f,
0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f,
0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18,
0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xe4, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73,
0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x07, 0x53, 0x69, 0x67, 0x6e, 0x4f,
0x75, 0x74, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x69, 0x67,
0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x1d, 0x42, 0x69, 0x64, 0x69, 0x72,
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e,
0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42,
0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x72,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x22, 0x4d, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x6f,
0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x6f, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x61,
0x72, 0x63, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x63, 0x68, 0x22,
0x40, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x28, 0x0a, 0x0e, 0x53, 0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x3f, 0x0a, 0x0f, 0x53,
0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12,
0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x6f,
0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xe4, 0x01, 0x0a,
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69,
0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52,
0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19,
0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x07, 0x53,
0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x12, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x53, 0x69, 0x67, 0x6e, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x18, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x69, 0x67, 0x6e, 0x4f, 0x75,
0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x1d, 0x42,
0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28,
0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -12,20 +12,22 @@ service Message {
message StreamRequest{
string type = 1;
map<string, int64> data = 2;
string NodeID = 3;
string nodeID = 2;
map<string, bytes> data = 3;
}
message StreamResponse{
string type = 1;
bytes profile = 2;
map<string, int64> data = 3;
string NodeID = 4;
bytes tasks = 5;
string nodeID = 2;
bytes profile = 3;
bytes tasks = 4;
map<string, bytes> data = 5;
}
message RegisterRequest{
string NodeID = 1;
string nodeID = 1;
string os = 2;
string arch = 3;
}
message RegisterResponse{
@@ -34,7 +36,7 @@ message RegisterResponse{
}
message SignOutRequest{
string NodeID = 1;
string nodeID = 1;
}
message SignOutResponse{

View File

@@ -11,11 +11,11 @@ const (
)
type genericMessage struct {
Type string `json:"type,omitempty"`
Profile []byte `json:"profile,omitempty"`
Data map[string]int64 `json:"data,omitempty"`
NodeID string `json:"node_id,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
Type string `json:"type,omitempty"`
Profile []byte `json:"profile,omitempty"`
Data map[string][]byte `json:"data,omitempty"`
NodeID string `json:"node_id,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
}
type task struct {
@@ -23,7 +23,7 @@ type task struct {
TestCases []byte `json:"testcases,omitempty"`
}
func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) {
func newGenericMessage(t string, data map[string][]byte, nodeID string) (msg *genericMessage) {
return &genericMessage{
Type: t,
Data: data,
@@ -38,7 +38,7 @@ func newQuitMessage(nodeID string) (msg *genericMessage) {
}
}
func newMessageToWorker(t string, profile []byte, data map[string]int64, tasks []byte) (msg *genericMessage) {
func newMessageToWorker(t string, profile []byte, data map[string][]byte, tasks []byte) (msg *genericMessage) {
return &genericMessage{
Type: t,
Profile: profile,

View File

@@ -52,6 +52,7 @@ const (
reportStatsInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
heartbeatLiveness = 3 * time.Second
reconnectInterval = 3 * time.Second
)
type Loop struct {
@@ -392,10 +393,11 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
for {
select {
case <-quit:
atomic.AddInt64(&r.controller.currentClientsNum, -1)
r.controller.increaseFinishedCount()
return
default:
if workerLoop != nil && !workerLoop.acquire() {
r.controller.increaseFinishedCount()
return
}
if r.rateLimitEnabled {
@@ -415,6 +417,8 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
workerLoop.increaseFinishedCount()
if r.loop.isFinished() {
go r.stop()
r.controller.increaseFinishedCount()
return
}
}
if r.controller.erase() {
@@ -621,7 +625,8 @@ func (r *localRunner) start() {
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stoppingChan, nil)
defer func() {
r.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
// block concurrent waitgroup adds in GoAttach while stopping
r.wgMu.Lock()
r.updateState(StateStopping)
close(r.stoppingChan)
close(r.rebalance)
@@ -634,10 +639,8 @@ func (r *localRunner) start() {
// wait until all stats are reported successfully
<-r.reportedChan
// report test result
r.reportTestResult()
// output teardown
r.outputOnStop()
@@ -693,8 +696,8 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
}
func (r *workerRunner) spawnComplete() {
data := make(map[string]int64)
data["count"] = r.controller.getSpawnCount()
data := make(map[string][]byte)
data["count"] = builtin.Int64ToBytes(r.controller.getSpawnCount())
r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID)
}
@@ -755,8 +758,6 @@ func (r *workerRunner) onMessage(msg *genericMessage) {
r.onRebalanceMessage(msg)
case "stop":
r.stop()
log.Info().Msg("Recv stop message from master, all the goroutines are stopped")
r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID)
case "quit":
r.stop()
if r.ignoreQuit {
@@ -780,6 +781,10 @@ func (r *workerRunner) onMessage(msg *genericMessage) {
}
}
func (r *workerRunner) onStopped() {
r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID)
}
func (r *workerRunner) onQuiting() {
if r.getState() != StateQuitting {
r.client.sendChannel() <- newQuitMessage(r.nodeID)
@@ -832,7 +837,7 @@ func (r *workerRunner) run() {
select {
case <-r.client.disconnectedChannel():
case <-ticker.C:
log.Warn().Msg("Timeout waiting for sending quit message to master, boomer will quit any way.")
log.Warn().Msg("timeout waiting for sending quit message to master, boomer will quit any way.")
}
// sign out from master
@@ -858,25 +863,31 @@ func (r *workerRunner) run() {
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&r.client.failCount) > 2 {
r.updateState(StateMissing)
}
if r.getState() == StateMissing {
err = r.client.register(r.client.config.ctx)
if err != nil {
continue
}
if r.client.newBiStreamClient() == nil {
r.updateState(StateInit)
err = r.client.newBiStreamClient()
if err != nil {
continue
}
r.updateState(StateInit)
}
CPUUsage := GetCurrentCPUUsage()
MemoryUsage := GetCurrentMemoryUsage()
data := map[string]int64{
"state": int64(r.getState()),
"current_cpu_usage": int64(CPUUsage), // percentage
"current_memory_usage": int64(MemoryUsage), // percentage
"current_users": r.controller.getCurrentClientsNum(),
if atomic.LoadInt32(&r.client.failCount) > 2 {
r.updateState(StateMissing)
}
CPUUsage := GetCurrentCPUPercent()
MemoryUsage := GetCurrentMemoryPercent()
PidCPUUsage := GetCurrentPidCPUUsage()
PidMemoryUsage := GetCurrentPidMemoryUsage()
data := map[string][]byte{
"state": builtin.Int64ToBytes(int64(r.getState())),
"current_cpu_usage": builtin.Float64ToByte(CPUUsage),
"current_pid_cpu_usage": builtin.Float64ToByte(PidCPUUsage),
"current_memory_usage": builtin.Float64ToByte(MemoryUsage),
"current_pid_memory_usage": builtin.Float64ToByte(PidMemoryUsage),
"current_users": builtin.Int64ToBytes(r.controller.getCurrentClientsNum()),
}
r.client.sendChannel() <- newGenericMessage("heartbeat", data, r.nodeID)
case <-r.closeChan:
@@ -901,7 +912,8 @@ func (r *workerRunner) start() {
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stoppingChan, r.spawnComplete)
defer func() {
r.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
// block concurrent waitgroup adds in GoAttach while stopping
r.wgMu.Lock()
r.updateState(StateStopping)
close(r.stoppingChan)
close(r.rebalance)
@@ -912,9 +924,13 @@ func (r *workerRunner) start() {
close(r.doneChan)
// notify master that worker is stopped
r.onStopped()
// wait until all stats are reported successfully
<-r.reportedChan
// report test result
r.reportTestResult()
// output teardown
r.outputOnStop()
}()
@@ -991,13 +1007,16 @@ func (r *masterRunner) heartbeatWorker() {
if !ok {
log.Error().Msg("failed to get worker information")
}
if atomic.LoadInt32(&workerInfo.Heartbeat) <= 0 && workerInfo.getState() != StateMissing {
workerInfo.setState(StateMissing)
if atomic.LoadInt32(&workerInfo.Heartbeat) <= 0 {
if workerInfo.getState() != StateMissing {
workerInfo.setState(StateMissing)
}
if r.getState() == StateRunning {
// all running workers missed, stopping runner
if r.server.getClientsLength() <= 0 {
r.updateState(StateStopped)
}
return true
}
} else {
atomic.AddInt32(&workerInfo.Heartbeat, -1)
@@ -1041,19 +1060,15 @@ func (r *masterRunner) clientListener() {
r.updateState(StateStopped)
}
case typeHeartbeat:
if workerInfo.getState() != int32(msg.Data["state"]) {
workerInfo.setState(int32(msg.Data["state"]))
if workerInfo.getState() != int32(builtin.BytesToInt64(msg.Data["state"])) {
workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"])))
}
workerInfo.updateHeartbeat(3)
if workerInfo.getCPUUsage() != float64(msg.Data["current_cpu_usage"]) {
workerInfo.updateCPUUsage(float64(msg.Data["current_cpu_usage"]))
}
if workerInfo.getMemoryUsage() != float64(msg.Data["current_memory_usage"]) {
workerInfo.updateMemoryUsage(float64(msg.Data["current_memory_usage"]))
}
if workerInfo.getSpawnCount() != msg.Data["current_users"] {
workerInfo.updateSpawnCount(msg.Data["current_users"])
}
workerInfo.updateCPUUsage(builtin.ByteToFloat64(msg.Data["current_cpu_usage"]))
workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(msg.Data["current_pid_cpu_usage"]))
workerInfo.updateMemoryUsage(builtin.ByteToFloat64(msg.Data["current_memory_usage"]))
workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(msg.Data["current_pid_memory_usage"]))
workerInfo.updateUserCount(builtin.BytesToInt64(msg.Data["current_users"]))
case typeSpawning:
workerInfo.setState(StateSpawning)
case typeSpawningComplete:
@@ -1095,12 +1110,6 @@ func (r *masterRunner) run() {
}
defer func() {
// disconnecting workers
close(r.server.disconnectedChan)
// waiting to close bidirectional stream
r.server.wg.Wait()
// close server
r.server.close()
}()
@@ -1191,7 +1200,6 @@ func (r *masterRunner) start() error {
workerInfo.getStream() <- &messager.StreamResponse{
Type: "spawn",
Profile: ProfileToBytes(workerProfile),
Data: map[string]int64{},
NodeID: workerInfo.ID,
Tasks: testcase,
}
@@ -1245,7 +1253,6 @@ func (r *masterRunner) rebalance() error {
workerInfo.getStream() <- &messager.StreamResponse{
Type: "spawn",
Profile: ProfileToBytes(workerProfile),
Data: map[string]int64{},
NodeID: workerInfo.ID,
Tasks: r.tcb,
}
@@ -1253,7 +1260,6 @@ func (r *masterRunner) rebalance() error {
workerInfo.getStream() <- &messager.StreamResponse{
Type: "rebalance",
Profile: ProfileToBytes(workerProfile),
Data: map[string]int64{},
NodeID: workerInfo.ID,
}
}
@@ -1284,7 +1290,7 @@ func (r *masterRunner) fetchTestCase() ([]byte, error) {
func (r *masterRunner) stop() error {
if r.isStarting() {
r.updateState(StateStopping)
r.server.sendBroadcasts(&genericMessage{Type: "stop", Data: map[string]int64{}})
r.server.sendBroadcasts(&genericMessage{Type: "stop"})
return nil
} else {
return errors.New("already stopped")
@@ -1314,16 +1320,16 @@ func (r *masterRunner) reportStats() {
table := tablewriter.NewWriter(os.Stdout)
table.SetColMinWidth(0, 20)
table.SetColMinWidth(1, 10)
table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage (%)", "Memory Usage (%)"})
table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU (%)", "Memory (%)"})
for _, worker := range r.server.getAllWorkers() {
row := make([]string, 6)
row[0] = worker.ID
row[1] = worker.IP
row[2] = fmt.Sprintf("%v", getStateName(worker.getState()))
row[3] = fmt.Sprintf("%v", worker.getSpawnCount())
row[4] = fmt.Sprintf("%v", worker.getCPUUsage())
row[5] = fmt.Sprintf("%v", worker.getMemoryUsage())
row[3] = fmt.Sprintf("%v", worker.getUserCount())
row[4] = fmt.Sprintf("%.2f", worker.getCPUUsage())
row[5] = fmt.Sprintf("%.2f", worker.getMemoryUsage())
table.Append(row)
}
table.Render()

View File

@@ -25,20 +25,24 @@ import (
type WorkerNode struct {
ID string `json:"id"`
IP string `json:"ip"`
OS string `json:"os"`
Arch string `json:"arch"`
State int32 `json:"state"`
Heartbeat int32 `json:"heartbeat"`
SpawnCount int64 `json:"spawn_count"`
UserCount int64 `json:"user_count"`
WorkerCPUUsage float64 `json:"worker_cpu_usage"`
CPUUsage float64 `json:"cpu_usage"`
CPUWarningEmitted bool `json:"cpu_warning_emitted"`
WorkerMemoryUsage float64 `json:"worker_memory_usage"`
MemoryUsage float64 `json:"memory_usage"`
stream chan *messager.StreamResponse
mutex sync.RWMutex
disconnectedChan chan bool
}
func newWorkerNode(id, ip string) *WorkerNode {
func newWorkerNode(id, ip, os, arch string) *WorkerNode {
stream := make(chan *messager.StreamResponse, 100)
return &WorkerNode{State: StateInit, ID: id, IP: ip, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)}
return &WorkerNode{State: StateInit, ID: id, IP: ip, OS: os, Arch: arch, Heartbeat: 3, stream: stream, disconnectedChan: make(chan bool)}
}
func (w *WorkerNode) getState() int32 {
@@ -57,12 +61,12 @@ func (w *WorkerNode) getHeartbeat() int32 {
return atomic.LoadInt32(&w.Heartbeat)
}
func (w *WorkerNode) updateSpawnCount(spawnCount int64) {
atomic.StoreInt64(&w.SpawnCount, spawnCount)
func (w *WorkerNode) updateUserCount(spawnCount int64) {
atomic.StoreInt64(&w.UserCount, spawnCount)
}
func (w *WorkerNode) getSpawnCount() int64 {
return atomic.LoadInt64(&w.SpawnCount)
func (w *WorkerNode) getUserCount() int64 {
return atomic.LoadInt64(&w.UserCount)
}
func (w *WorkerNode) updateCPUUsage(cpuUsage float64) {
@@ -77,6 +81,18 @@ func (w *WorkerNode) getCPUUsage() float64 {
return w.CPUUsage
}
func (w *WorkerNode) updateWorkerCPUUsage(workerCPUUsage float64) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.WorkerCPUUsage = workerCPUUsage
}
func (w *WorkerNode) getWorkerCPUUsage() float64 {
w.mutex.RLock()
defer w.mutex.RUnlock()
return w.WorkerCPUUsage
}
func (w *WorkerNode) updateCPUWarningEmitted(cpuWarningEmitted bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -89,6 +105,18 @@ func (w *WorkerNode) getCPUWarningEmitted() bool {
return w.CPUWarningEmitted
}
func (w *WorkerNode) updateWorkerMemoryUsage(workerMemoryUsage float64) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.WorkerMemoryUsage = workerMemoryUsage
}
func (w *WorkerNode) getWorkerMemoryUsage() float64 {
w.mutex.RLock()
defer w.mutex.RUnlock()
return w.WorkerMemoryUsage
}
func (w *WorkerNode) updateMemoryUsage(memoryUsage float64) {
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -119,11 +147,15 @@ func (w *WorkerNode) getWorkerInfo() WorkerNode {
return WorkerNode{
ID: w.ID,
IP: w.IP,
OS: w.OS,
Arch: w.Arch,
State: w.getState(),
Heartbeat: w.getHeartbeat(),
SpawnCount: w.getSpawnCount(),
UserCount: w.getUserCount(),
WorkerCPUUsage: w.getWorkerCPUUsage(),
CPUUsage: w.getCPUUsage(),
CPUWarningEmitted: w.getCPUWarningEmitted(),
WorkerMemoryUsage: w.getWorkerMemoryUsage(),
MemoryUsage: w.getMemoryUsage(),
}
}
@@ -267,7 +299,7 @@ func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest
p, _ := peer.FromContext(ctx)
clientIp := strings.Split(p.Addr.String(), ":")[0]
// store worker information
wn := newWorkerNode(req.NodeID, clientIp)
wn := newWorkerNode(req.NodeID, clientIp, req.Os, req.Arch)
s.clients.Store(req.NodeID, wn)
log.Warn().Str("worker id", req.NodeID).Msg("worker joined")
return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil
@@ -415,6 +447,12 @@ func (s *grpcServer) close() {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
s.stopServer(ctx)
cancel()
// disconnecting workers
close(s.disconnectedChan)
// waiting to close bidirectional stream
s.wg.Wait()
}
func (s *grpcServer) recvChannel() chan *genericMessage {

View File

@@ -13,6 +13,8 @@ import (
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
)
@@ -88,8 +90,8 @@ func getNodeID() (nodeID string) {
return
}
// GetCurrentCPUUsage get current CPU usage
func GetCurrentCPUUsage() float64 {
// GetCurrentPidCPUUsage get current pid CPU usage
func GetCurrentPidCPUUsage() float64 {
currentPid := os.Getpid()
p, err := process.NewProcess(int32(currentPid))
if err != nil {
@@ -104,8 +106,8 @@ func GetCurrentCPUUsage() float64 {
return percent
}
// GetCurrentCPUPercent get the percentage of current cpu used
func GetCurrentCPUPercent() float64 {
// GetCurrentPidCPUPercent get the percentage of current pid cpu used
func GetCurrentPidCPUPercent() float64 {
currentPid := os.Getpid()
p, err := process.NewProcess(int32(currentPid))
if err != nil {
@@ -120,8 +122,20 @@ func GetCurrentCPUPercent() float64 {
return percent
}
// GetCurrentMemoryUsage get current Memory usage
func GetCurrentMemoryUsage() float64 {
// GetCurrentCPUPercent get the percentage of current cpu used
func GetCurrentCPUPercent() float64 {
percent, _ := cpu.Percent(time.Second, false)
return percent[0]
}
// GetCurrentMemoryPercent get the percentage of current memory used
func GetCurrentMemoryPercent() float64 {
memInfo, _ := mem.VirtualMemory()
return memInfo.UsedPercent
}
// GetCurrentPidMemoryUsage get current Memory usage
func GetCurrentPidMemoryUsage() float64 {
currentPid := os.Getpid()
p, err := process.NewProcess(int32(currentPid))
if err != nil {

View File

@@ -3,9 +3,11 @@ package builtin
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/csv"
builtinJSON "encoding/json"
"fmt"
"math"
"math/rand"
"os"
"os/exec"
@@ -505,6 +507,42 @@ func Bytes2File(data []byte, filename string) error {
return nil
}
func Float32ToByte(v float32) []byte {
bits := math.Float32bits(v)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func ByteToFloat32(v []byte) float32 {
bits := binary.LittleEndian.Uint32(v)
return math.Float32frombits(bits)
}
func Float64ToByte(v float64) []byte {
bits := math.Float64bits(v)
bts := make([]byte, 8)
binary.LittleEndian.PutUint64(bts, bits)
return bts
}
func ByteToFloat64(v []byte) float64 {
bits := binary.LittleEndian.Uint64(v)
return math.Float64frombits(bits)
}
func Int64ToBytes(n int64) []byte {
bytesBuf := bytes.NewBuffer([]byte{})
_ = binary.Write(bytesBuf, binary.BigEndian, n)
return bytesBuf.Bytes()
}
func BytesToInt64(bys []byte) (data int64) {
byteBuff := bytes.NewBuffer(bys)
_ = binary.Read(byteBuff, binary.BigEndian, &data)
return
}
func SplitInteger(m, n int) (ints []int) {
quotient := m / n
remainder := m % n