fix: graceful stop

This commit is contained in:
xucong053
2022-07-05 16:37:42 +08:00
committed by 徐聪
parent 4333998edc
commit ba57efe97d
2 changed files with 6 additions and 7 deletions

View File

@@ -378,9 +378,6 @@ func (b *Boomer) EnableGracefulQuit() {
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-c
if b.mode == DistributedWorkerMode {
b.workerRunner.ignoreQuit = false
}
b.Quit()
}()
}
@@ -536,6 +533,7 @@ func (b *Boomer) GetCloseChan() chan bool {
func (b *Boomer) Quit() {
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.stop()
b.workerRunner.close()
case DistributedMasterMode:
b.masterRunner.close()

View File

@@ -620,6 +620,7 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
nodeID: getNodeID(),
tasksChan: make(chan *profileMessage, 10),
mutex: sync.Mutex{},
ignoreQuit: false,
}
return r
}
@@ -686,6 +687,10 @@ func (r *workerRunner) onMessage(msg *genericMessage) {
log.Info().Msg("Recv stop message from master, all the goroutines are stopped")
r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID)
case "quit":
r.stop()
if r.ignoreQuit {
break
}
r.close()
log.Info().Msg("Recv quit message from master, all the goroutines are stopped")
}
@@ -797,10 +802,6 @@ func (r *workerRunner) stop() {
}
func (r *workerRunner) close() {
r.stop()
if r.ignoreQuit {
return
}
// waiting report finished
time.Sleep(1 * time.Second)
close(r.closeChan)