Files
httprunner/internal/boomer/runner.go
2021-12-22 20:45:53 +08:00

272 lines
4.9 KiB
Go

package boomer
import (
"fmt"
"math/rand"
"os"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog/log"
)
const (
stateInit = "ready"
stateSpawning = "spawning"
stateRunning = "running"
stateStopped = "stopped"
stateQuitting = "quitting"
)
const (
reportStatsInterval = 3 * time.Second
)
type runner struct {
state string
tasks []*Task
totalTaskWeight int
rateLimiter RateLimiter
rateLimitEnabled bool
stats *requestStats
numClients int32
spawnRate float64
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
outputs []Output
}
// safeRun runs fn and recovers from unexpected panics.
// it prevents panics from Task.Fn crashing boomer.
func (r *runner) safeRun(fn func()) {
defer func() {
// don't panic
err := recover()
if err != nil {
stackTrace := debug.Stack()
errMsg := fmt.Sprintf("%v", err)
os.Stderr.Write([]byte(errMsg))
os.Stderr.Write([]byte("\n"))
os.Stderr.Write(stackTrace)
}
}()
fn()
}
func (r *runner) addOutput(o Output) {
r.outputs = append(r.outputs, o)
}
func (r *runner) outputOnStart() {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnStart()
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) outputOnEevent(data map[string]interface{}) {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnEvent(data)
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) outputOnStop() {
size := len(r.outputs)
if size == 0 {
return
}
wg := sync.WaitGroup{}
wg.Add(size)
for _, output := range r.outputs {
go func(o Output) {
o.OnStop()
wg.Done()
}(output)
}
wg.Wait()
}
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Info().Int("spawnCount", spawnCount).Msg("Spawning clients immediately")
for i := 1; i <= spawnCount; i++ {
select {
case <-quit:
// quit spawning goroutine
return
default:
atomic.AddInt32(&r.numClients, 1)
go func() {
for {
select {
case <-quit:
return
default:
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
task := r.getTask()
r.safeRun(task.Fn)
}
} else {
task := r.getTask()
r.safeRun(task.Fn)
}
}
}
}()
}
}
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
}
// setTasks will set the runner's task list AND the total task weight
// which is used to get a random task later
func (r *runner) setTasks(t []*Task) {
r.tasks = t
weightSum := 0
for _, task := range r.tasks {
weightSum += task.Weight
}
r.totalTaskWeight = weightSum
}
func (r *runner) getTask() *Task {
tasksCount := len(r.tasks)
if tasksCount == 1 {
// Fast path
return r.tasks[0]
}
rs := rand.New(rand.NewSource(time.Now().UnixNano()))
totalWeight := r.totalTaskWeight
if totalWeight <= 0 {
// If all the tasks have not weights defined, they have the same chance to run
randNum := rs.Intn(tasksCount)
return r.tasks[randNum]
}
randNum := rs.Intn(totalWeight)
runningSum := 0
for _, task := range r.tasks {
runningSum += task.Weight
if runningSum > randNum {
return task
}
}
return nil
}
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
r.stats.clearStatsChan <- true
r.stopChan = make(chan bool)
r.numClients = 0
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
}
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
}
type localRunner struct {
runner
spawnCount int
}
func newLocalRunner(tasks []*Task, rateLimiter RateLimiter, spawnCount int, spawnRate float64) (r *localRunner) {
r = &localRunner{}
r.setTasks(tasks)
r.spawnRate = spawnRate
r.spawnCount = spawnCount
r.closeChan = make(chan bool)
if rateLimiter != nil {
r.rateLimitEnabled = true
r.rateLimiter = rateLimiter
}
r.stats = newRequestStats()
return r
}
func (r *localRunner) run() {
r.state = stateInit
r.stats.start()
r.outputOnStart()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case data := <-r.stats.messageToRunnerChan:
data["user_count"] = r.numClients
r.outputOnEevent(data)
case <-r.closeChan:
r.stop()
wg.Done()
r.outputOnStop()
return
}
}
}()
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.startSpawning(r.spawnCount, r.spawnRate, nil)
wg.Wait()
}
func (r *localRunner) close() {
if r.stats != nil {
r.stats.close()
}
close(r.closeChan)
}