From ef71d614134c5e4544571cbdce7ec711bdccc409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=81=AA?= Date: Thu, 14 Jul 2022 14:26:55 +0800 Subject: [PATCH] fix: failed to evenly distribute the spawn-rate to each worker --- hrp/internal/boomer/runner.go | 31 ++++++++++++++++++++---------- hrp/internal/boomer/server_grpc.go | 1 + 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 9867a1d3..c75d6769 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -1158,24 +1158,35 @@ func (r *masterRunner) start() error { log.Error().Err(err).Msg("copy workerProfile failed") return err } - cur := 0 - ints := builtin.SplitInteger(int(r.profile.SpawnCount), numWorkers) - log.Info().Msg("send spawn data to worker") + + // spawn count + spawnCounts := builtin.SplitInteger(int(r.profile.SpawnCount), numWorkers) + + // spawn rate + spawnRate := workerProfile.SpawnRate / float64(numWorkers) + if spawnRate < 1 { + spawnRate = 1 + } + + // max RPS + maxRPSs := builtin.SplitInteger(int(workerProfile.MaxRPS), numWorkers) + r.updateState(StateSpawning) + log.Info().Msg("send spawn data to worker") + + cur := 0 r.server.clients.Range(func(key, value interface{}) bool { if workerInfo, ok := value.(*WorkerNode); ok { if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing { return true } + if workerProfile.SpawnCount > 0 { - workerProfile.SpawnCount = int64(ints[cur]) - } - if workerProfile.SpawnRate > 0 { - workerProfile.SpawnRate = workerProfile.SpawnRate / float64(numWorkers) - } - if workerProfile.MaxRPS > 0 { - workerProfile.MaxRPS = workerProfile.MaxRPS / int64(numWorkers) + workerProfile.SpawnCount = int64(spawnCounts[cur]) } + workerProfile.MaxRPS = int64(maxRPSs[cur]) + workerProfile.SpawnRate = spawnRate + workerInfo.getStream() <- &messager.StreamResponse{ Type: "spawn", Profile: ProfileToBytes(workerProfile), diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index b9d61cc6..08323aa6 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -406,6 +406,7 @@ func (s *grpcServer) close() { ctx, cancel := context.WithTimeout(context.Background(), timeout) s.stopServer(ctx) cancel() + close(s.disconnectedChan) } func (s *grpcServer) recvChannel() chan *genericMessage {