feat: support multi-machine collaborative distributed load testing #1193

This commit is contained in:
xucong053
2022-03-29 22:10:39 +08:00
committed by 徐聪
parent ab12707fcf
commit 0ab2017f93
29 changed files with 3354 additions and 171 deletions

View File

@@ -21,13 +21,21 @@ hrp boom [flags]
### Options
```
--autostart Starts the test immediately (without disabling the web UI). Use --spawn-count and --spawn-rate to control user count and run time
--cpu-profile string Enable CPU profiling.
--cpu-profile-duration duration CPU profile duration. (default 30s)
--disable-compression Disable compression
--disable-console-output Disable console output.
--disable-keepalive Disable keepalive
--expect-workers int How many workers master should expect to connect before starting the test (only when --autostart is used (default 1)
--expect-workers-max-wait int How many workers master should expect to connect before starting the test (only when --autostart is used
-h, --help help for boom
--loop-count int The specify running cycles for load testing (default -1)
--master master of distributed testing
--master-bind-host string Interfaces (hostname, ip) that hrp master should bind to. Only used when running with --master. Defaults to * (all available interfaces). (default "127.0.0.1")
--master-bind-port int Port that hrp master should bind to. Only used when running with --master. Defaults to 5557. (default 5557)
--master-host string Host or IP address of hrp master for distributed load testing. (default "127.0.0.1")
--master-port int The port to connect to that is used by the hrp master for distributed load testing. (default 5557)
--max-rps int Max RPS that boomer can generate, disabled by default.
--mem-profile string Enable memory profiling.
--mem-profile-duration duration Memory profile duration. (default 30s)
@@ -36,6 +44,7 @@ hrp boom [flags]
--request-increase-rate string Request increase rate, disabled by default. (default "-1")
--spawn-count int The number of users to spawn for load testing (default 1)
--spawn-rate float The rate for spawning users (default 1)
--worker worker of distributed testing
```
### SEE ALSO

11
go.mod
View File

