fix: remove unnecessary content

This commit is contained in:
徐聪
2022-07-13 16:45:43 +08:00
parent 5b085be6bc
commit b4409ad303
4 changed files with 14 additions and 28 deletions

View File

@@ -33,8 +33,6 @@ type grpcClient struct {
shutdownChan chan bool
failCount int32
wg *sync.WaitGroup
}
type grpcClientConfig struct {
@@ -152,7 +150,6 @@ func newClient(masterHost string, masterPort int, identity string) (client *grpc
ctxCancel: cancel,
mutex: sync.RWMutex{},
},
wg: &sync.WaitGroup{},
}
return client
}

View File

@@ -208,8 +208,8 @@ type runner struct {
stoppingChan chan bool
// done is closed when all goroutines from start() complete.
doneChan chan bool
reportChan chan bool
// when this channel is closed, all statistics are reported successfully
reportedChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
@@ -221,8 +221,6 @@ type runner struct {
wg sync.WaitGroup
outputs []Output
once *sync.Once
}
func (r *runner) setSpawnRate(spawnRate float64) {
@@ -362,7 +360,7 @@ func (r *runner) reset() {
r.rebalance = make(chan bool)
r.stoppingChan = make(chan bool)
r.doneChan = make(chan bool)
r.reportChan = make(chan bool)
r.reportedChan = make(chan bool)
}
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
@@ -449,13 +447,13 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
// goAttach creates a goroutine on a given function and tracks it using
// the runner waitgroup.
// The passed function should interrupt on r.StoppingNotify().
// The passed function should interrupt on r.stoppingNotify().
func (r *runner) goAttach(f func()) {
r.wgMu.RLock() // this blocks with ongoing close(s.stopping)
defer r.wgMu.RUnlock()
select {
case <-r.stoppingChan:
log.Warn().Msg("server has stopped; skipping GoAttach")
log.Warn().Msg("runner has stopped; skipping GoAttach")
return
default:
}
@@ -530,8 +528,9 @@ func (r *runner) statsStart() {
// report stats
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if !r.isStarted() {
close(r.reportChan)
close(r.reportedChan)
log.Info().Msg("Quitting statsStart")
return
}
@@ -564,17 +563,9 @@ func (r *runner) gracefulStop() {
<-r.doneChan
}
// StopNotify returns a channel that receives a bool type value
// stopNotify returns a channel that receives a bool type value
// when the runner is stopped.
func (r *runner) StopNotify() <-chan bool { return r.doneChan }
// StoppingNotify returns a channel that receives a bool type value
// when the runner is being stopped.
func (r *runner) StoppingNotify() <-chan bool { return r.stoppingChan }
// RebalanceNotify returns a channel that receives a bool type value
// when the runner is being rebalance.
func (r *runner) RebalanceNotify() <-chan bool { return r.rebalance }
func (r *runner) stopNotify() <-chan bool { return r.doneChan }
func (r *runner) getState() int32 {
return atomic.LoadInt32(&r.state)
@@ -606,7 +597,6 @@ func newLocalRunner(spawnCount int64, spawnRate float64) *localRunner {
outputs: make([]Output, 0),
stopChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
wg: sync.WaitGroup{},
wgMu: sync.RWMutex{},
},
@@ -638,7 +628,8 @@ func (r *localRunner) start() {
r.updateState(StateStopping)
<-r.reportChan
// wait until all stats are reported successfully
<-r.reportedChan
// report test result
r.reportTestResult()
@@ -687,7 +678,6 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
controller: &Controller{},
stopChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
},
masterHost: masterHost,
masterPort: masterPort,
@@ -914,7 +904,7 @@ func (r *workerRunner) start() {
r.updateState(StateStopping)
<-r.reportChan
<-r.reportedChan
r.reportTestResult()
r.outputOnStop()

View File

@@ -126,7 +126,7 @@ func TestStopNotify(t *testing.T) {
close(r.doneChan)
}()
notifier := r.StopNotify()
notifier := r.stopNotify()
select {
case <-notifier:
t.Fatalf("received unexpected stop notification")

View File

@@ -133,13 +133,11 @@ type grpcServer struct {
masterHost string
masterPort int
server *grpc.Server
secure bool
clients *sync.Map
fromWorker chan *genericMessage
disconnectedChan chan bool
shutdownChan chan bool
wg *sync.WaitGroup
}
var (
@@ -148,6 +146,7 @@ var (
)
func logger(format string, a ...interface{}) {
// FIXME: support server-side and client-side logging to files
log.Info().Msg(fmt.Sprintf(format, a...))
}