package boomer import ( "fmt" "math/rand" "os" "runtime/debug" "sync" "sync/atomic" "time" "github.com/rs/zerolog/log" ) const ( stateInit = "ready" stateSpawning = "spawning" stateRunning = "running" stateStopped = "stopped" stateQuitting = "quitting" ) const ( reportStatsInterval = 3 * time.Second ) type runner struct { state string tasks []*Task totalTaskWeight int rateLimiter RateLimiter rateLimitEnabled bool stats *requestStats numClients int32 spawnRate float64 // all running workers(goroutines) will select on this channel. // close this channel will stop all running workers. stopChan chan bool // close this channel will stop all goroutines used in runner. closeChan chan bool outputs []Output } // safeRun runs fn and recovers from unexpected panics. // it prevents panics from Task.Fn crashing boomer. func (r *runner) safeRun(fn func()) { defer func() { // don't panic err := recover() if err != nil { stackTrace := debug.Stack() errMsg := fmt.Sprintf("%v", err) os.Stderr.Write([]byte(errMsg)) os.Stderr.Write([]byte("\n")) os.Stderr.Write(stackTrace) } }() fn() } func (r *runner) addOutput(o Output) { r.outputs = append(r.outputs, o) } func (r *runner) outputOnStart() { size := len(r.outputs) if size == 0 { return } wg := sync.WaitGroup{} wg.Add(size) for _, output := range r.outputs { go func(o Output) { o.OnStart() wg.Done() }(output) } wg.Wait() } func (r *runner) outputOnEevent(data map[string]interface{}) { size := len(r.outputs) if size == 0 { return } wg := sync.WaitGroup{} wg.Add(size) for _, output := range r.outputs { go func(o Output) { o.OnEvent(data) wg.Done() }(output) } wg.Wait() } func (r *runner) outputOnStop() { size := len(r.outputs) if size == 0 { return } wg := sync.WaitGroup{} wg.Add(size) for _, output := range r.outputs { go func(o Output) { o.OnStop() wg.Done() }(output) } wg.Wait() } func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { log.Info().Int("spawnCount", spawnCount).Msg("Spawning clients immediately") for i := 1; i <= spawnCount; i++ { select { case <-quit: // quit spawning goroutine return default: atomic.AddInt32(&r.numClients, 1) go func() { for { select { case <-quit: return default: if r.rateLimitEnabled { blocked := r.rateLimiter.Acquire() if !blocked { task := r.getTask() r.safeRun(task.Fn) } } else { task := r.getTask() r.safeRun(task.Fn) } } } }() } } if spawnCompleteFunc != nil { spawnCompleteFunc() } } // setTasks will set the runner's task list AND the total task weight // which is used to get a random task later func (r *runner) setTasks(t []*Task) { r.tasks = t weightSum := 0 for _, task := range r.tasks { weightSum += task.Weight } r.totalTaskWeight = weightSum } func (r *runner) getTask() *Task { tasksCount := len(r.tasks) if tasksCount == 1 { // Fast path return r.tasks[0] } rs := rand.New(rand.NewSource(time.Now().UnixNano())) totalWeight := r.totalTaskWeight if totalWeight <= 0 { // If all the tasks have not weights defined, they have the same chance to run randNum := rs.Intn(tasksCount) return r.tasks[randNum] } randNum := rs.Intn(totalWeight) runningSum := 0 for _, task := range r.tasks { runningSum += task.Weight if runningSum > randNum { return task } } return nil } func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { r.stats.clearStatsChan <- true r.stopChan = make(chan bool) r.numClients = 0 go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) } func (r *runner) stop() { // stop previous goroutines without blocking // those goroutines will exit when r.safeRun returns close(r.stopChan) if r.rateLimitEnabled { r.rateLimiter.Stop() } } type localRunner struct { runner spawnCount int } func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) { r = &localRunner{} r.setTasks(tasks) r.spawnRate = spawnRate r.spawnCount = spawnCount r.closeChan = make(chan bool) if rateLimiter != nil { r.rateLimitEnabled = true r.rateLimiter = rateLimiter } r.stats = newRequestStats() return r } func (r *localRunner) run() { r.state = stateInit r.stats.start() r.outputOnStart() wg := sync.WaitGroup{} wg.Add(1) go func() { for { select { case data := <-r.stats.messageToRunnerChan: data["user_count"] = r.numClients r.outputOnEevent(data) case <-r.closeChan: r.stop() wg.Done() r.outputOnStop() return } } }() if r.rateLimitEnabled { r.rateLimiter.Start() } r.startSpawning(r.spawnCount, r.spawnRate, nil) wg.Wait() } func (r *localRunner) close() { if r.stats != nil { r.stats.close() } close(r.closeChan) }