From b9f1468ae2beb048ca930a6e416ae356bfcfe60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Fri, 29 Jul 2022 19:57:05 +0800 Subject: [PATCH 1/5] fix: testcase compatibility in the worker --- hrp/boomer.go | 22 +++++++++++----------- hrp/runner.go | 20 +++++++++++--------- hrp/testcase.go | 9 +++++---- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index c8cd9349..df8e423b 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -2,8 +2,6 @@ package hrp import ( "fmt" - "github.com/httprunner/httprunner/v4/hrp/internal/builtin" - "golang.org/x/net/context" "io/ioutil" "os" "path/filepath" @@ -13,9 +11,11 @@ import ( "github.com/httprunner/funplugin" "github.com/httprunner/httprunner/v4/hrp/internal/boomer" + "github.com/httprunner/httprunner/v4/hrp/internal/builtin" "github.com/httprunner/httprunner/v4/hrp/internal/json" "github.com/httprunner/httprunner/v4/hrp/internal/sdk" "github.com/rs/zerolog/log" + "golang.org/x/net/context" ) func NewStandaloneBoomer(spawnCount int64, spawnRate float64) *HRPBoomer { @@ -100,6 +100,15 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { // report execution timing event defer sdk.SendEvent(event.StartTiming("execution")) + // quit all plugins + defer func() { + if len(pluginMap) > 0 { + for _, plugin := range pluginMap { + plugin.Quit() + } + } + }() + taskSlice := b.ConvertTestCasesToBoomerTasks(testcases...) b.Boomer.Run(taskSlice...) @@ -113,15 +122,6 @@ func (b *HRPBoomer) ConvertTestCasesToBoomerTasks(testcases ...ITestCase) (taskS os.Exit(1) } - // quit all plugins - defer func() { - if len(pluginMap) > 0 { - for _, plugin := range pluginMap { - plugin.Quit() - } - } - }() - for _, testcase := range testCases { rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount())) task := b.convertBoomerTask(testcase, rendezvousList) diff --git a/hrp/runner.go b/hrp/runner.go index e6662613..b9192e09 100644 --- a/hrp/runner.go +++ b/hrp/runner.go @@ -285,15 +285,17 @@ func (r *HRPRunner) newCaseRunner(testcase *TestCase) (*testCaseRunner, error) { // load plugin info to testcase config if plugin != nil { pluginPath, _ := locatePlugin(testcase.Config.Path) - pluginContent, err := builtin.ReadFile(pluginPath) - if err != nil { - return nil, err - } - tp := strings.Split(plugin.Path(), ".") - runner.parsedConfig.PluginSetting = &PluginConfig{ - Path: pluginPath, - Content: pluginContent, - Type: tp[len(tp)-1], + if runner.parsedConfig.PluginSetting == nil { + pluginContent, err := builtin.ReadFile(pluginPath) + if err != nil { + return nil, err + } + tp := strings.Split(plugin.Path(), ".") + runner.parsedConfig.PluginSetting = &PluginConfig{ + Path: pluginPath, + Content: pluginContent, + Type: tp[len(tp)-1], + } } } diff --git a/hrp/testcase.go b/hrp/testcase.go index ffedd829..e1cd4533 100644 --- a/hrp/testcase.go +++ b/hrp/testcase.go @@ -104,10 +104,6 @@ func (tc *TCase) ToTestCase(casePath string) (*TestCase, error) { return nil, errors.New("invalid testcase format, missing teststeps!") } - err := tc.MakeCompat() - if err != nil { - return nil, err - } if tc.Config == nil { tc.Config = &TConfig{Name: "please input testcase name"} } @@ -121,6 +117,11 @@ func (tc *TCase) toTestCase() (*TestCase, error) { Config: tc.Config, } + err := tc.MakeCompat() + if err != nil { + return nil, err + } + // locate project root dir by plugin path projectRootDir, err := GetProjectRootDirPath(tc.Config.Path) if err != nil { From 3712eae7f5916a5de1bf5267900ca8bb26440311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Sat, 30 Jul 2022 23:11:45 +0800 Subject: [PATCH 2/5] fix: state machine --- hrp/boomer.go | 46 +++-- hrp/internal/boomer/boomer.go | 36 +++- hrp/internal/boomer/client_grpc.go | 9 +- hrp/internal/boomer/message.go | 4 +- hrp/internal/boomer/output.go | 28 +-- hrp/internal/boomer/runner.go | 272 ++++++++++++++++------------- hrp/internal/boomer/runner_test.go | 10 +- hrp/internal/boomer/server_grpc.go | 83 +++++++-- hrp/server.go | 2 +- hrp/step_request.go | 5 +- 10 files changed, 311 insertions(+), 184 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index df8e423b..c05492c7 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -57,7 +57,6 @@ type HRPBoomer struct { } func (b *HRPBoomer) InitBoomer() { - // init output if !b.GetProfile().DisableConsoleOutput { b.AddOutput(boomer.NewConsoleOutput()) } @@ -164,7 +163,7 @@ func (b *HRPBoomer) TestCasesToBytes(testcases ...ITestCase) []byte { return testCasesBytes } -func (b *HRPBoomer) BytesToTestCases(testCasesBytes []byte) []*TCase { +func (b *HRPBoomer) BytesToTCases(testCasesBytes []byte) []*TCase { var testcase []*TCase err := json.Unmarshal(testCasesBytes, &testcase) if err != nil { @@ -177,8 +176,7 @@ func (b *HRPBoomer) Quit() { b.Boomer.Quit() } -func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) { - var testcases []ITestCase +func (b *HRPBoomer) parseTCases(testCases []*TCase) (testcases []ITestCase) { for _, tc := range testCases { tesecase, err := tc.toTestCase() if err != nil { @@ -209,7 +207,11 @@ func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) { testcases = append(testcases, tesecase) } + return testcases +} +func (b *HRPBoomer) initWorker(profile *boomer.Profile) { + // if no IP address is specified, the default IP address is that of the master if profile.PrometheusPushgatewayURL != "" { urlSlice := strings.Split(profile.PrometheusPushgatewayURL, ":") if len(urlSlice) != 2 { @@ -224,16 +226,13 @@ func (b *HRPBoomer) runTestCases(testCases []*TCase, profile *boomer.Profile) { b.SetProfile(profile) b.InitBoomer() - log.Info().Interface("testcases", testcases).Interface("profile", profile).Msg("run tasks successful") - b.Run(testcases...) } -func (b *HRPBoomer) rebalanceBoomer(profile *boomer.Profile) { - b.SetProfile(profile) - b.SetSpawnCount(b.GetProfile().SpawnCount) - b.SetSpawnRate(b.GetProfile().SpawnRate) +func (b *HRPBoomer) rebalanceRunner(profile *boomer.Profile) { + b.SetSpawnCount(profile.SpawnCount) + b.SetSpawnRate(profile.SpawnRate) b.GetRebalanceChan() <- true - log.Info().Interface("profile", profile).Msg("rebalance tasks successful") + log.Info().Interface("profile", profile).Msg("rebalance tasks successfully") } func (b *HRPBoomer) PollTasks(ctx context.Context) { @@ -245,11 +244,17 @@ func (b *HRPBoomer) PollTasks(ctx context.Context) { continue } //Todo: 过滤掉已经传输过的task - if task.TestCases != nil { - testCases := b.BytesToTestCases(task.TestCases) - go b.runTestCases(testCases, task.Profile) + if task.TestCasesBytes != nil { + // init boomer with profile + b.initWorker(task.Profile) + // get testcases + testcases := b.parseTCases(b.BytesToTCases(task.TestCasesBytes)) + log.Info().Interface("testcases", testcases).Interface("profile", b.GetProfile()).Msg("starting to run tasks") + // run testcases + go b.Run(testcases...) } else { - go b.rebalanceBoomer(task.Profile) + // rebalance runner with profile + go b.rebalanceRunner(task.Profile) } case <-b.Boomer.GetCloseChan(): @@ -261,6 +266,15 @@ func (b *HRPBoomer) PollTasks(ctx context.Context) { } func (b *HRPBoomer) PollTestCases(ctx context.Context) { + // quit all plugins + defer func() { + if len(pluginMap) > 0 { + for _, plugin := range pluginMap { + plugin.Quit() + } + } + }() + for { select { case <-b.Boomer.ParseTestCasesChan(): @@ -270,7 +284,7 @@ func (b *HRPBoomer) PollTestCases(ctx context.Context) { tcs = append(tcs, &tcp) } b.TestCaseBytesChan() <- b.TestCasesToBytes(tcs...) - log.Info().Msg("put testcase successful") + log.Info().Msg("put testcase successfully") case <-b.Boomer.GetCloseChan(): return case <-ctx.Done(): diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index da7ac054..44230b78 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -63,6 +63,18 @@ type Profile struct { DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty" mapstructure:"disable-keepalive,omitempty"` } +func NewProfile() *Profile { + return &Profile{ + SpawnCount: 1, + SpawnRate: 1, + MaxRPS: -1, + LoopCount: -1, + RequestIncreaseRate: "-1", + CPUProfileDuration: 30 * time.Second, + MemoryProfileDuration: 30 * time.Second, + } +} + func (b *Boomer) GetProfile() *Profile { switch b.mode { case DistributedMasterMode: @@ -158,7 +170,18 @@ func (b *Boomer) RunWorker() { // TestCaseBytesChan gets test case bytes chan func (b *Boomer) TestCaseBytesChan() chan []byte { - return b.masterRunner.testCaseBytes + return b.masterRunner.testCaseBytesChan +} + +func (b *Boomer) GetTestCaseBytes() []byte { + switch b.mode { + case DistributedMasterMode: + return b.masterRunner.testCasesBytes + case DistributedWorkerMode: + return b.workerRunner.testCasesBytes + default: + return nil + } } func ProfileToBytes(profile *Profile) []byte { @@ -192,7 +215,7 @@ func (b *Boomer) GetTasksChan() chan *task { func (b *Boomer) GetRebalanceChan() chan bool { switch b.mode { case DistributedWorkerMode: - return b.workerRunner.rebalance + return b.workerRunner.controller.getRebalanceChan() default: return nil } @@ -469,6 +492,9 @@ func (b *Boomer) Start(Args *Profile) error { if b.masterRunner.isStopping() { return errors.New("Please wait for all workers to finish") } + if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() { + return errors.New("spawn count should be greater than available worker count") + } b.SetSpawnCount(Args.SpawnCount) b.SetSpawnRate(Args.SpawnRate) b.SetProfile(Args) @@ -481,6 +507,9 @@ func (b *Boomer) ReBalance(Args *Profile) error { if !b.masterRunner.isStarting() { return errors.New("no start") } + if int(Args.SpawnCount) < b.masterRunner.server.getAvailableClientsLength() { + return errors.New("spawn count should be greater than available worker count") + } b.SetSpawnCount(Args.SpawnCount) b.SetSpawnRate(Args.SpawnRate) b.SetProfile(Args) @@ -505,8 +534,9 @@ func (b *Boomer) GetWorkersInfo() []WorkerNode { func (b *Boomer) GetMasterInfo() map[string]interface{} { masterInfo := make(map[string]interface{}) masterInfo["state"] = b.masterRunner.getState() - masterInfo["workers"] = b.masterRunner.server.getClientsLength() + masterInfo["workers"] = b.masterRunner.server.getAvailableClientsLength() masterInfo["target_users"] = b.masterRunner.getSpawnCount() + masterInfo["current_users"] = b.masterRunner.server.getCurrentUsers() return masterInfo } diff --git a/hrp/internal/boomer/client_grpc.go b/hrp/internal/boomer/client_grpc.go index 6e014aee..82d4241b 100644 --- a/hrp/internal/boomer/client_grpc.go +++ b/hrp/internal/boomer/client_grpc.go @@ -313,11 +313,12 @@ func (c *grpcClient) sendMessage(msg *genericMessage) { return } err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID}) - switch err { - case nil: + if err == nil { atomic.StoreInt32(&c.failCount, 0) - default: - //log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message") + return + } + //log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message") + if msg.Type == "heartbeat" { atomic.AddInt32(&c.failCount, 1) } } diff --git a/hrp/internal/boomer/message.go b/hrp/internal/boomer/message.go index 975aaef1..ab6ae062 100644 --- a/hrp/internal/boomer/message.go +++ b/hrp/internal/boomer/message.go @@ -19,8 +19,8 @@ type genericMessage struct { } type task struct { - Profile *Profile `json:"profile,omitempty"` - TestCases []byte `json:"testcases,omitempty"` + Profile *Profile `json:"profile,omitempty"` + TestCasesBytes []byte `json:"testcases,omitempty"` } func newGenericMessage(t string, data map[string][]byte, nodeID string) (msg *genericMessage) { diff --git a/hrp/internal/boomer/output.go b/hrp/internal/boomer/output.go index 55dec290..2d64ee0e 100644 --- a/hrp/internal/boomer/output.go +++ b/hrp/internal/boomer/output.go @@ -6,6 +6,7 @@ import ( "os" "sort" "strconv" + "sync" "time" "github.com/google/uuid" @@ -454,8 +455,8 @@ var ( ) var ( - minResponseTimeMap = map[string]float64{} - maxResponseTimeMap = map[string]float64{} + minResponseTimeMap = sync.Map{} + maxResponseTimeMap = sync.Map{} ) // NewPrometheusPusherOutput returns a PrometheusPusherOutput. @@ -577,19 +578,19 @@ func (o *PrometheusPusherOutput) OnEvent(data map[string]interface{}) { } // every stat in total key := fmt.Sprintf("%v_%v", method, name) - if _, ok := minResponseTimeMap[key]; !ok { - minResponseTimeMap[key] = float64(stat.MinResponseTime) - } else { - minResponseTimeMap[key] = math.Min(float64(stat.MinResponseTime), minResponseTimeMap[key]) + minResponseTime, loaded := minResponseTimeMap.LoadOrStore(key, float64(stat.MinResponseTime)) + if loaded { + minResponseTime = math.Min(minResponseTime.(float64), float64(stat.MinResponseTime)) + minResponseTimeMap.Store(key, minResponseTime) } - gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTimeMap[key]) + gaugeTotalMinResponseTime.WithLabelValues(method, name).Set(minResponseTime.(float64)) - if _, ok := maxResponseTimeMap[key]; !ok { - maxResponseTimeMap[key] = float64(stat.MaxResponseTime) - } else { - maxResponseTimeMap[key] = math.Max(float64(stat.MaxResponseTime), maxResponseTimeMap[key]) + maxResponseTime, loaded := maxResponseTimeMap.LoadOrStore(key, float64(stat.MaxResponseTime)) + if loaded { + maxResponseTime = math.Max(maxResponseTime.(float64), float64(stat.MaxResponseTime)) + maxResponseTimeMap.Store(key, maxResponseTime) } - gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTimeMap[key]) + gaugeTotalMaxResponseTime.WithLabelValues(method, name).Set(maxResponseTime.(float64)) counterTotalNumRequests.WithLabelValues(method, name).Add(float64(stat.NumRequests)) counterTotalNumFailures.WithLabelValues(method, name).Add(float64(stat.NumFailures)) @@ -639,4 +640,7 @@ func resetPrometheusMetrics() { gaugeTotalFailPerSec.Set(0) gaugeTransactionsPassed.Set(0) gaugeTransactionsFailed.Set(0) + + minResponseTimeMap = sync.Map{} + maxResponseTimeMap = sync.Map{} } diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 503ffbaa..bb0d5fd1 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -49,10 +49,10 @@ func getStateName(state int32) (stateName string) { } const ( - reportStatsInterval = 3 * time.Second - heartbeatInterval = 1 * time.Second - heartbeatLiveness = 3 * time.Second - reconnectInterval = 3 * time.Second + reportStatsInterval = 3 * time.Second + heartbeatInterval = 1 * time.Second + heartbeatLiveness = 3 * time.Second + stateMachineInterval = 1 * time.Second ) type Loop struct { @@ -86,6 +86,7 @@ type Controller struct { currentClientsNum int64 // current clients count spawnCount int64 // target clients to spawn spawnRate float64 + rebalance chan bool // dynamically balance boomer running parameters spawnDone chan struct{} tasks []*Task } @@ -143,6 +144,12 @@ func (c *Controller) spawnCompete() { close(c.spawnDone) } +func (c *Controller) getRebalanceChan() chan bool { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.rebalance +} + func (c *Controller) isFinished() bool { // return true when workers acquired return atomic.LoadInt64(&c.currentClientsNum) == atomic.LoadInt64(&c.spawnCount) @@ -178,6 +185,7 @@ func (c *Controller) reset() { c.spawnRate = 0 atomic.StoreInt64(&c.currentClientsNum, 0) c.spawnDone = make(chan struct{}) + c.rebalance = make(chan bool) c.tasks = []*Task{} c.once = sync.Once{} } @@ -199,9 +207,6 @@ type runner struct { controller *Controller loop *Loop // specify loop count for testcase, count = loopCount * spawnCount - // dynamically balance boomer running parameters - rebalance chan bool - // stop signals the run goroutine should shutdown. stopChan chan bool // all running workers(goroutines) will select on this channel. @@ -358,7 +363,6 @@ func (r *runner) reportTestResult() { func (r *runner) reset() { r.controller.reset() r.stats.clearAll() - r.rebalance = make(chan bool) r.stoppingChan = make(chan bool) r.doneChan = make(chan bool) r.reportedChan = make(chan bool) @@ -430,16 +434,18 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo continue } - r.controller.once.Do(func() { - // spawning compete - r.controller.spawnCompete() - if spawnCompleteFunc != nil { - spawnCompleteFunc() - } - r.updateState(StateRunning) - }) + r.controller.once.Do( + func() { + // spawning compete + r.controller.spawnCompete() + if spawnCompleteFunc != nil { + spawnCompleteFunc() + } + r.updateState(StateRunning) + }, + ) - <-r.rebalance + <-r.controller.getRebalanceChan() if r.isStarting() { // rebalance spawn count r.controller.setSpawn(r.getSpawnCount(), r.getSpawnRate()) @@ -629,7 +635,7 @@ func (r *localRunner) start() { r.wgMu.Lock() r.updateState(StateStopping) close(r.stoppingChan) - close(r.rebalance) + close(r.controller.rebalance) r.wgMu.Unlock() // wait for goroutines before closing @@ -668,7 +674,8 @@ type workerRunner struct { masterPort int client *grpcClient - profile *Profile + profile *Profile + testCasesBytes []byte tasksChan chan *task @@ -714,10 +721,10 @@ func (r *workerRunner) onSpawnMessage(msg *genericMessage) { log.Error().Msg("miss tasks") } r.tasksChan <- &task{ - Profile: profile, - TestCases: msg.Tasks, + Profile: profile, + TestCasesBytes: msg.Tasks, } - log.Info().Msg("on spawn message successful") + log.Info().Msg("on spawn message successfully") } func (r *workerRunner) onRebalanceMessage(msg *genericMessage) { @@ -731,7 +738,7 @@ func (r *workerRunner) onRebalanceMessage(msg *genericMessage) { r.tasksChan <- &task{ Profile: profile, } - log.Info().Msg("on rebalance message successful") + log.Info().Msg("on rebalance message successfully") } // Runner acts as a state machine. @@ -831,6 +838,9 @@ func (r *workerRunner) run() { // wait for goroutines before closing r.wg.Wait() + // notify master that worker is quitting + r.onQuiting() + var ticker = time.NewTicker(1 * time.Second) if r.client != nil { // waitting for quit message is sent to master @@ -879,6 +889,7 @@ func (r *workerRunner) run() { if !r.isStarting() && !r.isStopping() { r.updateState(StateMissing) } + continue } CPUUsage := GetCurrentCPUPercent() MemoryUsage := GetCurrentMemoryPercent() @@ -918,13 +929,18 @@ func (r *workerRunner) start() { // block concurrent waitgroup adds in GoAttach while stopping r.wgMu.Lock() r.updateState(StateStopping) + close(r.controller.rebalance) close(r.stoppingChan) - close(r.rebalance) r.wgMu.Unlock() // wait for goroutines before closing r.wg.Wait() + // reset loop + if r.loop != nil { + r.loop = nil + } + close(r.doneChan) // wait until all stats are reported successfully @@ -951,7 +967,6 @@ func (r *workerRunner) stop() { } func (r *workerRunner) close() { - r.onQuiting() close(r.closeChan) } @@ -970,8 +985,8 @@ type masterRunner struct { profile *Profile parseTestCasesChan chan bool - testCaseBytes chan []byte - tcb []byte + testCaseBytesChan chan []byte + testCasesBytes []byte } func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { @@ -988,7 +1003,7 @@ func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { masterBindPort: masterBindPort, server: newServer(masterBindHost, masterBindPort), parseTestCasesChan: make(chan bool), - testCaseBytes: make(chan []byte), + testCaseBytesChan: make(chan []byte), } } @@ -1011,23 +1026,15 @@ func (r *masterRunner) heartbeatWorker() { if !ok { log.Error().Msg("failed to get worker information") } - if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 { - if workerInfo.getState() == StateQuitting { - return true - } - if workerInfo.getState() != StateMissing { - workerInfo.setState(StateMissing) - } - if r.isStopping() { - // all running workers missed, setting state to stopped - if r.server.getClientsLength() <= 0 { - r.updateState(StateStopped) + go func() { + if atomic.LoadInt32(&workerInfo.Heartbeat) < 0 { + if workerInfo.getState() != StateMissing { + workerInfo.setState(StateMissing) } - return true + } else { + atomic.AddInt32(&workerInfo.Heartbeat, -1) } - } else { - atomic.AddInt32(&workerInfo.Heartbeat, -1) - } + }() return true }) case <-reportTicker.C: @@ -1051,72 +1058,85 @@ func (r *masterRunner) clientListener() { if !ok { continue } - switch msg.Type { - case typeClientReady: - workerInfo.setState(StateInit) - if r.getState() == StateRunning { - log.Warn().Str("worker id", workerInfo.ID).Msg("worker joined, ready to rebalance the load of each worker") + go func() { + switch msg.Type { + case typeClientReady: + workerInfo.setState(StateInit) + case typeClientStopped: + workerInfo.setState(StateStopped) + case typeHeartbeat: + if workerInfo.getState() == StateMissing { + workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"]))) + } + workerInfo.updateHeartbeat(3) + currentCPUUsage, ok := msg.Data["current_cpu_usage"] + if ok { + workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage)) + } + currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"] + if ok { + workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage)) + } + currentMemoryUsage, ok := msg.Data["current_memory_usage"] + if ok { + workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage)) + } + currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"] + if ok { + workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage)) + } + currentUsers, ok := msg.Data["current_users"] + if ok { + workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers)) + } + case typeSpawning: + workerInfo.setState(StateSpawning) + case typeSpawningComplete: + workerInfo.setState(StateRunning) + case typeQuit: + if workerInfo.getState() == StateQuitting { + break + } + workerInfo.setState(StateQuitting) + case typeException: + // Todo + default: + } + }() + } + } +} + +func (r *masterRunner) stateMachine() { + ticker := time.NewTicker(stateMachineInterval) + for { + select { + case <-r.closeChan: + return + case <-ticker.C: + switch r.getState() { + case StateSpawning: + if r.server.getCurrentUsers() == int(r.getSpawnCount()) { + log.Warn().Msg("all workers spawn done, setting state as running") + r.updateState(StateRunning) + } + case StateRunning: + if r.server.getStartingClientsLength() == 0 { + r.updateState(StateStopped) + continue + } + if r.server.getWorkersLengthByState(StateInit) != 0 { err := r.rebalance() if err != nil { log.Error().Err(err).Msg("failed to rebalance") } } - case typeClientStopped: - workerInfo.setState(StateStopped) - if r.server.getWorkersLengthByState(StateStopped)+r.server.getWorkersLengthByState(StateInit) == r.server.getClientsLength() { + case StateStopping: + if r.server.getReadyClientsLength() == r.server.getAvailableClientsLength() { r.updateState(StateStopped) } - case typeHeartbeat: - if workerInfo.getState() != int32(builtin.BytesToInt64(msg.Data["state"])) { - workerInfo.setState(int32(builtin.BytesToInt64(msg.Data["state"]))) - } - workerInfo.updateHeartbeat(3) - currentCPUUsage, ok := msg.Data["current_cpu_usage"] - if ok { - workerInfo.updateCPUUsage(builtin.ByteToFloat64(currentCPUUsage)) - } - currentPidCpuUsage, ok := msg.Data["current_pid_cpu_usage"] - if ok { - workerInfo.updateWorkerCPUUsage(builtin.ByteToFloat64(currentPidCpuUsage)) - } - currentMemoryUsage, ok := msg.Data["current_memory_usage"] - if ok { - workerInfo.updateMemoryUsage(builtin.ByteToFloat64(currentMemoryUsage)) - } - currentPidMemoryUsage, ok := msg.Data["current_pid_memory_usage"] - if ok { - workerInfo.updateWorkerMemoryUsage(builtin.ByteToFloat64(currentPidMemoryUsage)) - } - currentUsers, ok := msg.Data["current_users"] - if ok { - workerInfo.updateUserCount(builtin.BytesToInt64(currentUsers)) - } - case typeSpawning: - workerInfo.setState(StateSpawning) - case typeSpawningComplete: - workerInfo.setState(StateRunning) - if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() { - log.Warn().Msg("all workers spawn done, setting state as running") - r.updateState(StateRunning) - } - case typeQuit: - if workerInfo.getState() == StateQuitting { - break - } - workerInfo.setState(StateQuitting) - if r.isStarting() { - if r.server.getClientsLength() > 0 { - log.Warn().Str("worker id", workerInfo.ID).Msg("worker quited, ready to rebalance the load of each worker") - err := r.rebalance() - if err != nil { - log.Error().Err(err).Msg("failed to rebalance") - } - } - } - case typeException: - // Todo - default: } + } } } @@ -1146,7 +1166,7 @@ func (r *masterRunner) run() { case <-r.closeChan: return case <-ticker.C: - c := r.server.getClientsLength() + c := r.server.getAvailableClientsLength() log.Info().Msg(fmt.Sprintf("expected worker number: %v, current worker count: %v", r.expectWorkers, c)) if c >= r.expectWorkers { err = r.start() @@ -1165,6 +1185,9 @@ func (r *masterRunner) run() { }() } + // master state machine + r.goAttach(r.stateMachine) + // listen and deal message from worker r.goAttach(r.clientListener) @@ -1174,13 +1197,13 @@ func (r *masterRunner) run() { } func (r *masterRunner) start() error { - numWorkers := r.server.getClientsLength() + numWorkers := r.server.getAvailableClientsLength() if numWorkers == 0 { return errors.New("current available workers: 0") } - // fetching testcase - testcase, err := r.fetchTestCase() + // fetching testcases + testCasesBytes, err := r.fetchTestCases() if err != nil { return err } @@ -1223,19 +1246,19 @@ func (r *masterRunner) start() error { Type: "spawn", Profile: ProfileToBytes(workerProfile), NodeID: workerInfo.ID, - Tasks: testcase, + Tasks: testCasesBytes, } cur++ } return true }) - log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successful") + log.Warn().Interface("profile", r.profile).Msg("send spawn data to worker successfully") return nil } func (r *masterRunner) rebalance() error { - numWorkers := r.server.getClientsLength() + numWorkers := r.server.getAvailableClientsLength() if numWorkers == 0 { return errors.New("current available workers: 0") } @@ -1276,7 +1299,7 @@ func (r *masterRunner) rebalance() error { Type: "spawn", Profile: ProfileToBytes(workerProfile), NodeID: workerInfo.ID, - Tasks: r.tcb, + Tasks: r.testCasesBytes, } } else { workerInfo.getStream() <- &messager.StreamResponse{ @@ -1290,22 +1313,22 @@ func (r *masterRunner) rebalance() error { return true }) - log.Warn().Msg("send rebalance data to worker successful") + log.Warn().Msg("send rebalance data to worker successfully") return nil } -func (r *masterRunner) fetchTestCase() ([]byte, error) { +func (r *masterRunner) fetchTestCases() ([]byte, error) { ticker := time.NewTicker(30 * time.Second) - if len(r.testCaseBytes) > 0 { - <-r.testCaseBytes + if len(r.testCaseBytesChan) > 0 { + <-r.testCaseBytesChan } r.parseTestCasesChan <- true select { case <-ticker.C: return nil, errors.New("parse testcases timeout") - case tcb := <-r.testCaseBytes: - r.tcb = tcb - return tcb, nil + case testCasesBytes := <-r.testCaseBytesChan: + r.testCasesBytes = testCasesBytes + return testCasesBytes, nil } } @@ -1336,22 +1359,23 @@ func (r *masterRunner) close() { func (r *masterRunner) reportStats() { currentTime := time.Now() println() - println("========================= HttpRunner Master for Distributed Load Testing ========================= ") - println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v", - currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getClientsLength(), r.getSpawnCount())) + println("==================================== HttpRunner Master for Distributed Load Testing ==================================== ") + println(fmt.Sprintf("Current time: %s, State: %v, Current Available Workers: %v, Target Users: %v, Current Users: %v", + currentTime.Format("2006/01/02 15:04:05"), getStateName(r.getState()), r.server.getAvailableClientsLength(), r.getSpawnCount(), r.server.getCurrentUsers())) table := tablewriter.NewWriter(os.Stdout) - table.SetColMinWidth(0, 20) + table.SetColMinWidth(0, 40) table.SetColMinWidth(1, 10) + table.SetColMinWidth(2, 10) table.SetHeader([]string{"Worker ID", "IP", "State", "Current Users", "CPU Usage (%)", "Memory Usage (%)"}) for _, worker := range r.server.getAllWorkers() { row := make([]string, 6) row[0] = worker.ID row[1] = worker.IP - row[2] = fmt.Sprintf("%v", getStateName(worker.getState())) - row[3] = fmt.Sprintf("%v", worker.getUserCount()) - row[4] = fmt.Sprintf("%.2f", worker.getCPUUsage()) - row[5] = fmt.Sprintf("%.2f", worker.getMemoryUsage()) + row[2] = fmt.Sprintf("%v", getStateName(worker.State)) + row[3] = fmt.Sprintf("%v", worker.UserCount) + row[4] = fmt.Sprintf("%.2f", worker.CPUUsage) + row[5] = fmt.Sprintf("%.2f", worker.MemoryUsage) table.Append(row) } table.Render() diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index f8a9e228..62d772d5 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -338,6 +338,7 @@ func TestOnQuitMessage(t *testing.T) { go runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) close(runner.doneChan) <-runner.closeChan + runner.onQuiting() if runner.getState() != StateQuitting { t.Error("Runner's state should be StateQuitting") } @@ -348,6 +349,7 @@ func TestOnQuitMessage(t *testing.T) { runner.client.shutdownChan = make(chan bool) runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) <-runner.closeChan + runner.onQuiting() if runner.getState() != StateQuitting { t.Error("Runner's state should be StateQuitting") } @@ -395,7 +397,7 @@ func TestOnMessage(t *testing.T) { // increase goroutines while running runner.onMessage(newMessageToWorker("rebalance", ProfileToBytes(&Profile{SpawnCount: 15, SpawnRate: 15}), nil, nil)) - runner.rebalance <- true + runner.controller.rebalance <- true time.Sleep(2 * time.Second) if runner.getState() != StateRunning { @@ -460,6 +462,7 @@ func TestClientListener(t *testing.T) { runner.updateState(StateInit) runner.setSpawnCount(10) runner.setSpawnRate(10) + go runner.stateMachine() go runner.clientListener() runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)}) runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3, stream: make(chan *messager.StreamResponse, 10)}) @@ -483,6 +486,7 @@ func TestClientListener(t *testing.T) { Type: typeClientStopped, NodeID: "testID2", } + runner.updateState(StateRunning) worker2, ok := runner.server.getClients().Load("testID2") if !ok { t.Fatal("error") @@ -515,7 +519,7 @@ func TestHeartbeatWorker(t *testing.T) { runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit, stream: make(chan *messager.StreamResponse, 10)}) go runner.clientListener() go runner.heartbeatWorker() - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) worker1, ok := runner.server.getClients().Load("testID1") if !ok { t.Fatal() @@ -525,7 +529,7 @@ func TestHeartbeatWorker(t *testing.T) { t.Fatal() } if workerInfo1.getState() != StateMissing { - t.Error("expected state of worker runner is missing, but got", workerInfo1.getState()) + t.Error("expected state of worker runner is missing, but got", getStateName(workerInfo1.getState())) } runner.server.recvChannel() <- &genericMessage{ Type: typeHeartbeat, diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index 5ad07267..2a9c9d00 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -53,6 +53,24 @@ func (w *WorkerNode) setState(state int32) { atomic.StoreInt32(&w.State, state) } +func (w *WorkerNode) isStarting() bool { + return w.getState() == StateRunning || w.getState() == StateSpawning +} + +func (w *WorkerNode) isStopping() bool { + return w.getState() == StateStopping +} + +func (w *WorkerNode) isAvailable() bool { + state := w.getState() + return state != StateMissing && state != StateQuitting +} + +func (w *WorkerNode) isReady() bool { + state := w.getState() + return state == StateInit || state == StateStopped +} + func (w *WorkerNode) updateHeartbeat(heartbeat int32) { atomic.StoreInt32(&w.Heartbeat, heartbeat) } @@ -130,8 +148,8 @@ func (w *WorkerNode) getMemoryUsage() float64 { } func (w *WorkerNode) setStream(stream chan *messager.StreamResponse) { - w.mutex.RLock() - defer w.mutex.RUnlock() + w.mutex.Lock() + defer w.mutex.Unlock() w.stream = stream } @@ -302,26 +320,19 @@ func (s *grpcServer) Register(ctx context.Context, req *messager.RegisterRequest wn := newWorkerNode(req.NodeID, clientIp, req.Os, req.Arch) s.clients.Store(req.NodeID, wn) log.Warn().Str("worker id", req.NodeID).Msg("worker joined") - return &messager.RegisterResponse{Code: "0", Message: "register successfully"}, nil + return &messager.RegisterResponse{Code: "0", Message: "register successful"}, nil } func (s *grpcServer) SignOut(_ context.Context, req *messager.SignOutRequest) (*messager.SignOutResponse, error) { // delete worker information s.clients.Delete(req.NodeID) log.Warn().Str("worker id", req.NodeID).Msg("worker quited") - return &messager.SignOutResponse{Code: "0", Message: "sign out successfully"}, nil + return &messager.SignOutResponse{Code: "0", Message: "sign out successful"}, nil } -func (s *grpcServer) valid(token string) (isValid bool) { - s.clients.Range(func(key, value interface{}) bool { - if workerInfo, ok := value.(*WorkerNode); ok { - if workerInfo.ID == token { - isValid = true - } - } - return true - }) - return +func (s *grpcServer) validClientToken(token string) bool { + _, ok := s.clients.Load(token) + return ok } func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_BidirectionalStreamingMessageServer) error { @@ -332,7 +343,7 @@ func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_Bidirect return status.Error(codes.Unauthenticated, "missing token header") } - ok = s.valid(token) + ok = s.validClientToken(token) if !ok { return status.Error(codes.Unauthenticated, "invalid token") } @@ -404,7 +415,7 @@ func (s *grpcServer) sendMsg(srv messager.Message_BidirectionalStreamingMessageS func (s *grpcServer) sendBroadcasts(msg *genericMessage) { s.clients.Range(func(key, value interface{}) bool { if workerInfo, ok := value.(*WorkerNode); ok { - if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing { + if !workerInfo.isAvailable() { return true } workerInfo.getStream() <- &messager.StreamResponse{ @@ -517,10 +528,10 @@ func (s *grpcServer) getClients() *sync.Map { return s.clients } -func (s *grpcServer) getClientsLength() (l int) { +func (s *grpcServer) getAvailableClientsLength() (l int) { s.clients.Range(func(key, value interface{}) bool { if workerInfo, ok := value.(*WorkerNode); ok { - if workerInfo.getState() != StateQuitting && workerInfo.getState() != StateMissing { + if workerInfo.isAvailable() { l++ } } @@ -528,3 +539,39 @@ func (s *grpcServer) getClientsLength() (l int) { }) return } + +func (s *grpcServer) getReadyClientsLength() (l int) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.isReady() { + l++ + } + } + return true + }) + return +} + +func (s *grpcServer) getStartingClientsLength() (l int) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.isStarting() { + l++ + } + } + return true + }) + return +} + +func (s *grpcServer) getCurrentUsers() (l int) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.isStarting() { + l += int(workerInfo.getUserCount()) + } + } + return true + }) + return +} diff --git a/hrp/server.go b/hrp/server.go index c060d409..ceada927 100644 --- a/hrp/server.go +++ b/hrp/server.go @@ -191,7 +191,7 @@ func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) { return } req := StartRequestBody{ - Profile: *api.boomer.GetProfile(), + Profile: *boomer.NewProfile(), } err = mapstructure.Decode(data, &req) if err != nil { diff --git a/hrp/step_request.go b/hrp/step_request.go index 651ce8fe..8abd7cdc 100644 --- a/hrp/step_request.go +++ b/hrp/step_request.go @@ -387,13 +387,16 @@ func runStepRequest(r *SessionRunner, step *TStep) (stepResult *StepResult, err if err != nil { return stepResult, errors.Wrap(err, "do request failed") } - defer resp.Body.Close() + if resp != nil { + defer resp.Body.Close() + } // decode response body in br/gzip/deflate formats err = decodeResponseBody(resp) if err != nil { return stepResult, errors.Wrap(err, "decode response body failed") } + defer resp.Body.Close() // log & print response if r.LogOn() { From 05febd07f0cf503575c84966517757aa355e6ebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Mon, 1 Aug 2022 14:39:51 +0800 Subject: [PATCH 3/5] fix: the transaction name recorded is inaccurate, when transaction fails --- hrp/boomer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index c05492c7..c139f90e 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -393,7 +393,7 @@ func (b *HRPBoomer) convertBoomerTask(testcase *TestCase, rendezvousList []*Rend // transaction // FIXME: support nested transactions if step.Struct().Transaction.Type == transactionEnd { // only record when transaction ends - b.RecordTransaction(stepResult.Name, transactionSuccess, stepResult.Elapsed, 0) + b.RecordTransaction(step.Struct().Transaction.Name, transactionSuccess, stepResult.Elapsed, 0) transactionSuccess = true // reset flag for next transaction } } else if stepResult.StepType == stepTypeRendezvous { From 44e844829d812d4c6c4c1513fcb2d3abeee8feb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Mon, 1 Aug 2022 16:08:36 +0800 Subject: [PATCH 4/5] fix: failed to load .env file from master --- hrp/boomer.go | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index c139f90e..34f63fc7 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -178,33 +178,48 @@ func (b *HRPBoomer) Quit() { func (b *HRPBoomer) parseTCases(testCases []*TCase) (testcases []ITestCase) { for _, tc := range testCases { - tesecase, err := tc.toTestCase() - if err != nil { - log.Error().Err(err).Msg("failed to load testcases") - return - } // create temp dir to save testcase tempDir, err := ioutil.TempDir("", "hrp_testcases") if err != nil { - log.Error().Err(err).Msg("failed to save testcases") + log.Error().Err(err).Msg("failed to create hrp testcases directory") return } - tesecase.Config.Path = filepath.Join(tempDir, "test-case.json") - if tesecase.Config.PluginSetting != nil { - tesecase.Config.PluginSetting.Path = filepath.Join(tempDir, fmt.Sprintf("debugtalk.%s", tesecase.Config.PluginSetting.Type)) - err = builtin.Bytes2File(tesecase.Config.PluginSetting.Content, tesecase.Config.PluginSetting.Path) + if tc.Config.PluginSetting != nil { + tc.Config.PluginSetting.Path = filepath.Join(tempDir, fmt.Sprintf("debugtalk.%s", tc.Config.PluginSetting.Type)) + err = builtin.Bytes2File(tc.Config.PluginSetting.Content, tc.Config.PluginSetting.Path) if err != nil { log.Error().Err(err).Msg("failed to save plugin file") return } + tc.Config.PluginSetting.Content = nil // remove the content in testcase } - err = builtin.Dump2JSON(tesecase, tesecase.Config.Path) + + if tc.Config.Environs != nil { + envContent := "" + for k, v := range tc.Config.Environs { + envContent += fmt.Sprintf("%s=%s\n", k, v) + } + err = os.WriteFile(filepath.Join(tempDir, ".env"), []byte(envContent), 0o644) + if err != nil { + log.Error().Err(err).Msg("failed to dump environs") + return + } + } + + tc.Config.Path = filepath.Join(tempDir, "test-case.json") + err = builtin.Dump2JSON(tc, tc.Config.Path) if err != nil { log.Error().Err(err).Msg("failed to dump testcases") return } + tesecase, err := tc.toTestCase() + if err != nil { + log.Error().Err(err).Msg("failed to load testcases") + return + } + testcases = append(testcases, tesecase) } return testcases From c5a0a0fc27730607f03e65ca4d2aba7096c70547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Mon, 1 Aug 2022 17:04:39 +0800 Subject: [PATCH 5/5] fix: pluginMap uses sync.Map to avoid data race --- hrp/boomer.go | 14 ++++++++------ hrp/plugin.go | 9 +++++---- hrp/runner.go | 8 +++++--- hrp/runner_test.go | 2 +- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index 34f63fc7..d5d71f47 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -101,11 +101,12 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { // quit all plugins defer func() { - if len(pluginMap) > 0 { - for _, plugin := range pluginMap { + pluginMap.Range(func(key, value interface{}) bool { + if plugin, ok := value.(funplugin.IPlugin); ok { plugin.Quit() } - } + return true + }) }() taskSlice := b.ConvertTestCasesToBoomerTasks(testcases...) @@ -283,11 +284,12 @@ func (b *HRPBoomer) PollTasks(ctx context.Context) { func (b *HRPBoomer) PollTestCases(ctx context.Context) { // quit all plugins defer func() { - if len(pluginMap) > 0 { - for _, plugin := range pluginMap { + pluginMap.Range(func(key, value interface{}) bool { + if plugin, ok := value.(funplugin.IPlugin); ok { plugin.Quit() } - } + return true + }) }() for { diff --git a/hrp/plugin.go b/hrp/plugin.go index d98437ac..c762b6c8 100644 --- a/hrp/plugin.go +++ b/hrp/plugin.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/httprunner/funplugin" "github.com/httprunner/funplugin/fungo" @@ -24,7 +25,7 @@ const ( const projectInfoFile = "proj.json" // used for ensuring root project -var pluginMap = map[string]funplugin.IPlugin{} // used for reusing plugin instance +var pluginMap = sync.Map{} // used for reusing plugin instance func initPlugin(path, venv string, logOn bool) (plugin funplugin.IPlugin, err error) { // plugin file not found @@ -37,8 +38,8 @@ func initPlugin(path, venv string, logOn bool) (plugin funplugin.IPlugin, err er } // reuse plugin instance if it already initialized - if p, ok := pluginMap[pluginPath]; ok { - return p, nil + if p, ok := pluginMap.Load(pluginPath); ok { + return p.(funplugin.IPlugin), nil } pluginOptions := []funplugin.Option{funplugin.WithLogOn(logOn)} @@ -74,7 +75,7 @@ func initPlugin(path, venv string, logOn bool) (plugin funplugin.IPlugin, err er } // add plugin instance to plugin map - pluginMap[pluginPath] = plugin + pluginMap.Store(pluginPath, plugin) // report event for initializing plugin event := sdk.EventTracking{ diff --git a/hrp/runner.go b/hrp/runner.go index b9192e09..cff1091f 100644 --- a/hrp/runner.go +++ b/hrp/runner.go @@ -17,6 +17,7 @@ import ( "github.com/rs/zerolog/log" "golang.org/x/net/http2" + "github.com/httprunner/funplugin" "github.com/httprunner/httprunner/v4/hrp/internal/builtin" "github.com/httprunner/httprunner/v4/hrp/internal/sdk" ) @@ -188,11 +189,12 @@ func (r *HRPRunner) Run(testcases ...ITestCase) error { // quit all plugins defer func() { - if len(pluginMap) > 0 { - for _, plugin := range pluginMap { + pluginMap.Range(func(key, value interface{}) bool { + if plugin, ok := value.(funplugin.IPlugin); ok { plugin.Quit() } - } + return true + }) }() var runErr error diff --git a/hrp/runner_test.go b/hrp/runner_test.go index 6b92fdf5..383cae09 100644 --- a/hrp/runner_test.go +++ b/hrp/runner_test.go @@ -24,7 +24,7 @@ func removeHashicorpGoPlugin() { log.Info().Msg("[teardown] remove hashicorp go plugin") os.Remove(tmpl("debugtalk.bin")) pluginPath, _ := filepath.Abs(tmpl("debugtalk.bin")) - delete(pluginMap, pluginPath) + pluginMap.Delete(pluginPath) } func buildHashicorpPyPlugin() {