@@ -1,12 +1,13 @@
module github.com/httprunner/httprunner/v4
go 1.16
go 1.18
require (
github.com/andybalholm/brotli v1.0.4
github.com/denisbrodbeck/machineid v1.0.1
github.com/fatih/color v1.13.0
github.com/getsentry/sentry-go v0.13.0
github.com/go-errors/errors v1.0.1
github.com/go-openapi/spec v0.20.6
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.0
@@ -17,14 +18,20 @@ require (
github.com/json-iterator/go v1.1.12
github.com/maja42/goval v1.2.1
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mitchellh/mapstructure v1.4.1
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/rs/zerolog v1.26.1
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/cobra v1.2.1
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
gopkg.in/yaml.v3 v3.0.0
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
// replace github.com/httprunner/funplugin => ../funplugin

16
go.sum
View File

@@ -132,6 +132,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@@ -352,6 +354,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
@@ -421,6 +424,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
@@ -452,6 +457,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
@@ -474,6 +483,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
@@ -624,6 +635,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -668,6 +680,7 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -877,9 +890,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@@ -6,13 +6,13 @@ import (
"time"
"github.com/httprunner/funplugin"
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/boomer"
"github.com/httprunner/httprunner/v4/hrp/internal/json"
"github.com/httprunner/httprunner/v4/hrp/internal/sdk"
"github.com/rs/zerolog/log"
)
func NewBoomer(spawnCount int, spawnRate float64) *HRPBoomer {
func NewStandaloneBoomer(spawnCount int, spawnRate float64) *HRPBoomer {
b := &HRPBoomer{
Boomer: boomer.NewStandaloneBoomer(spawnCount, spawnRate),
pluginsMutex: new(sync.RWMutex),
@@ -22,6 +22,27 @@ func NewBoomer(spawnCount int, spawnRate float64) *HRPBoomer {
return b
}
func NewMasterBoomer(masterBindHost string, masterBindPort int) *HRPBoomer {
b := &HRPBoomer{
Boomer: boomer.NewMasterBoomer(masterBindHost, masterBindPort),
pluginsMutex: new(sync.RWMutex),
}
b.hrpRunner = NewRunner(nil)
return b
}
func NewWorkerBoomer(masterHost string, masterPort int) *HRPBoomer {
b := &HRPBoomer{
Boomer: boomer.NewWorkerBoomer(masterHost, masterPort),
pluginsMutex: new(sync.RWMutex),
}
b.hrpRunner = NewRunner(nil)
// set client transport for high concurrency load testing
b.hrpRunner.SetClientTransport(b.GetSpawnCount(), b.GetDisableKeepAlive(), b.GetDisableCompression())
return b
}
type HRPBoomer struct {
*boomer.Boomer
hrpRunner *HRPRunner
@@ -52,8 +73,12 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) {
// report execution timing event
defer sdk.SendEvent(event.StartTiming("execution"))
var taskSlice []*boomer.Task
taskSlice := b.ConvertTestCasesToTasks(testcases...)
b.Boomer.Run(taskSlice...)
}
func (b *HRPBoomer) ConvertTestCasesToTasks(testcases ...ITestCase) (taskSlice []*boomer.Task) {
// load all testcases
testCases, err := LoadTestCases(testcases...)
if err != nil {
@@ -74,15 +99,107 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) {
rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount()))
task := b.convertBoomerTask(testcase, rendezvousList)
taskSlice = append(taskSlice, task)
waitRendezvous(rendezvousList)
waitRendezvous(rendezvousList, b)
}
b.Boomer.Run(taskSlice...)
return taskSlice
}
func (b *HRPBoomer) LoopTestCases() {
for {
select {
case <-b.Boomer.ParseTestCasesChan():
var tcs []ITestCase
for _, tc := range b.GetTestCasesPath() {
tcp := TestCasePath(tc)
tcs = append(tcs, &tcp)
}
b.GetTestCaseBytesChan() <- b.TestCasesToBytes(tcs...)
log.Info().Msg("put testcase successful")
case <-b.Boomer.GetCloseChan():
return
}
}
}
func (b *HRPBoomer) OutTestCases(testCases []*TestCase) []*TCase {
var outTestCases []*TCase
for _, tc := range testCases {
caseRunner, err := b.hrpRunner.newCaseRunner(tc)
if err != nil {
log.Error().Err(err).Msg("failed to create runner")
os.Exit(1)
}
caseRunner.parsedConfig.Parameters = caseRunner.parametersIterator.outParameters()
outTestCases = append(outTestCases, &TCase{
Config: caseRunner.parsedConfig,
TestSteps: caseRunner.testCase.ToTCase().TestSteps,
})
}
return outTestCases
}
func (b *HRPBoomer) TestCasesToBytes(testcases ...ITestCase) []byte {
// load all testcases
testCases, err := LoadTestCases(testcases...)
if err != nil {
log.Error().Err(err).Msg("failed to load testcases")
os.Exit(1)
}
tcs := b.OutTestCases(testCases)
testCasesBytes, err := json.Marshal(tcs)
if err != nil {
log.Error().Err(err).Msg("failed to marshal testcases")
return nil
}
return testCasesBytes
}
func (b *HRPBoomer) BytesToTestCases(testCasesBytes []byte) []*TCase {
var testcase []*TCase
err := json.Unmarshal(testCasesBytes, &testcase)
if err != nil {
log.Error().Err(err).Msg("failed to unmarshal testcases")
}
return testcase
}
func (b *HRPBoomer) Quit() {
b.Boomer.Quit()
}
func (b *HRPBoomer) handleTasks(tcs []byte) {
//Todo: 过滤掉已经传输过的task
testCases := b.BytesToTestCases(tcs)
var testcases []ITestCase
for _, tc := range testCases {
tesecase, err := tc.toTestCase()
if err != nil {
log.Error().Err(err).Msg("failed to load testcases")
}
testcases = append(testcases, tesecase)
}
log.Info().Interface("testcases", testcases).Msg("loop tasks successful")
if b.Boomer.GetState() == boomer.StateRunning || b.Boomer.GetState() == boomer.StateSpawning {
b.Boomer.SetTasks(b.ConvertTestCasesToTasks(testcases...)...)
} else {
b.Run(testcases...)
}
}
func (b *HRPBoomer) LoopTasks() {
for {
select {
case tcs := <-b.Boomer.GetTestCaseBytesChan():
if len(b.Boomer.GetTestCaseBytesChan()) > 0 {
continue
}
go b.handleTasks(tcs)
case <-b.Boomer.GetCloseChan():
return
}
}
}
func (b *HRPBoomer) convertBoomerTask(testcase *TestCase, rendezvousList []*Rendezvous) *boomer.Task {
// init runner for testcase
// this runner is shared by multiple session runners

View File

@@ -27,7 +27,7 @@ func TestBoomerStandaloneRun(t *testing.T) {
}
testcase2 := TestCasePath(demoTestCaseWithPluginJSONPath)
b := NewBoomer(2, 1)
b := NewStandaloneBoomer(2, 1)
go b.Run(testcase1, &testcase2)
time.Sleep(5 * time.Second)
b.Quit()

View File

@@ -21,7 +21,7 @@ var boomCmd = &cobra.Command{
Example: ` $ hrp boom demo.json # run specified json testcase file
$ hrp boom demo.yaml # run specified yaml testcase file
$ hrp boom examples/ # run testcases in specified folder`,
Args: cobra.MinimumNArgs(1),
Args: cobra.MinimumNArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
boomer.SetUlimit(10240) // ulimit -n 10240
if !strings.EqualFold(logLevel, "DEBUG") {
@@ -35,8 +35,65 @@ var boomCmd = &cobra.Command{
path := hrp.TestCasePath(arg)
paths = append(paths, &path)
}
hrpBoomer := makeHRPBoomer()
hrpBoomer.Run(paths...)
// if set profile, the priority is higher than the other commands
if boomArgs.profile != "" {
err := builtin.LoadFile(boomArgs.profile, &boomArgs)
if err != nil {
log.Error().Err(err).Msg("failed to load profile")
os.Exit(1)
}
}
var hrpBoomer *hrp.HRPBoomer
if boomArgs.master {
hrpBoomer = hrp.NewMasterBoomer(boomArgs.masterBindHost, boomArgs.masterBindPort)
hrpBoomer.SetTestCasesPath(args)
if boomArgs.autoStart {
hrpBoomer.SetAutoStart()
hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait)
hrpBoomer.SetSpawnCount(boomArgs.SpawnCount)
hrpBoomer.SetSpawnRate(boomArgs.SpawnRate)
}
hrpBoomer.EnableGracefulQuit()
go hrpBoomer.StartServer()
go hrpBoomer.RunMaster()
hrpBoomer.LoopTestCases()
return
} else if boomArgs.worker {
hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort)
if boomArgs.ignoreQuit {
hrpBoomer.SetIgnoreQuit()
}
go hrpBoomer.RunWorker()
} else {
hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
if boomArgs.LoopCount > 0 {
hrpBoomer.SetLoopCount(boomArgs.LoopCount)
}
}
hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate)
if !boomArgs.DisableConsoleOutput {
hrpBoomer.AddOutput(boomer.NewConsoleOutput())
}
if boomArgs.PrometheusPushgatewayURL != "" {
hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode()))
}
hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive)
hrpBoomer.SetDisableCompression(boomArgs.DisableCompression)
hrpBoomer.SetClientTransport()
if venv != "" {
hrpBoomer.SetPython3Venv(venv)
}
hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration)
hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration)
hrpBoomer.EnableGracefulQuit()
if boomArgs.worker {
hrpBoomer.LoopTasks()
} else {
hrpBoomer.Run(paths...)
}
},
}
@@ -55,6 +112,16 @@ type BoomArgs struct {
DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty"`
DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty"`
profile string
master bool
worker bool
ignoreQuit bool
masterHost string
masterPort int
masterBindHost string
masterBindPort int
autoStart bool
expectWorkers int
expectWorkersMaxWait int
}
var boomArgs BoomArgs
@@ -76,6 +143,16 @@ func init() {
boomCmd.Flags().BoolVar(&boomArgs.DisableCompression, "disable-compression", false, "Disable compression")
boomCmd.Flags().BoolVar(&boomArgs.DisableKeepalive, "disable-keepalive", false, "Disable keepalive")
boomCmd.Flags().StringVar(&boomArgs.profile, "profile", "", "profile for load testing")
boomCmd.Flags().BoolVar(&boomArgs.master, "master", false, "master of distributed testing")
boomCmd.Flags().StringVar(&boomArgs.masterBindHost, "master-bind-host", "127.0.0.1", "Interfaces (hostname, ip) that hrp master should bind to. Only used when running with --master. Defaults to * (all available interfaces).")
boomCmd.Flags().IntVar(&boomArgs.masterBindPort, "master-bind-port", 5557, "Port that hrp master should bind to. Only used when running with --master. Defaults to 5557.")
boomCmd.Flags().BoolVar(&boomArgs.worker, "worker", false, "worker of distributed testing")
boomCmd.Flags().BoolVar(&boomArgs.ignoreQuit, "ignore-quit", false, "ignores quit from master (only when --worker is used)")
boomCmd.Flags().StringVar(&boomArgs.masterHost, "master-host", "127.0.0.1", "Host or IP address of hrp master for distributed load testing.")
boomCmd.Flags().IntVar(&boomArgs.masterPort, "master-port", 5557, "The port to connect to that is used by the hrp master for distributed load testing.")
boomCmd.Flags().BoolVar(&boomArgs.autoStart, "autostart", false, "Starts the test immediately (without disabling the web UI). Use --spawn-count and --spawn-rate to control user count and run time")
boomCmd.Flags().IntVar(&boomArgs.expectWorkers, "expect-workers", 1, "How many workers master should expect to connect before starting the test (only when --autostart is used)")
boomCmd.Flags().IntVar(&boomArgs.expectWorkersMaxWait, "expect-workers-max-wait", 0, "How many workers master should expect to connect before starting the test (only when --autostart is used")
}
func makeHRPBoomer() *hrp.HRPBoomer {
@@ -88,7 +165,7 @@ func makeHRPBoomer() *hrp.HRPBoomer {
}
}
hrpBoomer := hrp.NewBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
hrpBoomer := hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate)
hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate)
if boomArgs.LoopCount > 0 {
hrpBoomer.SetLoopCount(boomArgs.LoopCount)

View File

@@ -98,7 +98,7 @@ func (c *TConfig) SetWebSocket(times, interval, timeout, size int64) {
}
type ThinkTimeConfig struct {
Strategy thinkTimeStrategy `json:"strategy,omitempty" yaml:"strategy,omitempty"` // default、random、limit、multiply、ignore
Strategy thinkTimeStrategy `json:"strategy,omitempty" yaml:"strategy,omitempty"` // default、random、multiply、ignore
Setting interface{} `json:"setting,omitempty" yaml:"setting,omitempty"` // random(map): {"min_percentage": 0.5, "max_percentage": 1.5}; 10、multiply(float64): 1.5
Limit float64 `json:"limit,omitempty" yaml:"limit,omitempty"` // limit think time no more than specific time, ignore if value <= 0
}

View File

@@ -4,9 +4,13 @@ import (
"math"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -25,9 +29,18 @@ const (
// A Boomer is used to run tasks.
type Boomer struct {
mode Mode
masterHost string
masterPort int
mode Mode
localRunner *localRunner
localRunner *localRunner
workerRunner *workerRunner
masterRunner *masterRunner
testcasePath []string
spawnCount int // target clients to spawn
spawnRate float64
cpuProfile string
cpuProfileDuration time.Duration
@@ -73,9 +86,101 @@ func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer {
return &Boomer{
mode: StandaloneMode,
localRunner: newLocalRunner(spawnCount, spawnRate),
spawnCount: spawnCount,
spawnRate: spawnRate,
}
}
// NewMasterBoomer returns a new Boomer.
func NewMasterBoomer(masterBindHost string, masterBindPort int) *Boomer {
return &Boomer{
masterRunner: newMasterRunner(masterBindHost, masterBindPort),
mode: DistributedMasterMode,
}
}
// NewWorkerBoomer returns a new Boomer.
func NewWorkerBoomer(masterHost string, masterPort int) *Boomer {
return &Boomer{
workerRunner: newWorkerRunner(masterHost, masterPort),
masterHost: masterHost,
masterPort: masterPort,
mode: DistributedWorkerMode,
}
}
// SetAutoStart auto start to load testing
func (b *Boomer) SetAutoStart() {
b.masterRunner.autoStart = true
}
// RunMaster start to run master runner
func (b *Boomer) RunMaster() {
b.masterRunner.run()
}
// RunWorker start to run worker runner
func (b *Boomer) RunWorker() {
b.workerRunner.run()
}
// GetTestCaseBytesChan gets test case bytes chan
func (b *Boomer) GetTestCaseBytesChan() chan []byte {
switch b.mode {
case DistributedMasterMode:
return b.masterRunner.testCaseBytes
case DistributedWorkerMode:
return b.workerRunner.testCaseBytes
}
return nil
}
func (b *Boomer) SetTestCasesPath(paths []string) {
b.testcasePath = paths
}
func (b *Boomer) GetTestCasesPath() []string {
return b.testcasePath
}
func (b *Boomer) ParseTestCasesChan() chan bool {
return b.masterRunner.parseTestCasesChan
}
// GetState gets worker state
func (b *Boomer) GetState() int32 {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.getState()
case DistributedMasterMode:
return b.masterRunner.getState()
default:
return b.localRunner.getState()
}
}
// SetSpawnCount sets spawn count
func (b *Boomer) SetSpawnCount(spawnCount int) {
b.spawnCount = spawnCount
if b.mode == DistributedMasterMode {
b.masterRunner.spawn.setSpawn(int64(spawnCount), -1)
}
}
// SetSpawnRate sets spawn rate
func (b *Boomer) SetSpawnRate(spawnRate float64) {
b.spawnRate = spawnRate
if b.mode == DistributedMasterMode {
b.masterRunner.spawn.setSpawn(-1, spawnRate)
}
}
// SetExpectWorkers sets expect workers while load testing
func (b *Boomer) SetExpectWorkers(expectWorkers int, expectWorkersMaxWait int) {
b.masterRunner.setExpectWorkers(expectWorkers, expectWorkersMaxWait)
}
// SetRateLimiter creates rate limiter with the given limit and burst.
func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
var rateLimiter RateLimiter
@@ -98,8 +203,14 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) {
}
if rateLimiter != nil {
b.localRunner.rateLimitEnabled = true
b.localRunner.rateLimiter = rateLimiter
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.rateLimitEnabled = true
b.workerRunner.rateLimiter = rateLimiter
case StandaloneMode:
b.localRunner.rateLimitEnabled = true
b.localRunner.rateLimiter = rateLimiter
}
}
}
@@ -108,6 +219,11 @@ func (b *Boomer) SetDisableKeepAlive(disableKeepalive bool) {
b.disableKeepalive = disableKeepalive
}
// SetIgnoreQuit not quit while master quit
func (b *Boomer) SetIgnoreQuit() {
b.workerRunner.ignoreQuit = true
}
// SetDisableCompression disable compression to prevent the Transport from requesting compression with an "Accept-Encoding: gzip"
func (b *Boomer) SetDisableCompression(disableCompression bool) {
b.disableCompression = disableCompression
@@ -124,12 +240,26 @@ func (b *Boomer) GetDisableCompression() bool {
// SetLoopCount set loop count for test.
func (b *Boomer) SetLoopCount(loopCount int64) {
// total loop count for testcase, it will be evenly distributed to each worker
b.localRunner.loop = &Loop{loopCount: loopCount * int64(b.localRunner.spawnCount)}
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.spawn.getSpawnCount()}
case DistributedMasterMode:
b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.spawn.getSpawnCount()}
case StandaloneMode:
b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.spawn.getSpawnCount()}
}
}
// AddOutput accepts outputs which implements the boomer.Output interface.
func (b *Boomer) AddOutput(o Output) {
b.localRunner.addOutput(o)
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.addOutput(o)
case DistributedMasterMode:
b.masterRunner.addOutput(o)
case StandaloneMode:
b.localRunner.addOutput(o)
}
}
// EnableCPUProfile will start cpu profiling after run.
@@ -150,6 +280,9 @@ func (b *Boomer) EnableGracefulQuit() {
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-c
if b.mode == DistributedWorkerMode {
b.workerRunner.ignoreQuit = false
}
b.Quit()
}()
}
@@ -169,13 +302,45 @@ func (b *Boomer) Run(tasks ...*Task) {
}
}
b.localRunner.setTasks(tasks)
b.localRunner.start()
switch b.mode {
case DistributedWorkerMode:
log.Info().Msg("running in worker mode")
b.workerRunner.setTasks(tasks)
b.workerRunner.start()
case StandaloneMode:
log.Info().Msg("running in standalone mode")
b.localRunner.setTasks(tasks)
b.localRunner.start()
default:
log.Error().Err(errors.New("Invalid mode, expected boomer.DistributedMode or boomer.StandaloneMode"))
}
}
func (b *Boomer) SetTasks(tasks ...*Task) {
switch b.mode {
case DistributedWorkerMode:
log.Info().Msg("set tasks to worker")
b.workerRunner.setTasks(tasks)
case StandaloneMode:
log.Info().Msg("set tasks to standalone")
b.localRunner.setTasks(tasks)
default:
log.Error().Err(errors.New("Invalid mode, expected boomer.DistributedMode or boomer.StandaloneMode"))
}
}
// RecordTransaction reports a transaction stat.
func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) {
b.localRunner.stats.transactionChan <- &transaction{
var runnerStats *requestStats
switch b.mode {
case DistributedWorkerMode:
runnerStats = b.workerRunner.stats
case DistributedMasterMode:
runnerStats = b.masterRunner.stats
case StandaloneMode:
runnerStats = b.localRunner.stats
}
runnerStats.transactionChan <- &transaction{
name: name,
success: success,
elapsedTime: elapsedTime,
@@ -185,7 +350,16 @@ func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64,
// RecordSuccess reports a success.
func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
b.localRunner.stats.requestSuccessChan <- &requestSuccess{
var runnerStats *requestStats
switch b.mode {
case DistributedWorkerMode:
runnerStats = b.workerRunner.stats
case DistributedMasterMode:
runnerStats = b.masterRunner.stats
case StandaloneMode:
runnerStats = b.localRunner.stats
}
runnerStats.requestSuccessChan <- &requestSuccess{
requestType: requestType,
name: name,
responseTime: responseTime,
@@ -195,7 +369,16 @@ func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, res
// RecordFailure reports a failure.
func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) {
b.localRunner.stats.requestFailureChan <- &requestFailure{
var runnerStats *requestStats
switch b.mode {
case DistributedWorkerMode:
runnerStats = b.workerRunner.stats
case DistributedMasterMode:
runnerStats = b.masterRunner.stats
case StandaloneMode:
runnerStats = b.localRunner.stats
}
runnerStats.requestFailureChan <- &requestFailure{
requestType: requestType,
name: name,
responseTime: responseTime,
@@ -203,19 +386,139 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
}
}
// Start starts to run
func (b *Boomer) Start(Args map[string]interface{}) error {
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(v)
} else {
return errors.New("spawn count error")
}
spawnRate, ok := Args["spawn_rate"]
if ok {
v, err := builtin.Interface2Float64(spawnRate)
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnRate(v)
} else {
b.SetSpawnRate(float64(b.GetSpawnCount()))
}
path, ok := Args["path"].(string)
if ok {
paths := strings.Split(path, ",")
b.SetTestCasesPath(paths)
} else {
return errors.New("testcase path error")
}
err := b.masterRunner.start()
return err
}
// ReBalance starts to rebalance load test
func (b *Boomer) ReBalance(Args map[string]interface{}) error {
spawnCount, ok := Args["spawn_count"]
if ok {
v, err := strconv.Atoi(spawnCount.(string))
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnCount(v)
}
spawnRate, ok := Args["spawn_rate"]
if ok {
v, err := builtin.Interface2Float64(spawnRate)
if err != nil {
log.Error().Err(err).Msg("spawn_count sets error")
return err
}
b.SetSpawnRate(v)
}
path, ok := Args["path"].(string)
if ok {
paths := strings.Split(path, ",")
b.SetTestCasesPath(paths)
}
err := b.masterRunner.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
}
return err
}
// Stop stops to load test
func (b *Boomer) Stop() {
switch b.mode {
case DistributedMasterMode:
b.masterRunner.stop()
default:
}
}
// GetWorkersInfo gets workers
func (b *Boomer) GetWorkersInfo() []WorkerNode {
return b.masterRunner.server.getAllWorkers()
}
func (b *Boomer) GetCloseChan() chan bool {
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.closeChan
case DistributedMasterMode:
return b.masterRunner.closeChan
default:
return b.localRunner.closeChan
}
}
// Quit will send a quit message to the master.
func (b *Boomer) Quit() {
b.localRunner.stop()
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.close()
case DistributedMasterMode:
b.masterRunner.close()
case StandaloneMode:
b.localRunner.stop()
}
}
func (b *Boomer) GetSpawnDoneChan() chan struct{} {
return b.localRunner.spawnDone
switch b.mode {
case DistributedWorkerMode:
return b.workerRunner.spawn.getSpawnDone()
case DistributedMasterMode:
return b.masterRunner.spawn.getSpawnDone()
default:
return b.localRunner.spawn.getSpawnDone()
}
}
func (b *Boomer) GetSpawnCount() int {
return b.localRunner.spawnCount
switch b.mode {
case DistributedWorkerMode:
return int(b.workerRunner.spawn.getSpawnCount())
case DistributedMasterMode:
return int(b.masterRunner.spawn.getSpawnCount())
default:
return int(b.localRunner.spawn.getSpawnCount())
}
}
func (b *Boomer) ResetStartTime() {
b.localRunner.stats.total.resetStartTime()
switch b.mode {
case DistributedWorkerMode:
b.workerRunner.stats.total.resetStartTime()
case DistributedMasterMode:
b.masterRunner.stats.total.resetStartTime()
default:
b.localRunner.stats.total.resetStartTime()
}
}

View File

@@ -12,11 +12,11 @@ import (
func TestNewStandaloneBoomer(t *testing.T) {
b := NewStandaloneBoomer(100, 10)
if b.localRunner.spawnCount != 100 {
if b.localRunner.spawn.spawnCount != 100 {
t.Error("spawnCount should be 100")
}
if b.localRunner.spawnRate != 10 {
if b.localRunner.spawn.spawnRate != 10 {
t.Error("spawnRate should be 10")
}
}

View File

@@ -0,0 +1,9 @@
package boomer
type client interface {
connect() (err error)
close()
recvChannel() chan *genericMessage
sendChannel() chan *genericMessage
disconnectedChannel() chan bool
}

View File

@@ -0,0 +1,224 @@
package boomer
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/httprunner/httprunner/v4/hrp/internal/grpc/messager"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
)
type grpcClient struct {
masterHost string
masterPort int
identity string // nodeID
config *grpcClientConfig
fromMaster chan *genericMessage
toMaster chan *genericMessage
disconnectedFromMaster chan bool
shutdownChan chan bool
failCount int32
wg sync.WaitGroup
}
type grpcClientConfig struct {
ctx context.Context
cancel context.CancelFunc // use cancel() to stop client
conn *grpc.ClientConn
biStream messager.Message_BidirectionalStreamingMessageClient
mutex sync.RWMutex
}
func (c *grpcClientConfig) getBiStreamClient() messager.Message_BidirectionalStreamingMessageClient {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.biStream
}
func (c *grpcClientConfig) setBiStreamClient(s messager.Message_BidirectionalStreamingMessageClient) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.biStream = s
}
func newClient(masterHost string, masterPort int, identity string) (client *grpcClient) {
log.Info().Msg("Boomer is built with grpc support.")
// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithCancel(context.Background())
client = &grpcClient{
masterHost: masterHost,
masterPort: masterPort,
identity: identity,
fromMaster: make(chan *genericMessage, 100),
toMaster: make(chan *genericMessage, 100),
disconnectedFromMaster: make(chan bool),
shutdownChan: make(chan bool),
config: &grpcClientConfig{
ctx: ctx,
cancel: cancel,
mutex: sync.RWMutex{},
},
}
return client
}
func (c *grpcClient) connect() (err error) {
addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort)
c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Error().Err(err).Msg("failed to connect")
return err
}
biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx)
if err != nil {
log.Error().Err(err).Msg("call bidirectional streaming message err")
return err
}
c.config.setBiStreamClient(biStream)
log.Info().Msg(fmt.Sprintf("Boomer is connected to master(%s) press Ctrl+c to quit.\n", addr))
go c.recv()
go c.send()
return nil
}
func (c *grpcClient) reConnect() (err error) {
addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort)
c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return
}
biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx)
if err != nil {
return
}
c.config.setBiStreamClient(biStream)
// register worker information to master
c.sendChannel() <- newGenericMessage("register", nil, c.identity)
//// tell master, I'm ready
//log.Info().Msg("send client ready signal")
//c.sendChannel() <- newClientReadyMessageToMaster(c.identity)
log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master(%s) press Ctrl+c to quit.\n", addr))
return
}
func (c *grpcClient) close() {
close(c.shutdownChan)
c.config.cancel()
if c.config.conn != nil {
c.config.conn.Close()
}
}
func (c *grpcClient) recvChannel() chan *genericMessage {
return c.fromMaster
}
func (c *grpcClient) recv() {
c.wg.Add(1)
defer c.wg.Done()
for {
select {
case <-c.shutdownChan:
return
default:
if c.config.getBiStreamClient() == nil {
continue
}
msg, err := c.config.getBiStreamClient().Recv()
if err != nil {
time.Sleep(1 * time.Second)
//log.Error().Err(err).Msg("failed to get message")
continue
}
if msg == nil {
continue
}
if msg.NodeID != c.identity {
log.Warn().
Str("nodeID", msg.NodeID).
Str("type", msg.Type).
Interface("data", msg.Data).
Msg(fmt.Sprintf("not for me(%s)", c.identity))
continue
}
c.fromMaster <- &genericMessage{
Type: msg.Type,
Data: msg.Data,
NodeID: msg.NodeID,
Tasks: msg.Tasks,
}
log.Info().
Str("nodeID", msg.NodeID).
Str("type", msg.Type).
Interface("data", msg.Data).
Interface("tasks", msg.Tasks).
Msg("receive data from master")
}
}
}
func (c *grpcClient) sendChannel() chan *genericMessage {
return c.toMaster
}
func (c *grpcClient) send() {
c.wg.Add(1)
defer c.wg.Done()
for {
select {
case <-c.shutdownChan:
return
case msg := <-c.toMaster:
c.sendMessage(msg)
// We may send genericMessage to master.
switch msg.Type {
case "quit":
c.disconnectedFromMaster <- true
}
}
}
}
func (c *grpcClient) sendMessage(msg *genericMessage) {
log.Info().
Str("nodeID", msg.NodeID).
Str("type", msg.Type).
Interface("data", msg.Data).
Msg("send data to server")
if c.config.getBiStreamClient() == nil {
return
}
err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID})
switch err {
case nil:
atomic.StoreInt32(&c.failCount, 0)
break
case io.EOF:
fallthrough
default:
//log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message")
atomic.AddInt32(&c.failCount, 1)
}
}
func (c *grpcClient) disconnectedChannel() chan bool {
return c.disconnectedFromMaster
}

