mirror of
https://github.com/httprunner/httprunner.git
synced 2026-05-12 02:21:29 +08:00
fix: state machine
This commit is contained in:
@@ -57,7 +57,6 @@ type HRPBoomer struct {
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) InitBoomer() {
|
||||
// init output
|
||||
if !b.GetProfile().DisableConsoleOutput {
|
||||
b.AddOutput(boomer.NewConsoleOutput())
|
||||
}
|
||||
@@ -164,7 +163,7 @@ func (b *HRPBoomer) TestCasesToBytes(testcases ...ITestCase) []byte {
|
||||
return testCasesBytes
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) BytesToTestCases(testCasesBytes []byte) []*TCase {
|
||||
func (b *HRPBoomer) BytesToTCases(testCasesBytes []byte) []*TCase {
|
||||
var testcase []*TCase
|
||||
err := json.Unmarshal(testCasesBytes, &testcase)
|
||||
if err != nil {
|
||||
@@ -177,8 +176,7 @@ func (b *HRPBoomer) Quit() {
|
||||
b.Boomer.Quit()
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) {
|
||||
var testcases []ITestCase
|
||||
func (b *HRPBoomer) parseTCases(testCases []*TCase) (testcases []ITestCase) {
|
||||
for _, tc := range testCases {
|
||||
tesecase, err := tc.toTestCase()
|
||||
if err != nil {
|
||||
@@ -209,7 +207,11 @@ func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) {
|
||||
|
||||
testcases = append(testcases, tesecase)
|
||||
}
|
||||
return testcases
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) initWorker(profile *boomer.Profile) {
|
||||
// if no IP address is specified, the default IP address is that of the master
|
||||
if profile.PrometheusPushgatewayURL != "" {
|
||||
urlSlice := strings.Split(profile.PrometheusPushgatewayURL, ":")
|
||||
if len(urlSlice) != 2 {
|
||||
@@ -224,16 +226,13 @@ func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) {
|
||||
|
||||
b.SetProfile(profile)
|
||||
b.InitBoomer()
|
||||
log.Info().Interface("testcases", testcases).Interface("profile", profile).Msg("run tasks successful")
|
||||
b.Run(testcases...)
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) rebalanceBoomer(profile *boomer.Profile) {
|
||||
b.SetProfile(profile)
|
||||
b.SetSpawnCount(b.GetProfile().SpawnCount)
|
||||
b.SetSpawnRate(b.GetProfile().SpawnRate)
|
||||
func (b *HRPBoomer) rebalanceRunner(profile *boomer.Profile) {
|
||||
b.SetSpawnCount(profile.SpawnCount)
|
||||
b.SetSpawnRate(profile.SpawnRate)
|
||||
b.GetRebalanceChan() <- true
|
||||
log.Info().Interface("profile", profile).Msg("rebalance tasks successful")
|
||||
log.Info().Interface("profile", profile).Msg("rebalance tasks successfully")
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) PollTasks(ctx context.Context) {
|
||||
@@ -245,11 +244,17 @@ func (b *HRPBoomer) PollTasks(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
//Todo: 过滤掉已经传输过的task
|
||||
if task.TestCases != nil {
|
||||
testCases := b.BytesToTestCases(task.TestCases)
|
||||
go b.runTestCases(testCases, task.Profile)
|
||||
if task.TestCasesBytes != nil {
|
||||
// init boomer with profile
|
||||
b.initWorker(task.Profile)
|
||||
// get testcases
|
||||
testcases := b.parseTCases(b.BytesToTCases(task.TestCasesBytes))
|
||||
log.Info().Interface("testcases", testcases).Interface("profile", b.GetProfile()).Msg("starting to run tasks")
|
||||
// run testcases
|
||||
go b.Run(testcases...)
|
||||
} else {
|
||||
go b.rebalanceBoomer(task.Profile)
|
||||
// rebalance runner with profile
|
||||
go b.rebalanceRunner(task.Profile)
|
||||
}
|
||||
|
||||
case <-b.Boomer.GetCloseChan():
|
||||
@@ -261,6 +266,15 @@ func (b *HRPBoomer) PollTasks(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (b *HRPBoomer) PollTestCases(ctx context.Context) {
|
||||
// quit all plugins
|
||||
defer func() {
|
||||
if len(pluginMap) > 0 {
|
||||
for _, plugin := range pluginMap {
|
||||
plugin.Quit()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-b.Boomer.ParseTestCasesChan():
|
||||
@@ -270,7 +284,7 @@ func (b *HRPBoomer) PollTestCases(ctx context.Context) {
|
||||
tcs = append(tcs, &tcp)
|
||||
}
|
||||
b.TestCaseBytesChan() <- b.TestCasesToBytes(tcs...)
|
||||
log.Info().Msg("put testcase successful")
|
||||
log.Info().Msg("put testcase successfully")
|
||||
case <-b.Boomer.GetCloseChan():
|
||||
return
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -63,6 +63,18 @@ type Profile struct {
|
||||
DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty" mapstructure:"disable-keepalive,omitempty"`
|
||||
}
|
||||
|
||||
func NewProfile() *Profile {
|
||||
return &Profile{
|
||||
SpawnCount: 1,
|
||||
SpawnRate: 1,
|
||||
MaxRPS: -1,
|
||||
LoopCount: -1,
|
||||
RequestIncreaseRate: "-1",
|
||||
CPUProfileDuration: 30 * time.Second,
|
||||
MemoryProfileDuration: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Boomer) GetProfile() *Profile {
|
||||
switch b.mode {
|
||||
case DistributedMasterMode:
|
||||
@@ -158,7 +170,18 @@ func (b *Boomer) RunWorker() {
|
||||
|
||||
// TestCaseBytesChan gets test case bytes chan
|
||||
func (b *Boomer) TestCaseBytesChan() chan []byte {
|
||||
return b.masterRunner.testCaseBytes
|
||||
return b.masterRunner.testCaseBytesChan
|
||||
}
|
||||
|
||||
func (b *Boomer) GetTestCaseBytes() []byte {
|
||||
switch b.mode {
|
||||
case DistributedMasterMode:
|
||||
return b.masterRunner.testCasesBytes
|
||||
case DistributedWorkerMode:
|
||||
return b.workerRunner.testCasesBytes
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func ProfileToBytes(profile *Profile) []byte {
|
||||
@@ -192,7 +215,7 @@ func (b *Boomer) GetTasksChan() chan *task {
|
||||
func (b *Boomer) GetRebalanceChan() chan bool {
|
||||
switch b.mode {
|
||||
case DistributedWorkerMode:
|
||||
return b.workerRunner.rebalance
|
||||
return b.workerRunner.controller.getRebalanceChan()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
@@ -469,6 +492,9 @@ func (b *Boomer) Start(Args *Profile) error {
|
||||
if b.masterRunner.isStopping() {
|
||||
return errors.New("Please wait for all workers to finish")
|
||||
}
|
||||
if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() {
|
||||
return errors.New("spawn count should be greater than available worker count")
|
||||
}
|
||||
b.SetSpawnCount(Args.SpawnCount)
|
||||
b.SetSpawnRate(Args.SpawnRate)
|
||||
b.SetProfile(Args)
|
||||
@@ -481,6 +507,9 @@ func (b *Boomer) ReBalance(Args *Profile) error {
|
||||
if !b.masterRunner.isStarting() {
|
||||
return errors.New("no start")
|
||||
}
|
||||
if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() {
|
||||
return errors.New("spawn count should be greater than available worker count")
|
||||
}
|
||||
b.SetSpawnCount(Args.SpawnCount)
|
||||
b.SetSpawnRate(Args.SpawnRate)
|
||||
b.SetProfile(Args)
|
||||
@@ -505,8 +534,9 @@ func (b *Boomer) GetWorkersInfo() []WorkerNode {
|
||||
func (b *Boomer) GetMasterInfo() map[string]interface{} {
|
||||
masterInfo := make(map[string]interface{})
|
||||
masterInfo["state"] = b.masterRunner.getState()
|
||||
masterInfo["workers"] = b.masterRunner.server.getClientsLength()
|
||||
masterInfo["workers"] = b.masterRunner.server.getAvailableClientsLength()
|
||||
masterInfo["target_users"] = b.masterRunner.getSpawnCount()
|
||||
masterInfo["current_users"] = b.masterRunner.server.getCurrentUsers()
|
||||
return masterInfo
|
||||
}
|
||||
|
||||
|
||||
@@ -313,11 +313,12 @@ func (c *grpcClient) sendMessage(msg *genericMessage) {
|
||||
return
|
||||
}
|
||||
err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID})
|
||||
switch err {
|
||||
case nil:
|
||||
if err == nil {
|
||||
atomic.StoreInt32(&c.failCount, 0)
|
||||
default:
|
||||
//log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message")
|
||||
return
|
||||
}
|
||||
//log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message")
|
||||
if msg.Type == "heartbeat" {
|
||||
atomic.AddInt32(&c.failCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ type genericMessage struct {
|
||||
}
|
||||
|
||||
type task struct {
|
||||
Profile *Profile `json:"profile,omitempty"`
|
||||
TestCases []byte `json:"testcases,omitempty"`
|
||||
Profile *Profile `json:"profile,omitempty"`
|
||||
TestCasesBytes []byte `json:"testcases,omitempty"`
|
||||
}
|
||||
|
||||
func newGenericMessage(t string, data map[string][]byte, nodeID string) (msg *genericMessage) {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -454,8 +455,8 @@ var (
|
||||
)
|
||||
|
||||
var (
|
||||
minResponseTimeMap = map[string]float64{}
|
||||
maxResponseTimeMap = map[string]float64{}
|
||||
minResponseTimeMap = sync.Map{}
|
||||
maxResponseTimeMap = sync.Map{}
|
||||
)
|
||||
|
||||
// NewPrometheusPusherOutput returns a PrometheusPusherOutput.
|
||||
@@ -577,19 +578,19 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) {
|
||||
}
|
||||
// every stat in total
|
||||
key := fmt.Sprintf("%v_%v", method, name)
|
||||
if _, ok := minResponseTimeMap[key]; !ok {
|
||||
minResponseTimeMap[key] = float64(stat.MinResponseTime)
|
||||
} else {
|
||||
minResponseTimeMap[key] = math.Min(float64(stat.MinResponseTime), minResponseTimeMap[key])
|
||||
minResponseTime, loaded := minResponseTimeMap.LoadOrStore(key, float64(stat.MinResponseTime))
|
||||
if loaded {
|
||||
minResponseTime = math.Min(minResponseTime.(float64), float64(stat.MinResponseTime))
|
||||
minResponseTimeMap.Store(key, minResponseTime)
|
||||
}
|
||||
gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTimeMap[key])
|
||||
gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTime.(float64))
|
||||
|
||||
if _, ok := maxResponseTimeMap[key]; !ok {
|
||||
maxResponseTimeMap[key] = float64(stat.MaxResponseTime)
|
||||
} else {
|
||||
maxResponseTimeMap[key] = math.Max(float64(stat.MaxResponseTime), maxResponseTimeMap[key])
|
||||
maxResponseTime, loaded := maxResponseTimeMap.LoadOrStore(key, float64(stat.MaxResponseTime))
|
||||
if loaded {
|
||||
maxResponseTime = math.Max(maxResponseTime.(float64), float64(stat.MaxResponseTime))
|
||||
maxResponseTimeMap.Store(key, maxResponseTime)
|
||||
}
|
||||
gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTimeMap[key])
|
||||
gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTime.(float64))
|
||||
|
||||
counterTotalNumRequests.WithLabelValues(method, name).Add(float64(stat.NumRequests))
|
||||
counterTotalNumFailures.WithLabelValues(method, name).Add(float64(stat.NumFailures))
|
||||
@@ -639,4 +640,7 @@ func resetPrometheusMetrics() {
|
||||
gaugeTotalFailPerSec.Set(0)
|
||||
gaugeTransactionsPassed.Set(0)
|
||||
gaugeTransactionsFailed.Set(0)
|
||||
|
||||
minResponseTimeMap = sync.Map{}
|
||||
maxResponseTimeMap = sync.Map{}
|
||||
}
|
||||
|
||||
@@ -49,10 +49,10 @@ func getStateName(state int32) (stateName string) {
|
||||
}
|
||||
|
||||
const (
|
||||
reportStatsInterval = 3 * time.Second
|
||||
heartbeatInterval = 1 * time.Second
|
||||
heartbeatLiveness = 3 * time.Second
|
||||
reconnectInterval = 3 * time.Second
|
||||
reportStatsInterval = 3 * time.Second
|
||||
heartbeatInterval = 1 * time.Second
|
||||
heartbeatLiveness = 3 * time.Second
|
||||
stateMachineInterval = 1 * time.Second
|
||||
)
|
||||
|
||||
type Loop struct {
|
||||
@@ -86,6 +86,7 @@ type Controller struct {
|
||||
currentClientsNum int64 // current clients count
|
||||
spawnCount int64 // target clients to spawn
|
||||
spawnRate float64
|
||||
rebalance chan bool // dynamically balance boomer running parameters
|
||||
spawnDone chan struct{}
|
||||
tasks []*Task
|
||||
}
|
||||
@@ -143,6 +144,12 @@ func (c *Controller) spawnCompete() {
|
||||
close(c.spawnDone)
|
||||
}
|
||||
|
||||
func (c *Controller) getRebalanceChan() chan bool {
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
return c.rebalance
|
||||
}
|
||||
|
||||
func (c *Controller) isFinished() bool {
|
||||
// return true when workers acquired
|
||||
return atomic.LoadInt64(&c.currentClientsNum) == atomic.LoadInt64(&c.spawnCount)
|
||||
@@ -178,6 +185,7 @@ func (c *Controller) reset() {
|
||||
c.spawnRate = 0
|
||||
atomic.StoreInt64(&c.currentClientsNum, 0)
|
||||
c.spawnDone = make(chan struct{})
|
||||
c.rebalance = make(chan bool)
|
||||
c.tasks = []*Task{}
|
||||
c.once = sync.Once{}
|
||||
}
|
||||
@@ -199,9 +207,6 @@ type runner struct {
|
||||
controller *Controller
|
||||
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
|
||||
|
||||
// dynamically balance boomer running parameters
|
||||
rebalance chan bool
|
||||
|
||||
// stop signals the run goroutine should shutdown.
|
||||
stopChan chan bool
|
||||
// all running workers(goroutines) will select on this channel.
|
||||
@@ -358,7 +363,6 @@ func (r *runner) reportTestResult() {
|
||||
func (r *runner) reset() {
|
||||
r.controller.reset()
|
||||
r.stats.clearAll()
|
||||
r.rebalance = make(chan bool)
|
||||
r.stoppingChan = make(chan bool)
|
||||
r.doneChan = make(chan bool)
|
||||
r.reportedChan = make(chan bool)
|
||||
@@ -430,16 +434,18 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
|
||||
continue
|
||||
}
|
||||
|
||||
r.controller.once.Do(func() {
|
||||
// spawning compete
|
||||
r.controller.spawnCompete()
|
||||
if spawnCompleteFunc != nil {
|
||||
spawnCompleteFunc()
|
||||
}
|
||||
r.updateState(StateRunning)
|
||||
})
|
||||
r.controller.once.Do(
|
||||
func() {
|
||||
// spawning compete
|
||||
r.controller.spawnCompete()
|
||||
if spawnCompleteFunc != nil {
|
||||
spawnCompleteFunc()
|
||||
}
|
||||
r.updateState(StateRunning)
|
||||
},
|
||||
)
|
||||
|
||||
<-r.rebalance
|
||||
<-r.controller.getRebalanceChan()
|
||||
if r.isStarting() {
|
||||
// rebalance spawn count
|
||||
r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate())
|
||||
@@ -629,7 +635,7 @@ func (r *localRunner) start() {
|
||||
r.wgMu.Lock()
|
||||
r.updateState(StateStopping)
|
||||
close(r.stoppingChan)
|
||||
close(r.rebalance)
|
||||
close(r.controller.rebalance)
|
||||
r.wgMu.Unlock()
|
||||
|
||||
// wait for goroutines before closing
|
||||
@@ -668,7 +674,8 @@ type workerRunner struct {
|
||||
masterPort int
|
||||
client *grpcClient
|
||||
|
||||
profile *Profile
|
||||
profile *Profile
|
||||
testCasesBytes []byte
|
||||
|
||||
tasksChan chan *task
|
||||
|
||||
@@ -714,10 +721,10 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
|
||||
log.Error().Msg("miss tasks")
|
||||
}
|
||||
r.tasksChan <- &task{
|
||||
Profile: profile,
|
||||
TestCases: msg.Tasks,
|
||||
Profile: profile,
|
||||
TestCasesBytes: msg.Tasks,
|
||||
}
|
||||
log.Info().Msg("on spawn message successful")
|
||||
log.Info().Msg("on spawn message successfully")
|
||||
}
|
||||
|
||||
func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
|
||||
@@ -731,7 +738,7 @@ func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
|
||||
r.tasksChan <- &task{
|
||||
Profile: profile,
|
||||
}
|
||||
log.Info().Msg("on rebalance message successful")
|
||||
log.Info().Msg("on rebalance message successfully")
|
||||
}
|
||||
|
||||
// Runner acts as a state machine.
|
||||
@@ -831,6 +838,9 @@ func (r *workerRunner) run() {
|
||||
// wait for goroutines before closing
|
||||
r.wg.Wait()
|
||||
|
||||
// notify master that worker is quitting
|
||||
r.onQuiting()
|
||||
|
||||
var ticker = time.NewTicker(1 * time.Second)
|
||||
if r.client != nil {
|
||||
// waitting for quit message is sent to master
|
||||
@@ -879,6 +889,7 @@ func (r *workerRunner) run() {
|
||||
if !r.isStarting() && !r.isStopping() {
|
||||
r.updateState(StateMissing)
|
||||
}
|
||||
continue
|
||||
}
|
||||
CPUUsage := GetCurrentCPUPercent()
|
||||
MemoryUsage := GetCurrentMemoryPercent()
|
||||
@@ -918,13 +929,18 @@ func (r *workerRunner) start() {
|
||||
// block concurrent waitgroup adds in GoAttach while stopping
|
||||
r.wgMu.Lock()
|
||||
r.updateState(StateStopping)
|
||||
close(r.controller.rebalance)
|
||||
close(r.stoppingChan)
|
||||
close(r.rebalance)
|
||||
r.wgMu.Unlock()
|
||||
|
||||
// wait for goroutines before closing
|
||||
r.wg.Wait()
|
||||
|
||||
// reset loop
|
||||
if r.loop != nil {
|
||||
r.loop = nil
|
||||
}
|
||||
|
||||
close(r.doneChan)
|
||||
|
||||
// wait until all stats are reported successfully
|
||||
@@ -951,7 +967,6 @@ func (r *workerRunner) stop() {
|
||||
}
|
||||
|
||||
func (r *workerRunner) close() {
|
||||
r.onQuiting()
|
||||
close(r.closeChan)
|
||||
}
|
||||
|
||||
@@ -970,8 +985,8 @@ type masterRunner struct {
|
||||
profile *Profile
|
||||
|
||||
parseTestCasesChan chan bool
|
||||
testCaseBytes chan []byte
|
||||
tcb []byte
|
||||
testCaseBytesChan chan []byte
|
||||
testCasesBytes []byte
|
||||
}
|
||||
|
||||
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
|
||||
@@ -988,7 +1003,7 @@ func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
|
||||
masterBindPort: masterBindPort,
|
||||
server: newServer(masterBindHost, masterBindPort),
|
||||
parseTestCasesChan: make(chan bool),
|
||||
testCaseBytes: make(chan []byte),
|
||||
testCaseBytesChan: make(chan []byte),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1011,23 +1026,15 @@ func (r *masterRunner) heartbeatWorker() {
|
||||
if !ok {
|
||||
log.Error().Msg("failed to get worker information")
|
||||
}
|
||||
if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 {
|
||||
if workerInfo.getState() == StateQuitting {
|
||||
return true
|
||||
}
|
||||
if workerInfo.getState() != StateMissing {
|
||||
workerInfo.setState(StateMissing)
|
||||
}
|
||||
if r.isStopping() {
|
||||
// all running workers missed, setting state to stopped
|
||||
if r.server.getClientsLength() <= 0 {
|
||||
r.updateState(StateStopped)
|
||||
go func() {
|
||||
if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 {
|
||||
if workerInfo.getState() != StateMissing {
|
||||
workerInfo.setState(StateMissing)
|
||||
}
|
||||
return true
|
||||
} else {
|
||||
atomic.AddInt32(&workerInfo.Heartbeat, -1)
|
||||
}
|
||||
} else {
|
||||
atomic.AddInt32(&workerInfo.Heartbeat, -1)
|
||||
}
|
||||
}()
|
||||
return true
|
||||
})
|
||||
case <-reportTicker.C:
|
||||
@@ -1051,72 +1058,85 @@ func (r *masterRunner) clientListener() {
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
switch msg.Type {
|
||||
case typeClientReady:
|
||||
workerInfo.setState(StateInit)
|
||||
if r.getState() == StateRunning {
|
||||
log.Warn().Str("worker id", workerInfo.ID).Msg("worker joined, ready to rebalance the load of each worker")
|
||||
go func() {
|
||||
switch msg.Type {
|
||||
case typeClientReady:
|
||||
workerInfo.setState(StateInit)
|
||||
case typeClientStopped:
|
||||
workerInfo.setState(StateStopped)
|
||||
case typeHeartbeat:
|
||||
if workerInfo.getState() == StateMissing {
|
||||
workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"])))
|
||||
}
|
||||
workerInfo.updateHeartbeat(3)
|
||||
currentCPUUsage, ok := msg.Data["current_cpu_usage"]
|
||||
if ok {
|
||||
workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage))
|
||||
}
|
||||
currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"]
|
||||
if ok {
|
||||
workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage))
|
||||
}
|
||||
currentMemoryUsage, ok := msg.Data["current_memory_usage"]
|
||||
if ok {
|
||||
workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage))
|
||||
}
|
||||
currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"]
|
||||
if ok {
|
||||
workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage))
|
||||
}
|
||||
currentUsers, ok := msg.Data["current_users"]
|
||||
if ok {
|
||||
workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers))
|
||||
}
|
||||
case typeSpawning:
|
||||
workerInfo.setState(StateSpawning)
|
||||
case typeSpawningComplete:
|
||||
workerInfo.setState(StateRunning)
|
||||
case typeQuit:
|
||||
if workerInfo.getState() == StateQuitting {
|
||||
break
|
||||
}
|
||||
workerInfo.setState(StateQuitting)
|
||||
case typeException:
|
||||
// Todo
|
||||
default:
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *masterRunner) stateMachine() {
|
||||
ticker := time.NewTicker(stateMachineInterval)
|
||||
for {
|
||||
select {
|
||||
case <-r.closeChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
switch r.getState() {
|
||||
case StateSpawning:
|
||||
if r.server.getCurrentUsers() == int(r.getSpawnCount()) {
|
||||
log.Warn().Msg("all workers spawn done, setting state as running")
|
||||
r.updateState(StateRunning)
|
||||
}
|
||||
case StateRunning:
|
||||
if r.server.getStartingClientsLength() == 0 {
|
||||
r.updateState(StateStopped)
|
||||
continue
|
||||
}
|
||||
if r.server.getWorkersLengthByState(StateInit) != 0 {
|
||||
err := r.rebalance()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to rebalance")
|
||||
}
|
||||
}
|
||||
case typeClientStopped:
|
||||
workerInfo.setState(StateStopped)
|
||||
if r.server.getWorkersLengthByState(StateStopped)+r.server.getWorkersLengthByState(StateInit) == r.server.getClientsLength() {
|
||||
case StateStopping:
|
||||
if r.server.getReadyClientsLength() == r.server.getAvailableClientsLength() {
|
||||
r.updateState(StateStopped)
|
||||
}
|
||||
case typeHeartbeat:
|
||||
if workerInfo.getState() != int32(builtin.BytesToInt64(msg.Data["state"])) {
|
||||
workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"])))
|
||||
}
|
||||
workerInfo.updateHeartbeat(3)
|
||||
currentCPUUsage, ok := msg.Data["current_cpu_usage"]
|
||||
if ok {
|
||||
workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage))
|
||||
}
|
||||
currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"]
|
||||
if ok {
|
||||
workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage))
|
||||
}
|
||||
currentMemoryUsage, ok := msg.Data["current_memory_usage"]
|
||||
if ok {
|
||||
workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage))
|
||||
}
|
||||
currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"]
|
||||
if ok {
|
||||
workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage))
|
||||
}
|
||||
currentUsers, ok := msg.Data["current_users"]
|
||||
if ok {
|
||||
workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers))
|
||||
}
|
||||
case typeSpawning:
|
||||
workerInfo.setState(StateSpawning)
|
||||
case typeSpawningComplete:
|
||||
workerInfo.setState(StateRunning)
|
||||
if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() {
|
||||
log.Warn().Msg("all workers spawn done, setting state as running")
|
||||
r.updateState(StateRunning)
|
||||
}
|
||||
case typeQuit:
|
||||
if workerInfo.getState() == StateQuitting {
|
||||
break
|
||||
}
|
||||
workerInfo.setState(StateQuitting)
|
||||
if r.isStarting() {
|
||||
if r.server.getClientsLength() > 0 {
|
||||
log.Warn().Str("worker id", workerInfo.ID).Msg("worker quited, ready to rebalance the load of each worker")
|
||||
err := r.rebalance()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to rebalance")
|
||||
}
|
||||
}
|
||||
}
|
||||
case typeException:
|
||||
// Todo
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1146,7 +1166,7 @@ func (r *masterRunner) run() {
|
||||
case <-r.closeChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
c := r.server.getClientsLength()
|
||||
c := r.server.getAvailableClientsLength()
|
||||
log.Info().Msg(fmt.Sprintf("expected worker number: %v, current worker count: %v", r.expectWorkers, c))
|
||||
if c >= r.expectWorkers {
|
||||
err = r.start()
|
||||
@@ -1165,6 +1185,9 @@ func (r *masterRunner) run() {
|
||||
}()
|
||||
}
|
||||
|
||||
// master state machine
|
||||
r.goAttach(r.stateMachine)
|
||||
|
||||
// listen and deal message from worker
|
||||
r.goAttach(r.clientListener)
|
||||
|
||||
@@ -1174,13 +1197,13 @@ func (r *masterRunner) run() {
|
||||
}
|
||||
|
||||
func (r *masterRunner) start() error {
|
||||
numWorkers := r.server.getClientsLength()
|
||||
numWorkers := r.server.getAvailableClientsLength()
|
||||
if numWorkers == 0 {
|
||||
return errors.New("current available workers: 0")
|
||||
}
|
||||
|
||||
// fetching testcase
|
||||
testcase, err := r.fetchTestCase()
|
||||
// fetching testcases
|
||||
testCasesBytes, err := r.fetchTestCases()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1223,19 +1246,19 @@ func (r *masterRunner) start() error {
|
||||
Type: "spawn",
|
||||
Profile: ProfileToBytes(workerProfile),
|
||||
NodeID: workerInfo.ID,
|
||||
Tasks: testcase,
|
||||
Tasks: testCasesBytes,
|
||||
}
|
||||
cur++
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successful")
|
||||
log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *masterRunner) rebalance() error {
|
||||
numWorkers := r.server.getClientsLength()
|
||||
numWorkers := r.server.getAvailableClientsLength()
|
||||
if numWorkers == 0 {
|
||||
return errors.New("current available workers: 0")
|
||||
}
|
||||
@@ -1276,7 +1299,7 @@ func (r *masterRunner) rebalance() error {
|
||||
Type: "spawn",
|
||||
Profile: ProfileToBytes(workerProfile),
|
||||
NodeID: workerInfo.ID,
|
||||
Tasks: r.tcb,
|
||||
Tasks: r.testCasesBytes,
|
||||
}
|
||||
} else {
|
||||
workerInfo.getStream() <- &messager.StreamResponse{
|
||||
@@ -1290,22 +1313,22 @@ func (r *masterRunner) rebalance() error {
|
||||
return true
|
||||
})
|
||||
|
||||
log.Warn().Msg("send rebalance data to worker successful")
|
||||
log.Warn().Msg("send rebalance data to worker successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *masterRunner) fetchTestCase() ([]byte, error) {
|
||||
func (r *masterRunner) fetchTestCases() ([]byte, error) {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
if len(r.testCaseBytes) > 0 {
|
||||
<-r.testCaseBytes
|
||||
if len(r.testCaseBytesChan) > 0 {
|
||||
<-r.testCaseBytesChan
|
||||
}
|
||||
r.parseTestCasesChan <- true
|
||||
select {
|
||||
case <-ticker.C:
|
||||
return nil, errors.New("parse testcases timeout")
|
||||
case tcb := <-r.testCaseBytes:
|
||||
r.tcb = tcb
|
||||
return tcb, nil
|
||||
case testCasesBytes := <-r.testCaseBytesChan:
|
||||
r.testCasesBytes = testCasesBytes
|
||||
return testCasesBytes, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1336,22 +1359,23 @@ func (r *masterRunner) close() {
|
||||
func (r *masterRunner) reportStats() {
|
||||
currentTime := time.Now()
|
||||
println()
|
||||
println("========================= HttpRunner Master for Distributed Load Testing ========================= ")
|
||||
println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v",
|
||||
currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount()))
|
||||
println("==================================== HttpRunner Master for Distributed Load Testing ==================================== ")
|
||||
println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v, Current Users: %v",
|
||||
currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getAvailableClientsLength(), r.getSpawnCount(), r.server.getCurrentUsers()))
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetColMinWidth(0, 20)
|
||||
table.SetColMinWidth(0, 40)
|
||||
table.SetColMinWidth(1, 10)
|
||||
table.SetColMinWidth(2, 10)
|
||||
table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage (%)", "Memory Usage (%)"})
|
||||
|
||||
for _, worker := range r.server.getAllWorkers() {
|
||||
row := make([]string, 6)
|
||||
row[0] = worker.ID
|
||||
row[1] = worker.IP
|
||||
row[2] = fmt.Sprintf("%v", getStateName(worker.getState()))
|
||||
row[3] = fmt.Sprintf("%v", worker.getUserCount())
|
||||
row[4] = fmt.Sprintf("%.2f", worker.getCPUUsage())
|
||||
row[5] = fmt.Sprintf("%.2f", worker.getMemoryUsage())
|
||||
row[2] = fmt.Sprintf("%v", getStateName(worker.State))
|
||||
row[3] = fmt.Sprintf("%v", worker.UserCount)
|
||||
row[4] = fmt.Sprintf("%.2f", worker.CPUUsage)
|
||||
row[5] = fmt.Sprintf("%.2f", worker.MemoryUsage)
|
||||
table.Append(row)
|
||||
}
|
||||
table.Render()
|
||||
|
||||
@@ -338,6 +338,7 @@ func TestOnQuitMessage(t *testing.T) {
|
||||
go runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
|
||||
close(runner.doneChan)
|
||||
<-runner.closeChan
|
||||
runner.onQuiting()
|
||||
if runner.getState() != StateQuitting {
|
||||
t.Error("Runner's state should be StateQuitting")
|
||||
}
|
||||
@@ -348,6 +349,7 @@ func TestOnQuitMessage(t *testing.T) {
|
||||
runner.client.shutdownChan = make(chan bool)
|
||||
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
|
||||
<-runner.closeChan
|
||||
runner.onQuiting()
|
||||
if runner.getState() != StateQuitting {
|
||||
t.Error("Runner's state should be StateQuitting")
|
||||
}
|
||||
@@ -395,7 +397,7 @@ func TestOnMessage(t *testing.T) {
|
||||
|
||||
// increase goroutines while running
|
||||
runner.onMessage(newMessageToWorker("rebalance", ProfileToBytes(&Profile{SpawnCount: 15, SpawnRate: 15}), nil, nil))
|
||||
runner.rebalance <- true
|
||||
runner.controller.rebalance <- true
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
if runner.getState() != StateRunning {
|
||||
@@ -460,6 +462,7 @@ func TestClientListener(t *testing.T) {
|
||||
runner.updateState(StateInit)
|
||||
runner.setSpawnCount(10)
|
||||
runner.setSpawnRate(10)
|
||||
go runner.stateMachine()
|
||||
go runner.clientListener()
|
||||
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)})
|
||||
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)})
|
||||
@@ -483,6 +486,7 @@ func TestClientListener(t *testing.T) {
|
||||
Type: typeClientStopped,
|
||||
NodeID: "testID2",
|
||||
}
|
||||
runner.updateState(StateRunning)
|
||||
worker2, ok := runner.server.getClients().Load("testID2")
|
||||
if !ok {
|
||||
t.Fatal("error")
|
||||
@@ -515,7 +519,7 @@ func TestHeartbeatWorker(t *testing.T) {
|
||||
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit, stream: make(chan *messager.StreamResponse, 10)})
|
||||
go runner.clientListener()
|
||||
go runner.heartbeatWorker()
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(4 * time.Second)
|
||||
worker1, ok := runner.server.getClients().Load("testID1")
|
||||
if !ok {
|
||||
t.Fatal()
|
||||
@@ -525,7 +529,7 @@ func TestHeartbeatWorker(t *testing.T) {
|
||||
t.Fatal()
|
||||
}
|
||||
if workerInfo1.getState() != StateMissing {
|
||||
t.Error("expected state of worker runner is missing, but got", workerInfo1.getState())
|
||||
t.Error("expected state of worker runner is missing, but got", getStateName(workerInfo1.getState()))
|
||||
}
|
||||
runner.server.recvChannel() <- &genericMessage{
|
||||
Type: typeHeartbeat,
|
||||
|
||||
@@ -53,6 +53,24 @@ func (w *WorkerNode) setState(state int32) {
|
||||
atomic.StoreInt32(&w.State, state)
|
||||
}
|
||||
|
||||
func (w *WorkerNode) isStarting() bool {
|
||||
return w.getState() == StateRunning || w.getState() == StateSpawning
|
||||
}
|
||||
|
||||
func (w *WorkerNode) isStopping() bool {
|
||||
return w.getState() == StateStopping
|
||||
}
|
||||
|
||||
func (w *WorkerNode) isAvailable() bool {
|
||||
state := w.getState()
|
||||
return state != StateMissing && state != StateQuitting
|
||||
}
|
||||
|
||||
func (w *WorkerNode) isReady() bool {
|
||||
state := w.getState()
|
||||
return state == StateInit || state == StateStopped
|
||||
}
|
||||
|
||||
func (w *WorkerNode) updateHeartbeat(heartbeat int32) {
|
||||
atomic.StoreInt32(&w.Heartbeat, heartbeat)
|
||||
}
|
||||
@@ -130,8 +148,8 @@ func (w *WorkerNode) getMemoryUsage() float64 {
|
||||
}
|
||||
|
||||
func (w *WorkerNode) setStream(stream chan *messager.StreamResponse) {
|
||||
w.mutex.RLock()
|
||||
defer w.mutex.RUnlock()
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
w.stream = stream
|
||||
}
|
||||
|
||||
@@ -302,26 +320,19 @@ func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest
|
||||
wn := newWorkerNode(req.NodeID, clientIp, req.Os, req.Arch)
|
||||
s.clients.Store(req.NodeID, wn)
|
||||
log.Warn().Str("worker id", req.NodeID).Msg("worker joined")
|
||||
return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil
|
||||
return &messager.RegisterResponse{Code: "0", Message: "register successful"}, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) SignOut(_ context.Context, req *messager.SignOutRequest) (*messager.SignOutResponse, error) {
|
||||
// delete worker information
|
||||
s.clients.Delete(req.NodeID)
|
||||
log.Warn().Str("worker id", req.NodeID).Msg("worker quited")
|
||||
return &messager.SignOutResponse{Code: "0", Message: "sign out successfully"}, nil
|
||||
return &messager.SignOutResponse{Code: "0", Message: "sign out successful"}, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) valid(token string) (isValid bool) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.ID == token {
|
||||
isValid = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
func (s *grpcServer) validClientToken(token string) bool {
|
||||
_, ok := s.clients.Load(token)
|
||||
return ok
|
||||
}
|
||||
|
||||
func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_BidirectionalStreamingMessageServer) error {
|
||||
@@ -332,7 +343,7 @@ func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_Bidirect
|
||||
return status.Error(codes.Unauthenticated, "missing token header")
|
||||
}
|
||||
|
||||
ok = s.valid(token)
|
||||
ok = s.validClientToken(token)
|
||||
if !ok {
|
||||
return status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
@@ -404,7 +415,7 @@ func (s *grpcServer) sendMsg(srv messager.Message_BidirectionalStreamingMessageS
|
||||
func (s *grpcServer) sendBroadcasts(msg *genericMessage) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing {
|
||||
if !workerInfo.isAvailable() {
|
||||
return true
|
||||
}
|
||||
workerInfo.getStream() <- &messager.StreamResponse{
|
||||
@@ -517,10 +528,10 @@ func (s *grpcServer) getClients() *sync.Map {
|
||||
return s.clients
|
||||
}
|
||||
|
||||
func (s *grpcServer) getClientsLength() (l int) {
|
||||
func (s *grpcServer) getAvailableClientsLength() (l int) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.getState() != StateQuitting && workerInfo.getState() != StateMissing {
|
||||
if workerInfo.isAvailable() {
|
||||
l++
|
||||
}
|
||||
}
|
||||
@@ -528,3 +539,39 @@ func (s *grpcServer) getClientsLength() (l int) {
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *grpcServer) getReadyClientsLength() (l int) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.isReady() {
|
||||
l++
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *grpcServer) getStartingClientsLength() (l int) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.isStarting() {
|
||||
l++
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (s *grpcServer) getCurrentUsers() (l int) {
|
||||
s.clients.Range(func(key, value interface{}) bool {
|
||||
if workerInfo, ok := value.(*WorkerNode); ok {
|
||||
if workerInfo.isStarting() {
|
||||
l += int(workerInfo.getUserCount())
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
req := StartRequestBody{
|
||||
Profile: *api.boomer.GetProfile(),
|
||||
Profile: *boomer.NewProfile(),
|
||||
}
|
||||
err = mapstructure.Decode(data, &req)
|
||||
if err != nil {
|
||||
|
||||
@@ -387,13 +387,16 @@ func runStepRequest(r *SessionRunner, step *TStep) (stepResult *StepResult, err
|
||||
if err != nil {
|
||||
return stepResult, errors.Wrap(err, "do request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
|
||||
// decode response body in br/gzip/deflate formats
|
||||
err = decodeResponseBody(resp)
|
||||
if err != nil {
|
||||
return stepResult, errors.Wrap(err, "decode response body failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// log & print response
|
||||
if r.LogOn() {
|
||||
|
||||
Reference in New Issue
Block a user