diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 9867a1d3..c75d6769 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -1158,24 +1158,35 @@ func (r *masterRunner) start() error { log.Error().Err(err).Msg("copy workerProfile failed") return err } - cur := 0 - ints := builtin.SplitInteger(int(r.profile.SpawnCount), numWorkers) - log.Info().Msg("send spawn data to worker") + + // spawn count + spawnCounts := builtin.SplitInteger(int(r.profile.SpawnCount), numWorkers) + + // spawn rate + spawnRate := workerProfile.SpawnRate / float64(numWorkers) + if spawnRate < 1 { + spawnRate = 1 + } + + // max RPS + maxRPSs := builtin.SplitInteger(int(workerProfile.MaxRPS), numWorkers) + r.updateState(StateSpawning) + log.Info().Msg("send spawn data to worker") + + cur := 0 r.server.clients.Range(func(key, value interface{}) bool { if workerInfo, ok := value.(*WorkerNode); ok { if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing { return true } + if workerProfile.SpawnCount > 0 { - workerProfile.SpawnCount = int64(ints[cur]) - } - if workerProfile.SpawnRate > 0 { - workerProfile.SpawnRate = workerProfile.SpawnRate / float64(numWorkers) - } - if workerProfile.MaxRPS > 0 { - workerProfile.MaxRPS = workerProfile.MaxRPS / int64(numWorkers) + workerProfile.SpawnCount = int64(spawnCounts[cur]) } + workerProfile.MaxRPS = int64(maxRPSs[cur]) + workerProfile.SpawnRate = spawnRate + workerInfo.getStream() <- &messager.StreamResponse{ Type: "spawn", Profile: ProfileToBytes(workerProfile), diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index b9d61cc6..08323aa6 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -406,6 +406,7 @@ func (s *grpcServer) close() { ctx, cancel := context.WithTimeout(context.Background(), timeout) s.stopServer(ctx) cancel() + close(s.disconnectedChan) } func (s *grpcServer) recvChannel() chan *genericMessage {