From caed68d6dd82e35e4f30723c1b3804182d3e75c8 Mon Sep 17 00:00:00 2001 From: xucong053 Date: Tue, 5 Jul 2022 16:37:42 +0800 Subject: [PATCH] fix: graceful stop --- hrp/internal/boomer/boomer.go | 4 +--- hrp/internal/boomer/runner.go | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index ed314249..8ec9536b 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -378,9 +378,6 @@ func (b *Boomer) EnableGracefulQuit() { signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) go func() { <-c - if b.mode == DistributedWorkerMode { - b.workerRunner.ignoreQuit = false - } b.Quit() }() } @@ -536,6 +533,7 @@ func (b *Boomer) GetCloseChan() chan bool { func (b *Boomer) Quit() { switch b.mode { case DistributedWorkerMode: + b.workerRunner.stop() b.workerRunner.close() case DistributedMasterMode: b.masterRunner.close() diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 80d8552e..e30d0d7a 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -620,6 +620,7 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { nodeID: getNodeID(), tasksChan: make(chan *profileMessage, 10), mutex: sync.Mutex{}, + ignoreQuit: false, } return r } @@ -686,6 +687,10 @@ func (r *workerRunner) onMessage(msg *genericMessage) { log.Info().Msg("Recv stop message from master, all the goroutines are stopped") r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID) case "quit": + r.stop() + if r.ignoreQuit { + break + } r.close() log.Info().Msg("Recv quit message from master, all the goroutines are stopped") } @@ -797,10 +802,6 @@ func (r *workerRunner) stop() { } func (r *workerRunner) close() { - r.stop() - if r.ignoreQuit { - return - } // waiting report finished time.Sleep(1 * time.Second) close(r.closeChan)