feat: support dispatch profile to worker

This commit is contained in:
xucong053
2022-05-23 16:45:47 +08:00
committed by 徐聪
parent c5f482dbfa
commit 4044c15974
11 changed files with 367 additions and 270 deletions

View File

@@ -12,7 +12,7 @@ import (
"github.com/rs/zerolog/log"
)
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *HRPBoomer {
func NewStandaloneBoomer(spawnCount int64, spawnRate float64) *HRPBoomer {
b := &HRPBoomer{
Boomer: boomer.NewStandaloneBoomer(spawnCount, spawnRate),
pluginsMutex: new(sync.RWMutex),
@@ -50,6 +50,27 @@ type HRPBoomer struct {
pluginsMutex *sync.RWMutex // avoid data race
}
func (b *HRPBoomer) InitBoomer() {
// init output
if !b.GetProfile().DisableConsoleOutput {
b.AddOutput(boomer.NewConsoleOutput())
}
if b.GetProfile().PrometheusPushgatewayURL != "" {
b.AddOutput(boomer.NewPrometheusPusherOutput(b.GetProfile().PrometheusPushgatewayURL, "hrp", b.GetMode()))
}
b.SetSpawnCount(b.GetProfile().SpawnCount)
b.SetSpawnRate(b.GetProfile().SpawnRate)
if b.GetProfile().LoopCount > 0 {
b.SetLoopCount(b.GetProfile().LoopCount)
}
b.SetRateLimiter(b.GetProfile().MaxRPS, b.GetProfile().RequestIncreaseRate)
b.SetDisableKeepAlive(b.GetProfile().DisableKeepalive)
b.SetDisableCompression(b.GetProfile().DisableCompression)
b.SetClientTransport()
b.EnableCPUProfile(b.GetProfile().CPUProfile, b.GetProfile().CPUProfileDuration)
b.EnableMemoryProfile(b.GetProfile().MemoryProfile, b.GetProfile().MemoryProfileDuration)
}
func (b *HRPBoomer) SetClientTransport() *HRPBoomer {
// set client transport for high concurrency load testing
b.hrpRunner.SetClientTransport(b.GetSpawnCount(), b.GetDisableKeepAlive(), b.GetDisableCompression())
@@ -104,7 +125,7 @@ func (b *HRPBoomer) ConvertTestCasesToTasks(testcases ...ITestCase) (taskSlice [
return taskSlice
}
func (b *HRPBoomer) LoopTestCases() {
func (b *HRPBoomer) PollTestCases() {
for {
select {
case <-b.Boomer.ParseTestCasesChan():
@@ -167,9 +188,7 @@ func (b *HRPBoomer) Quit() {
b.Boomer.Quit()
}
func (b *HRPBoomer) handleTasks(tcs []byte) {
//Todo: 过滤掉已经传输过的task
testCases := b.BytesToTestCases(tcs)
func (b *HRPBoomer) runTasks(testCases []*TCase, profile *boomer.Profile) {
var testcases []ITestCase
for _, tc := range testCases {
tesecase, err := tc.toTestCase()
@@ -178,22 +197,37 @@ func (b *HRPBoomer) handleTasks(tcs []byte) {
}
testcases = append(testcases, tesecase)
}
log.Info().Interface("testcases", testcases).Msg("loop tasks successful")
if b.Boomer.GetState() == boomer.StateRunning || b.Boomer.GetState() == boomer.StateSpawning {
b.Boomer.SetTasks(b.ConvertTestCasesToTasks(testcases...)...)
} else {
b.Run(testcases...)
}
b.SetProfile(profile)
b.InitBoomer()
log.Info().Interface("testcases", testcases).Interface("profile", profile).Msg("run tasks successful")
b.Run(testcases...)
}
func (b *HRPBoomer) LoopTasks() {
func (b *HRPBoomer) rebalanceTasks(profile *boomer.Profile) {
b.SetProfile(profile)
b.SetSpawnCount(b.GetProfile().SpawnCount)
b.SetSpawnRate(b.GetProfile().SpawnRate)
b.GetRebalanceChan() <- true
log.Info().Interface("profile", profile).Msg("rebalance tasks successful")
}
func (b *HRPBoomer) PollTasks() {
for {
select {
case tcs := <-b.Boomer.GetTestCaseBytesChan():
if len(b.Boomer.GetTestCaseBytesChan()) > 0 {
case tasks := <-b.Boomer.GetTasksChan():
// 清理过时测试用例任务
if len(b.Boomer.GetTasksChan()) > 0 {
continue
}
go b.handleTasks(tcs)
profile := boomer.BytesToProfile(tasks.Profile)
//Todo: 过滤掉已经传输过的task
if tasks.Tasks != nil {
testCases := b.BytesToTestCases(tasks.Tasks)
go b.runTasks(testCases, profile)
} else {
go b.rebalanceTasks(profile)
}
case <-b.Boomer.GetCloseChan():
return
}

View File

@@ -38,7 +38,7 @@ var boomCmd = &cobra.Command{
// if set profile, the priority is higher than the other commands
if boomArgs.profile != "" {
err := builtin.LoadFile(boomArgs.profile, &boomArgs)
err := builtin.LoadFile(boomArgs.profile, &boomArgs.profile)
if err != nil {
log.Error().Err(err).Msg("failed to load profile")
os.Exit(1)
@@ -54,16 +54,9 @@ var boomCmd = &cobra.Command{
} else {
hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
}
hrpBoomer.SetProfile(&boomArgs.Profile)
hrpBoomer.EnableGracefulQuit()
// init output
if !boomArgs.DisableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput())
}
if boomArgs.PrometheusPushgatewayURL != "" {
hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode()))
}
// run boomer
switch hrpBoomer.GetMode() {
case "master":
@@ -71,61 +64,41 @@ var boomCmd = &cobra.Command{
if boomArgs.autoStart {
hrpBoomer.SetAutoStart()
hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait)
hrpBoomer.SetSpawnCount(int64(boomArgs.SpawnCount))
hrpBoomer.SetSpawnCount(boomArgs.SpawnCount)
hrpBoomer.SetSpawnRate(boomArgs.SpawnRate)
}
go hrpBoomer.StartServer()
go hrpBoomer.RunMaster()
hrpBoomer.LoopTestCases()
hrpBoomer.PollTestCases()
case "worker":
if boomArgs.ignoreQuit {
hrpBoomer.SetIgnoreQuit()
}
go hrpBoomer.RunWorker()
hrpBoomer.LoopTasks()
hrpBoomer.PollTasks()
case "standalone":
if boomArgs.LoopCount > 0 {
hrpBoomer.SetLoopCount(boomArgs.LoopCount)
}
hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate)
hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive)
hrpBoomer.SetDisableCompression(boomArgs.DisableCompression)
hrpBoomer.SetClientTransport()
if venv != "" {
hrpBoomer.SetPython3Venv(venv)
}
hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration)
hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration)
hrpBoomer.InitBoomer()
hrpBoomer.Run(paths...)
}
},
}
type BoomArgs struct {
SpawnCount int `json:"spawn-count,omitempty" yaml:"spawn-count,omitempty"`
SpawnRate float64 `json:"spawn-rate,omitempty" yaml:"spawn-rate,omitempty"`
MaxRPS int64 `json:"max-rps,omitempty" yaml:"max-rps,omitempty"`
LoopCount int64 `json:"loop-count,omitempty" yaml:"loop-count,omitempty"`
RequestIncreaseRate string `json:"request-increase-rate,omitempty" yaml:"request-increase-rate,omitempty"`
MemoryProfile string `json:"memory-profile,omitempty" yaml:"memory-profile,omitempty"`
MemoryProfileDuration time.Duration `json:"memory-profile-duration" yaml:"memory-profile-duration"`
CPUProfile string `json:"cpu-profile,omitempty" yaml:"cpu-profile,omitempty"`
CPUProfileDuration time.Duration `json:"cpu-profile-duration,omitempty" yaml:"cpu-profile-duration,omitempty"`
PrometheusPushgatewayURL string `json:"prometheus-gateway,omitempty" yaml:"prometheus-gateway,omitempty"`
DisableConsoleOutput bool `json:"disable-console-output,omitempty" yaml:"disable-console-output,omitempty"`
DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty"`
DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty"`
profile string
master bool
worker bool
ignoreQuit bool
masterHost string
masterPort int
masterBindHost string
masterBindPort int
autoStart bool
expectWorkers int
expectWorkersMaxWait int
boomer.Profile
profile string
master bool
worker bool
ignoreQuit bool
masterHost string
masterPort int
masterBindHost string
masterBindPort int
autoStart bool
expectWorkers int
expectWorkersMaxWait int
}
var boomArgs BoomArgs
@@ -135,7 +108,7 @@ func init() {
boomCmd.Flags().Int64Var(&boomArgs.MaxRPS, "max-rps", 0, "Max RPS that boomer can generate, disabled by default.")
boomCmd.Flags().StringVar(&boomArgs.RequestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.")
boomCmd.Flags().IntVar(&boomArgs.SpawnCount, "spawn-count", 1, "The number of users to spawn for load testing")
boomCmd.Flags().Int64Var(&boomArgs.SpawnCount, "spawn-count", 1, "The number of users to spawn for load testing")
boomCmd.Flags().Float64Var(&boomArgs.SpawnRate, "spawn-rate", 1, "The rate for spawning users")
boomCmd.Flags().Int64Var(&boomArgs.LoopCount, "loop-count", -1, "The specify running cycles for load testing")
boomCmd.Flags().StringVar(&boomArgs.MemoryProfile, "mem-profile", "", "Enable memory profiling.")

View File

@@ -1,16 +1,13 @@
package boomer
import (
"github.com/httprunner/httprunner/v4/hrp/internal/json"
"math"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -49,6 +46,58 @@ type Boomer struct {
disableCompression bool
}
type Profile struct {
SpawnCount int64 `json:"spawn-count,omitempty" yaml:"spawn-count,omitempty" mapstructure:"spawn-count,omitempty"`
SpawnRate float64 `json:"spawn-rate,omitempty" yaml:"spawn-rate,omitempty" mapstructure:"spawn-rate,omitempty"`
MaxRPS int64 `json:"max-rps,omitempty" yaml:"max-rps,omitempty" mapstructure:"max-rps,omitempty"`
LoopCount int64 `json:"loop-count,omitempty" yaml:"loop-count,omitempty" mapstructure:"loop-count,omitempty"`
RequestIncreaseRate string `json:"request-increase-rate,omitempty" yaml:"request-increase-rate,omitempty" mapstructure:"request-increase-rate,omitempty"`
MemoryProfile string `json:"memory-profile,omitempty" yaml:"memory-profile,omitempty" mapstructure:"memory-profile,omitempty"`
MemoryProfileDuration time.Duration `json:"memory-profile-duration,omitempty" yaml:"memory-profile-duration,omitempty" mapstructure:"memory-profile-duration,omitempty"`
CPUProfile string `json:"cpu-profile,omitempty" yaml:"cpu-profile,omitempty" mapstructure:"cpu-profile,omitempty"`
CPUProfileDuration time.Duration `json:"cpu-profile-duration,omitempty" yaml:"cpu-profile-duration,omitempty" mapstructure:"cpu-profile-duration,omitempty"`
PrometheusPushgatewayURL string `json:"prometheus-gateway,omitempty" yaml:"prometheus-gateway,omitempty" mapstructure:"prometheus-gateway,omitempty"`
DisableConsoleOutput bool `json:"disable-console-output,omitempty" yaml:"disable-console-output,omitempty" mapstructure:"disable-console-output,omitempty"`
DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty" mapstructure:"disable-compression,omitempty"`
DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty" mapstructure:"disable-keepalive,omitempty"`
}
func (b *Boomer) GetProfile() *Profile {
switch b.mode {
case DistributedMasterMode:
return b.masterRunner.profile
case DistributedWorkerMode:
return b.workerRunner.profile
default:
return b.localRunner.profile
}
}
func (b *Boomer) SetProfile(profile *Profile) {
switch b.mode {
case DistributedMasterMode:
b.masterRunner.profile = profile
case DistributedWorkerMode:
b.workerRunner.profile = profile
default:
b.localRunner.profile = profile
}
}
func (p *Profile) dispatch(workers int64) *Profile {
workerProfile := *p
if p.SpawnCount > 0 {
workerProfile.SpawnCount = p.SpawnCount / workers
}
if p.SpawnRate > 0 {
workerProfile.SpawnRate = p.SpawnRate / float64(workers)
}
if p.MaxRPS > 0 {
workerProfile.MaxRPS = p.MaxRPS / workers
}
return &workerProfile
}
// SetMode only accepts boomer.DistributedMasterMode、boomer.DistributedWorkerMode and boomer.StandaloneMode.
func (b *Boomer) SetMode(mode Mode) {
switch mode {
@@ -79,7 +128,7 @@ func (b *Boomer) GetMode() string {
}
// NewStandaloneBoomer returns a new Boomer, which can run without master.
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
func NewStandaloneBoomer(spawnCount int64, spawnRate float64) *Boomer {
return &Boomer{
mode: StandaloneMode,
localRunner: newLocalRunner(spawnCount, spawnRate),
@@ -125,10 +174,56 @@ func (b *Boomer) GetTestCaseBytesChan() chan []byte {
switch b.mode {
case DistributedMasterMode:
return b.masterRunner.testCaseBytes
case DistributedWorkerMode:
return b.workerRunner.testCaseBytes
default:
return nil
}
}
func ProfileToBytes(profile *Profile) []byte {
profileBytes, err := json.Marshal(profile)
if err != nil {
log.Error().Err(err).Msg("failed to marshal testcases")
return nil
}
return profileBytes
}
func BytesToProfile(profileBytes []byte) *Profile {
var profile *Profile
err := json.Unmarshal(profileBytes, &profile)
if err != nil {
log.Error().Err(err).Msg("failed to unmarshal testcases")
}
return profile
}
// GetProfileBytesChan gets profile bytes chan
func (b *Boomer) GetProfileBytesChan() chan []byte {
switch b.mode {
case DistributedMasterMode:
return b.masterRunner.profileBytes
default:
return nil
}
}
// GetTasksChan gets profile bytes chan
func (b *Boomer) GetTasksChan() chan *profileMessage {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.tasksChan
default:
return nil
}
}
func (b *Boomer) GetRebalanceChan() chan bool {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.rebalance
default:
return nil
}
return nil
}
func (b *Boomer) SetTestCasesPath(paths []string) {
@@ -390,66 +485,25 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
}
// Start starts to run
func (b *Boomer) Start(Args map[string]interface{}) error {
func (b *Boomer) Start(Args *Profile) error {
if b.masterRunner.isStarted() {
return errors.New("already started")
}
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(int64(v))
} else {
return errors.New("spawn count error")
}
spawnRate, ok := Args["spawn_rate"]
if ok {
v, err := builtin.Interface2Float64(spawnRate)
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnRate(v)
} else {
b.SetSpawnRate(float64(b.GetSpawnCount()))
}
path, ok := Args["path"].(string)
if ok {
paths := strings.Split(path, ",")
b.SetTestCasesPath(paths)
} else {
return errors.New("testcase path error")
}
b.SetSpawnCount(Args.SpawnCount)
b.SetSpawnRate(Args.SpawnRate)
b.SetProfile(Args)
err := b.masterRunner.start()
return err
}
// ReBalance starts to rebalance load test
func (b *Boomer) ReBalance(Args map[string]interface{}) error {
func (b *Boomer) ReBalance(Args *Profile) error {
if !b.masterRunner.isStarted() {
return errors.New("no start")
}
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(int64(v))
}
spawnRate, ok := Args["spawn_rate"]
if ok {
v, err := builtin.Interface2Float64(spawnRate)
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnRate(v)
}
b.SetSpawnCount(Args.SpawnCount)
b.SetSpawnRate(Args.SpawnRate)
b.SetProfile(Args)
err := b.masterRunner.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")

View File

@@ -80,6 +80,9 @@ func (c *grpcClient) connect() (err error) {
return err
}
go c.recv()
go c.send()
biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx)
if err != nil {
log.Error().Err(err).Msg("call bidirectional streaming message err")
@@ -87,19 +90,11 @@ func (c *grpcClient) connect() (err error) {
}
c.config.setBiStreamClient(biStream)
log.Info().Msg(fmt.Sprintf("Boomer is connected to master(%s) press Ctrl+c to quit.\n", addr))
go c.recv()
go c.send()
return nil
}
func (c *grpcClient) reConnect() (err error) {
addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort)
c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return
}
biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx)
if err != nil {
return
@@ -111,7 +106,7 @@ func (c *grpcClient) reConnect() (err error) {
//// tell master, I'm ready
//log.Info().Msg("send client ready signal")
//c.sendChannel() <- newClientReadyMessageToMaster(c.identity)
log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master(%s) press Ctrl+c to quit.\n", addr))
log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master press Ctrl+c to quit.\n"))
return
}
@@ -136,6 +131,7 @@ func (c *grpcClient) recv() {
return
default:
if c.config.getBiStreamClient() == nil {
time.Sleep(1 * time.Second)
continue
}
msg, err := c.config.getBiStreamClient().Recv()
@@ -158,10 +154,11 @@ func (c *grpcClient) recv() {
}
c.fromMaster <- &genericMessage{
Type: msg.Type,
Data: msg.Data,
NodeID: msg.NodeID,
Tasks: msg.Tasks,
Type: msg.Type,
Profile: msg.Profile,
Data: msg.Data,
NodeID: msg.NodeID,
Tasks: msg.Tasks,
}
log.Info().
@@ -204,6 +201,7 @@ func (c *grpcClient) sendMessage(msg *genericMessage) {
Interface("data", msg.Data).
Msg("send data to server")
if c.config.getBiStreamClient() == nil {
atomic.AddInt32(&c.failCount, 1)
return
}
err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID})

View File

@@ -10,14 +10,17 @@ const (
typeException = "exception"
)
type message interface {
type genericMessage struct {
Type string `json:"type,omitempty"`
Profile []byte `json:"profile,omitempty"`
Data map[string]int64 `json:"data,omitempty"`
NodeID string `json:"node_id,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
}
type genericMessage struct {
Type string `json:"type,omitempty"`
Data map[string]int64 `json:"data,omitempty"`
NodeID string `json:"node_id,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
type profileMessage struct {
Profile []byte `json:"profile,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
}
func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) {
@@ -35,11 +38,12 @@ func newQuitMessage(nodeID string) (msg *genericMessage) {
}
}
func newSpawnMessageToWorker(t string, data map[string]int64, tasks []byte) (msg *genericMessage) {
func newMessageToWorker(t string, profile []byte, data map[string]int64, tasks []byte) (msg *genericMessage) {
return &genericMessage{
Type: t,
Data: data,
Tasks: tasks,
Type: t,
Profile: profile,
Data: data,
Tasks: tasks,
}
}

View File

@@ -270,6 +270,9 @@ func (r *runner) outputOnEvent(data map[string]interface{}) {
}
func (r *runner) outputOnStop() {
defer func() {
r.outputs = make([]Output, 0)
}()
size := len(r.outputs)
if size == 0 {
return
@@ -332,6 +335,7 @@ func (r *runner) reset() {
}
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
r.updateState(StateSpawning)
log.Info().
Int64("spawnCount", spawnCount).
Float64("spawnRate", spawnRate).
@@ -339,7 +343,6 @@ func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan boo
r.controller.setSpawn(spawnCount, spawnRate)
r.updateState(StateSpawning)
for {
select {
case <-quit:
@@ -510,14 +513,16 @@ func (r *runner) isStarted() bool {
type localRunner struct {
runner
profile *Profile
}
func newLocalRunner(spawnCount int, spawnRate float64) *localRunner {
func newLocalRunner(spawnCount int64, spawnRate float64) *localRunner {
return &localRunner{
runner: runner{
state: StateInit,
stats: newRequestStats(),
spawnCount: int64(spawnCount),
spawnCount: spawnCount,
spawnRate: spawnRate,
controller: &Controller{},
outputs: make([]Output, 0),
@@ -535,14 +540,13 @@ func (r *localRunner) start() {
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil)
// output setup
r.outputOnStart()
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, nil)
// start stats report
go r.runner.statsStart()
go r.statsStart()
// stop
<-r.stopChan
@@ -582,10 +586,9 @@ type workerRunner struct {
masterPort int
client *grpcClient
// this channel will start worker for spawning.
spawnStartChan chan bool
// get testcase from master
testCaseBytes chan []byte
profile *Profile
tasksChan chan *profileMessage
ignoreQuit bool
}
@@ -594,15 +597,15 @@ func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
r = &workerRunner{
runner: runner{
stats: newRequestStats(),
outputs: make([]Output, 0),
controller: &Controller{},
closeChan: make(chan bool),
once: &sync.Once{},
},
masterHost: masterHost,
masterPort: masterPort,
nodeID: getNodeID(),
spawnStartChan: make(chan bool),
testCaseBytes: make(chan []byte, 10),
masterHost: masterHost,
masterPort: masterPort,
nodeID: getNodeID(),
tasksChan: make(chan *profileMessage, 10),
}
return r
}
@@ -615,30 +618,26 @@ func (r *workerRunner) spawnComplete() {
func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID)
spawnCount, ok := msg.Data["spawn_count"]
if ok {
r.setSpawnCount(spawnCount)
if msg.Profile == nil {
log.Error().Msg("miss profile")
}
spawnRate, ok := msg.Data["spawn_rate"]
if ok {
r.setSpawnRate(float64(spawnRate))
if msg.Tasks == nil {
log.Error().Msg("miss tasks")
}
if msg.Tasks != nil {
r.testCaseBytes <- msg.Tasks
r.tasksChan <- &profileMessage{
Profile: msg.Profile,
Tasks: msg.Tasks,
}
log.Info().Msg("on spawn message successful")
}
func (r *workerRunner) onRebalanceMessage(msg *genericMessage) {
spawnCount, ok := msg.Data["spawn_count"]
if ok {
r.setSpawnCount(spawnCount)
if msg.Profile == nil {
log.Error().Msg("miss profile")
}
spawnRate, ok := msg.Data["spawn_rate"]
if ok {
r.setSpawnRate(float64(spawnRate))
r.tasksChan <- &profileMessage{
Profile: msg.Profile,
}
r.rebalance <- true
log.Info().Msg("on rebalance message successful")
}
@@ -705,7 +704,6 @@ func (r *workerRunner) run() {
err := r.client.connect()
if err != nil {
log.Printf("Failed to connect to master(%s:%d) with error %v\n", r.masterHost, r.masterPort, err)
return
}
// listen to master
@@ -758,7 +756,7 @@ func (r *workerRunner) start() {
r.once.Do(r.outputOnStart)
r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete)
go r.spawnWorkers(r.getSpawnCount(), r.getSpawnRate(), r.stopChan, r.spawnComplete)
// start stats report
go r.statsStart()
@@ -783,7 +781,7 @@ func (r *workerRunner) close() {
return
}
// waiting report finished
time.Sleep(3 * time.Second)
time.Sleep(1 * time.Second)
close(r.closeChan)
var ticker = time.NewTicker(1 * time.Second)
if r.client != nil {
@@ -811,8 +809,12 @@ type masterRunner struct {
expectWorkers int
expectWorkersMaxWait int
profile *Profile
parseTestCasesChan chan bool
testCaseBytes chan []byte
// set profile to worker
profileBytes chan []byte
}
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
@@ -990,20 +992,17 @@ func (r *masterRunner) start() error {
if numWorkers == 0 {
return errors.New("current workers: 0")
}
workerSpawnRate := r.getSpawnRate() / float64(numWorkers)
workerSpawnCount := r.getSpawnCount() / int64(numWorkers)
log.Info().Msg("send spawn data to worker")
r.updateState(StateSpawning)
// waitting to fetch testcase
// fetching testcase
testcase, err := r.fetchTestCase()
if err != nil {
return err
}
r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{
"spawn_count": workerSpawnCount,
"spawn_rate": int64(workerSpawnRate),
}, testcase)
profile := r.profile.dispatch(int64(numWorkers))
r.server.sendChannel() <- newMessageToWorker("spawn", ProfileToBytes(profile), nil, testcase)
println("send spawn data to worker successful")
log.Info().Msg("send spawn data to worker successful")
return nil
@@ -1014,13 +1013,9 @@ func (r *masterRunner) rebalance() error {
if numWorkers == 0 {
return errors.New("current workers: 0")
}
workerSpawnRate := r.getSpawnRate() / float64(numWorkers)
workerSpawnCount := r.getSpawnCount() / int64(numWorkers)
profile := r.profile.dispatch(int64(numWorkers))
r.server.sendChannel() <- newSpawnMessageToWorker("rebalance", map[string]int64{
"spawn_count": workerSpawnCount,
"spawn_rate": int64(workerSpawnRate),
}, nil)
r.server.sendChannel() <- newMessageToWorker("rebalance", ProfileToBytes(profile), nil, nil)
println("send rebalance data to worker successful")
return nil
}

View File

@@ -312,10 +312,11 @@ func (s *grpcServer) sendMessage(msg *genericMessage) {
}
err := workerInfo.messenger.Send(
&messager.StreamResponse{
Type: msg.Type,
Data: msg.Data,
NodeID: workerInfo.ID,
Tasks: msg.Tasks},
Type: msg.Type,
Profile: msg.Profile,
Data: msg.Data,
NodeID: workerInfo.ID,
Tasks: msg.Tasks},
)
switch err {
case nil:

View File

@@ -7,6 +7,8 @@
package messager
import (
context "context"
grpc "google.golang.org/grpc"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -88,10 +90,11 @@ type StreamResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
Tasks []byte `protobuf:"bytes,4,opt,name=tasks,proto3" json:"tasks,omitempty"`
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Profile []byte `protobuf:"bytes,2,opt,name=profile,proto3" json:"profile,omitempty"`
Data map[string]int64 `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,4,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
Tasks []byte `protobuf:"bytes,5,opt,name=tasks,proto3" json:"tasks,omitempty"`
}
func (x *StreamResponse) Reset() {
@@ -133,6 +136,13 @@ func (x *StreamResponse) GetType() string {
return ""
}
func (x *StreamResponse) GetProfile() []byte {
if x != nil {
return x.Profile
}
return nil
}
func (x *StreamResponse) GetData() map[string]int64 {
if x != nil {
return x.Data
@@ -170,27 +180,28 @@ var file_grpc_proto_messager_proto_rawDesc = []byte{
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xc2, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70,
0x01, 0x22, 0xdc, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61,
0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e,
0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12,
0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73,
0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a,
0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70,
0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x66,
0x69, 0x6c, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x66, 0x69,
0x6c, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e,
0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64,
0x65, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49,
0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42,
0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28,
0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -274,3 +285,7 @@ func file_grpc_proto_messager_proto_init() {
file_grpc_proto_messager_proto_goTypes = nil
file_grpc_proto_messager_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface

View File

@@ -15,12 +15,11 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const _ = grpc.SupportPackageIsVersion6
// MessageClient is the client API for Message service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type MessageClient interface {
BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error)
}
@@ -34,7 +33,7 @@ func NewMessageClient(cc grpc.ClientConnInterface) MessageClient {
}
func (c *messageClient) BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) {
stream, err := c.cc.NewStream(ctx, &Message_ServiceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...)
stream, err := c.cc.NewStream(ctx, &_Message_serviceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...)
if err != nil {
return nil, err
}
@@ -65,31 +64,20 @@ func (x *messageBidirectionalStreamingMessageClient) Recv() (*StreamResponse, er
}
// MessageServer is the server API for Message service.
// All implementations must embed UnimplementedMessageServer
// for forward compatibility
type MessageServer interface {
BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error
mustEmbedUnimplementedMessageServer()
}
// UnimplementedMessageServer must be embedded to have forward compatible implementations.
// UnimplementedMessageServer can be embedded to have forward compatible implementations.
type UnimplementedMessageServer struct {
}
func (UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error {
func (*UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error {
return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingMessage not implemented")
}
func (UnimplementedMessageServer) mustEmbedUnimplementedMessageServer() {}
// UnsafeMessageServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to MessageServer will
// result in compilation errors.
type UnsafeMessageServer interface {
mustEmbedUnimplementedMessageServer()
}
func RegisterMessageServer(s grpc.ServiceRegistrar, srv MessageServer) {
s.RegisterService(&Message_ServiceDesc, srv)
func RegisterMessageServer(s *grpc.Server, srv MessageServer) {
s.RegisterService(&_Message_serviceDesc, srv)
}
func _Message_BidirectionalStreamingMessage_Handler(srv interface{}, stream grpc.ServerStream) error {
@@ -118,10 +106,7 @@ func (x *messageBidirectionalStreamingMessageServer) Recv() (*StreamRequest, err
return m, nil
}
// Message_ServiceDesc is the grpc.ServiceDesc for Message service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Message_ServiceDesc = grpc.ServiceDesc{
var _Message_serviceDesc = grpc.ServiceDesc{
ServiceName: "message.Message",
HandlerType: (*MessageServer)(nil),
Methods: []grpc.MethodDesc{},

View File

@@ -10,13 +10,14 @@ service Message {
message StreamRequest{
string type = 1;
map<string, bytes> data = 2;
map<string, int64> data = 2;
string NodeID = 3;
}
message StreamResponse{
string type = 1;
map<string, bytes> data = 2;
string NodeID = 3;
bytes tasks = 4;
bytes profile = 2;
map<string, int64> data = 3;
string NodeID = 4;
bytes tasks = 5;
}

View File

@@ -6,9 +6,11 @@ import (
"io/ioutil"
"log"
"net/http"
"strings"
"github.com/httprunner/httprunner/v4/hrp/internal/boomer"
"github.com/httprunner/httprunner/v4/hrp/internal/json"
"github.com/mitchellh/mapstructure"
)
const jsonContentType = "application/json; encoding=utf-8"
@@ -43,7 +45,7 @@ func parseBody(r *http.Request) (data map[string]interface{}, err error) {
r.Body.Close()
return nil, err
}
err = json.Unmarshal(body, data)
err = json.Unmarshal(body, &data)
if err != nil {
return nil, err
}
@@ -62,10 +64,10 @@ func writeJSON(w http.ResponseWriter, body []byte, status int) {
}
type StartRequestBody struct {
Worker string `json:"worker"` // all
SpawnCount int64 `json:"spawn_count"`
SpawnRate int64 `json:"spawn_rate"`
TestCasePath string `json:"testcase_path"`
boomer.Profile `mapstructure:",squash"`
Worker string `json:"worker,omitempty" yaml:"worker,omitempty" mapstructure:"worker"` // all
TestCasePath string `json:"testcase-path" yaml:"testcase-path" mapstructure:"testcase-path"`
Other map[string]interface{} `mapstructure:",remain"`
}
type ServerCode int
@@ -118,10 +120,9 @@ func CustomAPIResponse(errCode ServerCode, errMsg string) ServerStatus {
}
type RebalanceRequestBody struct {
Worker string `json:"worker"`
SpawnCount int64 `json:"spawn_count"`
SpawnRate int64 `json:"spawn_rate"`
TestCasePath string `json:"testcase_path"`
boomer.Profile `mapstructure:",squash"`
Worker string `json:"worker,omitempty" yaml:"worker,omitempty" mapstructure:"worker"`
Other map[string]interface{} `mapstructure:",remain"`
}
type StopRequestBody struct {
@@ -167,15 +168,38 @@ func (api *apiHandler) Index(w http.ResponseWriter, r *http.Request) {
}
func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
var resp *CommonResponseBody
err := api.boomer.Start(data)
data, err := parseBody(r)
req := StartRequestBody{
Profile: *api.boomer.GetProfile(),
}
err = mapstructure.Decode(data, &req)
if len(req.Other) > 0 {
keys := make([]string, 0, len(req.Other))
for k := range req.Other {
keys = append(keys, k)
}
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseParamError(fmt.Sprintf("failed to recognize params: %v", keys)),
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
return
}
if req.TestCasePath == "" {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseParamError(fmt.Sprint("missing testcases path")),
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
return
}
paths := strings.Split(req.TestCasePath, ",")
api.boomer.SetTestCasesPath(paths)
if err == nil {
err = api.boomer.Start(&req.Profile)
}
if err != nil {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseServerError(err.Error()),
@@ -231,15 +255,28 @@ func (api *apiHandler) Quit(w http.ResponseWriter, r *http.Request) {
}
func (api *apiHandler) ReBalance(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
var resp *CommonResponseBody
err := api.boomer.ReBalance(data)
data, err := parseBody(r)
req := RebalanceRequestBody{
Profile: *api.boomer.GetProfile(),
}
err = mapstructure.Decode(data, &req)
if len(req.Other) > 0 {
keys := make([]string, 0, len(req.Other))
for k := range req.Other {
keys = append(keys, k)
}
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseParamError(fmt.Sprintf("failed to recognize params: %v", keys)),
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
return
}
if err == nil {
err = api.boomer.ReBalance(&req.Profile)
}
if err != nil {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseParamError(err.Error()),
@@ -267,10 +304,10 @@ func (api *apiHandler) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", methods(api.Index, "GET"))
mux.HandleFunc("/start", methods(api.Start, "GET"))
mux.HandleFunc("/start", methods(api.Start, "POST"))
mux.HandleFunc("/stop", methods(api.Stop, "GET"))
mux.HandleFunc("/quit", methods(api.Quit, "GET"))
mux.HandleFunc("/rebalance", methods(api.ReBalance, "GET"))
mux.HandleFunc("/rebalance", methods(api.ReBalance, "POST"))
mux.HandleFunc("/workers", methods(api.GetWorkersInfo, "GET"))
return mux