fix: unittest

This commit is contained in:
xucong053
2022-05-19 13:02:41 +08:00
committed by 徐聪
parent 19c6b0821f
commit b795558543
3 changed files with 82 additions and 82 deletions

View File

@@ -45,9 +45,28 @@ var boomCmd = &cobra.Command{
}
}
// init boomer
var hrpBoomer *hrp.HRPBoomer
if boomArgs.master {
hrpBoomer = hrp.NewMasterBoomer(boomArgs.masterBindHost, boomArgs.masterBindPort)
} else if boomArgs.worker {
hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort)
} else {
hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
}
hrpBoomer.EnableGracefulQuit()
// init output
if !boomArgs.DisableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput())
}
if boomArgs.PrometheusPushgatewayURL != "" {
hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode()))
}
// run boomer
switch hrpBoomer.GetMode() {
case "master":
hrpBoomer.SetTestCasesPath(args)
if boomArgs.autoStart {
hrpBoomer.SetAutoStart()
@@ -55,43 +74,28 @@ var boomCmd = &cobra.Command{
hrpBoomer.SetSpawnCount(boomArgs.SpawnCount)
hrpBoomer.SetSpawnRate(boomArgs.SpawnRate)
}
hrpBoomer.EnableGracefulQuit()
go hrpBoomer.StartServer()
go hrpBoomer.RunMaster()
hrpBoomer.LoopTestCases()
return
} else if boomArgs.worker {
hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort)
case "worker":
if boomArgs.ignoreQuit {
hrpBoomer.SetIgnoreQuit()
}
go hrpBoomer.RunWorker()
} else {
hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
hrpBoomer.LoopTasks()
case "standalone":
if boomArgs.LoopCount > 0 {
hrpBoomer.SetLoopCount(boomArgs.LoopCount)
}
}
hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate)
if !boomArgs.DisableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput())
}
if boomArgs.PrometheusPushgatewayURL != "" {
hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode()))
}
hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive)
hrpBoomer.SetDisableCompression(boomArgs.DisableCompression)
hrpBoomer.SetClientTransport()
if venv != "" {
hrpBoomer.SetPython3Venv(venv)
}
hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration)
hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration)
hrpBoomer.EnableGracefulQuit()
if boomArgs.worker {
hrpBoomer.LoopTasks()
} else {
hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate)
hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive)
hrpBoomer.SetDisableCompression(boomArgs.DisableCompression)
hrpBoomer.SetClientTransport()
if venv != "" {
hrpBoomer.SetPython3Venv(venv)
}
hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration)
hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration)
hrpBoomer.Run(paths...)
}
},

View File

