fix: unitest

This commit is contained in:
xucong053
2022-05-23 18:05:48 +08:00
committed by 徐聪
parent 4044c15974
commit 37d222252d
4 changed files with 26 additions and 42 deletions

View File

@@ -219,13 +219,12 @@ func (b *HRPBoomer) PollTasks() {
if len(b.Boomer.GetTasksChan()) > 0 { if len(b.Boomer.GetTasksChan()) > 0 {
continue continue
} }
profile := boomer.BytesToProfile(tasks.Profile)
//Todo: 过滤掉已经传输过的task //Todo: 过滤掉已经传输过的task
if tasks.Tasks != nil { if tasks.Tasks != nil {
testCases := b.BytesToTestCases(tasks.Tasks) testCases := b.BytesToTestCases(tasks.Tasks)
go b.runTasks(testCases, profile) go b.runTasks(testCases, tasks.Profile)
} else { } else {
go b.rebalanceTasks(profile) go b.rebalanceTasks(tasks.Profile)
} }
case <-b.Boomer.GetCloseChan(): case <-b.Boomer.GetCloseChan():

View File

@@ -19,8 +19,8 @@ type genericMessage struct {
} }
type profileMessage struct { type profileMessage struct {
Profile []byte `json:"profile,omitempty"` Profile *Profile `json:"profile,omitempty"`
Tasks []byte `json:"tasks,omitempty"` Tasks []byte `json:"tasks,omitempty"`
} }
func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) { func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) {

View File

@@ -176,9 +176,6 @@ type runner struct {
controller *Controller controller *Controller
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
// when this channel is closed, all statistics are reported successfully
reportedChan chan bool
// rebalance spawn // rebalance spawn
rebalance chan bool rebalance chan bool
@@ -331,7 +328,6 @@ func (r *runner) reset() {
r.stats.clearAll() r.stats.clearAll()
r.rebalance = make(chan bool) r.rebalance = make(chan bool)
r.stopChan = make(chan bool) r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
} }
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
@@ -479,9 +475,7 @@ func (r *runner) statsStart() {
// report stats // report stats
case <-ticker.C: case <-ticker.C:
r.reportStats() r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if !r.isStarted() { if !r.isStarted() {
close(r.reportedChan)
log.Info().Msg("Quitting statsStart") log.Info().Msg("Quitting statsStart")
return return
} }
@@ -546,15 +540,12 @@ func (r *localRunner) start() {
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil) go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil)
// start stats report // start stats report
go r.statsStart() r.statsStart()
// stop // stop
<-r.stopChan <-r.stopChan
r.updateState(StateStopped) r.updateState(StateStopped)
// wait until all stats are reported successfully
<-r.reportedChan
// stop rate limiter // stop rate limiter
if r.rateLimitEnabled { if r.rateLimitEnabled {
r.rateLimiter.Stop() r.rateLimiter.Stop()
@@ -621,11 +612,15 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
if msg.Profile == nil { if msg.Profile == nil {
log.Error().Msg("miss profile") log.Error().Msg("miss profile")
} }
if msg.Tasks == nil { profile := BytesToProfile(msg.Profile)
r.setSpawnCount(profile.SpawnCount)
r.setSpawnRate(profile.SpawnRate)
if msg.Tasks == nil && len(r.tasks) == 0 {
log.Error().Msg("miss tasks") log.Error().Msg("miss tasks")
} }
r.tasksChan <- &profileMessage{ r.tasksChan <- &profileMessage{
Profile: msg.Profile, Profile: profile,
Tasks: msg.Tasks, Tasks: msg.Tasks,
} }
log.Info().Msg("on spawn message successful") log.Info().Msg("on spawn message successful")
@@ -635,8 +630,12 @@ func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
if msg.Profile == nil { if msg.Profile == nil {
log.Error().Msg("miss profile") log.Error().Msg("miss profile")
} }
profile := BytesToProfile(msg.Profile)
r.setSpawnCount(profile.SpawnCount)
r.setSpawnRate(profile.SpawnRate)
r.tasksChan <- &profileMessage{ r.tasksChan <- &profileMessage{
Profile: msg.Profile, Profile: profile,
} }
log.Info().Msg("on rebalance message successful") log.Info().Msg("on rebalance message successful")
} }
@@ -759,9 +758,7 @@ func (r *workerRunner) start() {
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete) go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete)
// start stats report // start stats report
go r.statsStart() r.statsStart()
<-r.reportedChan
r.reportTestResult() r.reportTestResult()
r.outputOnStop() r.outputOnStop()

View File

@@ -281,11 +281,7 @@ func TestOnSpawnMessage(t *testing.T) {
runner.setTasks([]*Task{taskA}) runner.setTasks([]*Task{taskA})
runner.setSpawnCount(100) runner.setSpawnCount(100)
runner.setSpawnRate(100) runner.setSpawnRate(100)
runner.onSpawnMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 20, SpawnRate: 20}), nil, nil))
runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 20,
"spawn_rate": 20,
}, runner.nodeID))
if runner.getSpawnCount() != 20 { if runner.getSpawnCount() != 20 {
t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnCount) t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnCount)
@@ -344,13 +340,9 @@ func TestOnMessage(t *testing.T) {
runner.updateState(StateInit) runner.updateState(StateInit)
runner.setTasks(tasks) runner.setTasks(tasks)
go runner.start()
// start spawning // start spawning
runner.onMessage(newGenericMessage("spawn", map[string]int64{ runner.onMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 10, SpawnRate: 10}), nil, nil))
"spawn_count": 10, go runner.start()
"spawn_rate": 10,
}, runner.nodeID))
msg := <-runner.client.sendChannel() msg := <-runner.client.sendChannel()
if msg.Type != "spawning" { if msg.Type != "spawning" {
@@ -371,10 +363,8 @@ func TestOnMessage(t *testing.T) {
} }
// increase goroutines while running // increase goroutines while running
runner.onMessage(newGenericMessage("rebalance", map[string]int64{ runner.onMessage(newMessageToWorker("rebalance", ProfileToBytes(&Profile{SpawnCount: 15, SpawnRate: 15}), nil, nil))
"spawn_count": 15, runner.rebalance <- true
"spawn_rate": 15,
}, runner.nodeID))
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if runner.getState() != StateRunning { if runner.getState() != StateRunning {
@@ -394,14 +384,11 @@ func TestOnMessage(t *testing.T) {
t.Error("Runner should send client_stopped message, got", msg.Type) t.Error("Runner should send client_stopped message, got", msg.Type)
} }
time.Sleep(4 * time.Second) time.Sleep(3 * time.Second)
go runner.start()
// spawn again // spawn again
runner.onMessage(newGenericMessage("spawn", map[string]int64{ runner.onMessage(newMessageToWorker("spawn", ProfileToBytes(&Profile{SpawnCount: 10, SpawnRate: 10}), nil, nil))
"spawn_count": 10, go runner.start()
"spawn_rate": 10,
}, runner.nodeID))
msg = <-runner.client.sendChannel() msg = <-runner.client.sendChannel()
if msg.Type != "spawning" { if msg.Type != "spawning" {
@@ -431,6 +418,7 @@ func TestOnMessage(t *testing.T) {
t.Error("Runner should send client_stopped message, got", msg.Type) t.Error("Runner should send client_stopped message, got", msg.Type)
} }
time.Sleep(3 * time.Second)
// quit // quit
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
} }