View File

@@ -0,0 +1 @@
package boomer

View File

@@ -0,0 +1,51 @@
package boomer
const (
typeClientReady = "client_ready"
typeClientStopped = "client_stopped"
typeHeartbeat = "heartbeat"
typeSpawning = "spawning"
typeSpawningComplete = "spawning_complete"
typeQuit = "quit"
typeException = "exception"
)
type message interface {
}
type genericMessage struct {
Type string `json:"type,omitempty"`
Data map[string]int64 `json:"data,omitempty"`
NodeID string `json:"node_id,omitempty"`
Tasks []byte `json:"tasks,omitempty"`
}
func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) {
return &genericMessage{
Type: t,
Data: data,
NodeID: nodeID,
}
}
func newQuitMessage(nodeID string) (msg *genericMessage) {
return &genericMessage{
Type: "quit",
NodeID: nodeID,
}
}
func newSpawnMessageToWorker(t string, data map[string]int64, tasks []byte) (msg *genericMessage) {
return &genericMessage{
Type: t,
Data: data,
Tasks: tasks,
}
}
func newClientReadyMessageToMaster(nodeID string) (msg *genericMessage) {
return &genericMessage{
Type: "client_ready",
NodeID: nodeID,
}
}

View File

@@ -0,0 +1 @@
package boomer

View File

