From b795558543628bbf57389035c0f75a34215871d8 Mon Sep 17 00:00:00 2001 From: xucong053 Date: Thu, 19 May 2022 13:02:41 +0800 Subject: [PATCH] fix: unittest --- hrp/cmd/boom.go | 58 ++++++++++++------------ hrp/internal/boomer/runner.go | 71 ++++++++++++++---------------- hrp/internal/boomer/runner_test.go | 35 ++++++++------- 3 files changed, 82 insertions(+), 82 deletions(-) diff --git a/hrp/cmd/boom.go b/hrp/cmd/boom.go index 09d87f2a..1b465d36 100644 --- a/hrp/cmd/boom.go +++ b/hrp/cmd/boom.go @@ -45,9 +45,28 @@ var boomCmd = &cobra.Command{ } } + // init boomer var hrpBoomer *hrp.HRPBoomer if boomArgs.master { hrpBoomer = hrp.NewMasterBoomer(boomArgs.masterBindHost, boomArgs.masterBindPort) + } else if boomArgs.worker { + hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort) + } else { + hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) + } + hrpBoomer.EnableGracefulQuit() + + // init output + if !boomArgs.DisableConsoleOutput { + hrpBoomer.AddOutput(boomer.NewConsoleOutput()) + } + if boomArgs.PrometheusPushgatewayURL != "" { + hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode())) + } + + // run boomer + switch hrpBoomer.GetMode() { + case "master": hrpBoomer.SetTestCasesPath(args) if boomArgs.autoStart { hrpBoomer.SetAutoStart() @@ -55,43 +74,28 @@ var boomCmd = &cobra.Command{ hrpBoomer.SetSpawnCount(boomArgs.SpawnCount) hrpBoomer.SetSpawnRate(boomArgs.SpawnRate) } - hrpBoomer.EnableGracefulQuit() go hrpBoomer.StartServer() go hrpBoomer.RunMaster() hrpBoomer.LoopTestCases() - return - } else if boomArgs.worker { - hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort) + case "worker": if boomArgs.ignoreQuit { hrpBoomer.SetIgnoreQuit() } go hrpBoomer.RunWorker() - } else { - hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) + hrpBoomer.LoopTasks() + case "standalone": if boomArgs.LoopCount > 0 { hrpBoomer.SetLoopCount(boomArgs.LoopCount) } - } - hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate) - if !boomArgs.DisableConsoleOutput { - - hrpBoomer.AddOutput(boomer.NewConsoleOutput()) - } - if boomArgs.PrometheusPushgatewayURL != "" { - hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode())) - } - hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive) - hrpBoomer.SetDisableCompression(boomArgs.DisableCompression) - hrpBoomer.SetClientTransport() - if venv != "" { - hrpBoomer.SetPython3Venv(venv) - } - hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration) - hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration) - hrpBoomer.EnableGracefulQuit() - if boomArgs.worker { - hrpBoomer.LoopTasks() - } else { + hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate) + hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive) + hrpBoomer.SetDisableCompression(boomArgs.DisableCompression) + hrpBoomer.SetClientTransport() + if venv != "" { + hrpBoomer.SetPython3Venv(venv) + } + hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration) + hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration) hrpBoomer.Run(paths...) } }, diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 1181d6c2..67b89a19 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -58,12 +58,11 @@ func (l *Loop) increaseFinishedCount() { } type SpawnInfo struct { + mutex sync.RWMutex spawnCount int64 // target clients to spawn acquiredCount int64 // count acquired of workers spawnRate float64 spawnDone chan struct{} - - mutex sync.RWMutex } func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) { @@ -154,6 +153,9 @@ type runner struct { // when this channel is closed, all statistics are reported successfully reportedChan chan bool + // rebalance spawn + rebalance chan bool + // all running workers(goroutines) will select on this channel. // close this channel will stop all running workers. stopChan chan bool @@ -273,12 +275,7 @@ func (r *runner) reportTestResult() { } func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) { - r.stopChan = make(chan bool) - r.reportedChan = make(chan bool) r.spawn.reset() - - r.spawn.setSpawn(spawnCount, spawnRate) - atomic.StoreInt32(&r.currentClientsNum, 0) go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc) @@ -290,6 +287,8 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo Float64("spawnRate", spawnRate). Msg("Spawning workers") + r.spawn.setSpawn(spawnCount, spawnRate) + r.updateState(StateSpawning) for { select { @@ -306,7 +305,7 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo // loop count per worker var workerLoop *Loop if r.loop != nil { - workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawn.spawnCount)} + workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.spawn.spawnCount} } atomic.AddInt32(&r.currentClientsNum, 1) go func() { @@ -343,23 +342,19 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo atomic.AddInt32(&r.currentClientsNum, -1) return } - if !r.isStarted() { - atomic.AddInt64(&r.spawn.acquiredCount, -1) - atomic.AddInt32(&r.currentClientsNum, -1) - return - } } } }() - } else { - if r.getState() == StateSpawning { - r.spawn.done() - if spawnCompleteFunc != nil { - spawnCompleteFunc() - } - r.updateState(StateRunning) + } else if r.getState() == StateSpawning { + // spawning compete + r.spawn.done() + if spawnCompleteFunc != nil { + spawnCompleteFunc() } - time.Sleep(1 * time.Second) + r.updateState(StateRunning) + } else { + // continue if rebalance + <-r.rebalance } } } @@ -492,6 +487,10 @@ func (r *localRunner) start() { r.rateLimiter.Start() } + r.stopChan = make(chan bool) + r.reportedChan = make(chan bool) + r.rebalance = make(chan bool) + go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil) // output setup @@ -525,6 +524,7 @@ func (r *localRunner) start() { func (r *localRunner) stop() { if r.runner.isStarted() { r.runner.stop() + close(r.rebalance) } } @@ -542,8 +542,6 @@ type workerRunner struct { // get testcase from master testCaseBytes chan []byte - startFlag bool - ignoreQuit bool } @@ -554,10 +552,8 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { spawn: &SpawnInfo{ spawnDone: make(chan struct{}), }, - stopChan: make(chan bool), - reportedChan: make(chan bool), - closeChan: make(chan bool), - once: &sync.Once{}, + closeChan: make(chan bool), + once: &sync.Once{}, }, masterHost: masterHost, masterPort: masterPort, @@ -572,7 +568,6 @@ func (r *workerRunner) spawnComplete() { data := make(map[string]int64) data["count"] = r.spawn.getSpawnCount() r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID) - r.updateState(StateRunning) } func (r *workerRunner) onSpawnMessage(msg *genericMessage) { @@ -607,6 +602,7 @@ func (r *workerRunner) onMessage(msg *genericMessage) { switch msg.Type { case "spawn": r.onSpawnMessage(msg) + r.rebalance <- true case "stop": r.stop() log.Info().Msg("Recv stop message from master, all the goroutines are stopped") @@ -644,7 +640,7 @@ func (r *workerRunner) startListener() { } } -// run starts service +// run worker service func (r *workerRunner) run() { r.updateState(StateInit) r.client = newClient(r.masterHost, r.masterPort, r.nodeID) @@ -694,11 +690,8 @@ func (r *workerRunner) run() { <-r.closeChan } +// start load test func (r *workerRunner) start() { - r.startFlag = true - defer func() { - r.startFlag = false - }() r.stats.clearAll() // start rate limiter @@ -706,6 +699,10 @@ func (r *workerRunner) start() { r.rateLimiter.Start() } + r.stopChan = make(chan bool) + r.reportedChan = make(chan bool) + r.rebalance = make(chan bool) + r.once.Do(r.outputOnStart) r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete) @@ -722,6 +719,7 @@ func (r *workerRunner) start() { func (r *workerRunner) stop() { if r.isStarted() { close(r.stopChan) + close(r.rebalance) // stop rate limiter if r.rateLimitEnabled { r.rateLimiter.Stop() @@ -735,9 +733,8 @@ func (r *workerRunner) close() { if r.ignoreQuit { return } - for r.startFlag == true { - time.Sleep(1 * time.Second) - } + // waiting report finished + time.Sleep(3 * time.Second) close(r.closeChan) var ticker = time.NewTicker(1 * time.Second) if r.client != nil { @@ -768,8 +765,6 @@ type masterRunner struct { parseTestCasesChan chan bool startFlag bool testCaseBytes chan []byte - - mutex sync.Mutex } func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index 28305752..e48856ac 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -107,9 +107,8 @@ func TestLoopCount(t *testing.T) { runner := newLocalRunner(2, 2) runner.loop = &Loop{loopCount: 4} runner.setTasks(tasks) - go runner.start() - <-runner.stopChan - if !assert.Equal(t, runner.loop.loopCount, atomic.LoadInt64(&runner.loop.finishedCount)) { + runner.start() + if !assert.Equal(t, atomic.LoadInt64(&runner.loop.loopCount), atomic.LoadInt64(&runner.loop.finishedCount)) { t.Fatal() } } @@ -129,8 +128,9 @@ func TestSpawnWorkers(t *testing.T) { runner.client = newClient("localhost", 5557, runner.nodeID) runner.setTasks(tasks) + runner.stopChan = make(chan bool) go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete) - time.Sleep(10 * time.Millisecond) + time.Sleep(2 * time.Second) currentClients := atomic.LoadInt32(&runner.currentClientsNum) if currentClients != 10 { @@ -166,13 +166,14 @@ func TestSpawnWorkersWithManyTasks(t *testing.T) { runner.client = newClient("localhost", 5557, runner.nodeID) const numToSpawn int64 = 30 + runner.stopChan = make(chan bool) - runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete) + go runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete) time.Sleep(2 * time.Second) currentClients := atomic.LoadInt32(&runner.currentClientsNum) - assert.Equal(t, numToSpawn, int(currentClients)) + assert.Equal(t, numToSpawn, int64(currentClients)) lock.Lock() hundreds := taskCalls["one hundred"] tens := taskCalls["ten"] @@ -255,6 +256,7 @@ func TestStop(t *testing.T) { } tasks := []*Task{taskA} runner := newWorkerRunner("localhost", 5557) + runner.stopChan = make(chan bool) runner.setTasks(tasks) runner.spawn.setSpawn(10, 10) runner.updateState(StateSpawning) @@ -358,9 +360,6 @@ func TestOnMessage(t *testing.T) { // spawn complete and running time.Sleep(2 * time.Second) - if runner.getState() != StateRunning { - t.Error("State of runner is not running after spawn, got", runner.getState()) - } if atomic.LoadInt32(&runner.currentClientsNum) != 10 { t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) } @@ -368,6 +367,9 @@ func TestOnMessage(t *testing.T) { if msg.Type != "spawning_complete" { t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type) } + if runner.getState() != StateRunning { + t.Error("State of runner is not running after spawn, got", runner.getState()) + } // increase goroutines while running runner.onMessage(newGenericMessage("spawn", map[string]int64{ @@ -381,10 +383,6 @@ func TestOnMessage(t *testing.T) { } time.Sleep(2 * time.Second) - msg = <-runner.client.sendChannel() - if msg.Type != "spawning_complete" { - t.Error("Runner should send spawning_complete message, got", msg.Type) - } if runner.getState() != StateRunning { t.Error("State of runner is not running after spawn, got", runner.getState()) } @@ -402,6 +400,9 @@ func TestOnMessage(t *testing.T) { t.Error("Runner should send client_stopped message, got", msg.Type) } + time.Sleep(3 * time.Second) + + go runner.start() // spawn again runner.onMessage(newGenericMessage("spawn", map[string]int64{ "spawn_count": 10, @@ -414,13 +415,13 @@ func TestOnMessage(t *testing.T) { } // spawn complete and running - time.Sleep(2 * time.Second) - if runner.getState() != StateRunning { - t.Error("State of runner is not running after spawn, got", runner.getState()) - } + time.Sleep(3 * time.Second) if atomic.LoadInt32(&runner.currentClientsNum) != 10 { t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) } + if runner.getState() != StateRunning { + t.Error("State of runner is not running after spawn, got", runner.getState()) + } msg = <-runner.client.sendChannel() if msg.Type != "spawning_complete" { t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type)