From c5f482dbfa4ab3f6a926d61bb05f1d62a5912105 Mon Sep 17 00:00:00 2001 From: xucong053 Date: Sun, 22 May 2022 12:44:18 +0800 Subject: [PATCH] refactor --- hrp/cmd/boom.go | 2 +- hrp/internal/boomer/boomer.go | 68 +++--- hrp/internal/boomer/boomer_test.go | 4 +- hrp/internal/boomer/output.go | 4 +- hrp/internal/boomer/runner.go | 331 +++++++++++++++++------------ hrp/internal/boomer/runner_test.go | 71 +++---- hrp/server.go | 13 +- 7 files changed, 278 insertions(+), 215 deletions(-) diff --git a/hrp/cmd/boom.go b/hrp/cmd/boom.go index 1b465d36..a0be404c 100644 --- a/hrp/cmd/boom.go +++ b/hrp/cmd/boom.go @@ -71,7 +71,7 @@ var boomCmd = &cobra.Command{ if boomArgs.autoStart { hrpBoomer.SetAutoStart() hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait) - hrpBoomer.SetSpawnCount(boomArgs.SpawnCount) + hrpBoomer.SetSpawnCount(int64(boomArgs.SpawnCount)) hrpBoomer.SetSpawnRate(boomArgs.SpawnRate) } go hrpBoomer.StartServer() diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index a4e9bb72..16cb0e5c 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -39,9 +39,6 @@ type Boomer struct { testcasePath []string - spawnCount int // target clients to spawn - spawnRate float64 - cpuProfile string cpuProfileDuration time.Duration @@ -86,8 +83,6 @@ func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer { return &Boomer{ mode: StandaloneMode, localRunner: newLocalRunner(spawnCount, spawnRate), - spawnCount: spawnCount, - spawnRate: spawnRate, } } @@ -161,18 +156,26 @@ func (b *Boomer) GetState() int32 { } // SetSpawnCount sets spawn count -func (b *Boomer) SetSpawnCount(spawnCount int) { - b.spawnCount = spawnCount - if b.mode == DistributedMasterMode { - b.masterRunner.spawn.setSpawn(int64(spawnCount), -1) +func (b *Boomer) SetSpawnCount(spawnCount int64) { + switch b.mode { + case DistributedMasterMode: + b.masterRunner.setSpawnCount(spawnCount) + case DistributedWorkerMode: + b.workerRunner.setSpawnCount(spawnCount) + default: + b.localRunner.setSpawnCount(spawnCount) } } // SetSpawnRate sets spawn rate func (b *Boomer) SetSpawnRate(spawnRate float64) { - b.spawnRate = spawnRate - if b.mode == DistributedMasterMode { - b.masterRunner.spawn.setSpawn(-1, spawnRate) + switch b.mode { + case DistributedMasterMode: + b.masterRunner.setSpawnRate(spawnRate) + case DistributedWorkerMode: + b.workerRunner.setSpawnRate(spawnRate) + default: + b.localRunner.setSpawnRate(spawnRate) } } @@ -242,11 +245,11 @@ func (b *Boomer) SetLoopCount(loopCount int64) { // total loop count for testcase, it will be evenly distributed to each worker switch b.mode { case DistributedWorkerMode: - b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.spawn.getSpawnCount()} + b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.getSpawnCount()} case DistributedMasterMode: - b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.spawn.getSpawnCount()} + b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.getSpawnCount()} case StandaloneMode: - b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.spawn.getSpawnCount()} + b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.getSpawnCount()} } } @@ -388,6 +391,9 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc // Start starts to run func (b *Boomer) Start(Args map[string]interface{}) error { + if b.masterRunner.isStarted() { + return errors.New("already started") + } spawnCount, ok := Args["spawn_count"] if ok { v, err := strconv.Atoi(spawnCount.(string)) @@ -395,7 +401,7 @@ func (b *Boomer) Start(Args map[string]interface{}) error { log.Error().Err(err).Msg("spawn_count sets error") return err } - b.SetSpawnCount(v) + b.SetSpawnCount(int64(v)) } else { return errors.New("spawn count error") } @@ -423,6 +429,9 @@ func (b *Boomer) Start(Args map[string]interface{}) error { // ReBalance starts to rebalance load test func (b *Boomer) ReBalance(Args map[string]interface{}) error { + if !b.masterRunner.isStarted() { + return errors.New("no start") + } spawnCount, ok := Args["spawn_count"] if ok { v, err := strconv.Atoi(spawnCount.(string)) @@ -430,7 +439,7 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error { log.Error().Err(err).Msg("spawn_count sets error") return err } - b.SetSpawnCount(v) + b.SetSpawnCount(int64(v)) } spawnRate, ok := Args["spawn_rate"] if ok { @@ -441,11 +450,6 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error { } b.SetSpawnRate(v) } - path, ok := Args["path"].(string) - if ok { - paths := strings.Split(path, ",") - b.SetTestCasesPath(paths) - } err := b.masterRunner.rebalance() if err != nil { log.Error().Err(err).Msg("failed to rebalance") @@ -454,12 +458,8 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error { } // Stop stops to load test -func (b *Boomer) Stop() { - switch b.mode { - case DistributedMasterMode: - b.masterRunner.stop() - default: - } +func (b *Boomer) Stop() error { + return b.masterRunner.stop() } // GetWorkersInfo gets workers @@ -493,22 +493,22 @@ func (b *Boomer) Quit() { func (b *Boomer) GetSpawnDoneChan() chan struct{} { switch b.mode { case DistributedWorkerMode: - return b.workerRunner.spawn.getSpawnDone() + return b.workerRunner.controller.getSpawnDone() case DistributedMasterMode: - return b.masterRunner.spawn.getSpawnDone() + return b.masterRunner.controller.getSpawnDone() default: - return b.localRunner.spawn.getSpawnDone() + return b.localRunner.controller.getSpawnDone() } } func (b *Boomer) GetSpawnCount() int { switch b.mode { case DistributedWorkerMode: - return int(b.workerRunner.spawn.getSpawnCount()) + return int(b.workerRunner.getSpawnCount()) case DistributedMasterMode: - return int(b.masterRunner.spawn.getSpawnCount()) + return int(b.masterRunner.getSpawnCount()) default: - return int(b.localRunner.spawn.getSpawnCount()) + return int(b.localRunner.getSpawnCount()) } } diff --git a/hrp/internal/boomer/boomer_test.go b/hrp/internal/boomer/boomer_test.go index fde9b37b..7f113f87 100644 --- a/hrp/internal/boomer/boomer_test.go +++ b/hrp/internal/boomer/boomer_test.go @@ -12,11 +12,11 @@ import ( func TestNewStandaloneBoomer(t *testing.T) { b := NewStandaloneBoomer(100, 10) - if b.localRunner.spawn.spawnCount != 100 { + if b.localRunner.spawnCount != 100 { t.Error("spawnCount should be 100") } - if b.localRunner.spawn.spawnRate != 10 { + if b.localRunner.spawnRate != 10 { t.Error("spawnRate should be 10") } } diff --git a/hrp/internal/boomer/output.go b/hrp/internal/boomer/output.go index 3ef8bdc9..a0866e02 100644 --- a/hrp/internal/boomer/output.go +++ b/hrp/internal/boomer/output.go @@ -169,7 +169,7 @@ type statsEntryOutput struct { } type dataOutput struct { - UserCount int32 `json:"user_count"` + UserCount int64 `json:"user_count"` State int32 `json:"state"` TotalStats *statsEntryOutput `json:"stats_total"` TransactionsPassed int64 `json:"transactions_passed"` @@ -186,7 +186,7 @@ type dataOutput struct { } func convertData(data map[string]interface{}) (output *dataOutput, err error) { - userCount, ok := data["user_count"].(int32) + userCount, ok := data["user_count"].(int64) if !ok { return nil, fmt.Errorf("user_count is not int32") } diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 67b89a19..38c37b7c 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -57,82 +57,106 @@ func (l *Loop) increaseFinishedCount() { atomic.AddInt64(&l.finishedCount, 1) } -type SpawnInfo struct { - mutex sync.RWMutex - spawnCount int64 // target clients to spawn - acquiredCount int64 // count acquired of workers - spawnRate float64 - spawnDone chan struct{} +type Controller struct { + mutex sync.RWMutex + once sync.Once + currentClientsNum int64 // current clients count + spawnCount int64 // target clients to spawn + spawnRate float64 + spawnDone chan struct{} + tasks []*Task } -func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) { - s.mutex.Lock() - defer s.mutex.Unlock() +func (c *Controller) setSpawn(spawnCount int64, spawnRate float64) { + c.mutex.Lock() + defer c.mutex.Unlock() if spawnCount > 0 { - atomic.StoreInt64(&s.spawnCount, spawnCount) + atomic.StoreInt64(&c.spawnCount, spawnCount) } if spawnRate > 0 { - s.spawnRate = spawnRate + c.spawnRate = spawnRate } } -func (s *SpawnInfo) getSpawnCount() int64 { - s.mutex.RLock() - defer s.mutex.RUnlock() - return atomic.LoadInt64(&s.spawnCount) +func (c *Controller) setSpawnCount(spawnCount int64) { + if spawnCount > 0 { + atomic.StoreInt64(&c.spawnCount, spawnCount) + } } -func (s *SpawnInfo) getSpawnRate() float64 { - s.mutex.RLock() - defer s.mutex.RUnlock() - return s.spawnRate +func (c *Controller) setSpawnRate(spawnRate float64) { + c.mutex.Lock() + defer c.mutex.Unlock() + if spawnRate > 0 { + c.spawnRate = spawnRate + } } -func (s *SpawnInfo) getSpawnDone() chan struct{} { - s.mutex.RLock() - defer s.mutex.RUnlock() - return s.spawnDone +func (c *Controller) getSpawnCount() int64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + return atomic.LoadInt64(&c.spawnCount) } -func (s *SpawnInfo) done() { - close(s.spawnDone) +func (c *Controller) getSpawnRate() float64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.spawnRate } -func (s *SpawnInfo) isFinished() bool { +func (c *Controller) getSpawnDone() chan struct{} { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.spawnDone +} + +func (c *Controller) getCurrentClientsNum() int64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + return atomic.LoadInt64(&c.currentClientsNum) +} + +func (c *Controller) spawnCompete() { + close(c.spawnDone) +} + +func (c *Controller) isFinished() bool { // return true when workers acquired - return atomic.LoadInt64(&s.acquiredCount) == atomic.LoadInt64(&s.spawnCount) + return atomic.LoadInt64(&c.currentClientsNum) == atomic.LoadInt64(&c.spawnCount) } -func (s *SpawnInfo) acquire() bool { +func (c *Controller) acquire() bool { // get one ticket when there are still remaining spawn count to test // return true when getting ticket successfully - if atomic.LoadInt64(&s.acquiredCount) < atomic.LoadInt64(&s.spawnCount) { - atomic.AddInt64(&s.acquiredCount, 1) + if atomic.LoadInt64(&c.currentClientsNum) < atomic.LoadInt64(&c.spawnCount) { + atomic.AddInt64(&c.currentClientsNum, 1) return true } return false } -func (s *SpawnInfo) erase() bool { +func (c *Controller) erase() bool { // return true if acquiredCount > spawnCount - if atomic.LoadInt64(&s.acquiredCount) > atomic.LoadInt64(&s.spawnCount) { - atomic.AddInt64(&s.acquiredCount, -1) + if atomic.LoadInt64(&c.currentClientsNum) > atomic.LoadInt64(&c.spawnCount) { + atomic.AddInt64(&c.currentClientsNum, -1) return true } return false } -func (s *SpawnInfo) increaseFinishedCount() { - atomic.AddInt64(&s.acquiredCount, -1) +func (c *Controller) increaseFinishedCount() { + atomic.AddInt64(&c.currentClientsNum, -1) } -func (s *SpawnInfo) reset() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.spawnCount = 0 - s.spawnRate = 0 - s.acquiredCount = 0 - s.spawnDone = make(chan struct{}) +func (c *Controller) reset() { + c.mutex.Lock() + defer c.mutex.Unlock() + c.spawnCount = 0 + c.spawnRate = 0 + c.currentClientsNum = 0 + c.spawnDone = make(chan struct{}) + c.tasks = []*Task{} + c.once = sync.Once{} } type runner struct { @@ -146,9 +170,11 @@ type runner struct { rateLimitEnabled bool stats *requestStats - currentClientsNum int32 // current clients count - spawn *SpawnInfo - loop *Loop // specify loop count for testcase, count = loopCount * spawnCount + spawnCount int64 // target clients to spawn + spawnRate float64 + + controller *Controller + loop *Loop // specify loop count for testcase, count = loopCount * spawnCount // when this channel is closed, all statistics are reported successfully reportedChan chan bool @@ -168,6 +194,28 @@ type runner struct { once *sync.Once } +func (r *runner) setSpawnRate(spawnRate float64) { + r.mutex.Lock() + defer r.mutex.Unlock() + if spawnRate > 0 { + r.spawnRate = spawnRate + } +} + +func (r *runner) getSpawnRate() float64 { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.spawnRate +} + +func (r *runner) getSpawnCount() int64 { + return atomic.LoadInt64(&r.spawnCount) +} + +func (r *runner) setSpawnCount(spawnCount int64) { + atomic.StoreInt64(&r.spawnCount, spawnCount) +} + // safeRun runs fn and recovers from unexpected panics. // it prevents panics from Task.Fn crashing boomer. func (r *runner) safeRun(fn func()) { @@ -239,7 +287,7 @@ func (r *runner) outputOnStop() { func (r *runner) reportStats() { data := r.stats.collectReportData() - data["user_count"] = atomic.LoadInt32(&r.currentClientsNum) + data["user_count"] = r.controller.getCurrentClientsNum() data["state"] = atomic.LoadInt32(&r.state) r.outputOnEvent(data) } @@ -255,7 +303,7 @@ func (r *runner) reportTestResult() { currentTime := time.Now() println(fmt.Sprint("=========================================== Statistics Summary ==========================================")) println(fmt.Sprintf("Current time: %s, Users: %v, Duration: %v, Accumulated Transactions: %d Passed, %d Failed", - currentTime.Format("2006/01/02 15:04:05"), atomic.LoadInt32(&r.currentClientsNum), duration, r.stats.transactionPassed, r.stats.transactionFailed)) + currentTime.Format("2006/01/02 15:04:05"), r.controller.getCurrentClientsNum(), duration, r.stats.transactionPassed, r.stats.transactionFailed)) table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Name", "# requests", "# fails", "Median", "Average", "Min", "Max", "Content Size", "# reqs/sec", "# fails/sec"}) row := make([]string, 10) @@ -274,11 +322,13 @@ func (r *runner) reportTestResult() { println() } -func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) { - r.spawn.reset() - atomic.StoreInt32(&r.currentClientsNum, 0) - - go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc) +func (r *runner) reset() { + r.updateState(StateInit) + r.controller.reset() + r.stats.clearAll() + r.rebalance = make(chan bool) + r.stopChan = make(chan bool) + r.reportedChan = make(chan bool) } func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { @@ -287,7 +337,7 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo Float64("spawnRate", spawnRate). Msg("Spawning workers") - r.spawn.setSpawn(spawnCount, spawnRate) + r.controller.setSpawn(spawnCount, spawnRate) r.updateState(StateSpawning) for { @@ -297,23 +347,21 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo log.Info().Msg("Quitting spawning workers") return default: - if r.isStarted() && r.spawn.acquire() { + if r.isStarted() && r.controller.acquire() { // spawn workers with rate limit - sleepTime := time.Duration(1000000/r.spawn.getSpawnRate()) * time.Microsecond + sleepTime := time.Duration(1000000/r.controller.getSpawnRate()) * time.Microsecond time.Sleep(sleepTime) // loop count per worker var workerLoop *Loop if r.loop != nil { - workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.spawn.spawnCount} + workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.controller.spawnCount} } - atomic.AddInt32(&r.currentClientsNum, 1) go func() { for { select { case <-quit: - atomic.AddInt64(&r.spawn.acquiredCount, -1) - atomic.AddInt32(&r.currentClientsNum, -1) + atomic.AddInt64(&r.controller.currentClientsNum, -1) return default: if workerLoop != nil && !workerLoop.acquire() { @@ -336,25 +384,31 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo workerLoop.increaseFinishedCount() if r.loop.isFinished() { r.stop() + close(r.rebalance) } } - if r.spawn.erase() { - atomic.AddInt32(&r.currentClientsNum, -1) + if r.controller.erase() { return } } } }() - } else if r.getState() == StateSpawning { + continue + } + + r.controller.once.Do(func() { // spawning compete - r.spawn.done() + r.controller.spawnCompete() if spawnCompleteFunc != nil { spawnCompleteFunc() } r.updateState(StateRunning) - } else { - // continue if rebalance - <-r.rebalance + }) + + <-r.rebalance + if r.isStarted() { + // rebalance spawn count + r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate()) } } } @@ -425,6 +479,7 @@ func (r *runner) statsStart() { // close reportedChan and return if the last stats is reported successfully if !r.isStarted() { close(r.reportedChan) + log.Info().Msg("Quitting statsStart") return } } @@ -460,38 +515,28 @@ type localRunner struct { func newLocalRunner(spawnCount int, spawnRate float64) *localRunner { return &localRunner{ runner: runner{ - state: StateInit, - stats: newRequestStats(), - outputs: make([]Output, 0), - spawn: &SpawnInfo{ - spawnCount: int64(spawnCount), - spawnRate: spawnRate, - spawnDone: make(chan struct{}), - }, - reportedChan: make(chan bool), - stopChan: make(chan bool), - closeChan: make(chan bool), - once: &sync.Once{}, + state: StateInit, + stats: newRequestStats(), + spawnCount: int64(spawnCount), + spawnRate: spawnRate, + controller: &Controller{}, + outputs: make([]Output, 0), + closeChan: make(chan bool), + once: &sync.Once{}, }, } } func (r *localRunner) start() { - // init state - r.updateState(StateInit) - atomic.StoreInt32(&r.currentClientsNum, 0) - r.stats.clearAll() + // init localRunner + r.reset() // start rate limiter if r.rateLimitEnabled { r.rateLimiter.Start() } - r.stopChan = make(chan bool) - r.reportedChan = make(chan bool) - r.rebalance = make(chan bool) - - go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil) + r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil) // output setup r.outputOnStart() @@ -548,12 +593,10 @@ type workerRunner struct { func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { r = &workerRunner{ runner: runner{ - stats: newRequestStats(), - spawn: &SpawnInfo{ - spawnDone: make(chan struct{}), - }, - closeChan: make(chan bool), - once: &sync.Once{}, + stats: newRequestStats(), + controller: &Controller{}, + closeChan: make(chan bool), + once: &sync.Once{}, }, masterHost: masterHost, masterPort: masterPort, @@ -566,7 +609,7 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { func (r *workerRunner) spawnComplete() { data := make(map[string]int64) - data["count"] = r.spawn.getSpawnCount() + data["count"] = r.controller.getSpawnCount() r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID) } @@ -574,11 +617,11 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) { r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID) spawnCount, ok := msg.Data["spawn_count"] if ok { - r.spawn.setSpawn(spawnCount, -1) + r.setSpawnCount(spawnCount) } spawnRate, ok := msg.Data["spawn_rate"] if ok { - r.spawn.setSpawn(-1, float64(spawnRate)) + r.setSpawnRate(float64(spawnRate)) } if msg.Tasks != nil { r.testCaseBytes <- msg.Tasks @@ -586,6 +629,19 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) { log.Info().Msg("on spawn message successful") } +func (r *workerRunner) onRebalanceMessage(msg *genericMessage) { + spawnCount, ok := msg.Data["spawn_count"] + if ok { + r.setSpawnCount(spawnCount) + } + spawnRate, ok := msg.Data["spawn_rate"] + if ok { + r.setSpawnRate(float64(spawnRate)) + } + r.rebalance <- true + log.Info().Msg("on rebalance message successful") +} + // Runner acts as a state machine. func (r *workerRunner) onMessage(msg *genericMessage) { switch r.getState() { @@ -602,7 +658,8 @@ func (r *workerRunner) onMessage(msg *genericMessage) { switch msg.Type { case "spawn": r.onSpawnMessage(msg) - r.rebalance <- true + case "rebalance": + r.onRebalanceMessage(msg) case "stop": r.stop() log.Info().Msg("Recv stop message from master, all the goroutines are stopped") @@ -679,7 +736,7 @@ func (r *workerRunner) run() { data := map[string]int64{ "state": int64(r.getState()), "current_cpu_usage": int64(CPUUsage), - "spawn_count": int64(atomic.LoadInt32(&r.currentClientsNum)), + "spawn_count": r.controller.getCurrentClientsNum(), } r.client.sendChannel() <- newGenericMessage("heartbeat", data, r.nodeID) case <-r.closeChan: @@ -692,23 +749,19 @@ func (r *workerRunner) run() { // start load test func (r *workerRunner) start() { - r.stats.clearAll() + r.reset() // start rate limiter if r.rateLimitEnabled { r.rateLimiter.Start() } - r.stopChan = make(chan bool) - r.reportedChan = make(chan bool) - r.rebalance = make(chan bool) - r.once.Do(r.outputOnStart) - r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete) + r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete) // start stats report - go r.runner.statsStart() + go r.statsStart() <-r.reportedChan @@ -718,12 +771,8 @@ func (r *workerRunner) start() { func (r *workerRunner) stop() { if r.isStarted() { - close(r.stopChan) + r.runner.stop() close(r.rebalance) - // stop rate limiter - if r.rateLimitEnabled { - r.rateLimiter.Stop() - } r.updateState(StateStopped) } } @@ -763,24 +812,19 @@ type masterRunner struct { expectWorkersMaxWait int parseTestCasesChan chan bool - startFlag bool testCaseBytes chan []byte } func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { return &masterRunner{ runner: runner{ - state: StateInit, - spawn: &SpawnInfo{ - spawnDone: make(chan struct{}), - }, + state: StateInit, closeChan: make(chan bool), }, masterBindHost: masterBindHost, masterBindPort: masterBindPort, server: newServer(masterBindHost, masterBindPort), parseTestCasesChan: make(chan bool), - startFlag: false, testCaseBytes: make(chan []byte), } } @@ -946,15 +990,15 @@ func (r *masterRunner) start() error { if numWorkers == 0 { return errors.New("current workers: 0") } - workerSpawnRate := r.spawn.spawnRate / float64(numWorkers) - workerSpawnCount := r.spawn.getSpawnCount() / int64(numWorkers) + workerSpawnRate := r.getSpawnRate() / float64(numWorkers) + workerSpawnCount := r.getSpawnCount() / int64(numWorkers) log.Info().Msg("send spawn data to worker") r.updateState(StateSpawning) // waitting to fetch testcase - testcase, ok := r.fetchTestCase() - if !ok { - return errors.New("starting, do not retry frequently") + testcase, err := r.fetchTestCase() + if err != nil { + return err } r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{ "spawn_count": workerSpawnCount, @@ -965,27 +1009,44 @@ func (r *masterRunner) start() error { return nil } -func (r *masterRunner) fetchTestCase() ([]byte, bool) { - if r.startFlag { - return nil, false - } - r.startFlag = true - defer func() { - r.startFlag = false - }() - r.parseTestCasesChan <- true - return <-r.testCaseBytes, true -} - func (r *masterRunner) rebalance() error { - return r.start() + numWorkers := r.server.getClientsLength() + if numWorkers == 0 { + return errors.New("current workers: 0") + } + workerSpawnRate := r.getSpawnRate() / float64(numWorkers) + workerSpawnCount := r.getSpawnCount() / int64(numWorkers) + + r.server.sendChannel() <- newSpawnMessageToWorker("rebalance", map[string]int64{ + "spawn_count": workerSpawnCount, + "spawn_rate": int64(workerSpawnRate), + }, nil) + println("send rebalance data to worker successful") + return nil } -func (r *masterRunner) stop() { +func (r *masterRunner) fetchTestCase() ([]byte, error) { + ticker := time.NewTicker(30 * time.Second) + if len(r.testCaseBytes) > 0 { + <-r.testCaseBytes + } + r.parseTestCasesChan <- true + select { + case <-ticker.C: + return nil, errors.New("parse testcases timeout") + case tcb := <-r.testCaseBytes: + return tcb, nil + } +} + +func (r *masterRunner) stop() error { if r.isStarted() { r.updateState(StateStopping) r.server.sendChannel() <- &genericMessage{Type: "stop", Data: map[string]int64{}} r.updateState(StateStopped) + return nil + } else { + return errors.New("already stopped") } } diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index 356b5c74..ae41b5dd 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -127,13 +127,12 @@ func TestSpawnWorkers(t *testing.T) { defer runner.close() runner.client = newClient("localhost", 5557, runner.nodeID) + runner.reset() runner.setTasks(tasks) - runner.stopChan = make(chan bool) - runner.rebalance = make(chan bool) go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete) time.Sleep(2 * time.Second) - currentClients := atomic.LoadInt32(&runner.currentClientsNum) + currentClients := runner.controller.getCurrentClientsNum() if currentClients != 10 { t.Error("Unexpected count", currentClients) } @@ -163,17 +162,16 @@ func TestSpawnWorkersWithManyTasks(t *testing.T) { runner := newWorkerRunner("localhost", 5557) defer runner.close() + runner.reset() runner.setTasks(tasks) runner.client = newClient("localhost", 5557, runner.nodeID) const numToSpawn int64 = 30 - runner.stopChan = make(chan bool) - runner.rebalance = make(chan bool) go runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete) time.Sleep(2 * time.Second) - currentClients := atomic.LoadInt32(&runner.currentClientsNum) + currentClients := runner.controller.getCurrentClientsNum() assert.Equal(t, numToSpawn, int64(currentClients)) lock.Lock() @@ -226,15 +224,15 @@ func TestSpawnAndStop(t *testing.T) { runner.client = newClient("localhost", 5557, runner.nodeID) runner.setTasks(tasks) - runner.spawn.setSpawn(10, 10) - runner.updateState(StateSpawning) + runner.setSpawnCount(10) + runner.setSpawnRate(10) go runner.start() // wait for spawning goroutines time.Sleep(2 * time.Second) - if atomic.LoadInt32(&runner.currentClientsNum) != 10 { - t.Error("Number of goroutines mismatches, expected: 10, current count", atomic.LoadInt32(&runner.currentClientsNum)) + if runner.controller.getCurrentClientsNum() != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count", runner.controller.getCurrentClientsNum()) } msg := <-runner.client.sendChannel() @@ -258,10 +256,8 @@ func TestStop(t *testing.T) { } tasks := []*Task{taskA} runner := newWorkerRunner("localhost", 5557) - runner.stopChan = make(chan bool) - runner.rebalance = make(chan bool) runner.setTasks(tasks) - runner.spawn.setSpawn(10, 10) + runner.reset() runner.updateState(StateSpawning) runner.stop() @@ -281,20 +277,21 @@ func TestOnSpawnMessage(t *testing.T) { defer runner.close() runner.client = newClient("localhost", 5557, runner.nodeID) runner.updateState(StateInit) + runner.reset() runner.setTasks([]*Task{taskA}) - runner.spawn.spawnCount = 100 - runner.spawn.spawnRate = 100 + runner.setSpawnCount(100) + runner.setSpawnRate(100) runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{ "spawn_count": 20, "spawn_rate": 20, }, runner.nodeID)) - if runner.spawn.spawnCount != 20 { - t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnCount) + if runner.getSpawnCount() != 20 { + t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnCount) } - if runner.spawn.spawnRate != 20 { - t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnRate) + if runner.getSpawnRate() != 20 { + t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnRate) } runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) @@ -309,9 +306,8 @@ func TestOnQuitMessage(t *testing.T) { <-runner.closeChan runner.updateState(StateRunning) + runner.reset() runner.closeChan = make(chan bool) - runner.stopChan = make(chan bool) - runner.rebalance = make(chan bool) runner.client.shutdownChan = make(chan bool) runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) <-runner.closeChan @@ -321,7 +317,7 @@ func TestOnQuitMessage(t *testing.T) { runner.updateState(StateStopped) runner.closeChan = make(chan bool) - runner.stopChan = make(chan bool) + runner.reset() runner.client.shutdownChan = make(chan bool) runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) <-runner.closeChan @@ -344,7 +340,6 @@ func TestOnMessage(t *testing.T) { tasks := []*Task{taskA, taskB} runner := newWorkerRunner("localhost", 5557) - defer runner.close() runner.client = newClient("localhost", 5557, runner.nodeID) runner.updateState(StateInit) runner.setTasks(tasks) @@ -364,8 +359,8 @@ func TestOnMessage(t *testing.T) { // spawn complete and running time.Sleep(2 * time.Second) - if atomic.LoadInt32(&runner.currentClientsNum) != 10 { - t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + if runner.controller.getCurrentClientsNum() != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count:", runner.controller.getCurrentClientsNum()) } msg = <-runner.client.sendChannel() if msg.Type != "spawning_complete" { @@ -376,22 +371,17 @@ func TestOnMessage(t *testing.T) { } // increase goroutines while running - runner.onMessage(newGenericMessage("spawn", map[string]int64{ + runner.onMessage(newGenericMessage("rebalance", map[string]int64{ "spawn_count": 15, "spawn_rate": 15, }, runner.nodeID)) - msg = <-runner.client.sendChannel() - if msg.Type != "spawning" { - t.Error("Runner should send spawning message when starting spawn, got", msg.Type) - } - time.Sleep(2 * time.Second) if runner.getState() != StateRunning { t.Error("State of runner is not running after spawn, got", runner.getState()) } - if atomic.LoadInt32(&runner.currentClientsNum) != 15 { - t.Error("Number of goroutines mismatches, expected: 20, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + if runner.controller.getCurrentClientsNum() != 15 { + t.Error("Number of goroutines mismatches, expected: 15, current count:", runner.controller.getCurrentClientsNum()) } // stop all the workers @@ -404,7 +394,7 @@ func TestOnMessage(t *testing.T) { t.Error("Runner should send client_stopped message, got", msg.Type) } - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) go runner.start() // spawn again @@ -420,8 +410,8 @@ func TestOnMessage(t *testing.T) { // spawn complete and running time.Sleep(3 * time.Second) - if atomic.LoadInt32(&runner.currentClientsNum) != 10 { - t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + if runner.controller.getCurrentClientsNum() != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count:", runner.controller.getCurrentClientsNum()) } if runner.getState() != StateRunning { t.Error("State of runner is not running after spawn, got", runner.getState()) @@ -440,13 +430,17 @@ func TestOnMessage(t *testing.T) { if msg.Type != "client_stopped" { t.Error("Runner should send client_stopped message, got", msg.Type) } + + // quit + runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) } func TestClientListener(t *testing.T) { runner := newMasterRunner("localhost", 5557) defer runner.close() runner.updateState(StateInit) - runner.spawn.setSpawn(10, 10) + runner.setSpawnCount(10) + runner.setSpawnRate(10) go runner.clientListener() runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3}) runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3}) @@ -496,7 +490,8 @@ func TestHeartbeatWorker(t *testing.T) { runner := newMasterRunner("localhost", 5557) defer runner.close() runner.updateState(StateInit) - runner.spawn.setSpawn(10, 10) + runner.setSpawnCount(10) + runner.setSpawnRate(10) runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 1, State: StateInit}) runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit}) go runner.clientListener() diff --git a/hrp/server.go b/hrp/server.go index dc51e8c4..68db4815 100644 --- a/hrp/server.go +++ b/hrp/server.go @@ -198,9 +198,16 @@ func (api *apiHandler) Stop(w http.ResponseWriter, r *http.Request) { } } - api.boomer.Stop() - resp := &CommonResponseBody{ - ServerStatus: EnumAPIResponseSuccess, + var resp *CommonResponseBody + err := api.boomer.Stop() + if err != nil { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseStopError(err.Error()), + } + } else { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + } } body, _ := json.Marshal(resp) writeJSON(w, body, http.StatusOK)