@@ -118,15 +118,15 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) {
var state string
switch output.State {
case stateInit:
case StateInit:
state = "initializing"
case stateSpawning:
case StateSpawning:
state = "spawning"
case stateRunning:
case StateRunning:
state = "running"
case stateQuitting:
case StateQuitting:
state = "quitting"
case stateStopped:
case StateStopped:
state = "stopped"
}
@@ -525,7 +525,7 @@ func (o *PrometheusPusherOutput) OnStart() {
// OnStop of PrometheusPusherOutput has nothing to do.
func (o *PrometheusPusherOutput) OnStop() {
// update runner state: stopped
gaugeState.Set(float64(stateStopped))
gaugeState.Set(float64(StateStopped))
if err := o.pusher.Push(); err != nil {
log.Error().Err(err).Msg("push to Pushgateway failed")
}

View File

@@ -10,20 +10,26 @@ import (
"sync/atomic"
"time"
"github.com/go-errors/errors"
"github.com/olekukonko/tablewriter"
"github.com/rs/zerolog/log"
)
const (
stateInit = iota + 1 // initializing
stateSpawning // spawning
stateRunning // running
stateQuitting // quitting
stateStopped // stopped
StateInit = iota + 1 // initializing
StateSpawning // spawning
StateRunning // running
StateStopping // stopping
StateStopped // stopped
StateQuitting // quitting
StateMissing // missing
)
const (
reportStatsInterval = 3 * time.Second
heartbeatInterval = 1 * time.Second
heartbeatLiveness = 3 * time.Second
)
type Loop struct {
@@ -51,23 +57,113 @@ func (l *Loop) increaseFinishedCount() {
atomic.AddInt64(&l.finishedCount, 1)
}
type SpawnInfo struct {
spawnCount int64 // target clients to spawn
acquiredCount int64 // count acquired of workers
spawnRate float64
spawnDone chan struct{}
mutex sync.RWMutex
}
func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) {
s.mutex.Lock()
defer s.mutex.Unlock()
if spawnCount > 0 {
atomic.StoreInt64(&s.spawnCount, spawnCount)
}
if spawnRate > 0 {
s.spawnRate = spawnRate
}
}
func (s *SpawnInfo) getSpawnCount() int64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return atomic.LoadInt64(&s.spawnCount)
}
func (s *SpawnInfo) getSpawnRate() float64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.spawnRate
}
func (s *SpawnInfo) getSpawnDone() chan struct{} {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.spawnDone
}
func (s *SpawnInfo) done() {
close(s.spawnDone)
}
func (s *SpawnInfo) isFinished() bool {
// return true when workers acquired
return atomic.LoadInt64(&s.acquiredCount) == atomic.LoadInt64(&s.spawnCount)
}
func (s *SpawnInfo) acquire() bool {
// get one ticket when there are still remaining spawn count to test
// return true when getting ticket successfully
if atomic.LoadInt64(&s.acquiredCount) < atomic.LoadInt64(&s.spawnCount) {
atomic.AddInt64(&s.acquiredCount, 1)
return true
}
return false
}
func (s *SpawnInfo) erase() bool {
// return true if acquiredCount > spawnCount
if atomic.LoadInt64(&s.acquiredCount) > atomic.LoadInt64(&s.spawnCount) {
atomic.AddInt64(&s.acquiredCount, -1)
return true
}
return false
}
func (s *SpawnInfo) increaseFinishedCount() {
atomic.AddInt64(&s.acquiredCount, -1)
}
func (s *SpawnInfo) reset() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.spawnCount = 0
s.spawnRate = 0
s.acquiredCount = 0
s.spawnDone = make(chan struct{})
}
type runner struct {
state int32
tasks []*Task
totalTaskWeight int
mutex sync.RWMutex
rateLimiter RateLimiter
rateLimitEnabled bool
stats *requestStats
currentClientsNum int32 // current clients count
spawnCount int // target clients to spawn
spawnRate float64
spawn *SpawnInfo
loop *Loop // specify loop count for testcase, count = loopCount * spawnCount
spawnDone chan struct{}
// when this channel is closed, all statistics are reported successfully
reportedChan chan bool
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// close this channel will stop all goroutines used in runner.
closeChan chan bool
outputs []Output
once *sync.Once
}
// safeRun runs fn and recovers from unexpected panics.
@@ -176,75 +272,104 @@ func (r *runner) reportTestResult() {
println()
}
func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) {
r.stopChan = make(chan bool)
r.reportedChan = make(chan bool)
r.spawn.reset()
r.spawn.setSpawn(spawnCount, spawnRate)
atomic.StoreInt32(&r.currentClientsNum, 0)
go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc)
}
func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) {
log.Info().
Int("spawnCount", spawnCount).
Int64("spawnCount", spawnCount).
Float64("spawnRate", spawnRate).
Msg("Spawning workers")
atomic.StoreInt32(&r.state, stateSpawning)
for i := 1; i <= spawnCount; i++ {
// spawn workers with rate limit
sleepTime := time.Duration(1000000/r.spawnRate) * time.Microsecond
time.Sleep(sleepTime)
// loop count per worker
var workerLoop *Loop
if r.loop != nil {
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawnCount)}
}
r.updateState(StateSpawning)
for {
select {
case <-quit:
// quit spawning goroutine
log.Info().Msg("Quitting spawning workers")
return
default:
atomic.AddInt32(&r.currentClientsNum, 1)
go func() {
for {
select {
case <-quit:
return
default:
if workerLoop != nil && !workerLoop.acquire() {
if r.isStarted() && r.spawn.acquire() {
// spawn workers with rate limit
sleepTime := time.Duration(1000000/r.spawn.getSpawnRate()) * time.Microsecond
time.Sleep(sleepTime)
// loop count per worker
var workerLoop *Loop
if r.loop != nil {
workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawn.spawnCount)}
}
atomic.AddInt32(&r.currentClientsNum, 1)
go func() {
for {
select {
case <-quit:
atomic.AddInt64(&r.spawn.acquiredCount, -1)
atomic.AddInt32(&r.currentClientsNum, -1)
return
}
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
default:
if workerLoop != nil && !workerLoop.acquire() {
return
}
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
task := r.getTask()
r.safeRun(task.Fn)
}
} else {
task := r.getTask()
r.safeRun(task.Fn)
}
} else {
task := r.getTask()
r.safeRun(task.Fn)
}
if workerLoop != nil {
// finished count of total
r.loop.increaseFinishedCount()
// finished count of single worker
workerLoop.increaseFinishedCount()
if r.loop.isFinished() {
r.stop()
if workerLoop != nil {
// finished count of total
r.loop.increaseFinishedCount()
// finished count of single worker
workerLoop.increaseFinishedCount()
if r.loop.isFinished() {
r.stop()
}
}
if r.spawn.erase() {
atomic.AddInt32(&r.currentClientsNum, -1)
return
}
if !r.isStarted() {
atomic.AddInt64(&r.spawn.acquiredCount, -1)
atomic.AddInt32(&r.currentClientsNum, -1)
return
}
}
}
}()
} else {
if r.getState() == StateSpawning {
r.spawn.done()
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
r.updateState(StateRunning)
}
}()
time.Sleep(1 * time.Second)
}
}
}
close(r.spawnDone)
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}
atomic.StoreInt32(&r.state, stateRunning)
}
// setTasks will set the runner's task list AND the total task weight
// which is used to get a random task later
func (r *runner) setTasks(t []*Task) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.tasks = t
weightSum := 0
@@ -255,6 +380,8 @@ func (r *runner) setTasks(t []*Task) {
}
func (r *runner) getTask() *Task {
r.mutex.RLock()
defer r.mutex.RUnlock()
tasksCount := len(r.tasks)
if tasksCount == 0 {
log.Error().Msg("no valid testcase found")
@@ -285,30 +412,78 @@ func (r *runner) getTask() *Task {
return nil
}
func (r *runner) statsStart() {
var ticker = time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if !r.isStarted() {
close(r.reportedChan)
return
}
}
}
}
func (r *runner) stop() {
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
}
func (r *runner) getState() int32 {
return atomic.LoadInt32(&r.state)
}
func (r *runner) updateState(state int32) {
log.Debug().Int32("from", atomic.LoadInt32(&r.state)).Int32("to", state).Msg("update runner state")
atomic.StoreInt32(&r.state, state)
}
func (r *runner) isStarted() bool {
return r.getState() == StateRunning || r.getState() == StateSpawning
}
type localRunner struct {
runner
// close this channel will stop all goroutines used in runner.
stopChan chan bool
}
func newLocalRunner(spawnCount int, spawnRate float64) *localRunner {
return &localRunner{
runner: runner{
state: stateInit,
spawnRate: spawnRate,
spawnCount: spawnCount,
stats: newRequestStats(),
outputs: make([]Output, 0),
spawnDone: make(chan struct{}),
state: StateInit,
stats: newRequestStats(),
outputs: make([]Output, 0),
spawn: &SpawnInfo{
spawnCount: int64(spawnCount),
spawnRate: spawnRate,
spawnDone: make(chan struct{}),
},
reportedChan: make(chan bool),
stopChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
},
stopChan: make(chan bool),
}
}
func (r *localRunner) start() {
// init state
atomic.StoreInt32(&r.state, stateInit)
r.updateState(StateInit)
atomic.StoreInt32(&r.currentClientsNum, 0)
r.stats.clearAll()
@@ -317,51 +492,20 @@ func (r *localRunner) start() {
r.rateLimiter.Start()
}
// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
quitChan := make(chan bool)
// when this channel is closed, all statistics are reported successfully
reportedChan := make(chan bool)
go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil)
go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil)
// output setup
r.outputOnStart()
// start running
go func() {
ticker := time.NewTicker(reportStatsInterval)
for {
select {
// record stats
case t := <-r.stats.transactionChan:
r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize)
case m := <-r.stats.requestSuccessChan:
r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength)
case n := <-r.stats.requestFailureChan:
r.stats.logRequest(n.requestType, n.name, n.responseTime, 0)
r.stats.logError(n.requestType, n.name, n.errMsg)
// report stats
case <-ticker.C:
r.reportStats()
// close reportedChan and return if the last stats is reported successfully
if atomic.LoadInt32(&r.state) == stateQuitting {
close(reportedChan)
return
}
}
}
}()
// start stats report
go r.runner.statsStart()
// stop
<-r.stopChan
atomic.StoreInt32(&r.state, stateQuitting)
// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(quitChan)
r.updateState(StateStopped)
// wait until all stats are reported successfully
<-reportedChan
<-r.reportedChan
// stop rate limiter
if r.rateLimitEnabled {
@@ -374,10 +518,494 @@ func (r *localRunner) start() {
// output teardown
r.outputOnStop()
atomic.StoreInt32(&r.state, stateStopped)
r.updateState(StateQuitting)
return
}
func (r *localRunner) stop() {
close(r.stopChan)
if r.runner.isStarted() {
r.runner.stop()
}
}
// workerRunner connects to the master, spawns goroutines and collects stats.
type workerRunner struct {
runner
nodeID string
masterHost string
masterPort int
client *grpcClient
// this channel will start worker for spawning.
spawnStartChan chan bool
// get testcase from master
testCaseBytes chan []byte
startFlag bool
ignoreQuit bool
}
func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) {
r = &workerRunner{
runner: runner{
stats: newRequestStats(),
spawn: &SpawnInfo{
spawnDone: make(chan struct{}),
},
stopChan: make(chan bool),
reportedChan: make(chan bool),
closeChan: make(chan bool),
once: &sync.Once{},
},
masterHost: masterHost,
masterPort: masterPort,
nodeID: getNodeID(),
spawnStartChan: make(chan bool),
testCaseBytes: make(chan []byte, 10),
}
return r
}
func (r *workerRunner) spawnComplete() {
data := make(map[string]int64)
data["count"] = r.spawn.getSpawnCount()
r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID)
r.updateState(StateRunning)
}
func (r *workerRunner) onSpawnMessage(msg *genericMessage) {
r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID)
spawnCount, ok := msg.Data["spawn_count"]
if ok {
r.spawn.setSpawn(spawnCount, -1)
}
spawnRate, ok := msg.Data["spawn_rate"]
if ok {
r.spawn.setSpawn(-1, float64(spawnRate))
}
if msg.Tasks != nil {
r.testCaseBytes <- msg.Tasks
}
log.Info().Msg("on spawn message successful")
}
// Runner acts as a state machine.
func (r *workerRunner) onMessage(msg *genericMessage) {
switch r.getState() {
case StateInit:
switch msg.Type {
case "spawn":
r.onSpawnMessage(msg)
case "quit":
r.close()
}
case StateSpawning:
fallthrough
case StateRunning:
switch msg.Type {
case "spawn":
r.onSpawnMessage(msg)
case "stop":
r.stop()
log.Info().Msg("Recv stop message from master, all the goroutines are stopped")
r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID)
case "quit":
r.close()
log.Info().Msg("Recv quit message from master, all the goroutines are stopped")
}
case StateStopped:
switch msg.Type {
case "spawn":
r.onSpawnMessage(msg)
go r.start()
case "quit":
r.close()
}
}
}
func (r *workerRunner) onQuiting() {
if r.getState() != StateQuitting {
r.client.sendChannel() <- newQuitMessage(r.nodeID)
}
r.updateState(StateQuitting)
}
func (r *workerRunner) startListener() {
for {
select {
case msg := <-r.client.recvChannel():
r.onMessage(msg)
case <-r.closeChan:
return
}
}
}
// run starts service
func (r *workerRunner) run() {
r.updateState(StateInit)
r.client = newClient(r.masterHost, r.masterPort, r.nodeID)
err := r.client.connect()
if err != nil {
log.Printf("Failed to connect to master(%s:%d) with error %v\n", r.masterHost, r.masterPort, err)
return
}
// listen to master
go r.startListener()
// register worker information to master
r.client.sendChannel() <- newGenericMessage("register", nil, r.nodeID)
// tell master, I'm ready
log.Info().Msg("send client ready signal")
r.client.sendChannel() <- newClientReadyMessageToMaster(r.nodeID)
// heartbeat
// See: https://github.com/locustio/locust/commit/a8c0d7d8c588f3980303358298870f2ea394ab93
go func() {
var ticker = time.NewTicker(heartbeatInterval)
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&r.client.failCount) > 2 {
r.updateState(StateMissing)
}
if r.getState() == StateMissing {
if r.client.reConnect() == nil {
r.updateState(StateInit)
}
}
CPUUsage := GetCurrentCPUUsage()
data := map[string]int64{
"state": int64(r.getState()),
"current_cpu_usage": int64(CPUUsage),
"spawn_count": int64(atomic.LoadInt32(&r.currentClientsNum)),
}
r.client.sendChannel() <- newGenericMessage("heartbeat", data, r.nodeID)
case <-r.closeChan:
return
}
}
}()
<-r.closeChan
}
func (r *workerRunner) start() {
r.startFlag = true
defer func() {
r.startFlag = false
}()
r.stats.clearAll()
// start rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Start()
}
r.once.Do(r.outputOnStart)
r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete)
// start stats report
go r.runner.statsStart()
<-r.reportedChan
r.reportTestResult()
r.outputOnStop()
}
func (r *workerRunner) stop() {
if r.isStarted() {
close(r.stopChan)
// stop rate limiter
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
r.updateState(StateStopped)
}
}
func (r *workerRunner) close() {
r.stop()
if r.ignoreQuit {
return
}
for r.startFlag == true {
time.Sleep(1 * time.Second)
}
close(r.closeChan)
var ticker = time.NewTicker(1 * time.Second)
if r.client != nil {
// waitting for quit message is sent to master
select {
case <-r.client.disconnectedChannel():
break
case <-ticker.C:
log.Warn().Msg("Timeout waiting for sending quit message to master, boomer will quit any way.")
r.onQuiting()
}
r.client.close()
}
}
// masterRunner controls worker to spawn goroutines and collect stats.
type masterRunner struct {
runner
masterBindHost string
masterBindPort int
server *grpcServer
autoStart bool
expectWorkers int
expectWorkersMaxWait int
parseTestCasesChan chan bool
startFlag bool
testCaseBytes chan []byte
mutex sync.Mutex
}
func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner {
return &masterRunner{
runner: runner{
state: StateInit,
spawn: &SpawnInfo{
spawnDone: make(chan struct{}),
},
closeChan: make(chan bool),
},
masterBindHost: masterBindHost,
masterBindPort: masterBindPort,
server: newServer(masterBindHost, masterBindPort),
parseTestCasesChan: make(chan bool),
startFlag: false,
testCaseBytes: make(chan []byte),
}
}
func (r *masterRunner) setExpectWorkers(expectWorkers int, expectWorkersMaxWait int) {
r.expectWorkers = expectWorkers
r.expectWorkersMaxWait = expectWorkersMaxWait
}
func (r *masterRunner) heartbeatWorker() {
log.Info().Msg("heartbeatWorker, listen and record heartbeat from worker")
var ticker = time.NewTicker(heartbeatInterval)
for {
select {
case <-r.closeChan:
return
case <-ticker.C:
r.server.clients.Range(func(key, value interface{}) bool {
workerInfo, ok := value.(*WorkerNode)
if !ok {
log.Error().Msg("failed to get worker information")
}
if atomic.LoadInt32(&workerInfo.Heartbeat) <= 0 && workerInfo.getState() != StateMissing {
workerInfo.setState(StateMissing)
if r.getState() == StateRunning {
// all running workers missed, stopping runner
if r.server.getClientsLength() <= 0 {
r.updateState(StateStopped)
}
}
} else {
atomic.AddInt32(&workerInfo.Heartbeat, -1)
}
return true
})
}
}
}
func (r *masterRunner) clientListener() {
log.Info().Msg("clientListener, start to deal message from worker")
for {
select {
case <-r.closeChan:
return
case msg := <-r.server.recvChannel():
worker, ok := r.server.getClients().Load(msg.NodeID)
if !ok {
continue
}
workerInfo, ok := worker.(*WorkerNode)
if !ok {
continue
}
switch msg.Type {
case typeClientReady:
if workerInfo.getState() == StateInit {
break
}
workerInfo.setState(StateInit)
if r.getState() == StateRunning {
println(fmt.Sprintf("worker(%s) joined, ready to rebalance the load of each worker", workerInfo.ID))
err := r.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
}
}
case typeClientStopped:
workerInfo.setState(StateStopped)
if r.server.getWorkersLengthByState(StateStopped)+r.server.getWorkersLengthByState(StateInit) == r.server.getClientsLength() {
r.updateState(StateStopped)
}
case typeHeartbeat:
if workerInfo.getState() != int32(msg.Data["state"]) {
workerInfo.setState(int32(msg.Data["state"]))
}
workerInfo.updateHeartbeat(3)
if workerInfo.getCPUUsage() != float64(msg.Data["current_cpu_usage"]) {
workerInfo.updateCPUUsage(float64(msg.Data["current_cpu_usage"]))
}
if workerInfo.getSpawnCount() != msg.Data["spawn_count"] {
workerInfo.updateSpawnCount(msg.Data["spawn_count"])
}
case typeSpawning:
workerInfo.setState(StateSpawning)
case typeSpawningComplete:
workerInfo.setState(StateRunning)
if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() {
println(fmt.Sprintf("all(%v) workers spawn done, setting state as running", r.server.getClientsLength()))
r.updateState(StateRunning)
}
case typeQuit:
if workerInfo.getState() == StateQuitting {
break
}
workerInfo.setState(StateQuitting)
if r.isStarted() {
if r.server.getClientsLength() > 0 {
println(fmt.Sprintf("worker(%s) quited, ready to rebalance the load of each worker", workerInfo.ID))
err := r.rebalance()
if err != nil {
log.Error().Err(err).Msg("failed to rebalance")
}
}
}
case typeException:
// Todo
default:
}
}
}
}
func (r *masterRunner) run() {
r.updateState(StateInit)
// start grpc server
err := r.server.start()
if err != nil {
log.Error().Err(err).Msg("failed to start grpc server")
return
}
// listen and deal message from worker
go r.clientListener()
// listen and record heartbeat from worker
go r.heartbeatWorker()
if r.autoStart {
log.Info().Msg("auto start, waiting expected workers joined")
var ticker = time.NewTicker(1 * time.Second)
var tickerMaxWait = time.NewTicker(time.Duration(r.expectWorkersMaxWait) * time.Second)
FOR:
for {
select {
case <-r.closeChan:
return
case <-ticker.C:
c := r.server.getClientsLength()
log.Info().Msg(fmt.Sprintf("expected worker number: %v, current worker count: %v", r.expectWorkers, c))
if c >= r.expectWorkers {
go func() {
err = r.start()
if err != nil {
log.Error().Err(err).Msg("failed to run")
os.Exit(1)
}
}()
break FOR
}
case <-tickerMaxWait.C:
log.Warn().Msg("reached max wait time, quiting")
r.onQuiting()
os.Exit(1)
}
}
}
<-r.closeChan
}
func (r *masterRunner) start() error {
numWorkers := r.server.getClientsLength()
if numWorkers == 0 {
return errors.New("current workers: 0")
}
workerSpawnRate := r.spawn.spawnRate / float64(numWorkers)
workerSpawnCount := r.spawn.getSpawnCount() / int64(numWorkers)
log.Info().Msg("send spawn data to worker")
r.updateState(StateSpawning)
// waitting to fetch testcase
testcase, ok := r.fetchTestCase()
if !ok {
return errors.New("starting, do not retry frequently")
}
r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{
"spawn_count": workerSpawnCount,
"spawn_rate": int64(workerSpawnRate),
}, testcase)
println("send spawn data to worker successful")
log.Info().Msg("send spawn data to worker successful")
return nil
}
func (r *masterRunner) fetchTestCase() ([]byte, bool) {
if r.startFlag {
return nil, false
}
r.startFlag = true
defer func() {
r.startFlag = false
}()
r.parseTestCasesChan <- true
return <-r.testCaseBytes, true
}
func (r *masterRunner) rebalance() error {
return r.start()
}
func (r *masterRunner) stop() {
if r.isStarted() {
r.updateState(StateStopping)
r.server.sendChannel() <- &genericMessage{Type: "stop", Data: map[string]int64{}}
r.updateState(StateStopped)
}
}
func (r *masterRunner) onQuiting() {
if r.getState() != StateQuitting {
r.server.sendChannel() <- &genericMessage{
Type: "quit",
}
}
r.updateState(StateQuitting)
}
func (r *masterRunner) close() {
r.onQuiting()
r.server.wg.Wait()
close(r.closeChan)
r.server.close()
}