@@ -58,12 +58,11 @@ func (l *Loop) increaseFinishedCount() {
}
type SpawnInfo struct {
mutex sync.RWMutex
spawnCount int64 // target clients to spawn
acquiredCount int64 // count acquired of workers
spawnRate float64
spawnDone chan struct{}
mutex sync.RWMutex
}
func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) {
@@ -154,6 +153,9 @@ type runner struct {
// when this channel is closed, all statistics are reported successfully
reportedChan chan bool
// rebalance spawn
rebalance chan bool
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
@@ -273,12 +275,7 @@ func (r *runner) reportTestResult() {
}
func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) {
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.spawn.reset()
r.spawn.setSpawn(spawnCount, spawnRate)
atomic.StoreInt32(&r.currentClientsNum, 0)
go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc)
@@ -290,6 +287,8 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
r.spawn.setSpawn(spawnCount, spawnRate)
r.updateState(StateSpawning)
for {
select {
@@ -306,7 +305,7 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
// loop count per worker
var workerLoop *Loop
if r.loop != nil {
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawn.spawnCount)}
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / r.spawn.spawnCount}
}
atomic.AddInt32(&r.currentClientsNum, 1)
go func() {
@@ -343,23 +342,19 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
atomic.AddInt32(&r.currentClientsNum, -1)
return
}
if !r.isStarted() {
atomic.AddInt64(&r.spawn.acquiredCount, -1)
atomic.AddInt32(&r.currentClientsNum, -1)
return
}
}
}
}()
} else {
if r.getState() == StateSpawning {
r.spawn.done()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
r.updateState(StateRunning)
} else if r.getState() == StateSpawning {
// spawning compete
r.spawn.done()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
time.Sleep(1 * time.Second)
r.updateState(StateRunning)
} else {
// continue if rebalance
<-r.rebalance
}
}
}
@@ -492,6 +487,10 @@ func (r *localRunner) start() {
r.rateLimiter.Start()
}
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.rebalance = make(chan bool)
go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil)
// output setup
@@ -525,6 +524,7 @@ func (r *localRunner) start() {
func (r *localRunner) stop() {
if r.runner.isStarted() {
r.runner.stop()
close(r.rebalance)
}
}
@@ -542,8 +542,6 @@ type workerRunner struct {
// get testcase from master
testCaseBytes chan []byte
startFlag bool
ignoreQuit bool
}
@@ -554,10 +552,8 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
spawn: &SpawnInfo{
spawnDone: make(chan struct{}),
},
stopChan: make(chan bool),
reportedChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
closeChan: make(chan bool),
once: &sync.Once{},
},
masterHost: masterHost,
masterPort: masterPort,
@@ -572,7 +568,6 @@ func (r *workerRunner) spawnComplete() {
data := make(map[string]int64)
data["count"] = r.spawn.getSpawnCount()
r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID)
r.updateState(StateRunning)
}
func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
@@ -607,6 +602,7 @@ func (r *workerRunner) onMessage(msg *genericMessage) {
switch msg.Type {
case "spawn":
r.onSpawnMessage(msg)
r.rebalance <- true
case "stop":
r.stop()
log.Info().Msg("Recv stop message from master, all the goroutines are stopped")
@@ -644,7 +640,7 @@ func (r *workerRunner) startListener() {
}
}
// run starts service
// run worker service
func (r *workerRunner) run() {
r.updateState(StateInit)
r.client = newClient(r.masterHost, r.masterPort, r.nodeID)
@@ -694,11 +690,8 @@ func (r *workerRunner) run() {
<-r.closeChan
}
// start load test
func (r *workerRunner) start() {
r.startFlag = true
defer func() {
r.startFlag = false
}()
r.stats.clearAll()
// start rate limiter
@@ -706,6 +699,10 @@ func (r *workerRunner) start() {
r.rateLimiter.Start()
}
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.rebalance = make(chan bool)
r.once.Do(r.outputOnStart)
r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete)
@@ -722,6 +719,7 @@ func (r *workerRunner) start() {
func (r *workerRunner) stop() {
if r.isStarted() {
close(r.stopChan)
close(r.rebalance)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
@@ -735,9 +733,8 @@ func (r *workerRunner) close() {
if r.ignoreQuit {
return
}
for r.startFlag == true {
time.Sleep(1 * time.Second)
}
// waiting report finished
time.Sleep(3 * time.Second)
close(r.closeChan)
var ticker = time.NewTicker(1 * time.Second)
if r.client != nil {
@@ -768,8 +765,6 @@ type masterRunner struct {
parseTestCasesChan chan bool
startFlag bool
testCaseBytes chan []byte
mutex sync.Mutex
}
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {

View File

@@ -107,9 +107,8 @@ func TestLoopCount(t *testing.T) {
runner := newLocalRunner(2, 2)
runner.loop = &Loop{loopCount: 4}
runner.setTasks(tasks)
go runner.start()
<-runner.stopChan
if !assert.Equal(t, runner.loop.loopCount, atomic.LoadInt64(&runner.loop.finishedCount)) {
runner.start()
if !assert.Equal(t, atomic.LoadInt64(&runner.loop.loopCount), atomic.LoadInt64(&runner.loop.finishedCount)) {
t.Fatal()
}
}
@@ -129,8 +128,9 @@ func TestSpawnWorkers(t *testing.T) {
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.setTasks(tasks)
runner.stopChan = make(chan bool)
go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete)
time.Sleep(10 * time.Millisecond)
time.Sleep(2 * time.Second)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
if currentClients != 10 {
@@ -166,13 +166,14 @@ func TestSpawnWorkersWithManyTasks(t *testing.T) {
runner.client = newClient("localhost", 5557, runner.nodeID)
const numToSpawn int64 = 30
runner.stopChan = make(chan bool)
runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete)
go runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete)
time.Sleep(2 * time.Second)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
assert.Equal(t, numToSpawn, int(currentClients))
assert.Equal(t, numToSpawn, int64(currentClients))
lock.Lock()
hundreds := taskCalls["one hundred"]
tens := taskCalls["ten"]
@@ -255,6 +256,7 @@ func TestStop(t *testing.T) {
}
tasks := []*Task{taskA}
runner := newWorkerRunner("localhost", 5557)
runner.stopChan = make(chan bool)
runner.setTasks(tasks)
runner.spawn.setSpawn(10, 10)
runner.updateState(StateSpawning)
@@ -358,9 +360,6 @@ func TestOnMessage(t *testing.T) {
// spawn complete and running
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
}
@@ -368,6 +367,9 @@ func TestOnMessage(t *testing.T) {
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type)
}
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
// increase goroutines while running
runner.onMessage(newGenericMessage("spawn", map[string]int64{
@@ -381,10 +383,6 @@ func TestOnMessage(t *testing.T) {
}
time.Sleep(2 * time.Second)
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message, got", msg.Type)
}
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
@@ -402,6 +400,9 @@ func TestOnMessage(t *testing.T) {
t.Error("Runner should send client_stopped message, got", msg.Type)
}
time.Sleep(3 * time.Second)
go runner.start()
// spawn again
runner.onMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 10,
@@ -414,13 +415,13 @@ func TestOnMessage(t *testing.T) {
}
// spawn complete and running
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
time.Sleep(3 * time.Second)
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
}
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type)