From b4409ad3036dacaacaa24ea6914d7ab18e0b98f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Wed, 13 Jul 2022 16:45:43 +0800 Subject: [PATCH] fix: remove unnecessary content --- hrp/internal/boomer/client_grpc.go | 3 --- hrp/internal/boomer/runner.go | 34 +++++++++++------------------- hrp/internal/boomer/runner_test.go | 2 +- hrp/internal/boomer/server_grpc.go | 3 +-- 4 files changed, 14 insertions(+), 28 deletions(-) diff --git a/hrp/internal/boomer/client_grpc.go b/hrp/internal/boomer/client_grpc.go index f96f789e..17310e42 100644 --- a/hrp/internal/boomer/client_grpc.go +++ b/hrp/internal/boomer/client_grpc.go @@ -33,8 +33,6 @@ type grpcClient struct { shutdownChan chan bool failCount int32 - - wg *sync.WaitGroup } type grpcClientConfig struct { @@ -152,7 +150,6 @@ func newClient(masterHost string, masterPort int, identity string) (client *grpc ctxCancel: cancel, mutex: sync.RWMutex{}, }, - wg: &sync.WaitGroup{}, } return client } diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index ffbcce72..933a2a72 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -208,8 +208,8 @@ type runner struct { stoppingChan chan bool // done is closed when all goroutines from start() complete. doneChan chan bool - - reportChan chan bool + // when this channel is closed, all statistics are reported successfully + reportedChan chan bool // close this channel will stop all goroutines used in runner. closeChan chan bool @@ -221,8 +221,6 @@ type runner struct { wg sync.WaitGroup outputs []Output - - once *sync.Once } func (r *runner) setSpawnRate(spawnRate float64) { @@ -362,7 +360,7 @@ func (r *runner) reset() { r.rebalance = make(chan bool) r.stoppingChan = make(chan bool) r.doneChan = make(chan bool) - r.reportChan = make(chan bool) + r.reportedChan = make(chan bool) } func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { @@ -449,13 +447,13 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo // goAttach creates a goroutine on a given function and tracks it using // the runner waitgroup. -// The passed function should interrupt on r.StoppingNotify(). +// The passed function should interrupt on r.stoppingNotify(). func (r *runner) goAttach(f func()) { r.wgMu.RLock() // this blocks with ongoing close(s.stopping) defer r.wgMu.RUnlock() select { case <-r.stoppingChan: - log.Warn().Msg("server has stopped; skipping GoAttach") + log.Warn().Msg("runner has stopped; skipping GoAttach") return default: } @@ -530,8 +528,9 @@ func (r *runner) statsStart() { // report stats case <-ticker.C: r.reportStats() + // close reportedChan and return if the last stats is reported successfully if !r.isStarted() { - close(r.reportChan) + close(r.reportedChan) log.Info().Msg("Quitting statsStart") return } @@ -564,17 +563,9 @@ func (r *runner) gracefulStop() { <-r.doneChan } -// StopNotify returns a channel that receives a bool type value +// stopNotify returns a channel that receives a bool type value // when the runner is stopped. -func (r *runner) StopNotify() <-chan bool { return r.doneChan } - -// StoppingNotify returns a channel that receives a bool type value -// when the runner is being stopped. -func (r *runner) StoppingNotify() <-chan bool { return r.stoppingChan } - -// RebalanceNotify returns a channel that receives a bool type value -// when the runner is being rebalance. -func (r *runner) RebalanceNotify() <-chan bool { return r.rebalance } +func (r *runner) stopNotify() <-chan bool { return r.doneChan } func (r *runner) getState() int32 { return atomic.LoadInt32(&r.state) @@ -606,7 +597,6 @@ func newLocalRunner(spawnCount int64, spawnRate float64) *localRunner { outputs: make([]Output, 0), stopChan: make(chan bool), closeChan: make(chan bool), - once: &sync.Once{}, wg: sync.WaitGroup{}, wgMu: sync.RWMutex{}, }, @@ -638,7 +628,8 @@ func (r *localRunner) start() { r.updateState(StateStopping) - <-r.reportChan + // wait until all stats are reported successfully + <-r.reportedChan // report test result r.reportTestResult() @@ -687,7 +678,6 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { controller: &Controller{}, stopChan: make(chan bool), closeChan: make(chan bool), - once: &sync.Once{}, }, masterHost: masterHost, masterPort: masterPort, @@ -914,7 +904,7 @@ func (r *workerRunner) start() { r.updateState(StateStopping) - <-r.reportChan + <-r.reportedChan r.reportTestResult() r.outputOnStop() diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index 67089c1f..4293103b 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -126,7 +126,7 @@ func TestStopNotify(t *testing.T) { close(r.doneChan) }() - notifier := r.StopNotify() + notifier := r.stopNotify() select { case <-notifier: t.Fatalf("received unexpected stop notification") diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index 8702ef74..b9d61cc6 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -133,13 +133,11 @@ type grpcServer struct { masterHost string masterPort int server *grpc.Server - secure bool clients *sync.Map fromWorker chan *genericMessage disconnectedChan chan bool shutdownChan chan bool - wg *sync.WaitGroup } var ( @@ -148,6 +146,7 @@ var ( ) func logger(format string, a ...interface{}) { + // FIXME: support server-side and client-side logging to files log.Info().Msg(fmt.Sprintf(format, a...)) }