View File

@@ -1,6 +1,7 @@
package boomer
import (
"sync"
"sync/atomic"
"testing"
"time"
@@ -112,3 +113,416 @@ func TestLoopCount(t *testing.T) {
t.Fatal()
}
}
func TestSpawnWorkers(t *testing.T) {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}
tasks := []*Task{taskA}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.setTasks(tasks)
go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete)
time.Sleep(10 * time.Millisecond)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
if currentClients != 10 {
t.Error("Unexpected count", currentClients)
}
}
func TestSpawnWorkersWithManyTasks(t *testing.T) {
var lock sync.Mutex
taskCalls := map[string]int{}
createTask := func(name string, weight int) *Task {
return &Task{
Name: name,
Weight: weight,
Fn: func() {
lock.Lock()
taskCalls[name]++
lock.Unlock()
},
}
}
tasks := []*Task{
createTask("one hundred", 100),
createTask("ten", 10),
createTask("one", 1),
}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.setTasks(tasks)
runner.client = newClient("localhost", 5557, runner.nodeID)
const numToSpawn int64 = 30
runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete)
time.Sleep(2 * time.Second)
currentClients := atomic.LoadInt32(&runner.currentClientsNum)
assert.Equal(t, numToSpawn, int(currentClients))
lock.Lock()
hundreds := taskCalls["one hundred"]
tens := taskCalls["ten"]
ones := taskCalls["one"]
lock.Unlock()
total := hundreds + tens + ones
t.Logf("total tasks run: %d\n", total)
assert.True(t, total > 111)
assert.True(t, ones > 1)
actPercentage := float64(ones) / float64(total)
expectedPercentage := 1.0 / 111.0
if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage {
t.Errorf("Unexpected percentage of ones task: exp %v, act %v", expectedPercentage, actPercentage)
}
assert.True(t, tens > 10)
actPercentage = float64(tens) / float64(total)
expectedPercentage = 10.0 / 111.0
if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage {
t.Errorf("Unexpected percentage of tens task: exp %v, act %v", expectedPercentage, actPercentage)
}
assert.True(t, hundreds > 100)
actPercentage = float64(hundreds) / float64(total)
expectedPercentage = 100.0 / 111.0
if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage {
t.Errorf("Unexpected percentage of hundreds task: exp %v, act %v", expectedPercentage, actPercentage)
}
}
func TestSpawnAndStop(t *testing.T) {
taskA := &Task{
Fn: func() {
time.Sleep(time.Second)
},
}
taskB := &Task{
Fn: func() {
time.Sleep(2 * time.Second)
},
}
tasks := []*Task{taskA, taskB}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.setTasks(tasks)
runner.spawn.setSpawn(10, 10)
runner.updateState(StateSpawning)
go runner.start()
// wait for spawning goroutines
time.Sleep(2 * time.Second)
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count", atomic.LoadInt32(&runner.currentClientsNum))
}
msg := <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message when spawning completed, got", msg.Type)
}
runner.stop()
runner.onQuiting()
msg = <-runner.client.sendChannel()
if msg.Type != "quit" {
t.Error("Runner should send quit message on quitting, got", msg.Type)
}
}
func TestStop(t *testing.T) {
taskA := &Task{
Fn: func() {
time.Sleep(time.Second)
},
}
tasks := []*Task{taskA}
runner := newWorkerRunner("localhost", 5557)
runner.setTasks(tasks)
runner.spawn.setSpawn(10, 10)
runner.updateState(StateSpawning)
runner.stop()
if runner.getState() != StateStopped {
t.Error("Expected runner state to be 5, was", runner.getState())
}
}
func TestOnSpawnMessage(t *testing.T) {
taskA := &Task{
Fn: func() {
time.Sleep(time.Second)
},
}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.updateState(StateInit)
runner.setTasks([]*Task{taskA})
runner.spawn.spawnCount = 100
runner.spawn.spawnRate = 100
runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 20,
"spawn_rate": 20,
}, runner.nodeID))
if runner.spawn.spawnCount != 20 {
t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnCount)
}
if runner.spawn.spawnRate != 20 {
t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnRate)
}
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
}
func TestOnQuitMessage(t *testing.T) {
runner := newWorkerRunner("localhost", 5557)
runner.client = newClient("localhost", 5557, "test")
runner.updateState(StateInit)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
runner.updateState(StateRunning)
runner.closeChan = make(chan bool)
runner.stopChan = make(chan bool)
runner.client.shutdownChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
if runner.getState() != StateQuitting {
t.Error("Runner's state should be StateQuitting")
}
runner.updateState(StateStopped)
runner.closeChan = make(chan bool)
runner.stopChan = make(chan bool)
runner.client.shutdownChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
<-runner.closeChan
if runner.getState() != StateQuitting {
t.Error("Runner's state should be StateQuitting")
}
}
func TestOnMessage(t *testing.T) {
taskA := &Task{
Fn: func() {
time.Sleep(time.Second)
},
}
taskB := &Task{
Fn: func() {
time.Sleep(2 * time.Second)
},
}
tasks := []*Task{taskA, taskB}
runner := newWorkerRunner("localhost", 5557)
defer runner.close()
runner.client = newClient("localhost", 5557, runner.nodeID)
runner.updateState(StateInit)
runner.setTasks(tasks)
go runner.start()
// start spawning
runner.onMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 10,
"spawn_rate": 10,
}, runner.nodeID))
msg := <-runner.client.sendChannel()
if msg.Type != "spawning" {
t.Error("Runner should send spawning message when starting spawn, got", msg.Type)
}
// spawn complete and running
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
}
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type)
}
// increase goroutines while running
runner.onMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 15,
"spawn_rate": 15,
}, runner.nodeID))
msg = <-runner.client.sendChannel()
if msg.Type != "spawning" {
t.Error("Runner should send spawning message when starting spawn, got", msg.Type)
}
time.Sleep(2 * time.Second)
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message, got", msg.Type)
}
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
if atomic.LoadInt32(&runner.currentClientsNum) != 15 {
t.Error("Number of goroutines mismatches, expected: 20, current count:", atomic.LoadInt32(&runner.currentClientsNum))
}
// stop all the workers
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
if runner.getState() != StateStopped {
t.Error("State of runner is not stopped, got", runner.getState())
}
msg = <-runner.client.sendChannel()
if msg.Type != "client_stopped" {
t.Error("Runner should send client_stopped message, got", msg.Type)
}
// spawn again
runner.onMessage(newGenericMessage("spawn", map[string]int64{
"spawn_count": 10,
"spawn_rate": 10,
}, runner.nodeID))
msg = <-runner.client.sendChannel()
if msg.Type != "spawning" {
t.Error("Runner should send spawning message when starting spawn, got", msg.Type)
}
// spawn complete and running
time.Sleep(2 * time.Second)
if runner.getState() != StateRunning {
t.Error("State of runner is not running after spawn, got", runner.getState())
}
if atomic.LoadInt32(&runner.currentClientsNum) != 10 {
t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum))
}
msg = <-runner.client.sendChannel()
if msg.Type != "spawning_complete" {
t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type)
}
// stop all the workers
runner.onMessage(newGenericMessage("stop", nil, runner.nodeID))
if runner.getState() != StateStopped {
t.Error("State of runner is not stopped, got", runner.getState())
}
msg = <-runner.client.sendChannel()
if msg.Type != "client_stopped" {
t.Error("Runner should send client_stopped message, got", msg.Type)
}
}
func TestClientListener(t *testing.T) {
runner := newMasterRunner("localhost", 5557)
defer runner.close()
runner.updateState(StateInit)
runner.spawn.setSpawn(10, 10)
go runner.clientListener()
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3})
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3})
runner.server.recvChannel() <- &genericMessage{
Type: typeClientReady,
NodeID: "testID1",
}
worker1, ok := runner.server.getClients().Load("testID1")
if !ok {
t.Fatal("error")
}
workerInfo1, ok := worker1.(*WorkerNode)
if !ok {
t.Fatal("error")
}
time.Sleep(time.Second)
if workerInfo1.getState() != StateInit {
t.Error("State of worker runner is not init, got", workerInfo1.getState())
}
runner.server.recvChannel() <- &genericMessage{
Type: typeClientStopped,
NodeID: "testID2",
}
worker2, ok := runner.server.getClients().Load("testID2")
if !ok {
t.Fatal("error")
}
workerInfo2, ok := worker2.(*WorkerNode)
if !ok {
t.Fatal("error")
}
time.Sleep(time.Second)
if workerInfo2.getState() != StateStopped {
t.Error("State of worker runner is not stopped, got", workerInfo2.getState())
}
runner.server.recvChannel() <- &genericMessage{
Type: typeClientStopped,
NodeID: "testID1",
}
time.Sleep(time.Second)
if runner.getState() != StateStopped {
t.Error("State of master runner is not stopped, got", runner.getState())
}
}
func TestHeartbeatWorker(t *testing.T) {
runner := newMasterRunner("localhost", 5557)
defer runner.close()
runner.updateState(StateInit)
runner.spawn.setSpawn(10, 10)
runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 1, State: StateInit})
runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit})
go runner.clientListener()
go runner.heartbeatWorker()
time.Sleep(3 * time.Second)
worker1, ok := runner.server.getClients().Load("testID1")
if !ok {
t.Fatal()
}
workerInfo1, ok := worker1.(*WorkerNode)
if !ok {
t.Fatal()
}
if workerInfo1.getState() != StateMissing {
t.Error("expected state of worker runner is missing, but got", workerInfo1.getState())
}
runner.server.recvChannel() <- &genericMessage{
Type: typeHeartbeat,
NodeID: "testID2",
Data: map[string]int64{"state": 3},
}
worker2, ok := runner.server.getClients().Load("testID2")
if !ok {
t.Fatal()
}
workerInfo2, ok := worker2.(*WorkerNode)
if !ok {
t.Fatal()
}
time.Sleep(time.Second)
if workerInfo2.getState() == StateMissing {
t.Error("expected state of worker runner is not missing, but got missing")
}
}

