From 4044c15974fa663832e911a0cb65f4e6a661d77d Mon Sep 17 00:00:00 2001 From: xucong053 Date: Mon, 23 May 2022 16:45:47 +0800 Subject: [PATCH] feat: support dispatch profile to worker --- hrp/boomer.go | 64 +++++-- hrp/cmd/boom.go | 65 ++----- hrp/internal/boomer/boomer.go | 168 ++++++++++++------ hrp/internal/boomer/client_grpc.go | 24 ++- hrp/internal/boomer/message.go | 24 +-- hrp/internal/boomer/runner.go | 89 +++++----- hrp/internal/boomer/server_grpc.go | 9 +- hrp/internal/grpc/messager/messager.pb.go | 63 ++++--- .../grpc/messager/messager_grpc.pb.go | 31 +--- hrp/internal/grpc/proto/messager.proto | 9 +- hrp/server.go | 91 +++++++--- 11 files changed, 367 insertions(+), 270 deletions(-) diff --git a/hrp/boomer.go b/hrp/boomer.go index 57d3695f..5bb37f4c 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -12,7 +12,7 @@ import ( "github.com/rs/zerolog/log" ) -func NewStandaloneBoomer(spawnCount int, spawnRate float64) *HRPBoomer { +func NewStandaloneBoomer(spawnCount int64, spawnRate float64) *HRPBoomer { b := &HRPBoomer{ Boomer: boomer.NewStandaloneBoomer(spawnCount, spawnRate), pluginsMutex: new(sync.RWMutex), @@ -50,6 +50,27 @@ type HRPBoomer struct { pluginsMutex *sync.RWMutex // avoid data race } +func (b *HRPBoomer) InitBoomer() { + // init output + if !b.GetProfile().DisableConsoleOutput { + b.AddOutput(boomer.NewConsoleOutput()) + } + if b.GetProfile().PrometheusPushgatewayURL != "" { + b.AddOutput(boomer.NewPrometheusPusherOutput(b.GetProfile().PrometheusPushgatewayURL, "hrp", b.GetMode())) + } + b.SetSpawnCount(b.GetProfile().SpawnCount) + b.SetSpawnRate(b.GetProfile().SpawnRate) + if b.GetProfile().LoopCount > 0 { + b.SetLoopCount(b.GetProfile().LoopCount) + } + b.SetRateLimiter(b.GetProfile().MaxRPS, b.GetProfile().RequestIncreaseRate) + b.SetDisableKeepAlive(b.GetProfile().DisableKeepalive) + b.SetDisableCompression(b.GetProfile().DisableCompression) + b.SetClientTransport() + b.EnableCPUProfile(b.GetProfile().CPUProfile, b.GetProfile().CPUProfileDuration) + b.EnableMemoryProfile(b.GetProfile().MemoryProfile, b.GetProfile().MemoryProfileDuration) +} + func (b *HRPBoomer) SetClientTransport() *HRPBoomer { // set client transport for high concurrency load testing b.hrpRunner.SetClientTransport(b.GetSpawnCount(), b.GetDisableKeepAlive(), b.GetDisableCompression()) @@ -104,7 +125,7 @@ func (b *HRPBoomer) ConvertTestCasesToTasks(testcases ...ITestCase) (taskSlice [ return taskSlice } -func (b *HRPBoomer) LoopTestCases() { +func (b *HRPBoomer) PollTestCases() { for { select { case <-b.Boomer.ParseTestCasesChan(): @@ -167,9 +188,7 @@ func (b *HRPBoomer) Quit() { b.Boomer.Quit() } -func (b *HRPBoomer) handleTasks(tcs []byte) { - //Todo: 过滤掉已经传输过的task - testCases := b.BytesToTestCases(tcs) +func (b *HRPBoomer) runTasks(testCases []*TCase, profile *boomer.Profile) { var testcases []ITestCase for _, tc := range testCases { tesecase, err := tc.toTestCase() @@ -178,22 +197,37 @@ func (b *HRPBoomer) handleTasks(tcs []byte) { } testcases = append(testcases, tesecase) } - log.Info().Interface("testcases", testcases).Msg("loop tasks successful") - if b.Boomer.GetState() == boomer.StateRunning || b.Boomer.GetState() == boomer.StateSpawning { - b.Boomer.SetTasks(b.ConvertTestCasesToTasks(testcases...)...) - } else { - b.Run(testcases...) - } + b.SetProfile(profile) + b.InitBoomer() + log.Info().Interface("testcases", testcases).Interface("profile", profile).Msg("run tasks successful") + b.Run(testcases...) } -func (b *HRPBoomer) LoopTasks() { +func (b *HRPBoomer) rebalanceTasks(profile *boomer.Profile) { + b.SetProfile(profile) + b.SetSpawnCount(b.GetProfile().SpawnCount) + b.SetSpawnRate(b.GetProfile().SpawnRate) + b.GetRebalanceChan() <- true + log.Info().Interface("profile", profile).Msg("rebalance tasks successful") +} + +func (b *HRPBoomer) PollTasks() { for { select { - case tcs := <-b.Boomer.GetTestCaseBytesChan(): - if len(b.Boomer.GetTestCaseBytesChan()) > 0 { + case tasks := <-b.Boomer.GetTasksChan(): + // 清理过时测试用例任务 + if len(b.Boomer.GetTasksChan()) > 0 { continue } - go b.handleTasks(tcs) + profile := boomer.BytesToProfile(tasks.Profile) + //Todo: 过滤掉已经传输过的task + if tasks.Tasks != nil { + testCases := b.BytesToTestCases(tasks.Tasks) + go b.runTasks(testCases, profile) + } else { + go b.rebalanceTasks(profile) + } + case <-b.Boomer.GetCloseChan(): return } diff --git a/hrp/cmd/boom.go b/hrp/cmd/boom.go index a0be404c..7fe7c93a 100644 --- a/hrp/cmd/boom.go +++ b/hrp/cmd/boom.go @@ -38,7 +38,7 @@ var boomCmd = &cobra.Command{ // if set profile, the priority is higher than the other commands if boomArgs.profile != "" { - err := builtin.LoadFile(boomArgs.profile, &boomArgs) + err := builtin.LoadFile(boomArgs.profile, &boomArgs.profile) if err != nil { log.Error().Err(err).Msg("failed to load profile") os.Exit(1) @@ -54,16 +54,9 @@ var boomCmd = &cobra.Command{ } else { hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) } + hrpBoomer.SetProfile(&boomArgs.Profile) hrpBoomer.EnableGracefulQuit() - // init output - if !boomArgs.DisableConsoleOutput { - hrpBoomer.AddOutput(boomer.NewConsoleOutput()) - } - if boomArgs.PrometheusPushgatewayURL != "" { - hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode())) - } - // run boomer switch hrpBoomer.GetMode() { case "master": @@ -71,61 +64,41 @@ var boomCmd = &cobra.Command{ if boomArgs.autoStart { hrpBoomer.SetAutoStart() hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait) - hrpBoomer.SetSpawnCount(int64(boomArgs.SpawnCount)) + hrpBoomer.SetSpawnCount(boomArgs.SpawnCount) hrpBoomer.SetSpawnRate(boomArgs.SpawnRate) } go hrpBoomer.StartServer() go hrpBoomer.RunMaster() - hrpBoomer.LoopTestCases() + hrpBoomer.PollTestCases() case "worker": if boomArgs.ignoreQuit { hrpBoomer.SetIgnoreQuit() } go hrpBoomer.RunWorker() - hrpBoomer.LoopTasks() + hrpBoomer.PollTasks() case "standalone": - if boomArgs.LoopCount > 0 { - hrpBoomer.SetLoopCount(boomArgs.LoopCount) - } - hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate) - hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive) - hrpBoomer.SetDisableCompression(boomArgs.DisableCompression) - hrpBoomer.SetClientTransport() if venv != "" { hrpBoomer.SetPython3Venv(venv) } - hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration) - hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration) + hrpBoomer.InitBoomer() hrpBoomer.Run(paths...) } }, } type BoomArgs struct { - SpawnCount int `json:"spawn-count,omitempty" yaml:"spawn-count,omitempty"` - SpawnRate float64 `json:"spawn-rate,omitempty" yaml:"spawn-rate,omitempty"` - MaxRPS int64 `json:"max-rps,omitempty" yaml:"max-rps,omitempty"` - LoopCount int64 `json:"loop-count,omitempty" yaml:"loop-count,omitempty"` - RequestIncreaseRate string `json:"request-increase-rate,omitempty" yaml:"request-increase-rate,omitempty"` - MemoryProfile string `json:"memory-profile,omitempty" yaml:"memory-profile,omitempty"` - MemoryProfileDuration time.Duration `json:"memory-profile-duration" yaml:"memory-profile-duration"` - CPUProfile string `json:"cpu-profile,omitempty" yaml:"cpu-profile,omitempty"` - CPUProfileDuration time.Duration `json:"cpu-profile-duration,omitempty" yaml:"cpu-profile-duration,omitempty"` - PrometheusPushgatewayURL string `json:"prometheus-gateway,omitempty" yaml:"prometheus-gateway,omitempty"` - DisableConsoleOutput bool `json:"disable-console-output,omitempty" yaml:"disable-console-output,omitempty"` - DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty"` - DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty"` - profile string - master bool - worker bool - ignoreQuit bool - masterHost string - masterPort int - masterBindHost string - masterBindPort int - autoStart bool - expectWorkers int - expectWorkersMaxWait int + boomer.Profile + profile string + master bool + worker bool + ignoreQuit bool + masterHost string + masterPort int + masterBindHost string + masterBindPort int + autoStart bool + expectWorkers int + expectWorkersMaxWait int } var boomArgs BoomArgs @@ -135,7 +108,7 @@ func init() { boomCmd.Flags().Int64Var(&boomArgs.MaxRPS, "max-rps", 0, "Max RPS that boomer can generate, disabled by default.") boomCmd.Flags().StringVar(&boomArgs.RequestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.") - boomCmd.Flags().IntVar(&boomArgs.SpawnCount, "spawn-count", 1, "The number of users to spawn for load testing") + boomCmd.Flags().Int64Var(&boomArgs.SpawnCount, "spawn-count", 1, "The number of users to spawn for load testing") boomCmd.Flags().Float64Var(&boomArgs.SpawnRate, "spawn-rate", 1, "The rate for spawning users") boomCmd.Flags().Int64Var(&boomArgs.LoopCount, "loop-count", -1, "The specify running cycles for load testing") boomCmd.Flags().StringVar(&boomArgs.MemoryProfile, "mem-profile", "", "Enable memory profiling.") diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index 16cb0e5c..ed314249 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -1,16 +1,13 @@ package boomer import ( + "github.com/httprunner/httprunner/v4/hrp/internal/json" "math" "os" "os/signal" - "strconv" - "strings" "syscall" "time" - "github.com/httprunner/httprunner/v4/hrp/internal/builtin" - "github.com/pkg/errors" "github.com/rs/zerolog/log" ) @@ -49,6 +46,58 @@ type Boomer struct { disableCompression bool } +type Profile struct { + SpawnCount int64 `json:"spawn-count,omitempty" yaml:"spawn-count,omitempty" mapstructure:"spawn-count,omitempty"` + SpawnRate float64 `json:"spawn-rate,omitempty" yaml:"spawn-rate,omitempty" mapstructure:"spawn-rate,omitempty"` + MaxRPS int64 `json:"max-rps,omitempty" yaml:"max-rps,omitempty" mapstructure:"max-rps,omitempty"` + LoopCount int64 `json:"loop-count,omitempty" yaml:"loop-count,omitempty" mapstructure:"loop-count,omitempty"` + RequestIncreaseRate string `json:"request-increase-rate,omitempty" yaml:"request-increase-rate,omitempty" mapstructure:"request-increase-rate,omitempty"` + MemoryProfile string `json:"memory-profile,omitempty" yaml:"memory-profile,omitempty" mapstructure:"memory-profile,omitempty"` + MemoryProfileDuration time.Duration `json:"memory-profile-duration,omitempty" yaml:"memory-profile-duration,omitempty" mapstructure:"memory-profile-duration,omitempty"` + CPUProfile string `json:"cpu-profile,omitempty" yaml:"cpu-profile,omitempty" mapstructure:"cpu-profile,omitempty"` + CPUProfileDuration time.Duration `json:"cpu-profile-duration,omitempty" yaml:"cpu-profile-duration,omitempty" mapstructure:"cpu-profile-duration,omitempty"` + PrometheusPushgatewayURL string `json:"prometheus-gateway,omitempty" yaml:"prometheus-gateway,omitempty" mapstructure:"prometheus-gateway,omitempty"` + DisableConsoleOutput bool `json:"disable-console-output,omitempty" yaml:"disable-console-output,omitempty" mapstructure:"disable-console-output,omitempty"` + DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty" mapstructure:"disable-compression,omitempty"` + DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty" mapstructure:"disable-keepalive,omitempty"` +} + +func (b *Boomer) GetProfile() *Profile { + switch b.mode { + case DistributedMasterMode: + return b.masterRunner.profile + case DistributedWorkerMode: + return b.workerRunner.profile + default: + return b.localRunner.profile + } +} + +func (b *Boomer) SetProfile(profile *Profile) { + switch b.mode { + case DistributedMasterMode: + b.masterRunner.profile = profile + case DistributedWorkerMode: + b.workerRunner.profile = profile + default: + b.localRunner.profile = profile + } +} + +func (p *Profile) dispatch(workers int64) *Profile { + workerProfile := *p + if p.SpawnCount > 0 { + workerProfile.SpawnCount = p.SpawnCount / workers + } + if p.SpawnRate > 0 { + workerProfile.SpawnRate = p.SpawnRate / float64(workers) + } + if p.MaxRPS > 0 { + workerProfile.MaxRPS = p.MaxRPS / workers + } + return &workerProfile +} + // SetMode only accepts boomer.DistributedMasterMode、boomer.DistributedWorkerMode and boomer.StandaloneMode. func (b *Boomer) SetMode(mode Mode) { switch mode { @@ -79,7 +128,7 @@ func (b *Boomer) GetMode() string { } // NewStandaloneBoomer returns a new Boomer, which can run without master. -func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer { +func NewStandaloneBoomer(spawnCount int64, spawnRate float64) *Boomer { return &Boomer{ mode: StandaloneMode, localRunner: newLocalRunner(spawnCount, spawnRate), @@ -125,10 +174,56 @@ func (b *Boomer) GetTestCaseBytesChan() chan []byte { switch b.mode { case DistributedMasterMode: return b.masterRunner.testCaseBytes - case DistributedWorkerMode: - return b.workerRunner.testCaseBytes + default: + return nil + } +} + +func ProfileToBytes(profile *Profile) []byte { + profileBytes, err := json.Marshal(profile) + if err != nil { + log.Error().Err(err).Msg("failed to marshal testcases") + return nil + } + return profileBytes +} + +func BytesToProfile(profileBytes []byte) *Profile { + var profile *Profile + err := json.Unmarshal(profileBytes, &profile) + if err != nil { + log.Error().Err(err).Msg("failed to unmarshal testcases") + } + return profile +} + +// GetProfileBytesChan gets profile bytes chan +func (b *Boomer) GetProfileBytesChan() chan []byte { + switch b.mode { + case DistributedMasterMode: + return b.masterRunner.profileBytes + default: + return nil + } +} + +// GetTasksChan gets profile bytes chan +func (b *Boomer) GetTasksChan() chan *profileMessage { + switch b.mode { + case DistributedWorkerMode: + return b.workerRunner.tasksChan + default: + return nil + } +} + +func (b *Boomer) GetRebalanceChan() chan bool { + switch b.mode { + case DistributedWorkerMode: + return b.workerRunner.rebalance + default: + return nil } - return nil } func (b *Boomer) SetTestCasesPath(paths []string) { @@ -390,66 +485,25 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc } // Start starts to run -func (b *Boomer) Start(Args map[string]interface{}) error { +func (b *Boomer) Start(Args *Profile) error { if b.masterRunner.isStarted() { return errors.New("already started") } - spawnCount, ok := Args["spawn_count"] - if ok { - v, err := strconv.Atoi(spawnCount.(string)) - if err != nil { - log.Error().Err(err).Msg("spawn_count sets error") - return err - } - b.SetSpawnCount(int64(v)) - } else { - return errors.New("spawn count error") - } - spawnRate, ok := Args["spawn_rate"] - if ok { - v, err := builtin.Interface2Float64(spawnRate) - if err != nil { - log.Error().Err(err).Msg("spawn_count sets error") - return err - } - b.SetSpawnRate(v) - } else { - b.SetSpawnRate(float64(b.GetSpawnCount())) - } - path, ok := Args["path"].(string) - if ok { - paths := strings.Split(path, ",") - b.SetTestCasesPath(paths) - } else { - return errors.New("testcase path error") - } + b.SetSpawnCount(Args.SpawnCount) + b.SetSpawnRate(Args.SpawnRate) + b.SetProfile(Args) err := b.masterRunner.start() return err } // ReBalance starts to rebalance load test -func (b *Boomer) ReBalance(Args map[string]interface{}) error { +func (b *Boomer) ReBalance(Args *Profile) error { if !b.masterRunner.isStarted() { return errors.New("no start") } - spawnCount, ok := Args["spawn_count"] - if ok { - v, err := strconv.Atoi(spawnCount.(string)) - if err != nil { - log.Error().Err(err).Msg("spawn_count sets error") - return err - } - b.SetSpawnCount(int64(v)) - } - spawnRate, ok := Args["spawn_rate"] - if ok { - v, err := builtin.Interface2Float64(spawnRate) - if err != nil { - log.Error().Err(err).Msg("spawn_count sets error") - return err - } - b.SetSpawnRate(v) - } + b.SetSpawnCount(Args.SpawnCount) + b.SetSpawnRate(Args.SpawnRate) + b.SetProfile(Args) err := b.masterRunner.rebalance() if err != nil { log.Error().Err(err).Msg("failed to rebalance") diff --git a/hrp/internal/boomer/client_grpc.go b/hrp/internal/boomer/client_grpc.go index 5fa33cc1..8082074d 100644 --- a/hrp/internal/boomer/client_grpc.go +++ b/hrp/internal/boomer/client_grpc.go @@ -80,6 +80,9 @@ func (c *grpcClient) connect() (err error) { return err } + go c.recv() + go c.send() + biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx) if err != nil { log.Error().Err(err).Msg("call bidirectional streaming message err") @@ -87,19 +90,11 @@ func (c *grpcClient) connect() (err error) { } c.config.setBiStreamClient(biStream) log.Info().Msg(fmt.Sprintf("Boomer is connected to master(%s) press Ctrl+c to quit.\n", addr)) - go c.recv() - go c.send() return nil } func (c *grpcClient) reConnect() (err error) { - addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort) - c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure()) - if err != nil { - return - } - biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx) if err != nil { return @@ -111,7 +106,7 @@ func (c *grpcClient) reConnect() (err error) { //// tell master, I'm ready //log.Info().Msg("send client ready signal") //c.sendChannel() <- newClientReadyMessageToMaster(c.identity) - log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master(%s) press Ctrl+c to quit.\n", addr)) + log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master press Ctrl+c to quit.\n")) return } @@ -136,6 +131,7 @@ func (c *grpcClient) recv() { return default: if c.config.getBiStreamClient() == nil { + time.Sleep(1 * time.Second) continue } msg, err := c.config.getBiStreamClient().Recv() @@ -158,10 +154,11 @@ func (c *grpcClient) recv() { } c.fromMaster <- &genericMessage{ - Type: msg.Type, - Data: msg.Data, - NodeID: msg.NodeID, - Tasks: msg.Tasks, + Type: msg.Type, + Profile: msg.Profile, + Data: msg.Data, + NodeID: msg.NodeID, + Tasks: msg.Tasks, } log.Info(). @@ -204,6 +201,7 @@ func (c *grpcClient) sendMessage(msg *genericMessage) { Interface("data", msg.Data). Msg("send data to server") if c.config.getBiStreamClient() == nil { + atomic.AddInt32(&c.failCount, 1) return } err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID}) diff --git a/hrp/internal/boomer/message.go b/hrp/internal/boomer/message.go index 93b9a0b3..69819854 100644 --- a/hrp/internal/boomer/message.go +++ b/hrp/internal/boomer/message.go @@ -10,14 +10,17 @@ const ( typeException = "exception" ) -type message interface { +type genericMessage struct { + Type string `json:"type,omitempty"` + Profile []byte `json:"profile,omitempty"` + Data map[string]int64 `json:"data,omitempty"` + NodeID string `json:"node_id,omitempty"` + Tasks []byte `json:"tasks,omitempty"` } -type genericMessage struct { - Type string `json:"type,omitempty"` - Data map[string]int64 `json:"data,omitempty"` - NodeID string `json:"node_id,omitempty"` - Tasks []byte `json:"tasks,omitempty"` +type profileMessage struct { + Profile []byte `json:"profile,omitempty"` + Tasks []byte `json:"tasks,omitempty"` } func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) { @@ -35,11 +38,12 @@ func newQuitMessage(nodeID string) (msg *genericMessage) { } } -func newSpawnMessageToWorker(t string, data map[string]int64, tasks []byte) (msg *genericMessage) { +func newMessageToWorker(t string, profile []byte, data map[string]int64, tasks []byte) (msg *genericMessage) { return &genericMessage{ - Type: t, - Data: data, - Tasks: tasks, + Type: t, + Profile: profile, + Data: data, + Tasks: tasks, } } diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 38c37b7c..61506db2 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -270,6 +270,9 @@ func (r *runner) outputOnEvent(data map[string]interface{}) { } func (r *runner) outputOnStop() { + defer func() { + r.outputs = make([]Output, 0) + }() size := len(r.outputs) if size == 0 { return @@ -332,6 +335,7 @@ func (r *runner) reset() { } func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { + r.updateState(StateSpawning) log.Info(). Int64("spawnCount", spawnCount). Float64("spawnRate", spawnRate). @@ -339,7 +343,6 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo r.controller.setSpawn(spawnCount, spawnRate) - r.updateState(StateSpawning) for { select { case <-quit: @@ -510,14 +513,16 @@ func (r *runner) isStarted() bool { type localRunner struct { runner + + profile *Profile } -func newLocalRunner(spawnCount int, spawnRate float64) *localRunner { +func newLocalRunner(spawnCount int64, spawnRate float64) *localRunner { return &localRunner{ runner: runner{ state: StateInit, stats: newRequestStats(), - spawnCount: int64(spawnCount), + spawnCount: spawnCount, spawnRate: spawnRate, controller: &Controller{}, outputs: make([]Output, 0), @@ -535,14 +540,13 @@ func (r *localRunner) start() { if r.rateLimitEnabled { r.rateLimiter.Start() } - - r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil) - // output setup r.outputOnStart() + go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil) + // start stats report - go r.runner.statsStart() + go r.statsStart() // stop <-r.stopChan @@ -582,10 +586,9 @@ type workerRunner struct { masterPort int client *grpcClient - // this channel will start worker for spawning. - spawnStartChan chan bool - // get testcase from master - testCaseBytes chan []byte + profile *Profile + + tasksChan chan *profileMessage ignoreQuit bool } @@ -594,15 +597,15 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { r = &workerRunner{ runner: runner{ stats: newRequestStats(), + outputs: make([]Output, 0), controller: &Controller{}, closeChan: make(chan bool), once: &sync.Once{}, }, - masterHost: masterHost, - masterPort: masterPort, - nodeID: getNodeID(), - spawnStartChan: make(chan bool), - testCaseBytes: make(chan []byte, 10), + masterHost: masterHost, + masterPort: masterPort, + nodeID: getNodeID(), + tasksChan: make(chan *profileMessage, 10), } return r } @@ -615,30 +618,26 @@ func (r *workerRunner) spawnComplete() { func (r *workerRunner) onSpawnMessage(msg *genericMessage) { r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID) - spawnCount, ok := msg.Data["spawn_count"] - if ok { - r.setSpawnCount(spawnCount) + if msg.Profile == nil { + log.Error().Msg("miss profile") } - spawnRate, ok := msg.Data["spawn_rate"] - if ok { - r.setSpawnRate(float64(spawnRate)) + if msg.Tasks == nil { + log.Error().Msg("miss tasks") } - if msg.Tasks != nil { - r.testCaseBytes <- msg.Tasks + r.tasksChan <- &profileMessage{ + Profile: msg.Profile, + Tasks: msg.Tasks, } log.Info().Msg("on spawn message successful") } func (r *workerRunner) onRebalanceMessage(msg *genericMessage) { - spawnCount, ok := msg.Data["spawn_count"] - if ok { - r.setSpawnCount(spawnCount) + if msg.Profile == nil { + log.Error().Msg("miss profile") } - spawnRate, ok := msg.Data["spawn_rate"] - if ok { - r.setSpawnRate(float64(spawnRate)) + r.tasksChan <- &profileMessage{ + Profile: msg.Profile, } - r.rebalance <- true log.Info().Msg("on rebalance message successful") } @@ -705,7 +704,6 @@ func (r *workerRunner) run() { err := r.client.connect() if err != nil { log.Printf("Failed to connect to master(%s:%d) with error %v\n", r.masterHost, r.masterPort, err) - return } // listen to master @@ -758,7 +756,7 @@ func (r *workerRunner) start() { r.once.Do(r.outputOnStart) - r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete) + go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete) // start stats report go r.statsStart() @@ -783,7 +781,7 @@ func (r *workerRunner) close() { return } // waiting report finished - time.Sleep(3 * time.Second) + time.Sleep(1 * time.Second) close(r.closeChan) var ticker = time.NewTicker(1 * time.Second) if r.client != nil { @@ -811,8 +809,12 @@ type masterRunner struct { expectWorkers int expectWorkersMaxWait int + profile *Profile + parseTestCasesChan chan bool testCaseBytes chan []byte + // set profile to worker + profileBytes chan []byte } func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { @@ -990,20 +992,17 @@ func (r *masterRunner) start() error { if numWorkers == 0 { return errors.New("current workers: 0") } - workerSpawnRate := r.getSpawnRate() / float64(numWorkers) - workerSpawnCount := r.getSpawnCount() / int64(numWorkers) log.Info().Msg("send spawn data to worker") r.updateState(StateSpawning) - // waitting to fetch testcase + // fetching testcase testcase, err := r.fetchTestCase() if err != nil { return err } - r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{ - "spawn_count": workerSpawnCount, - "spawn_rate": int64(workerSpawnRate), - }, testcase) + profile := r.profile.dispatch(int64(numWorkers)) + + r.server.sendChannel() <- newMessageToWorker("spawn", ProfileToBytes(profile), nil, testcase) println("send spawn data to worker successful") log.Info().Msg("send spawn data to worker successful") return nil @@ -1014,13 +1013,9 @@ func (r *masterRunner) rebalance() error { if numWorkers == 0 { return errors.New("current workers: 0") } - workerSpawnRate := r.getSpawnRate() / float64(numWorkers) - workerSpawnCount := r.getSpawnCount() / int64(numWorkers) + profile := r.profile.dispatch(int64(numWorkers)) - r.server.sendChannel() <- newSpawnMessageToWorker("rebalance", map[string]int64{ - "spawn_count": workerSpawnCount, - "spawn_rate": int64(workerSpawnRate), - }, nil) + r.server.sendChannel() <- newMessageToWorker("rebalance", ProfileToBytes(profile), nil, nil) println("send rebalance data to worker successful") return nil } diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go index 7eb92104..bcd85001 100644 --- a/hrp/internal/boomer/server_grpc.go +++ b/hrp/internal/boomer/server_grpc.go @@ -312,10 +312,11 @@ func (s *grpcServer) sendMessage(msg *genericMessage) { } err := workerInfo.messenger.Send( &messager.StreamResponse{ - Type: msg.Type, - Data: msg.Data, - NodeID: workerInfo.ID, - Tasks: msg.Tasks}, + Type: msg.Type, + Profile: msg.Profile, + Data: msg.Data, + NodeID: workerInfo.ID, + Tasks: msg.Tasks}, ) switch err { case nil: diff --git a/hrp/internal/grpc/messager/messager.pb.go b/hrp/internal/grpc/messager/messager.pb.go index bb389289..a9d2efde 100644 --- a/hrp/internal/grpc/messager/messager.pb.go +++ b/hrp/internal/grpc/messager/messager.pb.go @@ -7,6 +7,8 @@ package messager import ( + context "context" + grpc "google.golang.org/grpc" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -88,10 +90,11 @@ type StreamResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` - Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` - NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"` - Tasks []byte `protobuf:"bytes,4,opt,name=tasks,proto3" json:"tasks,omitempty"` + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Profile []byte `protobuf:"bytes,2,opt,name=profile,proto3" json:"profile,omitempty"` + Data map[string]int64 `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + NodeID string `protobuf:"bytes,4,opt,name=NodeID,proto3" json:"NodeID,omitempty"` + Tasks []byte `protobuf:"bytes,5,opt,name=tasks,proto3" json:"tasks,omitempty"` } func (x *StreamResponse) Reset() { @@ -133,6 +136,13 @@ func (x *StreamResponse) GetType() string { return "" } +func (x *StreamResponse) GetProfile() []byte { + if x != nil { + return x.Profile + } + return nil +} + func (x *StreamResponse) GetData() map[string]int64 { if x != nil { return x.Data @@ -170,27 +180,28 @@ var file_grpc_proto_messager_proto_rawDesc = []byte{ 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xc2, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, + 0x01, 0x22, 0xdc, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, - 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, - 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a, - 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, - 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x66, + 0x69, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x66, 0x69, + 0x6c, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, + 0x65, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, + 0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42, + 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, + 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -274,3 +285,7 @@ func file_grpc_proto_messager_proto_init() { file_grpc_proto_messager_proto_goTypes = nil file_grpc_proto_messager_proto_depIdxs = nil } + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface diff --git a/hrp/internal/grpc/messager/messager_grpc.pb.go b/hrp/internal/grpc/messager/messager_grpc.pb.go index 8237aa3c..d59a25e8 100644 --- a/hrp/internal/grpc/messager/messager_grpc.pb.go +++ b/hrp/internal/grpc/messager/messager_grpc.pb.go @@ -15,12 +15,11 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +const _ = grpc.SupportPackageIsVersion6 // MessageClient is the client API for Message service. // -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type MessageClient interface { BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) } @@ -34,7 +33,7 @@ func NewMessageClient(cc grpc.ClientConnInterface) MessageClient { } func (c *messageClient) BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) { - stream, err := c.cc.NewStream(ctx, &Message_ServiceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...) + stream, err := c.cc.NewStream(ctx, &_Message_serviceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...) if err != nil { return nil, err } @@ -65,31 +64,20 @@ func (x *messageBidirectionalStreamingMessageClient) Recv() (*StreamResponse, er } // MessageServer is the server API for Message service. -// All implementations must embed UnimplementedMessageServer -// for forward compatibility type MessageServer interface { BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error - mustEmbedUnimplementedMessageServer() } -// UnimplementedMessageServer must be embedded to have forward compatible implementations. +// UnimplementedMessageServer can be embedded to have forward compatible implementations. type UnimplementedMessageServer struct { } -func (UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error { +func (*UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error { return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingMessage not implemented") } -func (UnimplementedMessageServer) mustEmbedUnimplementedMessageServer() {} -// UnsafeMessageServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to MessageServer will -// result in compilation errors. -type UnsafeMessageServer interface { - mustEmbedUnimplementedMessageServer() -} - -func RegisterMessageServer(s grpc.ServiceRegistrar, srv MessageServer) { - s.RegisterService(&Message_ServiceDesc, srv) +func RegisterMessageServer(s *grpc.Server, srv MessageServer) { + s.RegisterService(&_Message_serviceDesc, srv) } func _Message_BidirectionalStreamingMessage_Handler(srv interface{}, stream grpc.ServerStream) error { @@ -118,10 +106,7 @@ func (x *messageBidirectionalStreamingMessageServer) Recv() (*StreamRequest, err return m, nil } -// Message_ServiceDesc is the grpc.ServiceDesc for Message service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Message_ServiceDesc = grpc.ServiceDesc{ +var _Message_serviceDesc = grpc.ServiceDesc{ ServiceName: "message.Message", HandlerType: (*MessageServer)(nil), Methods: []grpc.MethodDesc{}, diff --git a/hrp/internal/grpc/proto/messager.proto b/hrp/internal/grpc/proto/messager.proto index ef311339..fa6bfd49 100644 --- a/hrp/internal/grpc/proto/messager.proto +++ b/hrp/internal/grpc/proto/messager.proto @@ -10,13 +10,14 @@ service Message { message StreamRequest{ string type = 1; - map data = 2; + map data = 2; string NodeID = 3; } message StreamResponse{ string type = 1; - map data = 2; - string NodeID = 3; - bytes tasks = 4; + bytes profile = 2; + map data = 3; + string NodeID = 4; + bytes tasks = 5; } \ No newline at end of file diff --git a/hrp/server.go b/hrp/server.go index 68db4815..faacfb95 100644 --- a/hrp/server.go +++ b/hrp/server.go @@ -6,9 +6,11 @@ import ( "io/ioutil" "log" "net/http" + "strings" "github.com/httprunner/httprunner/v4/hrp/internal/boomer" "github.com/httprunner/httprunner/v4/hrp/internal/json" + "github.com/mitchellh/mapstructure" ) const jsonContentType = "application/json; encoding=utf-8" @@ -43,7 +45,7 @@ func parseBody(r *http.Request) (data map[string]interface{}, err error) { r.Body.Close() return nil, err } - err = json.Unmarshal(body, data) + err = json.Unmarshal(body, &data) if err != nil { return nil, err } @@ -62,10 +64,10 @@ func writeJSON(w http.ResponseWriter, body []byte, status int) { } type StartRequestBody struct { - Worker string `json:"worker"` // all - SpawnCount int64 `json:"spawn_count"` - SpawnRate int64 `json:"spawn_rate"` - TestCasePath string `json:"testcase_path"` + boomer.Profile `mapstructure:",squash"` + Worker string `json:"worker,omitempty" yaml:"worker,omitempty" mapstructure:"worker"` // all + TestCasePath string `json:"testcase-path" yaml:"testcase-path" mapstructure:"testcase-path"` + Other map[string]interface{} `mapstructure:",remain"` } type ServerCode int @@ -118,10 +120,9 @@ func CustomAPIResponse(errCode ServerCode, errMsg string) ServerStatus { } type RebalanceRequestBody struct { - Worker string `json:"worker"` - SpawnCount int64 `json:"spawn_count"` - SpawnRate int64 `json:"spawn_rate"` - TestCasePath string `json:"testcase_path"` + boomer.Profile `mapstructure:",squash"` + Worker string `json:"worker,omitempty" yaml:"worker,omitempty" mapstructure:"worker"` + Other map[string]interface{} `mapstructure:",remain"` } type StopRequestBody struct { @@ -167,15 +168,38 @@ func (api *apiHandler) Index(w http.ResponseWriter, r *http.Request) { } func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) { - data := map[string]interface{}{} - args := r.URL.Query() - for k, vs := range args { - for _, v := range vs { - data[k] = v - } - } var resp *CommonResponseBody - err := api.boomer.Start(data) + data, err := parseBody(r) + + req := StartRequestBody{ + Profile: *api.boomer.GetProfile(), + } + err = mapstructure.Decode(data, &req) + if len(req.Other) > 0 { + keys := make([]string, 0, len(req.Other)) + for k := range req.Other { + keys = append(keys, k) + } + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseParamError(fmt.Sprintf("failed to recognize params: %v", keys)), + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) + return + } + if req.TestCasePath == "" { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseParamError(fmt.Sprint("missing testcases path")), + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) + return + } + paths := strings.Split(req.TestCasePath, ",") + api.boomer.SetTestCasesPath(paths) + if err == nil { + err = api.boomer.Start(&req.Profile) + } if err != nil { resp = &CommonResponseBody{ ServerStatus: EnumAPIResponseServerError(err.Error()), @@ -231,15 +255,28 @@ func (api *apiHandler) Quit(w http.ResponseWriter, r *http.Request) { } func (api *apiHandler) ReBalance(w http.ResponseWriter, r *http.Request) { - data := map[string]interface{}{} - args := r.URL.Query() - for k, vs := range args { - for _, v := range vs { - data[k] = v - } - } var resp *CommonResponseBody - err := api.boomer.ReBalance(data) + data, err := parseBody(r) + + req := RebalanceRequestBody{ + Profile: *api.boomer.GetProfile(), + } + err = mapstructure.Decode(data, &req) + if len(req.Other) > 0 { + keys := make([]string, 0, len(req.Other)) + for k := range req.Other { + keys = append(keys, k) + } + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseParamError(fmt.Sprintf("failed to recognize params: %v", keys)), + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) + return + } + if err == nil { + err = api.boomer.ReBalance(&req.Profile) + } if err != nil { resp = &CommonResponseBody{ ServerStatus: EnumAPIResponseParamError(err.Error()), @@ -267,10 +304,10 @@ func (api *apiHandler) Handler() http.Handler { mux := http.NewServeMux() mux.HandleFunc("/", methods(api.Index, "GET")) - mux.HandleFunc("/start", methods(api.Start, "GET")) + mux.HandleFunc("/start", methods(api.Start, "POST")) mux.HandleFunc("/stop", methods(api.Stop, "GET")) mux.HandleFunc("/quit", methods(api.Quit, "GET")) - mux.HandleFunc("/rebalance", methods(api.ReBalance, "GET")) + mux.HandleFunc("/rebalance", methods(api.ReBalance, "POST")) mux.HandleFunc("/workers", methods(api.GetWorkersInfo, "GET")) return mux