From 0f7ea53e56b5aef80b3a1d0cd290864fe289d522 Mon Sep 17 00:00:00 2001 From: buyuxiang <347586493@qq.com> Date: Thu, 17 Feb 2022 12:39:55 +0800 Subject: [PATCH] fix: broadcast to all rendezvous at once when spawn done Change-Id: Ic04bc2f56aa40c2af42a1fdb3c261a10e8346033 --- boomer.go | 17 +++++++++++------ docs/CHANGELOG.md | 1 + internal/boomer/boomer.go | 4 ++-- internal/boomer/runner.go | 5 +++-- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/boomer.go b/boomer.go index c9baf9dd..ab4d7655 100644 --- a/boomer.go +++ b/boomer.go @@ -26,6 +26,7 @@ type HRPBoomer struct { plugins []common.Plugin // each task has its own plugin process pluginsMutex *sync.RWMutex // avoid data race debug bool + once sync.Once } // SetDebug configures whether to log HTTP request and response content. @@ -57,7 +58,7 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { panic(err) } rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount())) - task := b.convertBoomerTask(testcase) + task := b.convertBoomerTask(testcase, rendezvousList) taskSlice = append(taskSlice, task) waitRendezvous(rendezvousList) } @@ -74,7 +75,7 @@ func (b *HRPBoomer) Quit() { b.Boomer.Quit() } -func (b *HRPBoomer) convertBoomerTask(testcase *TestCase) *boomer.Task { +func (b *HRPBoomer) convertBoomerTask(testcase *TestCase, rendezvousList []*Rendezvous) *boomer.Task { hrpRunner := NewRunner(nil).SetDebug(b.debug) config := testcase.Config @@ -86,6 +87,14 @@ func (b *HRPBoomer) convertBoomerTask(testcase *TestCase) *boomer.Task { b.pluginsMutex.Unlock() } + // broadcast to all rendezvous at once when spawn done + go func() { + <-b.GetSpawnDoneChan() + for _, rendezvous := range rendezvousList { + rendezvous.setSpawnDone() + } + }() + return &boomer.Task{ Name: config.Name, Weight: config.Weight, @@ -149,10 +158,6 @@ func (b *HRPBoomer) convertBoomerTask(testcase *TestCase) *boomer.Task { } else if stepData.StepType == stepTypeRendezvous { // rendezvous // TODO: implement rendezvous in boomer - rendezvous := step.ToStruct().Rendezvous - if !rendezvous.isSpawnDone() && b.IsSpawnDone() { - rendezvous.setSpawnDone() - } } else { // request or testcase step b.RecordSuccess(step.Type(), step.Name(), stepData.Elapsed, stepData.ContentSize) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4c68900e..ca41fc52 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,6 +3,7 @@ ## v0.6.1 (2022-02-11) - fix: assertion function errors and json number parse rule +- fix: broadcast to all rendezvous at once when spawn done ## v0.6.0 (2022-02-08) diff --git a/internal/boomer/boomer.go b/internal/boomer/boomer.go index 563ed6f6..a66b5563 100644 --- a/internal/boomer/boomer.go +++ b/internal/boomer/boomer.go @@ -128,8 +128,8 @@ func (b *Boomer) Quit() { b.localRunner.stop() } -func (b *Boomer) IsSpawnDone() bool { - return b.localRunner.isSpawnDone +func (b *Boomer) GetSpawnDoneChan() chan struct{} { + return b.localRunner.spawnDone } func (b *Boomer) GetSpawnCount() int { diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index 90ed9615..8b8d6444 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -63,7 +63,7 @@ type runner struct { spawnCount int // target clients to spawn spawnRate float64 loop *Loop // specify running cycles - isSpawnDone bool + spawnDone chan struct{} outputs []Output } @@ -194,7 +194,7 @@ func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan } } - r.isSpawnDone = true + close(r.spawnDone) if spawnCompleteFunc != nil { spawnCompleteFunc() } @@ -256,6 +256,7 @@ func newLocalRunner(spawnCount int, spawnRate float64) *localRunner { spawnCount: spawnCount, stats: newRequestStats(), outputs: make([]Output, 0), + spawnDone: make(chan struct{}), }, stopChan: make(chan bool), }