View File

@@ -0,0 +1 @@
package boomer

View File

@@ -0,0 +1,343 @@
package boomer
import (
"context"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"github.com/httprunner/httprunner/v4/hrp/internal/grpc/messager"
"github.com/rs/zerolog/log"
)
func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_BidirectionalStreamingMessageServer) error {
s.wg.Add(1)
defer s.wg.Done()
req, err := srv.Recv()
switch err {
case nil:
break
case io.EOF:
return nil
default:
if err.Error() == status.Error(codes.Canceled, context.Canceled.Error()).Error() {
return nil
}
log.Error().Err(err).Msg("failed to get stream from client")
return err
}
wn := &WorkerNode{messenger: srv, ID: req.NodeID, Heartbeat: 3}
s.clients.Store(req.NodeID, wn)
println(fmt.Sprintf("worker(%v) joined, current worker count: %v", req.NodeID, s.getClientsLength()))
<-s.disconnectedChannel()
s.clients.Delete(req.NodeID)
println(fmt.Sprintf("worker(%v) quited, current worker count: %v", req.NodeID, s.getClientsLength()))
return nil
}
type WorkerNode struct {
ID string `json:"id"`
State int32 `json:"state"`
Heartbeat int32 `json:"heartbeat"`
SpawnCount int64 `json:"spawn_count"`
CPUUsage float64 `json:"cpu_usage"`
CPUWarningEmitted bool `json:"cpu_warning_emitted"`
MemoryUsage float64 `json:"memory_usage"`
messenger messager.Message_BidirectionalStreamingMessageServer
mutex sync.RWMutex
}
func (w *WorkerNode) getState() int32 {
return atomic.LoadInt32(&w.State)
}
func (w *WorkerNode) setState(state int32) {
atomic.StoreInt32(&w.State, state)
}
func (w *WorkerNode) updateHeartbeat(heartbeat int32) {
atomic.StoreInt32(&w.Heartbeat, heartbeat)
}
func (w *WorkerNode) getHeartbeat() int32 {
return atomic.LoadInt32(&w.Heartbeat)
}
func (w *WorkerNode) updateSpawnCount(spawnCount int64) {
atomic.StoreInt64(&w.SpawnCount, spawnCount)
}
func (w *WorkerNode) getSpawnCount() int64 {
return atomic.LoadInt64(&w.SpawnCount)
}
func (w *WorkerNode) updateCPUUsage(cpuUsage float64) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.CPUUsage = cpuUsage
}
func (w *WorkerNode) getCPUUsage() float64 {
w.mutex.RLock()
defer w.mutex.RUnlock()
return w.CPUUsage
}
func (w *WorkerNode) updateCPUWarningEmitted(cpuWarningEmitted bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.CPUWarningEmitted = cpuWarningEmitted
}
func (w *WorkerNode) getCPUWarningEmitted() bool {
w.mutex.RLock()
defer w.mutex.RUnlock()
return w.CPUWarningEmitted
}
func (w *WorkerNode) updateMemoryUsage(memoryUsage float64) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.MemoryUsage = memoryUsage
}
func (w *WorkerNode) getMemoryUsage() float64 {
w.mutex.RLock()
defer w.mutex.RUnlock()
return w.MemoryUsage
}
func (w *WorkerNode) getWorkerInfo() WorkerNode {
w.mutex.RLock()
defer w.mutex.RUnlock()
return WorkerNode{
ID: w.ID,
State: w.getState(),
Heartbeat: w.getHeartbeat(),
SpawnCount: w.getSpawnCount(),
CPUUsage: w.getCPUUsage(),
CPUWarningEmitted: w.getCPUWarningEmitted(),
MemoryUsage: w.getMemoryUsage(),
}
}
type grpcServer struct {
messager.UnimplementedMessageServer
masterHost string
masterPort int
server *grpc.Server
clients *sync.Map
fromWorker chan *genericMessage
toWorker chan *genericMessage
disconnectedToWorker chan bool
shutdownChan chan bool
wg sync.WaitGroup
}
func newServer(masterHost string, masterPort int) (server *grpcServer) {
log.Info().Msg("Boomer is built with grpc support.")
server = &grpcServer{
masterHost: masterHost,
masterPort: masterPort,
clients: &sync.Map{},
fromWorker: make(chan *genericMessage, 100),
toWorker: make(chan *genericMessage, 100),
disconnectedToWorker: make(chan bool),
shutdownChan: make(chan bool),
}
return server
}
func (s *grpcServer) start() (err error) {
addr := fmt.Sprintf("%v:%v", s.masterHost, s.masterPort)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Error().Err(err).Msg("failed to listen")
return
}
// create gRPC server
serv := grpc.NewServer()
// register message server
messager.RegisterMessageServer(serv, s)
reflection.Register(serv)
// start grpc server
go func() {
err = serv.Serve(lis)
if err != nil {
log.Error().Err(err).Msg("failed to serve")
return
}
}()
go s.recv()
go s.send()
return nil
}
func (s *grpcServer) getWorkersByState(state int32) (wns []*WorkerNode) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() == state {
wns = append(wns, workerInfo)
}
}
return true
})
return wns
}
func (s *grpcServer) getWorkersLengthByState(state int32) (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() == state {
l++
}
}
return true
})
return
}
func (s *grpcServer) getAllWorkers() (wns []WorkerNode) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
wns = append(wns, workerInfo.getWorkerInfo())
}
return true
})
return wns
}
func (s *grpcServer) getClients() *sync.Map {
return s.clients
}
func (s *grpcServer) getClientsLength() (l int) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() != StateQuitting && workerInfo.getState() != StateMissing {
l++
}
}
return true
})
return
}
func (s *grpcServer) close() {
close(s.shutdownChan)
}
func (s *grpcServer) recvChannel() chan *genericMessage {
return s.fromWorker
}
func (s *grpcServer) shutdownChannel() chan bool {
return s.shutdownChan
}
func (s *grpcServer) recv() {
for {
select {
case <-s.shutdownChan:
return
default:
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing {
return true
}
msg, err := workerInfo.messenger.Recv()
switch err {
case nil:
if msg == nil {
return true
}
s.fromWorker <- newGenericMessage(msg.Type, msg.Data, msg.NodeID)
log.Info().
Str("nodeID", msg.NodeID).
Str("type", msg.Type).
Interface("data", msg.Data).
Msg("receive data from worker")
case io.EOF:
s.fromWorker <- newQuitMessage(workerInfo.ID)
default:
if err.Error() == status.Error(codes.Canceled, context.Canceled.Error()).Error() {
s.fromWorker <- newQuitMessage(workerInfo.ID)
return true
}
log.Error().Err(err).Msg("failed to get stream from client")
}
}
return true
})
}
}
}
func (s *grpcServer) sendChannel() chan *genericMessage {
return s.toWorker
}
func (s *grpcServer) send() {
for {
select {
case <-s.shutdownChan:
return
case msg := <-s.toWorker:
s.sendMessage(msg)
// We may send genericMessage to Worker.
if msg.Type == "quit" {
close(s.disconnectedToWorker)
}
}
}
}
func (s *grpcServer) sendMessage(msg *genericMessage) {
s.clients.Range(func(key, value interface{}) bool {
if workerInfo, ok := value.(*WorkerNode); ok {
if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing {
return true
}
err := workerInfo.messenger.Send(
&messager.StreamResponse{
Type: msg.Type,
Data: msg.Data,
NodeID: workerInfo.ID,
Tasks: msg.Tasks},
)
switch err {
case nil:
break
case io.EOF:
fallthrough
default:
s.fromWorker <- newQuitMessage(workerInfo.ID)
log.Error().Err(err).Msg("failed to send message")
return true
}
log.Info().
Str("nodeID", workerInfo.ID).
Str("type", msg.Type).
Interface("data", msg.Data).
Int32("state", workerInfo.getState()).
Msg("send data to worker")
}
return true
})
}
func (s *grpcServer) disconnectedChannel() chan bool {
return s.disconnectedToWorker
}

