From 37d222252d8a7ca39aa6ef7dbfb6481f76d1a7c0 Mon Sep 17 00:00:00 2001 From: xucong053 Date: Mon, 23 May 2022 18:05:48 +0800 Subject: [PATCH] fix: unitest --- hrp/boomer.go | 5 ++--- hrp/internal/boomer/message.go | 4 ++-- hrp/internal/boomer/runner.go | 29 +++++++++++++---------------- hrp/internal/boomer/runner_test.go | 30 +++++++++--------------------- 4 files changed, 26 insertions(+), 42 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index 5bb37f4c..8a076a4a 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -219,13 +219,12 @@ func (b *HRPBoomer) PollTasks() { if len(b.Boomer.GetTasksChan()) > 0 { continue } - profile := boomer.BytesToProfile(tasks.Profile) //Todo: 过滤掉已经传输过的task if tasks.Tasks != nil { testCases := b.BytesToTestCases(tasks.Tasks) - go b.runTasks(testCases, profile) + go b.runTasks(testCases, tasks.Profile) } else { - go b.rebalanceTasks(profile) + go b.rebalanceTasks(tasks.Profile) } case <-b.Boomer.GetCloseChan(): diff --git a/hrp/internal/boomer/message.go b/hrp/internal/boomer/message.go index 69819854..a9168384 100644 --- a/hrp/internal/boomer/message.go +++ b/hrp/internal/boomer/message.go @@ -19,8 +19,8 @@ type genericMessage struct { } type profileMessage struct { - Profile []byte `json:"profile,omitempty"` - Tasks []byte `json:"tasks,omitempty"` + Profile *Profile `json:"profile,omitempty"` + Tasks []byte `json:"tasks,omitempty"` } func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) { diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 61506db2..bc6bab58 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -176,9 +176,6 @@ type runner struct { controller *Controller loop *Loop // specify loop count for testcase, count = loopCount * spawnCount - // when this channel is closed, all statistics are reported successfully - reportedChan chan bool - // rebalance spawn rebalance chan bool @@ -331,7 +328,6 @@ func (r *runner) reset() { r.stats.clearAll() r.rebalance = make(chan bool) r.stopChan = make(chan bool) - r.reportedChan = make(chan bool) } func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { @@ -479,9 +475,7 @@ func (r *runner) statsStart() { // report stats case <-ticker.C: r.reportStats() - // close reportedChan and return if the last stats is reported successfully if !r.isStarted() { - close(r.reportedChan) log.Info().Msg("Quitting statsStart") return } @@ -546,15 +540,12 @@ func (r *localRunner) start() { go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil) // start stats report - go r.statsStart() + r.statsStart() // stop <-r.stopChan r.updateState(StateStopped) - // wait until all stats are reported successfully - <-r.reportedChan - // stop rate limiter if r.rateLimitEnabled { r.rateLimiter.Stop() @@ -621,11 +612,15 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) { if msg.Profile == nil { log.Error().Msg("miss profile") } - if msg.Tasks == nil { + profile := BytesToProfile(msg.Profile) + r.setSpawnCount(profile.SpawnCount) + r.setSpawnRate(profile.SpawnRate) + + if msg.Tasks == nil && len(r.tasks) == 0 { log.Error().Msg("miss tasks") } r.tasksChan <- &profileMessage{ - Profile: msg.Profile, + Profile: profile, Tasks: msg.Tasks, } log.Info().Msg("on spawn message successful") @@ -635,8 +630,12 @@ func (r *workerRunner) onRebalanceMessage(msg *genericMessage) { if msg.Profile == nil { log.Error().Msg("miss profile") } + profile := BytesToProfile(msg.Profile) + r.setSpawnCount(profile.SpawnCount) + r.setSpawnRate(profile.SpawnRate) + r.tasksChan <- &profileMessage{ - Profile: msg.Profile, + Profile: profile, } log.Info().Msg("on rebalance message successful") } @@ -759,9 +758,7 @@ func (r *workerRunner) start() { go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete) // start stats report - go r.statsStart() - - <-r.reportedChan + r.statsStart() r.reportTestResult() r.outputOnStop() diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index ae41b5dd..5bfccf6e 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -281,11 +281,7 @@ func TestOnSpawnMessage(t *testing.T) { runner.setTasks([]*Task{taskA}) runner.setSpawnCount(100) runner.setSpawnRate(100) - - runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{ - "spawn_count": 20, - "spawn_rate": 20, - }, runner.nodeID)) + runner.onSpawnMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 20, SpawnRate: 20}), nil, nil)) if runner.getSpawnCount() != 20 { t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnCount) @@ -344,13 +340,9 @@ func TestOnMessage(t *testing.T) { runner.updateState(StateInit) runner.setTasks(tasks) - go runner.start() - // start spawning - runner.onMessage(newGenericMessage("spawn", map[string]int64{ - "spawn_count": 10, - "spawn_rate": 10, - }, runner.nodeID)) + runner.onMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 10, SpawnRate: 10}), nil, nil)) + go runner.start() msg := <-runner.client.sendChannel() if msg.Type != "spawning" { @@ -371,10 +363,8 @@ func TestOnMessage(t *testing.T) { } // increase goroutines while running - runner.onMessage(newGenericMessage("rebalance", map[string]int64{ - "spawn_count": 15, - "spawn_rate": 15, - }, runner.nodeID)) + runner.onMessage(newMessageToWorker("rebalance", ProfileToBytes(&Profile{SpawnCount: 15, SpawnRate: 15}), nil, nil)) + runner.rebalance <- true time.Sleep(2 * time.Second) if runner.getState() != StateRunning { @@ -394,14 +384,11 @@ func TestOnMessage(t *testing.T) { t.Error("Runner should send client_stopped message, got", msg.Type) } - time.Sleep(4 * time.Second) + time.Sleep(3 * time.Second) - go runner.start() // spawn again - runner.onMessage(newGenericMessage("spawn", map[string]int64{ - "spawn_count": 10, - "spawn_rate": 10, - }, runner.nodeID)) + runner.onMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 10, SpawnRate: 10}), nil, nil)) + go runner.start() msg = <-runner.client.sendChannel() if msg.Type != "spawning" { @@ -431,6 +418,7 @@ func TestOnMessage(t *testing.T) { t.Error("Runner should send client_stopped message, got", msg.Type) } + time.Sleep(3 * time.Second) // quit runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) }