This commit is contained in:
xucong053
2022-05-22 12:44:18 +08:00
committed by 徐聪
parent df20035234
commit c5f482dbfa
7 changed files with 278 additions and 215 deletions

View File

@@ -71,7 +71,7 @@ var boomCmd = &cobra.Command{
if boomArgs.autoStart {
hrpBoomer.SetAutoStart()
hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait)
hrpBoomer.SetSpawnCount(boomArgs.SpawnCount)
hrpBoomer.SetSpawnCount(int64(boomArgs.SpawnCount))
hrpBoomer.SetSpawnRate(boomArgs.SpawnRate)
}
go hrpBoomer.StartServer()

View File

@@ -39,9 +39,6 @@ type Boomer struct {
testcasePath []string
spawnCount int // target clients to spawn
spawnRate float64
cpuProfile string
cpuProfileDuration time.Duration
@@ -86,8 +83,6 @@ func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
return &Boomer{
mode: StandaloneMode,
localRunner: newLocalRunner(spawnCount, spawnRate),
spawnCount: spawnCount,
spawnRate: spawnRate,
}
}
@@ -161,18 +156,26 @@ func (b *Boomer) GetState() int32 {
}
// SetSpawnCount sets spawn count
func (b *Boomer) SetSpawnCount(spawnCount int) {
b.spawnCount = spawnCount
if b.mode == DistributedMasterMode {
b.masterRunner.spawn.setSpawn(int64(spawnCount), -1)
func (b *Boomer) SetSpawnCount(spawnCount int64) {
switch b.mode {
case DistributedMasterMode:
b.masterRunner.setSpawnCount(spawnCount)
case DistributedWorkerMode:
b.workerRunner.setSpawnCount(spawnCount)
default:
b.localRunner.setSpawnCount(spawnCount)
}
}
// SetSpawnRate sets spawn rate
func (b *Boomer) SetSpawnRate(spawnRate float64) {
b.spawnRate = spawnRate
if b.mode == DistributedMasterMode {
b.masterRunner.spawn.setSpawn(-1, spawnRate)
switch b.mode {
case DistributedMasterMode:
b.masterRunner.setSpawnRate(spawnRate)
case DistributedWorkerMode:
b.workerRunner.setSpawnRate(spawnRate)
default:
b.localRunner.setSpawnRate(spawnRate)
}
}
@@ -242,11 +245,11 @@ func (b *Boomer) SetLoopCount(loopCount int64) {
// total loop count for testcase, it will be evenly distributed to each worker
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.spawn.getSpawnCount()}
b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.getSpawnCount()}
case DistributedMasterMode:
b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.spawn.getSpawnCount()}
b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.getSpawnCount()}
case StandaloneMode:
b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.spawn.getSpawnCount()}
b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.getSpawnCount()}
}
}
@@ -388,6 +391,9 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
// Start starts to run
func (b *Boomer) Start(Args map[string]interface{}) error {
if b.masterRunner.isStarted() {
return errors.New("already started")
}
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
@@ -395,7 +401,7 @@ func (b *Boomer) Start(Args map[string]interface{}) error {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(v)
b.SetSpawnCount(int64(v))
} else {
return errors.New("spawn count error")
}
@@ -423,6 +429,9 @@ func (b *Boomer) Start(Args map[string]interface{}) error {
// ReBalance starts to rebalance load test
func (b *Boomer) ReBalance(Args map[string]interface{}) error {
if !b.masterRunner.isStarted() {
return errors.New("no start")
}
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
@@ -430,7 +439,7 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(v)
b.SetSpawnCount(int64(v))
}
spawnRate, ok := Args["spawn_rate"]
if ok {
@@ -441,11 +450,6 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error {
}
b.SetSpawnRate(v)
}
path, ok := Args["path"].(string)
if ok {
paths := strings.Split(path, ",")
b.SetTestCasesPath(paths)
}
err := b.masterRunner.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
@@ -454,12 +458,8 @@ func (b *Boomer) ReBalance(Args map[string]interface{}) error {
}
// Stop stops to load test
func (b *Boomer) Stop() {
switch b.mode {
case DistributedMasterMode:
b.masterRunner.stop()
default:
}
func (b *Boomer) Stop() error {
return b.masterRunner.stop()
}
// GetWorkersInfo gets workers
@@ -493,22 +493,22 @@ func (b *Boomer) Quit() {
func (b *Boomer) GetSpawnDoneChan() chan struct{} {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.spawn.getSpawnDone()
return b.workerRunner.controller.getSpawnDone()
case DistributedMasterMode:
return b.masterRunner.spawn.getSpawnDone()
return b.masterRunner.controller.getSpawnDone()
default:
return b.localRunner.spawn.getSpawnDone()
return b.localRunner.controller.getSpawnDone()
}
}
func (b *Boomer) GetSpawnCount() int {
switch b.mode {
case DistributedWorkerMode:
return int(b.workerRunner.spawn.getSpawnCount())
return int(b.workerRunner.getSpawnCount())
case DistributedMasterMode:
return int(b.masterRunner.spawn.getSpawnCount())
return int(b.masterRunner.getSpawnCount())
default:
return int(b.localRunner.spawn.getSpawnCount())
return int(b.localRunner.getSpawnCount())
}
}

View File

@@ -12,11 +12,11 @@ import (
func TestNewStandaloneBoomer(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
if b.localRunner.spawn.spawnCount != 100 {
if b.localRunner.spawnCount != 100 {
t.Error("spawnCount should be 100")
}
if b.localRunner.spawn.spawnRate != 10 {
if b.localRunner.spawnRate != 10 {
t.Error("spawnRate should be 10")
}
}

View File

@@ -169,7 +169,7 @@ type statsEntryOutput struct {
}
type dataOutput struct {
UserCount int32 `json:"user_count"`
UserCount int64 `json:"user_count"`
State int32 `json:"state"`
TotalStats *statsEntryOutput `json:"stats_total"`
TransactionsPassed int64 `json:"transactions_passed"`
@@ -186,7 +186,7 @@ type dataOutput struct {
}
func convertData(data map[string]interface{}) (output *dataOutput, err error) {
userCount, ok := data["user_count"].(int32)
userCount, ok := data["user_count"].(int64)
if !ok {
return nil, fmt.Errorf("user_count is not int32")
}

View File

@@ -57,82 +57,106 @@ func (l *Loop) increaseFinishedCount() {
atomic.AddInt64(&l.finishedCount, 1)
}
type SpawnInfo struct {
mutex sync.RWMutex
spawnCount int64 // target clients to spawn
acquiredCount int64 // count acquired of workers
spawnRate float64
spawnDone chan struct{}
type Controller struct {
mutex sync.RWMutex
once sync.Once
currentClientsNum int64 // current clients count
spawnCount int64 // target clients to spawn
spawnRate float64
spawnDone chan struct{}
tasks []*Task
}
func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (c *Controller) setSpawn(spawnCount int64, spawnRate float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
if spawnCount > 0 {
atomic.StoreInt64(&s.spawnCount, spawnCount)
atomic.StoreInt64(&c.spawnCount, spawnCount)
}
if spawnRate > 0 {
s.spawnRate = spawnRate
c.spawnRate = spawnRate
}
}
func (s *SpawnInfo) getSpawnCount() int64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return atomic.LoadInt64(&s.spawnCount)
func (c *Controller) setSpawnCount(spawnCount int64) {
if spawnCount > 0 {
atomic.StoreInt64(&c.spawnCount, spawnCount)
}
}
func (s *SpawnInfo) getSpawnRate() float64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.spawnRate
func (c *Controller) setSpawnRate(spawnRate float64) {
c.mutex.Lock()
defer c.mutex.Unlock()
if spawnRate > 0 {
c.spawnRate = spawnRate
}
}
func (s *SpawnInfo) getSpawnDone() chan struct{} {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.spawnDone
func (c *Controller) getSpawnCount() int64 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return atomic.LoadInt64(&c.spawnCount)
}
func (s *SpawnInfo) done() {
close(s.spawnDone)
func (c *Controller) getSpawnRate() float64 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.spawnRate
}
func (s *SpawnInfo) isFinished() bool {
func (c *Controller) getSpawnDone() chan struct{} {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.spawnDone
}
func (c *Controller) getCurrentClientsNum() int64 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return atomic.LoadInt64(&c.currentClientsNum)
}
func (c *Controller) spawnCompete() {
close(c.spawnDone)
}
func (c *Controller) isFinished() bool {
// return true when workers acquired
return atomic.LoadInt64(&s.acquiredCount) == atomic.LoadInt64(&s.spawnCount)
return atomic.LoadInt64(&c.currentClientsNum) == atomic.LoadInt64(&c.spawnCount)
}
func (s *SpawnInfo) acquire() bool {
func (c *Controller) acquire() bool {
// get one ticket when there are still remaining spawn count to test
// return true when getting ticket successfully
if atomic.LoadInt64(&s.acquiredCount) < atomic.LoadInt64(&s.spawnCount) {
atomic.AddInt64(&s.acquiredCount, 1)
if atomic.LoadInt64(&c.currentClientsNum) < atomic.LoadInt64(&c.spawnCount) {
atomic.AddInt64(&c.currentClientsNum, 1)
return true
}
return false
}
func (s *SpawnInfo) erase() bool {
func (c *Controller) erase() bool {
// return true if acquiredCount > spawnCount
if atomic.LoadInt64(&s.acquiredCount) > atomic.LoadInt64(&s.spawnCount) {
atomic.AddInt64(&s.acquiredCount, -1)
if atomic.LoadInt64(&c.currentClientsNum) > atomic.LoadInt64(&c.spawnCount) {
atomic.AddInt64(&c.currentClientsNum, -1)
return true
}
return false
}
func (s *SpawnInfo) increaseFinishedCount() {
atomic.AddInt64(&s.acquiredCount, -1)
func (c *Controller) increaseFinishedCount() {
atomic.AddInt64(&c.currentClientsNum, -1)
}
func (s *SpawnInfo) reset() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.spawnCount = 0
s.spawnRate = 0
s.acquiredCount = 0
s.spawnDone = make(chan struct{})
func (c *Controller) reset() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.spawnCount = 0
c.spawnRate = 0
c.currentClientsNum = 0
c.spawnDone = make(chan struct{})
c.tasks = []*Task{}
c.once = sync.Once{}
}
type runner struct {
@@ -146,9 +170,11 @@ type runner struct {
rateLimitEnabled bool
stats *requestStats
currentClientsNum int32 // current clients count
spawn *SpawnInfo
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
spawnCount int64 // target clients to spawn
spawnRate float64
controller *Controller
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
// when this channel is closed, all statistics are reported successfully
reportedChan chan bool
@@ -168,6 +194,28 @@ type runner struct {
once *sync.Once
}
func (r *runner) setSpawnRate(spawnRate float64) {
r.mutex.Lock()
defer r.mutex.Unlock()
if spawnRate > 0 {
r.spawnRate = spawnRate
}
}
func (r *runner) getSpawnRate() float64 {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.spawnRate
}
func (r *runner) getSpawnCount() int64 {
return atomic.LoadInt64(&r.spawnCount)
}
func (r *runner) setSpawnCount(spawnCount int64) {
atomic.StoreInt64(&r.spawnCount, spawnCount)
}
// safeRun runs fn and recovers from unexpected panics.
// it prevents panics from Task.Fn crashing boomer.
func (r *runner) safeRun(fn func()) {
@@ -239,7 +287,7 @@ func (r *runner) outputOnStop() {
func (r *runner) reportStats() {
data := r.stats.collectReportData()
data["user_count"] = atomic.LoadInt32(&r.currentClientsNum)
data["user_count"] = r.controller.getCurrentClientsNum()
data["state"] = atomic.LoadInt32(&r.state)
r.outputOnEvent(data)
}
@@ -255,7 +303,7 @@ func (r *runner) reportTestResult() {
currentTime := time.Now()
println(fmt.Sprint("=========================================== Statistics Summary =========================================="))
println(fmt.Sprintf("Current time: %s, Users: %v, Duration: %v, Accumulated Transactions: %d Passed, %d Failed",
currentTime.Format("2006/01/02 15:04:05"), atomic.LoadInt32(&r.currentClientsNum), duration, r.stats.transactionPassed, r.stats.transactionFailed))
currentTime.Format("2006/01/02 15:04:05"), r.controller.getCurrentClientsNum(), duration, r.stats.transactionPassed, r.stats.transactionFailed))
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Name", "# requests", "# fails", "Median", "Average", "Min", "Max", "Content Size", "# reqs/sec", "# fails/sec"})
row := make([]string, 10)
@@ -274,11 +322,13 @@ func (r *runner) reportTestResult() {
println()
}
func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) {
r.spawn.reset()
atomic.StoreInt32(&r.currentClientsNum, 0)
go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc)
func (r *runner) reset() {
r.updateState(StateInit)
r.controller.reset()
r.stats.clearAll()
r.rebalance = make(chan bool)
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
}
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
@@ -287,7 +337,7 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
r.spawn.setSpawn(spawnCount, spawnRate)
r.controller.setSpawn(spawnCount, spawnRate)
r.updateState(StateSpawning)
for {
@@ -297,23 +347,21 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
log.Info().Msg("Quitting spawning workers")
return
default:
if r.isStarted() && r.spawn.acquire() {
if r.isStarted() && r.controller.acquire() {
// spawn workers with rate limit
sleepTime := time.Duration(1000000/r.spawn.getSpawnRate()) * time.Microsecond
sleepTime := time.Duration(1000000/r.controller.getSpawnRate()) * time.Microsecond
time.Sleep(sleepTime)
// loop count per worker
var workerLoop *Loop
if r.loop != nil {
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.spawn.spawnCount}
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.controller.spawnCount}
}
atomic.AddInt32(&r.currentClientsNum, 1)
go func() {
for {
select {
case <-quit:
atomic.AddInt64(&r.spawn.acquiredCount, -1)
atomic.AddInt32(&r.currentClientsNum, -1)
atomic.AddInt64(&r.controller.currentClientsNum, -1)
return
default:
if workerLoop != nil && !workerLoop.acquire() {
@@ -336,25 +384,31 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
workerLoop.increaseFinishedCount()
if r.loop.isFinished() {
r.stop()
close(r.rebalance)
}
}
if r.spawn.erase() {
atomic.AddInt32(&r.currentClientsNum, -1)
if r.controller.erase() {
return
}
}
}
}()
} else if r.getState() == StateSpawning {
continue
}
r.controller.once.Do(func() {
// spawning compete
r.spawn.done()
r.controller.spawnCompete()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
r.updateState(StateRunning)
} else {
// continue if rebalance
<-r.rebalance
})
<-r.rebalance
if r.isStarted() {
// rebalance spawn count
r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate())
}
}
}
@@ -425,6 +479,7 @@ func (r *runner) statsStart() {
// close reportedChan and return if the last stats is reported successfully
if !r.isStarted() {
close(r.reportedChan)
log.Info().Msg("Quitting statsStart")
return
}
}
@@ -460,38 +515,28 @@ type localRunner struct {
func newLocalRunner(spawnCount int, spawnRate float64) *localRunner {
return &localRunner{
runner: runner{
state: StateInit,
stats: newRequestStats(),
outputs: make([]Output, 0),
spawn: &SpawnInfo{
spawnCount: int64(spawnCount),
spawnRate: spawnRate,
spawnDone: make(chan struct{}),
},
reportedChan: make(chan bool),
stopChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
state: StateInit,
stats: newRequestStats(),
spawnCount: int64(spawnCount),
spawnRate: spawnRate,
controller: &Controller{},
outputs: make([]Output, 0),
closeChan: make(chan bool),
once: &sync.Once{},
},
}
}
func (r *localRunner) start() {
// init state
r.updateState(StateInit)
atomic.StoreInt32(&r.currentClientsNum, 0)
r.stats.clearAll()
// init localRunner
r.reset()
// start rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.rebalance = make(chan bool)
go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil)
r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil)
// output setup
r.outputOnStart()
@@ -548,12 +593,10 @@ type workerRunner struct {
func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
r = &workerRunner{
runner: runner{
stats: newRequestStats(),
spawn: &SpawnInfo{
spawnDone: make(chan struct{}),
},
closeChan: make(chan bool),
once: &sync.Once{},
stats: newRequestStats(),
controller: &Controller{},
closeChan: make(chan bool),
once: &sync.Once{},
},
masterHost: masterHost,
masterPort: masterPort,
@@ -566,7 +609,7 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
func (r *workerRunner) spawnComplete() {
data := make(map[string]int64)
data["count"] = r.spawn.getSpawnCount()
data["count"] = r.controller.getSpawnCount()
r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID)
}
@@ -574,11 +617,11 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID)
spawnCount, ok := msg.Data["spawn_count"]
if ok {
r.spawn.setSpawn(spawnCount, -1)
r.setSpawnCount(spawnCount)
}
spawnRate, ok := msg.Data["spawn_rate"]
if ok {
r.spawn.setSpawn(-1, float64(spawnRate))
r.setSpawnRate(float64(spawnRate))
}
if msg.Tasks != nil {
r.testCaseBytes <- msg.Tasks
@@ -586,6 +629,19 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
log.Info().Msg("on spawn message successful")
}
func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
spawnCount, ok := msg.Data["spawn_count"]
if ok {
r.setSpawnCount(spawnCount)
}
spawnRate, ok := msg.Data["spawn_rate"]
if ok {
r.setSpawnRate(float64(spawnRate))
}
r.rebalance <- true
log.Info().Msg("on rebalance message successful")
}
// Runner acts as a state machine.
func (r *workerRunner) onMessage(msg *genericMessage) {
switch r.getState() {
@@ -602,7 +658,8 @@ func (r *workerRunner) onMessage(msg *genericMessage) {
switch msg.Type {
case "spawn":
r.onSpawnMessage(msg)
r.rebalance <- true
case "rebalance":
r.onRebalanceMessage(msg)
case "stop":
r.stop()
log.Info().Msg("Recv stop message from master, all the goroutines are stopped")
@@ -679,7 +736,7 @@ func (r *workerRunner) run() {
data := map[string]int64{
"state": int64(r.getState()),
"current_cpu_usage": int64(CPUUsage),
"spawn_count": int64(atomic.LoadInt32(&r.currentClientsNum)),
"spawn_count": r.controller.getCurrentClientsNum(),
}
r.client.sendChannel() <- newGenericMessage("heartbeat", data, r.nodeID)
case <-r.closeChan:
@@ -692,23 +749,19 @@ func (r *workerRunner) run() {
// start load test
func (r *workerRunner) start() {
r.stats.clearAll()
r.reset()
// start rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.rebalance = make(chan bool)
r.once.Do(r.outputOnStart)
r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete)
r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete)
// start stats report
go r.runner.statsStart()
go r.statsStart()
<-r.reportedChan
@@ -718,12 +771,8 @@ func (r *workerRunner) start() {
func (r *workerRunner) stop() {
if r.isStarted() {
close(r.stopChan)
r.runner.stop()
close(r.rebalance)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
r.updateState(StateStopped)
}
}
@@ -763,24 +812,19 @@ type masterRunner struct {
expectWorkersMaxWait int
parseTestCasesChan chan bool
startFlag bool
testCaseBytes chan []byte
}
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
return &masterRunner{
runner: runner{
state: StateInit,
spawn: &SpawnInfo{
spawnDone: make(chan struct{}),
},
state: StateInit,
closeChan: make(chan bool),
},
masterBindHost: masterBindHost,
masterBindPort: masterBindPort,
server: newServer(masterBindHost, masterBindPort),
parseTestCasesChan: make(chan bool),
startFlag: false,
testCaseBytes: make(chan []byte),
}
}
@@ -946,15 +990,15 @@ func (r *masterRunner) start() error {
if numWorkers == 0 {
return errors.New("current workers: 0")
}
workerSpawnRate := r.spawn.spawnRate / float64(numWorkers)
workerSpawnCount := r.spawn.getSpawnCount() / int64(numWorkers)
workerSpawnRate := r.getSpawnRate() / float64(numWorkers)
workerSpawnCount := r.getSpawnCount() / int64(numWorkers)
log.Info().Msg("send spawn data to worker")
r.updateState(StateSpawning)
// waitting to fetch testcase
testcase, ok := r.fetchTestCase()
if !ok {
return errors.New("starting, do not retry frequently")
testcase, err := r.fetchTestCase()
if err != nil {
return err
}
r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{
"spawn_count": workerSpawnCount,
@@ -965,27 +1009,44 @@ func (r *masterRunner) start() error {
return nil
}
func (r *masterRunner) fetchTestCase() ([]byte, bool) {
if r.startFlag {
return nil, false
}
r.startFlag = true
defer func() {
r.startFlag = false
}()
r.parseTestCasesChan <- true
return <-r.testCaseBytes, true
}
func (r *masterRunner) rebalance() error {
return r.start()
numWorkers := r.server.getClientsLength()
if numWorkers == 0 {
return errors.New("current workers: 0")
}
workerSpawnRate := r.getSpawnRate() / float64(numWorkers)
workerSpawnCount := r.getSpawnCount() / int64(numWorkers)
r.server.sendChannel() <- newSpawnMessageToWorker("rebalance", map[string]int64{
"spawn_count": workerSpawnCount,
"spawn_rate": int64(workerSpawnRate),
}, nil)
println("send rebalance data to worker successful")
return nil
}
func (r *masterRunner) stop() {
func (r *masterRunner) fetchTestCase() ([]byte, error) {
ticker := time.NewTicker(30 * time.Second)
if len(r.testCaseBytes) > 0 {
<-r.testCaseBytes
}
r.parseTestCasesChan <- true
select {
case <-ticker.C:
return nil, errors.New("parse testcases timeout")
case tcb := <-r.testCaseBytes:
return tcb, nil
}
}
func (r *masterRunner) stop() error {
if r.isStarted() {
r.updateState(StateStopping)
r.server.sendChannel() <- &genericMessage{Type: "stop", Data: map[string]int64{}}
r.updateState(StateStopped)
return nil
} else {
return errors.New("already stopped")
}
}

View File

@@ -127,13 +127,12 @@ func TestSpawnWorkers(t *testing.T) {
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.reset()
runner.setTasks(tasks)
runner.stopChan = make(chan bool)
runner.rebalance = make(chan bool)
go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete)
time.Sleep(2 * time.Second)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
currentClients := runner.controller.getCurrentClientsNum()
if currentClients != 10 {
t.Error("Unexpected count", currentClients)
}
@@ -163,17 +162,16 @@ func TestSpawnWorkersWithManyTasks(t *testing.T) {
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.reset()
runner.setTasks(tasks)
runner.client = newClient("localhost", 5557, runner.nodeID)
const numToSpawn int64 = 30
runner.stopChan = make(chan bool)
runner.rebalance = make(chan bool)
go runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete)
time.Sleep(2 * time.Second)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
currentClients := runner.controller.getCurrentClientsNum()
assert.Equal(t, numToSpawn, int64(currentClients))
lock.Lock()
@@ -226,15 +224,15 @@ func TestSpawnAndStop(t *testing.T) {
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.setTasks(tasks)
runner.spawn.setSpawn(10, 10)
runner.updateState(StateSpawning)
runner.setSpawnCount(10)
runner.setSpawnRate(10)
go runner.start()
// wait for spawning goroutines
time.Sleep(2 * time.Second)
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count", atomic.LoadInt32(&runner.currentClientsNum))
if runner.controller.getCurrentClientsNum() != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count", runner.controller.getCurrentClientsNum())
}
msg := <-runner.client.sendChannel()
@@ -258,10 +256,8 @@ func TestStop(t *testing.T) {
}
tasks := []*Task{taskA}
runner := newWorkerRunner("localhost", 5557)
runner.stopChan = make(chan bool)
runner.rebalance = make(chan bool)
runner.setTasks(tasks)
runner.spawn.setSpawn(10, 10)
runner.reset()
runner.updateState(StateSpawning)
runner.stop()
@@ -281,20 +277,21 @@ func TestOnSpawnMessage(t *testing.T) {
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.updateState(StateInit)
runner.reset()
runner.setTasks([]*Task{taskA})
runner.spawn.spawnCount = 100
runner.spawn.spawnRate = 100
runner.setSpawnCount(100)
runner.setSpawnRate(100)
runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 20,
"spawn_rate": 20,
}, runner.nodeID))
if runner.spawn.spawnCount != 20 {
t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnCount)
if runner.getSpawnCount() != 20 {
t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnCount)
}
if runner.spawn.spawnRate != 20 {
t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnRate)
if runner.getSpawnRate() != 20 {
t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.controller.spawnRate)
}
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
@@ -309,9 +306,8 @@ func TestOnQuitMessage(t *testing.T) {
<-runner.closeChan
runner.updateState(StateRunning)
runner.reset()
runner.closeChan = make(chan bool)
runner.stopChan = make(chan bool)
runner.rebalance = make(chan bool)
runner.client.shutdownChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
@@ -321,7 +317,7 @@ func TestOnQuitMessage(t *testing.T) {
runner.updateState(StateStopped)
runner.closeChan = make(chan bool)
runner.stopChan = make(chan bool)
runner.reset()
runner.client.shutdownChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
@@ -344,7 +340,6 @@ func TestOnMessage(t *testing.T) {
tasks := []*Task{taskA, taskB}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.updateState(StateInit)
runner.setTasks(tasks)
@@ -364,8 +359,8 @@ func TestOnMessage(t *testing.T) {
// spawn complete and running
time.Sleep(2 * time.Second)
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
if runner.controller.getCurrentClientsNum() != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", runner.controller.getCurrentClientsNum())
}
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
@@ -376,22 +371,17 @@ func TestOnMessage(t *testing.T) {
}
// increase goroutines while running
runner.onMessage(newGenericMessage("spawn", map[string]int64{
runner.onMessage(newGenericMessage("rebalance", map[string]int64{
"spawn_count": 15,
"spawn_rate": 15,
}, runner.nodeID))
msg = <-runner.client.sendChannel()
if msg.Type != "spawning" {
t.Error("Runner should send spawning message when starting spawn, got", msg.Type)
}
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
if atomic.LoadInt32(&runner.currentClientsNum) != 15 {
t.Error("Number of goroutines mismatches, expected: 20, current count:", atomic.LoadInt32(&runner.currentClientsNum))
if runner.controller.getCurrentClientsNum() != 15 {
t.Error("Number of goroutines mismatches, expected: 15, current count:", runner.controller.getCurrentClientsNum())
}
// stop all the workers
@@ -404,7 +394,7 @@ func TestOnMessage(t *testing.T) {
t.Error("Runner should send client_stopped message, got", msg.Type)
}
time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)
go runner.start()
// spawn again
@@ -420,8 +410,8 @@ func TestOnMessage(t *testing.T) {
// spawn complete and running
time.Sleep(3 * time.Second)
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
if runner.controller.getCurrentClientsNum() != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", runner.controller.getCurrentClientsNum())
}
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
@@ -440,13 +430,17 @@ func TestOnMessage(t *testing.T) {
if msg.Type != "client_stopped" {
t.Error("Runner should send client_stopped message, got", msg.Type)
}
// quit
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
}
func TestClientListener(t *testing.T) {
runner := newMasterRunner("localhost", 5557)
defer runner.close()
runner.updateState(StateInit)
runner.spawn.setSpawn(10, 10)
runner.setSpawnCount(10)
runner.setSpawnRate(10)
go runner.clientListener()
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3})
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3})
@@ -496,7 +490,8 @@ func TestHeartbeatWorker(t *testing.T) {
runner := newMasterRunner("localhost", 5557)
defer runner.close()
runner.updateState(StateInit)
runner.spawn.setSpawn(10, 10)
runner.setSpawnCount(10)
runner.setSpawnRate(10)
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 1, State: StateInit})
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit})
go runner.clientListener()

View File

@@ -198,9 +198,16 @@ func (api *apiHandler) Stop(w http.ResponseWriter, r *http.Request) {
}
}
api.boomer.Stop()
resp := &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
var resp *CommonResponseBody
err := api.boomer.Stop()
if err != nil {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseStopError(err.Error()),
}
} else {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
}
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)