View File

@@ -0,0 +1 @@
package boomer

View File

@@ -6,10 +6,15 @@ import (
"io"
"math"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/shirou/gopsutil/process"
)
func round(val float64, roundOn float64, places int) (newVal float64) {
@@ -75,3 +80,27 @@ func startCPUProfile(file string, duration time.Duration) (err error) {
})
return nil
}
// generate a random nodeID like locust does, using the same algorithm.
func getNodeID() (nodeID string) {
hostname, _ := os.Hostname()
id := strings.Replace(uuid.New().String(), "-", "", -1)
nodeID = fmt.Sprintf("%s_%s", hostname, id)
return
}
// GetCurrentCPUUsage get current CPU usage
func GetCurrentCPUUsage() float64 {
currentPid := os.Getpid()
p, err := process.NewProcess(int32(currentPid))
if err != nil {
log.Printf("Fail to get CPU percent, %v\n", err)
return 0.0
}
percent, err := p.CPUPercent()
if err != nil {
log.Printf("Fail to get CPU percent, %v\n", err)
return 0.0
}
return percent / float64(runtime.NumCPU())
}

View File

@@ -1,11 +1,13 @@
package builtin
import (
"archive/zip"
"bufio"
"bytes"
"encoding/csv"
builtinJSON "encoding/json"
"fmt"
"io"
"math/rand"
"os"
"os/exec"
@@ -490,3 +492,168 @@ func GetFileNameWithoutExtension(path string) string {
ext := filepath.Ext(base)
return base[0 : len(base)-len(ext)]
}
func ZipDir(filename string, root string) error {
p, err := os.Getwd()
if err != nil {
return err
}
if strings.Contains(root, p) {
root, err = filepath.Rel(p, root)
if err != nil {
return err
}
}
err = os.RemoveAll(filename)
if err != nil {
return err
}
var files []string
err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
files = append(files, path)
return nil
})
if err != nil {
return err
}
err = ZipFiles(filename, files)
return err
}
// ZipFiles compresses one or many files into a single zip archive file.
// Param 1: filename is the output zip file's name.
// Param 2: files is a list of files to add to the zip.
func ZipFiles(filename string, files []string) error {
newZipFile, err := os.Create(filename)
if err != nil {
return err
}
defer newZipFile.Close()
zipWriter := zip.NewWriter(newZipFile)
defer zipWriter.Close()
// Add files to zip
for _, file := range files {
if err = AddFileToZip(zipWriter, file); err != nil {
return err
}
}
return nil
}
func AddFileToZip(zipWriter *zip.Writer, filename string) error {
fileToZip, err := os.Open(filename)
if err != nil {
return err
}
defer fileToZip.Close()
// Get the file information
info, err := fileToZip.Stat()
if err != nil {
return err
}
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// Using FileInfoHeader() above only uses the basename of the file. If we want
// to preserve the folder structure we can overwrite this with the full path.
header.Name = filename
// if dir
if info.IsDir() {
header.Name += `/`
} else {
// Change to deflate to gain better compression
// see http://golang.org/pkg/archive/zip/#pkg-constants
header.Method = zip.Deflate
}
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return err
}
if !info.IsDir() {
_, err = io.Copy(writer, fileToZip)
}
return err
}
func UnZip(dst, src string) (err error) {
zr, err := zip.OpenReader(src)
defer zr.Close()
if err != nil {
return
}
if dst != "" {
if err := os.MkdirAll(dst, 0755); err != nil {
return err
}
}
for _, file := range zr.File {
path := filepath.Join(dst, file.Name)
if file.FileInfo().IsDir() {
if err := os.MkdirAll(path, file.Mode()); err != nil {
return err
}
continue
}
fr, err := file.Open()
if err != nil {
return err
}
fw, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, file.Mode())
if err != nil {
return err
}
_, err = io.Copy(fw, fr)
if err != nil {
return err
}
log.Info().Msg(fmt.Sprintf("unzip %s successful\n", path))
_ = fw.Close()
_ = fr.Close()
}
return nil
}
func File2Bytes(filename string) ([]byte, error) {
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
stats, err := file.Stat()
if err != nil {
return nil, err
}
data := make([]byte, stats.Size())
count, err := file.Read(data)
if err != nil {
return nil, err
}
log.Info().Msg(fmt.Sprintf("read file %s len: %d \n", filename, count))
return data, nil
}
func Bytes2File(data []byte, filename string) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
count, err := file.Write(data)
if err != nil {
return err
}
log.Info().Msg(fmt.Sprintf("write file %s len: %d \n", filename, count))
return nil
}

View File

@@ -0,0 +1,276 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.20.0
// source: grpc/proto/messager.proto
package messager
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
}
func (x *StreamRequest) Reset() {
*x = StreamRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_grpc_proto_messager_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamRequest) ProtoMessage() {}
func (x *StreamRequest) ProtoReflect() protoreflect.Message {
mi := &file_grpc_proto_messager_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamRequest.ProtoReflect.Descriptor instead.
func (*StreamRequest) Descriptor() ([]byte, []int) {
return file_grpc_proto_messager_proto_rawDescGZIP(), []int{0}
}
func (x *StreamRequest) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (x *StreamRequest) GetData() map[string]int64 {
if x != nil {
return x.Data
}
return nil
}
func (x *StreamRequest) GetNodeID() string {
if x != nil {
return x.NodeID
}
return ""
}
type StreamResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
Tasks []byte `protobuf:"bytes,4,opt,name=tasks,proto3" json:"tasks,omitempty"`
}
func (x *StreamResponse) Reset() {
*x = StreamResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_grpc_proto_messager_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamResponse) ProtoMessage() {}
func (x *StreamResponse) ProtoReflect() protoreflect.Message {
mi := &file_grpc_proto_messager_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamResponse.ProtoReflect.Descriptor instead.
func (*StreamResponse) Descriptor() ([]byte, []int) {
return file_grpc_proto_messager_proto_rawDescGZIP(), []int{1}
}
func (x *StreamResponse) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (x *StreamResponse) GetData() map[string]int64 {
if x != nil {
return x.Data
}
return nil
}
func (x *StreamResponse) GetNodeID() string {
if x != nil {
return x.NodeID
}
return ""
}
func (x *StreamResponse) GetTasks() []byte {
if x != nil {
return x.Tasks
}
return nil
}
var File_grpc_proto_messager_proto protoreflect.FileDescriptor
var file_grpc_proto_messager_proto_rawDesc = []byte{
0x0a, 0x19, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x22, 0xaa, 0x01, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x34, 0x0a, 0x04, 0x64, 0x61,
0x74, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xc2, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61,
0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e,
0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12,
0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73,
0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a,
0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70,
0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_grpc_proto_messager_proto_rawDescOnce sync.Once
file_grpc_proto_messager_proto_rawDescData = file_grpc_proto_messager_proto_rawDesc
)
func file_grpc_proto_messager_proto_rawDescGZIP() []byte {
file_grpc_proto_messager_proto_rawDescOnce.Do(func() {
file_grpc_proto_messager_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_messager_proto_rawDescData)
})
return file_grpc_proto_messager_proto_rawDescData
}
var file_grpc_proto_messager_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_grpc_proto_messager_proto_goTypes = []interface{}{
(*StreamRequest)(nil), // 0: message.StreamRequest
(*StreamResponse)(nil), // 1: message.StreamResponse
nil, // 2: message.StreamRequest.DataEntry
nil, // 3: message.StreamResponse.DataEntry
}
var file_grpc_proto_messager_proto_depIdxs = []int32{
2, // 0: message.StreamRequest.data:type_name -> message.StreamRequest.DataEntry
3, // 1: message.StreamResponse.data:type_name -> message.StreamResponse.DataEntry
0, // 2: message.Message.BidirectionalStreamingMessage:input_type -> message.StreamRequest
1, // 3: message.Message.BidirectionalStreamingMessage:output_type -> message.StreamResponse
3, // [3:4] is the sub-list for method output_type
2, // [2:3] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_grpc_proto_messager_proto_init() }
func file_grpc_proto_messager_proto_init() {
if File_grpc_proto_messager_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_grpc_proto_messager_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_grpc_proto_messager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_grpc_proto_messager_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_grpc_proto_messager_proto_goTypes,
DependencyIndexes: file_grpc_proto_messager_proto_depIdxs,
MessageInfos: file_grpc_proto_messager_proto_msgTypes,
}.Build()
File_grpc_proto_messager_proto = out.File
file_grpc_proto_messager_proto_rawDesc = nil
file_grpc_proto_messager_proto_goTypes = nil
file_grpc_proto_messager_proto_depIdxs = nil
}

View File

@@ -0,0 +1,137 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.20.0
// source: grpc/proto/messager.proto
package messager
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// MessageClient is the client API for Message service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type MessageClient interface {
BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error)
}
type messageClient struct {
cc grpc.ClientConnInterface
}
func NewMessageClient(cc grpc.ClientConnInterface) MessageClient {
return &messageClient{cc}
}
func (c *messageClient) BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) {
stream, err := c.cc.NewStream(ctx, &Message_ServiceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...)
if err != nil {
return nil, err
}
x := &messageBidirectionalStreamingMessageClient{stream}
return x, nil
}
type Message_BidirectionalStreamingMessageClient interface {
Send(*StreamRequest) error
Recv() (*StreamResponse, error)
grpc.ClientStream
}
type messageBidirectionalStreamingMessageClient struct {
grpc.ClientStream
}
func (x *messageBidirectionalStreamingMessageClient) Send(m *StreamRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *messageBidirectionalStreamingMessageClient) Recv() (*StreamResponse, error) {
m := new(StreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// MessageServer is the server API for Message service.
// All implementations must embed UnimplementedMessageServer
// for forward compatibility
type MessageServer interface {
BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error
mustEmbedUnimplementedMessageServer()
}
// UnimplementedMessageServer must be embedded to have forward compatible implementations.
type UnimplementedMessageServer struct {
}
func (UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error {
return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingMessage not implemented")
}
func (UnimplementedMessageServer) mustEmbedUnimplementedMessageServer() {}
// UnsafeMessageServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to MessageServer will
// result in compilation errors.
type UnsafeMessageServer interface {
mustEmbedUnimplementedMessageServer()
}
func RegisterMessageServer(s grpc.ServiceRegistrar, srv MessageServer) {
s.RegisterService(&Message_ServiceDesc, srv)
}
func _Message_BidirectionalStreamingMessage_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MessageServer).BidirectionalStreamingMessage(&messageBidirectionalStreamingMessageServer{stream})
}
type Message_BidirectionalStreamingMessageServer interface {
Send(*StreamResponse) error
Recv() (*StreamRequest, error)
grpc.ServerStream
}
type messageBidirectionalStreamingMessageServer struct {
grpc.ServerStream
}
func (x *messageBidirectionalStreamingMessageServer) Send(m *StreamResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *messageBidirectionalStreamingMessageServer) Recv() (*StreamRequest, error) {
m := new(StreamRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Message_ServiceDesc is the grpc.ServiceDesc for Message service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Message_ServiceDesc = grpc.ServiceDesc{
ServiceName: "message.Message",
HandlerType: (*MessageServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "BidirectionalStreamingMessage",
Handler: _Message_BidirectionalStreamingMessage_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "grpc/proto/messager.proto",
}

View File

@@ -0,0 +1,22 @@
syntax = "proto3";
package message;
option go_package = "grpc/messager";
service Message {
rpc BidirectionalStreamingMessage(stream StreamRequest) returns (stream StreamResponse){};
}
message StreamRequest{
string type = 1;
map<string, bytes> data = 2;
string NodeID = 3;
}
message StreamResponse{
string type = 1;
map<string, bytes> data = 2;
string NodeID = 3;
bytes tasks = 4;
}

View File

@@ -178,6 +178,14 @@ func (iter *ParametersIterator) Next() map[string]interface{} {
return selectedParameters
}
func (iter *ParametersIterator) outParameters() map[string]interface{} {
res := map[string]interface{}{}
for key, params := range iter.data {
res[key] = params
}
return res
}
func genCartesianProduct(multiParameters []Parameters) Parameters {
if len(multiParameters) == 0 {
return nil

299
hrp/server.go Normal file
View File

@@ -0,0 +1,299 @@
package hrp
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/httprunner/httprunner/v4/hrp/internal/boomer"
"github.com/httprunner/httprunner/v4/hrp/internal/json"
)
const jsonContentType = "application/json; encoding=utf-8"
func methods(h http.HandlerFunc, methods ...string) http.HandlerFunc {
methodMap := make(map[string]struct{}, len(methods))
for _, m := range methods {
methodMap[m] = struct{}{}
// GET implies support for HEAD
if m == "GET" {
methodMap["HEAD"] = struct{}{}
}
}
return func(w http.ResponseWriter, r *http.Request) {
if _, ok := methodMap[r.Method]; !ok {
http.Error(w, fmt.Sprintf("method %s not allowed", r.Method), http.StatusMethodNotAllowed)
return
}
h.ServeHTTP(w, r)
}
}
func parseBody(r *http.Request) (data map[string]interface{}, err error) {
if r.Body == nil {
return nil, nil
}
// Always set resp.Data to the incoming request body, in case we don't know
// how to handle the content type
body, err := ioutil.ReadAll(r.Body)
if err != nil {
r.Body.Close()
return nil, err
}
err = json.Unmarshal(body, data)
if err != nil {
return nil, err
}
return data, nil
}
func writeResponse(w http.ResponseWriter, status int, contentType string, body []byte) {
w.Header().Set("Content-Type", contentType)
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(body)))
w.WriteHeader(status)
w.Write(body)
}
func writeJSON(w http.ResponseWriter, body []byte, status int) {
writeResponse(w, status, jsonContentType, body)
}
type StartRequestBody struct {
Worker string `json:"worker"` // all
SpawnCount int64 `json:"spawn_count"`
SpawnRate int64 `json:"spawn_rate"`
TestCasePath string `json:"testcase_path"`
}
type ServerCode int
// server response code
const (
Success ServerCode = iota
ParamsError
ServerError
StopError
)
// ServerStatus stores http response code and message
type ServerStatus struct {
Code ServerCode `json:"code"`
Message string `json:"message"`
}
var EnumAPIResponseSuccess = ServerStatus{
Code: Success,
Message: "success",
}
func EnumAPIResponseParamError(errMsg string) ServerStatus {
return ServerStatus{
Code: ParamsError,
Message: errMsg,
}
}
func EnumAPIResponseServerError(errMsg string) ServerStatus {
return ServerStatus{
Code: ServerError,
Message: errMsg,
}
}
func EnumAPIResponseStopError(errMsg string) ServerStatus {
return ServerStatus{
Code: StopError,
Message: errMsg,
}
}
func CustomAPIResponse(errCode ServerCode, errMsg string) ServerStatus {
return ServerStatus{
Code: errCode,
Message: errMsg,
}
}
type RebalanceRequestBody struct {
Worker string `json:"worker"`
SpawnCount int64 `json:"spawn_count"`
SpawnRate int64 `json:"spawn_rate"`
TestCasePath string `json:"testcase_path"`
}
type StopRequestBody struct {
Worker string `json:"worker"`
}
type QuitRequestBody struct {
Worker string `json:"worker"`
}
type CommonResponseBody struct {
ServerStatus
}
type APIGetWorkersRequestBody struct {
ID string `json:"id"`
State int32 `json:"state"`
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage float64 `json:"memory_usage"`
}
type APIGetWorkersResponseBody struct {
ServerStatus
Data []boomer.WorkerNode `json:"data"`
}
type apiHandler struct {
boomer *HRPBoomer
}
func (b *HRPBoomer) NewAPIHandler() *apiHandler {
return &apiHandler{boomer: b}
}
// Index renders an HTML index page
func (api *apiHandler) Index(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
w.Header().Set("Content-Security-Policy", "default-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' camo.githubusercontent.com")
fmt.Fprintf(w, "Welcome to httprunner page!")
}
func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
var resp *CommonResponseBody
err := api.boomer.Start(data)
if err != nil {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseServerError(err.Error()),
}
} else {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
}
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
}
func (api *apiHandler) Stop(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
api.boomer.Stop()
resp := &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
}
func (api *apiHandler) Quit(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
resp := &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
api.boomer.Quit()
}
func (api *apiHandler) ReBalance(w http.ResponseWriter, r *http.Request) {
data := map[string]interface{}{}
args := r.URL.Query()
for k, vs := range args {
for _, v := range vs {
data[k] = v
}
}
var resp *CommonResponseBody
err := api.boomer.ReBalance(data)
if err != nil {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseParamError(err.Error()),
}
} else {
resp = &CommonResponseBody{
ServerStatus: EnumAPIResponseSuccess,
}
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
}
func (api *apiHandler) GetWorkersInfo(w http.ResponseWriter, r *http.Request) {
resp := &APIGetWorkersResponseBody{
ServerStatus: EnumAPIResponseSuccess,
Data: api.boomer.GetWorkersInfo(),
}
body, _ := json.Marshal(resp)
writeJSON(w, body, http.StatusOK)
}
func (api *apiHandler) Handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", methods(api.Index, "GET"))
mux.HandleFunc("/start", methods(api.Start, "GET"))
mux.HandleFunc("/stop", methods(api.Stop, "GET"))
mux.HandleFunc("/quit", methods(api.Quit, "GET"))
mux.HandleFunc("/rebalance", methods(api.ReBalance, "GET"))
mux.HandleFunc("/workers", methods(api.GetWorkersInfo, "GET"))
return mux
}
func (apiHandler) ServeHTTP(http.ResponseWriter, *http.Request) {}
func (b *HRPBoomer) StartServer() {
h := b.NewAPIHandler()
mux := h.Handler()
server := &http.Server{
Addr: ":9771",
Handler: mux,
}
go func() {
<-b.GetCloseChan()
if err := server.Shutdown(context.Background()); err != nil {
log.Fatal("shutdown server:", err)
}
}()
log.Println("Starting HTTP server...")
err := server.ListenAndServe()
if err != nil {
if err == http.ErrServerClosed {
log.Print("server closed under request")
} else {
log.Fatal("server closed unexpected")
}
}
}

View File

@@ -155,9 +155,9 @@ func (r *Rendezvous) setReleased() {
}
func initRendezvous(testcase *TestCase, total int64) []*Rendezvous {
tCase := testcase.ToTCase()
var rendezvousList []*Rendezvous
for _, step := range tCase.TestSteps {
for _, s := range testcase.TestSteps {
step := s.Struct()
if step.Rendezvous == nil {
continue
}
@@ -188,16 +188,20 @@ func initRendezvous(testcase *TestCase, total int64) []*Rendezvous {
return rendezvousList
}
func waitRendezvous(rendezvousList []*Rendezvous) {
func (r *Rendezvous) updateRendezvousNumber(number int64) {
atomic.StoreInt64(&r.Number, int64(float32(number)*r.Percent))
}
func waitRendezvous(rendezvousList []*Rendezvous, b *HRPBoomer) {
if rendezvousList != nil {
lastRendezvous := rendezvousList[len(rendezvousList)-1]
for _, rendezvous := range rendezvousList {
go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous)
go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous, b)
}
}
}
func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous) {
func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous, b *HRPBoomer) {
for {
// cycle start: block current checking until current rendezvous activated
<-rendezvous.activateChan
@@ -241,6 +245,8 @@ func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous,
if rendezvous == lastRendezvous {
for _, r := range rendezvousList {
r.reset()
// dynamic adjustment based on the number of concurrent users
r.updateRendezvousNumber(int64(b.GetSpawnCount()))
}
} else {
<-lastRendezvous.releaseChan

View File

@@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/httprunner/httprunner/v4/hrp/internal/builtin"
"github.com/mitchellh/mapstructure"
)
// ITestCase represents interface for testcases,
@@ -40,6 +41,11 @@ func (tc *TestCase) ToTCase() *TCase {
Config: tc.Config,
}
for _, step := range tc.TestSteps {
if step.Type() == stepTypeTestCase {
if testcase, ok := step.Struct().TestCase.(*TestCase); ok {
step.Struct().TestCase = testcase.ToTCase()
}
}
tCase.TestSteps = append(tCase.TestSteps, step.Struct())
}
return tCase
@@ -106,13 +112,17 @@ func (tc *TCase) ToTestCase(casePath string) (*TestCase, error) {
tc.Config = &TConfig{Name: "please input testcase name"}
}
tc.Config.Path = casePath
return tc.toTestCase()
}
// toTestCase converts *TCase to *TestCase
func (tc *TCase) toTestCase() (*TestCase, error) {
testCase := &TestCase{
Config: tc.Config,
}
// locate project root dir by plugin path
projectRootDir, err := GetProjectRootDirPath(casePath)
projectRootDir, err := GetProjectRootDirPath(tc.Config.Path)
if err != nil {
return nil, errors.Wrap(err, "failed to get project root dir")
}
@@ -139,40 +149,71 @@ func (tc *TCase) ToTestCase(casePath string) (*TestCase, error) {
for _, step := range tc.TestSteps {
if step.API != nil {
apiPath, ok := step.API.(string)
if ok {
path := filepath.Join(projectRootDir, apiPath)
if !builtin.IsFilePathExists(path) {
return nil, errors.New("referenced api file not found: " + path)
}
refAPI := APIPath(path)
apiContent, err := refAPI.ToAPI()
if err != nil {
return nil, err
}
step.API = apiContent
} else {
apiMap, ok := step.API.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("referenced api should be map or path(string), got %v", step.API)
}
api := &API{}
err = mapstructure.Decode(apiMap, api)
if err != nil {
return nil, err
}
step.API = api
}
_, ok = step.API.(*API)
if !ok {
return nil, fmt.Errorf("referenced api path should be string, got %v", step.API)
return nil, fmt.Errorf("failed to handle referenced API, got %v", step.TestCase)
}
path := filepath.Join(projectRootDir, apiPath)
if !builtin.IsFilePathExists(path) {
return nil, errors.New("referenced api file not found: " + path)
}
refAPI := APIPath(path)
apiContent, err := refAPI.ToAPI()
if err != nil {
return nil, err
}
step.API = apiContent
testCase.TestSteps = append(testCase.TestSteps, &StepAPIWithOptionalArgs{
step: step,
})
} else if step.TestCase != nil {
casePath, ok := step.TestCase.(string)
if !ok {
return nil, fmt.Errorf("referenced testcase path should be string, got %v", step.TestCase)
}
path := filepath.Join(projectRootDir, casePath)
if !builtin.IsFilePathExists(path) {
return nil, errors.New("referenced testcase file not found: " + path)
}
if ok {
path := filepath.Join(projectRootDir, casePath)
if !builtin.IsFilePathExists(path) {
return nil, errors.New("referenced testcase file not found: " + path)
}
refTestCase := TestCasePath(path)
tc, err := refTestCase.ToTestCase()
if err != nil {
return nil, err
refTestCase := TestCasePath(path)
tc, err := refTestCase.ToTestCase()
if err != nil {
return nil, err
}
step.TestCase = tc
} else {
testCaseMap, ok := step.TestCase.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("referenced testcase should be map or path(string), got %v", step.TestCase)
}
tCase := &TCase{}
err = mapstructure.Decode(testCaseMap, tCase)
if err != nil {
return nil, err
}
tc, err := tCase.toTestCase()
if err != nil {
return nil, err
}
step.TestCase = tc
}
_, ok = step.TestCase.(*TestCase)
if !ok {
return nil, fmt.Errorf("failed to handle referenced testcase, got %v", step.TestCase)
}
step.TestCase = tc
testCase.TestSteps = append(testCase.TestSteps, &StepTestCaseWithOptionalArgs{
step: step,
})