diff --git a/docs/cmd/hrp_boom.md b/docs/cmd/hrp_boom.md index 85ceb90e..d79d7f14 100644 --- a/docs/cmd/hrp_boom.md +++ b/docs/cmd/hrp_boom.md @@ -21,13 +21,21 @@ hrp boom [flags] ### Options ``` + --autostart Starts the test immediately (without disabling the web UI). Use --spawn-count and --spawn-rate to control user count and run time --cpu-profile string Enable CPU profiling. --cpu-profile-duration duration CPU profile duration. (default 30s) --disable-compression Disable compression --disable-console-output Disable console output. --disable-keepalive Disable keepalive + --expect-workers int How many workers master should expect to connect before starting the test (only when --autostart is used (default 1) + --expect-workers-max-wait int How many workers master should expect to connect before starting the test (only when --autostart is used -h, --help help for boom --loop-count int The specify running cycles for load testing (default -1) + --master master of distributed testing + --master-bind-host string Interfaces (hostname, ip) that hrp master should bind to. Only used when running with --master. Defaults to * (all available interfaces). (default "127.0.0.1") + --master-bind-port int Port that hrp master should bind to. Only used when running with --master. Defaults to 5557. (default 5557) + --master-host string Host or IP address of hrp master for distributed load testing. (default "127.0.0.1") + --master-port int The port to connect to that is used by the hrp master for distributed load testing. (default 5557) --max-rps int Max RPS that boomer can generate, disabled by default. --mem-profile string Enable memory profiling. --mem-profile-duration duration Memory profile duration. (default 30s) @@ -36,6 +44,7 @@ hrp boom [flags] --request-increase-rate string Request increase rate, disabled by default. (default "-1") --spawn-count int The number of users to spawn for load testing (default 1) --spawn-rate float The rate for spawning users (default 1) + --worker worker of distributed testing ``` ### SEE ALSO diff --git a/go.mod b/go.mod index f4d02e95..1bc11398 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,13 @@ module github.com/httprunner/httprunner/v4 -go 1.16 +go 1.18 require ( github.com/andybalholm/brotli v1.0.4 github.com/denisbrodbeck/machineid v1.0.1 github.com/fatih/color v1.13.0 github.com/getsentry/sentry-go v0.13.0 + github.com/go-errors/errors v1.0.1 github.com/go-openapi/spec v0.20.6 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.3.0 @@ -17,14 +18,20 @@ require ( github.com/json-iterator/go v1.1.12 github.com/maja42/goval v1.2.1 github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/mitchellh/mapstructure v1.4.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/rs/zerolog v1.26.1 + github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/cobra v1.2.1 github.com/stretchr/testify v1.7.0 + github.com/tklauser/go-sysconf v0.3.10 // indirect + github.com/yusufpapurcu/wmi v1.2.2 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f - gopkg.in/yaml.v3 v3.0.0 + google.golang.org/grpc v1.45.0 + google.golang.org/protobuf v1.27.1 + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) // replace github.com/httprunner/funplugin => ../funplugin diff --git a/go.sum b/go.sum index b7292409..a36ed4ad 100644 --- a/go.sum +++ b/go.sum @@ -132,6 +132,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -352,6 +354,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -421,6 +424,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -452,6 +457,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= +github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= +github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o= +github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= @@ -474,6 +483,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= @@ -624,6 +635,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -668,6 +680,7 @@ golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 h1:y/woIyUBFbpQGKS0u1aHF/40WUDnek3fPOyD08H5Vng= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -877,9 +890,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/hrp/boomer.go b/hrp/boomer.go index c44448f2..57d3695f 100644 --- a/hrp/boomer.go +++ b/hrp/boomer.go @@ -6,13 +6,13 @@ import ( "time" "github.com/httprunner/funplugin" - "github.com/rs/zerolog/log" - "github.com/httprunner/httprunner/v4/hrp/internal/boomer" + "github.com/httprunner/httprunner/v4/hrp/internal/json" "github.com/httprunner/httprunner/v4/hrp/internal/sdk" + "github.com/rs/zerolog/log" ) -func NewBoomer(spawnCount int, spawnRate float64) *HRPBoomer { +func NewStandaloneBoomer(spawnCount int, spawnRate float64) *HRPBoomer { b := &HRPBoomer{ Boomer: boomer.NewStandaloneBoomer(spawnCount, spawnRate), pluginsMutex: new(sync.RWMutex), @@ -22,6 +22,27 @@ func NewBoomer(spawnCount int, spawnRate float64) *HRPBoomer { return b } +func NewMasterBoomer(masterBindHost string, masterBindPort int) *HRPBoomer { + b := &HRPBoomer{ + Boomer: boomer.NewMasterBoomer(masterBindHost, masterBindPort), + pluginsMutex: new(sync.RWMutex), + } + b.hrpRunner = NewRunner(nil) + return b +} + +func NewWorkerBoomer(masterHost string, masterPort int) *HRPBoomer { + b := &HRPBoomer{ + Boomer: boomer.NewWorkerBoomer(masterHost, masterPort), + pluginsMutex: new(sync.RWMutex), + } + + b.hrpRunner = NewRunner(nil) + // set client transport for high concurrency load testing + b.hrpRunner.SetClientTransport(b.GetSpawnCount(), b.GetDisableKeepAlive(), b.GetDisableCompression()) + return b +} + type HRPBoomer struct { *boomer.Boomer hrpRunner *HRPRunner @@ -52,8 +73,12 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { // report execution timing event defer sdk.SendEvent(event.StartTiming("execution")) - var taskSlice []*boomer.Task + taskSlice := b.ConvertTestCasesToTasks(testcases...) + b.Boomer.Run(taskSlice...) +} + +func (b *HRPBoomer) ConvertTestCasesToTasks(testcases ...ITestCase) (taskSlice []*boomer.Task) { // load all testcases testCases, err := LoadTestCases(testcases...) if err != nil { @@ -74,15 +99,107 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount())) task := b.convertBoomerTask(testcase, rendezvousList) taskSlice = append(taskSlice, task) - waitRendezvous(rendezvousList) + waitRendezvous(rendezvousList, b) } - b.Boomer.Run(taskSlice...) + return taskSlice +} + +func (b *HRPBoomer) LoopTestCases() { + for { + select { + case <-b.Boomer.ParseTestCasesChan(): + var tcs []ITestCase + for _, tc := range b.GetTestCasesPath() { + tcp := TestCasePath(tc) + tcs = append(tcs, &tcp) + } + b.GetTestCaseBytesChan() <- b.TestCasesToBytes(tcs...) + log.Info().Msg("put testcase successful") + case <-b.Boomer.GetCloseChan(): + return + } + } +} + +func (b *HRPBoomer) OutTestCases(testCases []*TestCase) []*TCase { + var outTestCases []*TCase + for _, tc := range testCases { + caseRunner, err := b.hrpRunner.newCaseRunner(tc) + if err != nil { + log.Error().Err(err).Msg("failed to create runner") + os.Exit(1) + } + caseRunner.parsedConfig.Parameters = caseRunner.parametersIterator.outParameters() + outTestCases = append(outTestCases, &TCase{ + Config: caseRunner.parsedConfig, + TestSteps: caseRunner.testCase.ToTCase().TestSteps, + }) + } + return outTestCases +} + +func (b *HRPBoomer) TestCasesToBytes(testcases ...ITestCase) []byte { + // load all testcases + testCases, err := LoadTestCases(testcases...) + if err != nil { + log.Error().Err(err).Msg("failed to load testcases") + os.Exit(1) + } + tcs := b.OutTestCases(testCases) + testCasesBytes, err := json.Marshal(tcs) + if err != nil { + log.Error().Err(err).Msg("failed to marshal testcases") + return nil + } + return testCasesBytes +} + +func (b *HRPBoomer) BytesToTestCases(testCasesBytes []byte) []*TCase { + var testcase []*TCase + err := json.Unmarshal(testCasesBytes, &testcase) + if err != nil { + log.Error().Err(err).Msg("failed to unmarshal testcases") + } + return testcase } func (b *HRPBoomer) Quit() { b.Boomer.Quit() } +func (b *HRPBoomer) handleTasks(tcs []byte) { + //Todo: 过滤掉已经传输过的task + testCases := b.BytesToTestCases(tcs) + var testcases []ITestCase + for _, tc := range testCases { + tesecase, err := tc.toTestCase() + if err != nil { + log.Error().Err(err).Msg("failed to load testcases") + } + testcases = append(testcases, tesecase) + } + log.Info().Interface("testcases", testcases).Msg("loop tasks successful") + if b.Boomer.GetState() == boomer.StateRunning || b.Boomer.GetState() == boomer.StateSpawning { + b.Boomer.SetTasks(b.ConvertTestCasesToTasks(testcases...)...) + } else { + b.Run(testcases...) + } +} + +func (b *HRPBoomer) LoopTasks() { + for { + select { + case tcs := <-b.Boomer.GetTestCaseBytesChan(): + if len(b.Boomer.GetTestCaseBytesChan()) > 0 { + continue + } + go b.handleTasks(tcs) + case <-b.Boomer.GetCloseChan(): + return + } + } +} + func (b *HRPBoomer) convertBoomerTask(testcase *TestCase, rendezvousList []*Rendezvous) *boomer.Task { // init runner for testcase // this runner is shared by multiple session runners diff --git a/hrp/boomer_test.go b/hrp/boomer_test.go index 547a4618..83151b5e 100644 --- a/hrp/boomer_test.go +++ b/hrp/boomer_test.go @@ -27,7 +27,7 @@ func TestBoomerStandaloneRun(t *testing.T) { } testcase2 := TestCasePath(demoTestCaseWithPluginJSONPath) - b := NewBoomer(2, 1) + b := NewStandaloneBoomer(2, 1) go b.Run(testcase1, &testcase2) time.Sleep(5 * time.Second) b.Quit() diff --git a/hrp/cmd/boom.go b/hrp/cmd/boom.go index 120109e0..09d87f2a 100644 --- a/hrp/cmd/boom.go +++ b/hrp/cmd/boom.go @@ -21,7 +21,7 @@ var boomCmd = &cobra.Command{ Example: ` $ hrp boom demo.json # run specified json testcase file $ hrp boom demo.yaml # run specified yaml testcase file $ hrp boom examples/ # run testcases in specified folder`, - Args: cobra.MinimumNArgs(1), + Args: cobra.MinimumNArgs(0), PreRun: func(cmd *cobra.Command, args []string) { boomer.SetUlimit(10240) // ulimit -n 10240 if !strings.EqualFold(logLevel, "DEBUG") { @@ -35,8 +35,65 @@ var boomCmd = &cobra.Command{ path := hrp.TestCasePath(arg) paths = append(paths, &path) } - hrpBoomer := makeHRPBoomer() - hrpBoomer.Run(paths...) + + // if set profile, the priority is higher than the other commands + if boomArgs.profile != "" { + err := builtin.LoadFile(boomArgs.profile, &boomArgs) + if err != nil { + log.Error().Err(err).Msg("failed to load profile") + os.Exit(1) + } + } + + var hrpBoomer *hrp.HRPBoomer + if boomArgs.master { + hrpBoomer = hrp.NewMasterBoomer(boomArgs.masterBindHost, boomArgs.masterBindPort) + hrpBoomer.SetTestCasesPath(args) + if boomArgs.autoStart { + hrpBoomer.SetAutoStart() + hrpBoomer.SetExpectWorkers(boomArgs.expectWorkers, boomArgs.expectWorkersMaxWait) + hrpBoomer.SetSpawnCount(boomArgs.SpawnCount) + hrpBoomer.SetSpawnRate(boomArgs.SpawnRate) + } + hrpBoomer.EnableGracefulQuit() + go hrpBoomer.StartServer() + go hrpBoomer.RunMaster() + hrpBoomer.LoopTestCases() + return + } else if boomArgs.worker { + hrpBoomer = hrp.NewWorkerBoomer(boomArgs.masterHost, boomArgs.masterPort) + if boomArgs.ignoreQuit { + hrpBoomer.SetIgnoreQuit() + } + go hrpBoomer.RunWorker() + } else { + hrpBoomer = hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) + if boomArgs.LoopCount > 0 { + hrpBoomer.SetLoopCount(boomArgs.LoopCount) + } + } + hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate) + if !boomArgs.DisableConsoleOutput { + + hrpBoomer.AddOutput(boomer.NewConsoleOutput()) + } + if boomArgs.PrometheusPushgatewayURL != "" { + hrpBoomer.AddOutput(boomer.NewPrometheusPusherOutput(boomArgs.PrometheusPushgatewayURL, "hrp", hrpBoomer.GetMode())) + } + hrpBoomer.SetDisableKeepAlive(boomArgs.DisableKeepalive) + hrpBoomer.SetDisableCompression(boomArgs.DisableCompression) + hrpBoomer.SetClientTransport() + if venv != "" { + hrpBoomer.SetPython3Venv(venv) + } + hrpBoomer.EnableCPUProfile(boomArgs.CPUProfile, boomArgs.CPUProfileDuration) + hrpBoomer.EnableMemoryProfile(boomArgs.MemoryProfile, boomArgs.MemoryProfileDuration) + hrpBoomer.EnableGracefulQuit() + if boomArgs.worker { + hrpBoomer.LoopTasks() + } else { + hrpBoomer.Run(paths...) + } }, } @@ -55,6 +112,16 @@ type BoomArgs struct { DisableCompression bool `json:"disable-compression,omitempty" yaml:"disable-compression,omitempty"` DisableKeepalive bool `json:"disable-keepalive,omitempty" yaml:"disable-keepalive,omitempty"` profile string + master bool + worker bool + ignoreQuit bool + masterHost string + masterPort int + masterBindHost string + masterBindPort int + autoStart bool + expectWorkers int + expectWorkersMaxWait int } var boomArgs BoomArgs @@ -76,6 +143,16 @@ func init() { boomCmd.Flags().BoolVar(&boomArgs.DisableCompression, "disable-compression", false, "Disable compression") boomCmd.Flags().BoolVar(&boomArgs.DisableKeepalive, "disable-keepalive", false, "Disable keepalive") boomCmd.Flags().StringVar(&boomArgs.profile, "profile", "", "profile for load testing") + boomCmd.Flags().BoolVar(&boomArgs.master, "master", false, "master of distributed testing") + boomCmd.Flags().StringVar(&boomArgs.masterBindHost, "master-bind-host", "127.0.0.1", "Interfaces (hostname, ip) that hrp master should bind to. Only used when running with --master. Defaults to * (all available interfaces).") + boomCmd.Flags().IntVar(&boomArgs.masterBindPort, "master-bind-port", 5557, "Port that hrp master should bind to. Only used when running with --master. Defaults to 5557.") + boomCmd.Flags().BoolVar(&boomArgs.worker, "worker", false, "worker of distributed testing") + boomCmd.Flags().BoolVar(&boomArgs.ignoreQuit, "ignore-quit", false, "ignores quit from master (only when --worker is used)") + boomCmd.Flags().StringVar(&boomArgs.masterHost, "master-host", "127.0.0.1", "Host or IP address of hrp master for distributed load testing.") + boomCmd.Flags().IntVar(&boomArgs.masterPort, "master-port", 5557, "The port to connect to that is used by the hrp master for distributed load testing.") + boomCmd.Flags().BoolVar(&boomArgs.autoStart, "autostart", false, "Starts the test immediately (without disabling the web UI). Use --spawn-count and --spawn-rate to control user count and run time") + boomCmd.Flags().IntVar(&boomArgs.expectWorkers, "expect-workers", 1, "How many workers master should expect to connect before starting the test (only when --autostart is used)") + boomCmd.Flags().IntVar(&boomArgs.expectWorkersMaxWait, "expect-workers-max-wait", 0, "How many workers master should expect to connect before starting the test (only when --autostart is used") } func makeHRPBoomer() *hrp.HRPBoomer { @@ -88,7 +165,7 @@ func makeHRPBoomer() *hrp.HRPBoomer { } } - hrpBoomer := hrp.NewBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) + hrpBoomer := hrp.NewStandaloneBoomer(boomArgs.SpawnCount, boomArgs.SpawnRate) hrpBoomer.SetRateLimiter(boomArgs.MaxRPS, boomArgs.RequestIncreaseRate) if boomArgs.LoopCount > 0 { hrpBoomer.SetLoopCount(boomArgs.LoopCount) diff --git a/hrp/config.go b/hrp/config.go index 8d50c690..06930564 100644 --- a/hrp/config.go +++ b/hrp/config.go @@ -98,7 +98,7 @@ func (c *TConfig) SetWebSocket(times, interval, timeout, size int64) { } type ThinkTimeConfig struct { - Strategy thinkTimeStrategy `json:"strategy,omitempty" yaml:"strategy,omitempty"` // default、random、limit、multiply、ignore + Strategy thinkTimeStrategy `json:"strategy,omitempty" yaml:"strategy,omitempty"` // default、random、multiply、ignore Setting interface{} `json:"setting,omitempty" yaml:"setting,omitempty"` // random(map): {"min_percentage": 0.5, "max_percentage": 1.5}; 10、multiply(float64): 1.5 Limit float64 `json:"limit,omitempty" yaml:"limit,omitempty"` // limit think time no more than specific time, ignore if value <= 0 } diff --git a/hrp/internal/boomer/boomer.go b/hrp/internal/boomer/boomer.go index cc424b12..a4e9bb72 100644 --- a/hrp/internal/boomer/boomer.go +++ b/hrp/internal/boomer/boomer.go @@ -4,9 +4,13 @@ import ( "math" "os" "os/signal" + "strconv" + "strings" "syscall" "time" + "github.com/httprunner/httprunner/v4/hrp/internal/builtin" + "github.com/pkg/errors" "github.com/rs/zerolog/log" ) @@ -25,9 +29,18 @@ const ( // A Boomer is used to run tasks. type Boomer struct { - mode Mode + masterHost string + masterPort int + mode Mode - localRunner *localRunner + localRunner *localRunner + workerRunner *workerRunner + masterRunner *masterRunner + + testcasePath []string + + spawnCount int // target clients to spawn + spawnRate float64 cpuProfile string cpuProfileDuration time.Duration @@ -73,9 +86,101 @@ func NewStandaloneBoomer(spawnCount int, spawnRate float64) *Boomer { return &Boomer{ mode: StandaloneMode, localRunner: newLocalRunner(spawnCount, spawnRate), + spawnCount: spawnCount, + spawnRate: spawnRate, } } +// NewMasterBoomer returns a new Boomer. +func NewMasterBoomer(masterBindHost string, masterBindPort int) *Boomer { + return &Boomer{ + masterRunner: newMasterRunner(masterBindHost, masterBindPort), + mode: DistributedMasterMode, + } +} + +// NewWorkerBoomer returns a new Boomer. +func NewWorkerBoomer(masterHost string, masterPort int) *Boomer { + return &Boomer{ + workerRunner: newWorkerRunner(masterHost, masterPort), + masterHost: masterHost, + masterPort: masterPort, + mode: DistributedWorkerMode, + } +} + +// SetAutoStart auto start to load testing +func (b *Boomer) SetAutoStart() { + b.masterRunner.autoStart = true + +} + +// RunMaster start to run master runner +func (b *Boomer) RunMaster() { + b.masterRunner.run() +} + +// RunWorker start to run worker runner +func (b *Boomer) RunWorker() { + b.workerRunner.run() +} + +// GetTestCaseBytesChan gets test case bytes chan +func (b *Boomer) GetTestCaseBytesChan() chan []byte { + switch b.mode { + case DistributedMasterMode: + return b.masterRunner.testCaseBytes + case DistributedWorkerMode: + return b.workerRunner.testCaseBytes + } + return nil +} + +func (b *Boomer) SetTestCasesPath(paths []string) { + b.testcasePath = paths +} + +func (b *Boomer) GetTestCasesPath() []string { + return b.testcasePath +} + +func (b *Boomer) ParseTestCasesChan() chan bool { + return b.masterRunner.parseTestCasesChan +} + +// GetState gets worker state +func (b *Boomer) GetState() int32 { + switch b.mode { + case DistributedWorkerMode: + return b.workerRunner.getState() + case DistributedMasterMode: + return b.masterRunner.getState() + default: + return b.localRunner.getState() + } +} + +// SetSpawnCount sets spawn count +func (b *Boomer) SetSpawnCount(spawnCount int) { + b.spawnCount = spawnCount + if b.mode == DistributedMasterMode { + b.masterRunner.spawn.setSpawn(int64(spawnCount), -1) + } +} + +// SetSpawnRate sets spawn rate +func (b *Boomer) SetSpawnRate(spawnRate float64) { + b.spawnRate = spawnRate + if b.mode == DistributedMasterMode { + b.masterRunner.spawn.setSpawn(-1, spawnRate) + } +} + +// SetExpectWorkers sets expect workers while load testing +func (b *Boomer) SetExpectWorkers(expectWorkers int, expectWorkersMaxWait int) { + b.masterRunner.setExpectWorkers(expectWorkers, expectWorkersMaxWait) +} + // SetRateLimiter creates rate limiter with the given limit and burst. func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) { var rateLimiter RateLimiter @@ -98,8 +203,14 @@ func (b *Boomer) SetRateLimiter(maxRPS int64, requestIncreaseRate string) { } if rateLimiter != nil { - b.localRunner.rateLimitEnabled = true - b.localRunner.rateLimiter = rateLimiter + switch b.mode { + case DistributedWorkerMode: + b.workerRunner.rateLimitEnabled = true + b.workerRunner.rateLimiter = rateLimiter + case StandaloneMode: + b.localRunner.rateLimitEnabled = true + b.localRunner.rateLimiter = rateLimiter + } } } @@ -108,6 +219,11 @@ func (b *Boomer) SetDisableKeepAlive(disableKeepalive bool) { b.disableKeepalive = disableKeepalive } +// SetIgnoreQuit not quit while master quit +func (b *Boomer) SetIgnoreQuit() { + b.workerRunner.ignoreQuit = true +} + // SetDisableCompression disable compression to prevent the Transport from requesting compression with an "Accept-Encoding: gzip" func (b *Boomer) SetDisableCompression(disableCompression bool) { b.disableCompression = disableCompression @@ -124,12 +240,26 @@ func (b *Boomer) GetDisableCompression() bool { // SetLoopCount set loop count for test. func (b *Boomer) SetLoopCount(loopCount int64) { // total loop count for testcase, it will be evenly distributed to each worker - b.localRunner.loop = &Loop{loopCount: loopCount * int64(b.localRunner.spawnCount)} + switch b.mode { + case DistributedWorkerMode: + b.workerRunner.loop = &Loop{loopCount: loopCount * b.workerRunner.spawn.getSpawnCount()} + case DistributedMasterMode: + b.masterRunner.loop = &Loop{loopCount: loopCount * b.masterRunner.spawn.getSpawnCount()} + case StandaloneMode: + b.localRunner.loop = &Loop{loopCount: loopCount * b.localRunner.spawn.getSpawnCount()} + } } // AddOutput accepts outputs which implements the boomer.Output interface. func (b *Boomer) AddOutput(o Output) { - b.localRunner.addOutput(o) + switch b.mode { + case DistributedWorkerMode: + b.workerRunner.addOutput(o) + case DistributedMasterMode: + b.masterRunner.addOutput(o) + case StandaloneMode: + b.localRunner.addOutput(o) + } } // EnableCPUProfile will start cpu profiling after run. @@ -150,6 +280,9 @@ func (b *Boomer) EnableGracefulQuit() { signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) go func() { <-c + if b.mode == DistributedWorkerMode { + b.workerRunner.ignoreQuit = false + } b.Quit() }() } @@ -169,13 +302,45 @@ func (b *Boomer) Run(tasks ...*Task) { } } - b.localRunner.setTasks(tasks) - b.localRunner.start() + switch b.mode { + case DistributedWorkerMode: + log.Info().Msg("running in worker mode") + b.workerRunner.setTasks(tasks) + b.workerRunner.start() + case StandaloneMode: + log.Info().Msg("running in standalone mode") + b.localRunner.setTasks(tasks) + b.localRunner.start() + default: + log.Error().Err(errors.New("Invalid mode, expected boomer.DistributedMode or boomer.StandaloneMode")) + } +} + +func (b *Boomer) SetTasks(tasks ...*Task) { + switch b.mode { + case DistributedWorkerMode: + log.Info().Msg("set tasks to worker") + b.workerRunner.setTasks(tasks) + case StandaloneMode: + log.Info().Msg("set tasks to standalone") + b.localRunner.setTasks(tasks) + default: + log.Error().Err(errors.New("Invalid mode, expected boomer.DistributedMode or boomer.StandaloneMode")) + } } // RecordTransaction reports a transaction stat. func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, contentSize int64) { - b.localRunner.stats.transactionChan <- &transaction{ + var runnerStats *requestStats + switch b.mode { + case DistributedWorkerMode: + runnerStats = b.workerRunner.stats + case DistributedMasterMode: + runnerStats = b.masterRunner.stats + case StandaloneMode: + runnerStats = b.localRunner.stats + } + runnerStats.transactionChan <- &transaction{ name: name, success: success, elapsedTime: elapsedTime, @@ -185,7 +350,16 @@ func (b *Boomer) RecordTransaction(name string, success bool, elapsedTime int64, // RecordSuccess reports a success. func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) { - b.localRunner.stats.requestSuccessChan <- &requestSuccess{ + var runnerStats *requestStats + switch b.mode { + case DistributedWorkerMode: + runnerStats = b.workerRunner.stats + case DistributedMasterMode: + runnerStats = b.masterRunner.stats + case StandaloneMode: + runnerStats = b.localRunner.stats + } + runnerStats.requestSuccessChan <- &requestSuccess{ requestType: requestType, name: name, responseTime: responseTime, @@ -195,7 +369,16 @@ func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, res // RecordFailure reports a failure. func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) { - b.localRunner.stats.requestFailureChan <- &requestFailure{ + var runnerStats *requestStats + switch b.mode { + case DistributedWorkerMode: + runnerStats = b.workerRunner.stats + case DistributedMasterMode: + runnerStats = b.masterRunner.stats + case StandaloneMode: + runnerStats = b.localRunner.stats + } + runnerStats.requestFailureChan <- &requestFailure{ requestType: requestType, name: name, responseTime: responseTime, @@ -203,19 +386,139 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc } } +// Start starts to run +func (b *Boomer) Start(Args map[string]interface{}) error { + spawnCount, ok := Args["spawn_count"] + if ok { + v, err := strconv.Atoi(spawnCount.(string)) + if err != nil { + log.Error().Err(err).Msg("spawn_count sets error") + return err + } + b.SetSpawnCount(v) + } else { + return errors.New("spawn count error") + } + spawnRate, ok := Args["spawn_rate"] + if ok { + v, err := builtin.Interface2Float64(spawnRate) + if err != nil { + log.Error().Err(err).Msg("spawn_count sets error") + return err + } + b.SetSpawnRate(v) + } else { + b.SetSpawnRate(float64(b.GetSpawnCount())) + } + path, ok := Args["path"].(string) + if ok { + paths := strings.Split(path, ",") + b.SetTestCasesPath(paths) + } else { + return errors.New("testcase path error") + } + err := b.masterRunner.start() + return err +} + +// ReBalance starts to rebalance load test +func (b *Boomer) ReBalance(Args map[string]interface{}) error { + spawnCount, ok := Args["spawn_count"] + if ok { + v, err := strconv.Atoi(spawnCount.(string)) + if err != nil { + log.Error().Err(err).Msg("spawn_count sets error") + return err + } + b.SetSpawnCount(v) + } + spawnRate, ok := Args["spawn_rate"] + if ok { + v, err := builtin.Interface2Float64(spawnRate) + if err != nil { + log.Error().Err(err).Msg("spawn_count sets error") + return err + } + b.SetSpawnRate(v) + } + path, ok := Args["path"].(string) + if ok { + paths := strings.Split(path, ",") + b.SetTestCasesPath(paths) + } + err := b.masterRunner.rebalance() + if err != nil { + log.Error().Err(err).Msg("failed to rebalance") + } + return err +} + +// Stop stops to load test +func (b *Boomer) Stop() { + switch b.mode { + case DistributedMasterMode: + b.masterRunner.stop() + default: + } +} + +// GetWorkersInfo gets workers +func (b *Boomer) GetWorkersInfo() []WorkerNode { + return b.masterRunner.server.getAllWorkers() +} + +func (b *Boomer) GetCloseChan() chan bool { + switch b.mode { + case DistributedWorkerMode: + return b.workerRunner.closeChan + case DistributedMasterMode: + return b.masterRunner.closeChan + default: + return b.localRunner.closeChan + } +} + // Quit will send a quit message to the master. func (b *Boomer) Quit() { - b.localRunner.stop() + switch b.mode { + case DistributedWorkerMode: + b.workerRunner.close() + case DistributedMasterMode: + b.masterRunner.close() + case StandaloneMode: + b.localRunner.stop() + } } func (b *Boomer) GetSpawnDoneChan() chan struct{} { - return b.localRunner.spawnDone + switch b.mode { + case DistributedWorkerMode: + return b.workerRunner.spawn.getSpawnDone() + case DistributedMasterMode: + return b.masterRunner.spawn.getSpawnDone() + default: + return b.localRunner.spawn.getSpawnDone() + } } func (b *Boomer) GetSpawnCount() int { - return b.localRunner.spawnCount + switch b.mode { + case DistributedWorkerMode: + return int(b.workerRunner.spawn.getSpawnCount()) + case DistributedMasterMode: + return int(b.masterRunner.spawn.getSpawnCount()) + default: + return int(b.localRunner.spawn.getSpawnCount()) + } } func (b *Boomer) ResetStartTime() { - b.localRunner.stats.total.resetStartTime() + switch b.mode { + case DistributedWorkerMode: + b.workerRunner.stats.total.resetStartTime() + case DistributedMasterMode: + b.masterRunner.stats.total.resetStartTime() + default: + b.localRunner.stats.total.resetStartTime() + } } diff --git a/hrp/internal/boomer/boomer_test.go b/hrp/internal/boomer/boomer_test.go index 7f113f87..fde9b37b 100644 --- a/hrp/internal/boomer/boomer_test.go +++ b/hrp/internal/boomer/boomer_test.go @@ -12,11 +12,11 @@ import ( func TestNewStandaloneBoomer(t *testing.T) { b := NewStandaloneBoomer(100, 10) - if b.localRunner.spawnCount != 100 { + if b.localRunner.spawn.spawnCount != 100 { t.Error("spawnCount should be 100") } - if b.localRunner.spawnRate != 10 { + if b.localRunner.spawn.spawnRate != 10 { t.Error("spawnRate should be 10") } } diff --git a/hrp/internal/boomer/client.go b/hrp/internal/boomer/client.go new file mode 100644 index 00000000..b3bf6def --- /dev/null +++ b/hrp/internal/boomer/client.go @@ -0,0 +1,9 @@ +package boomer + +type client interface { + connect() (err error) + close() + recvChannel() chan *genericMessage + sendChannel() chan *genericMessage + disconnectedChannel() chan bool +} diff --git a/hrp/internal/boomer/client_grpc.go b/hrp/internal/boomer/client_grpc.go new file mode 100644 index 00000000..5fa33cc1 --- /dev/null +++ b/hrp/internal/boomer/client_grpc.go @@ -0,0 +1,224 @@ +package boomer + +import ( + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/httprunner/httprunner/v4/hrp/internal/grpc/messager" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" +) + +type grpcClient struct { + masterHost string + masterPort int + identity string // nodeID + + config *grpcClientConfig + + fromMaster chan *genericMessage + toMaster chan *genericMessage + disconnectedFromMaster chan bool + shutdownChan chan bool + + failCount int32 + + wg sync.WaitGroup +} + +type grpcClientConfig struct { + ctx context.Context + cancel context.CancelFunc // use cancel() to stop client + conn *grpc.ClientConn + biStream messager.Message_BidirectionalStreamingMessageClient + + mutex sync.RWMutex +} + +func (c *grpcClientConfig) getBiStreamClient() messager.Message_BidirectionalStreamingMessageClient { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.biStream +} + +func (c *grpcClientConfig) setBiStreamClient(s messager.Message_BidirectionalStreamingMessageClient) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.biStream = s +} + +func newClient(masterHost string, masterPort int, identity string) (client *grpcClient) { + log.Info().Msg("Boomer is built with grpc support.") + // Initiate the stream with a context that supports cancellation. + ctx, cancel := context.WithCancel(context.Background()) + client = &grpcClient{ + masterHost: masterHost, + masterPort: masterPort, + identity: identity, + fromMaster: make(chan *genericMessage, 100), + toMaster: make(chan *genericMessage, 100), + disconnectedFromMaster: make(chan bool), + shutdownChan: make(chan bool), + config: &grpcClientConfig{ + ctx: ctx, + cancel: cancel, + mutex: sync.RWMutex{}, + }, + } + return client +} + +func (c *grpcClient) connect() (err error) { + addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort) + c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + log.Error().Err(err).Msg("failed to connect") + return err + } + + biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx) + if err != nil { + log.Error().Err(err).Msg("call bidirectional streaming message err") + return err + } + c.config.setBiStreamClient(biStream) + log.Info().Msg(fmt.Sprintf("Boomer is connected to master(%s) press Ctrl+c to quit.\n", addr)) + go c.recv() + go c.send() + + return nil +} + +func (c *grpcClient) reConnect() (err error) { + addr := fmt.Sprintf("%v:%v", c.masterHost, c.masterPort) + c.config.conn, err = grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + return + } + + biStream, err := messager.NewMessageClient(c.config.conn).BidirectionalStreamingMessage(c.config.ctx) + if err != nil { + return + } + c.config.setBiStreamClient(biStream) + + // register worker information to master + c.sendChannel() <- newGenericMessage("register", nil, c.identity) + //// tell master, I'm ready + //log.Info().Msg("send client ready signal") + //c.sendChannel() <- newClientReadyMessageToMaster(c.identity) + log.Info().Msg(fmt.Sprintf("Boomer is reConnected to master(%s) press Ctrl+c to quit.\n", addr)) + return +} + +func (c *grpcClient) close() { + close(c.shutdownChan) + c.config.cancel() + if c.config.conn != nil { + c.config.conn.Close() + } +} + +func (c *grpcClient) recvChannel() chan *genericMessage { + return c.fromMaster +} + +func (c *grpcClient) recv() { + c.wg.Add(1) + defer c.wg.Done() + for { + select { + case <-c.shutdownChan: + return + default: + if c.config.getBiStreamClient() == nil { + continue + } + msg, err := c.config.getBiStreamClient().Recv() + if err != nil { + time.Sleep(1 * time.Second) + //log.Error().Err(err).Msg("failed to get message") + continue + } + if msg == nil { + continue + } + + if msg.NodeID != c.identity { + log.Warn(). + Str("nodeID", msg.NodeID). + Str("type", msg.Type). + Interface("data", msg.Data). + Msg(fmt.Sprintf("not for me(%s)", c.identity)) + continue + } + + c.fromMaster <- &genericMessage{ + Type: msg.Type, + Data: msg.Data, + NodeID: msg.NodeID, + Tasks: msg.Tasks, + } + + log.Info(). + Str("nodeID", msg.NodeID). + Str("type", msg.Type). + Interface("data", msg.Data). + Interface("tasks", msg.Tasks). + Msg("receive data from master") + } + } +} + +func (c *grpcClient) sendChannel() chan *genericMessage { + return c.toMaster +} + +func (c *grpcClient) send() { + c.wg.Add(1) + defer c.wg.Done() + for { + select { + case <-c.shutdownChan: + return + case msg := <-c.toMaster: + c.sendMessage(msg) + + // We may send genericMessage to master. + switch msg.Type { + case "quit": + c.disconnectedFromMaster <- true + } + } + } +} + +func (c *grpcClient) sendMessage(msg *genericMessage) { + log.Info(). + Str("nodeID", msg.NodeID). + Str("type", msg.Type). + Interface("data", msg.Data). + Msg("send data to server") + if c.config.getBiStreamClient() == nil { + return + } + err := c.config.getBiStreamClient().Send(&messager.StreamRequest{Type: msg.Type, Data: msg.Data, NodeID: msg.NodeID}) + switch err { + case nil: + atomic.StoreInt32(&c.failCount, 0) + break + case io.EOF: + fallthrough + default: + //log.Error().Err(err).Interface("genericMessage", *msg).Msg("failed to send message") + atomic.AddInt32(&c.failCount, 1) + } +} + +func (c *grpcClient) disconnectedChannel() chan bool { + return c.disconnectedFromMaster +} diff --git a/hrp/internal/boomer/client_grpc_test.go b/hrp/internal/boomer/client_grpc_test.go new file mode 100644 index 00000000..853e847e --- /dev/null +++ b/hrp/internal/boomer/client_grpc_test.go @@ -0,0 +1 @@ +package boomer diff --git a/hrp/internal/boomer/message.go b/hrp/internal/boomer/message.go new file mode 100644 index 00000000..93b9a0b3 --- /dev/null +++ b/hrp/internal/boomer/message.go @@ -0,0 +1,51 @@ +package boomer + +const ( + typeClientReady = "client_ready" + typeClientStopped = "client_stopped" + typeHeartbeat = "heartbeat" + typeSpawning = "spawning" + typeSpawningComplete = "spawning_complete" + typeQuit = "quit" + typeException = "exception" +) + +type message interface { +} + +type genericMessage struct { + Type string `json:"type,omitempty"` + Data map[string]int64 `json:"data,omitempty"` + NodeID string `json:"node_id,omitempty"` + Tasks []byte `json:"tasks,omitempty"` +} + +func newGenericMessage(t string, data map[string]int64, nodeID string) (msg *genericMessage) { + return &genericMessage{ + Type: t, + Data: data, + NodeID: nodeID, + } +} + +func newQuitMessage(nodeID string) (msg *genericMessage) { + return &genericMessage{ + Type: "quit", + NodeID: nodeID, + } +} + +func newSpawnMessageToWorker(t string, data map[string]int64, tasks []byte) (msg *genericMessage) { + return &genericMessage{ + Type: t, + Data: data, + Tasks: tasks, + } +} + +func newClientReadyMessageToMaster(nodeID string) (msg *genericMessage) { + return &genericMessage{ + Type: "client_ready", + NodeID: nodeID, + } +} diff --git a/hrp/internal/boomer/message_test.go b/hrp/internal/boomer/message_test.go new file mode 100644 index 00000000..853e847e --- /dev/null +++ b/hrp/internal/boomer/message_test.go @@ -0,0 +1 @@ +package boomer diff --git a/hrp/internal/boomer/output.go b/hrp/internal/boomer/output.go index db77b053..3ef8bdc9 100644 --- a/hrp/internal/boomer/output.go +++ b/hrp/internal/boomer/output.go @@ -118,15 +118,15 @@ func (o *ConsoleOutput) OnEvent(data map[string]interface{}) { var state string switch output.State { - case stateInit: + case StateInit: state = "initializing" - case stateSpawning: + case StateSpawning: state = "spawning" - case stateRunning: + case StateRunning: state = "running" - case stateQuitting: + case StateQuitting: state = "quitting" - case stateStopped: + case StateStopped: state = "stopped" } @@ -525,7 +525,7 @@ func (o *PrometheusPusherOutput) OnStart() { // OnStop of PrometheusPusherOutput has nothing to do. func (o *PrometheusPusherOutput) OnStop() { // update runner state: stopped - gaugeState.Set(float64(stateStopped)) + gaugeState.Set(float64(StateStopped)) if err := o.pusher.Push(); err != nil { log.Error().Err(err).Msg("push to Pushgateway failed") } diff --git a/hrp/internal/boomer/runner.go b/hrp/internal/boomer/runner.go index 8419f6ab..1181d6c2 100644 --- a/hrp/internal/boomer/runner.go +++ b/hrp/internal/boomer/runner.go @@ -10,20 +10,26 @@ import ( "sync/atomic" "time" + "github.com/go-errors/errors" + "github.com/olekukonko/tablewriter" "github.com/rs/zerolog/log" ) const ( - stateInit = iota + 1 // initializing - stateSpawning // spawning - stateRunning // running - stateQuitting // quitting - stateStopped // stopped + StateInit = iota + 1 // initializing + StateSpawning // spawning + StateRunning // running + StateStopping // stopping + StateStopped // stopped + StateQuitting // quitting + StateMissing // missing ) const ( reportStatsInterval = 3 * time.Second + heartbeatInterval = 1 * time.Second + heartbeatLiveness = 3 * time.Second ) type Loop struct { @@ -51,23 +57,113 @@ func (l *Loop) increaseFinishedCount() { atomic.AddInt64(&l.finishedCount, 1) } +type SpawnInfo struct { + spawnCount int64 // target clients to spawn + acquiredCount int64 // count acquired of workers + spawnRate float64 + spawnDone chan struct{} + + mutex sync.RWMutex +} + +func (s *SpawnInfo) setSpawn(spawnCount int64, spawnRate float64) { + s.mutex.Lock() + defer s.mutex.Unlock() + if spawnCount > 0 { + atomic.StoreInt64(&s.spawnCount, spawnCount) + } + if spawnRate > 0 { + s.spawnRate = spawnRate + } +} + +func (s *SpawnInfo) getSpawnCount() int64 { + s.mutex.RLock() + defer s.mutex.RUnlock() + return atomic.LoadInt64(&s.spawnCount) +} + +func (s *SpawnInfo) getSpawnRate() float64 { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.spawnRate +} + +func (s *SpawnInfo) getSpawnDone() chan struct{} { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.spawnDone +} + +func (s *SpawnInfo) done() { + close(s.spawnDone) +} + +func (s *SpawnInfo) isFinished() bool { + // return true when workers acquired + return atomic.LoadInt64(&s.acquiredCount) == atomic.LoadInt64(&s.spawnCount) +} + +func (s *SpawnInfo) acquire() bool { + // get one ticket when there are still remaining spawn count to test + // return true when getting ticket successfully + if atomic.LoadInt64(&s.acquiredCount) < atomic.LoadInt64(&s.spawnCount) { + atomic.AddInt64(&s.acquiredCount, 1) + return true + } + return false +} + +func (s *SpawnInfo) erase() bool { + // return true if acquiredCount > spawnCount + if atomic.LoadInt64(&s.acquiredCount) > atomic.LoadInt64(&s.spawnCount) { + atomic.AddInt64(&s.acquiredCount, -1) + return true + } + return false +} + +func (s *SpawnInfo) increaseFinishedCount() { + atomic.AddInt64(&s.acquiredCount, -1) +} + +func (s *SpawnInfo) reset() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.spawnCount = 0 + s.spawnRate = 0 + s.acquiredCount = 0 + s.spawnDone = make(chan struct{}) +} + type runner struct { state int32 tasks []*Task totalTaskWeight int + mutex sync.RWMutex rateLimiter RateLimiter rateLimitEnabled bool stats *requestStats currentClientsNum int32 // current clients count - spawnCount int // target clients to spawn - spawnRate float64 + spawn *SpawnInfo loop *Loop // specify loop count for testcase, count = loopCount * spawnCount - spawnDone chan struct{} + + // when this channel is closed, all statistics are reported successfully + reportedChan chan bool + + // all running workers(goroutines) will select on this channel. + // close this channel will stop all running workers. + stopChan chan bool + + // close this channel will stop all goroutines used in runner. + closeChan chan bool outputs []Output + + once *sync.Once } // safeRun runs fn and recovers from unexpected panics. @@ -176,75 +272,104 @@ func (r *runner) reportTestResult() { println() } -func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { +func (r *runner) startSpawning(spawnCount int64, spawnRate float64, spawnCompleteFunc func()) { + r.stopChan = make(chan bool) + r.reportedChan = make(chan bool) + r.spawn.reset() + + r.spawn.setSpawn(spawnCount, spawnRate) + + atomic.StoreInt32(&r.currentClientsNum, 0) + + go r.spawnWorkers(spawnCount, spawnRate, r.stopChan, spawnCompleteFunc) +} + +func (r *runner) spawnWorkers(spawnCount int64, spawnRate float64, quit chan bool, spawnCompleteFunc func()) { log.Info(). - Int("spawnCount", spawnCount). + Int64("spawnCount", spawnCount). Float64("spawnRate", spawnRate). Msg("Spawning workers") - atomic.StoreInt32(&r.state, stateSpawning) - for i := 1; i <= spawnCount; i++ { - // spawn workers with rate limit - sleepTime := time.Duration(1000000/r.spawnRate) * time.Microsecond - time.Sleep(sleepTime) - - // loop count per worker - var workerLoop *Loop - if r.loop != nil { - workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawnCount)} - } - + r.updateState(StateSpawning) + for { select { case <-quit: // quit spawning goroutine log.Info().Msg("Quitting spawning workers") return default: - atomic.AddInt32(&r.currentClientsNum, 1) - go func() { - for { - select { - case <-quit: - return - default: - if workerLoop != nil && !workerLoop.acquire() { + if r.isStarted() && r.spawn.acquire() { + // spawn workers with rate limit + sleepTime := time.Duration(1000000/r.spawn.getSpawnRate()) * time.Microsecond + time.Sleep(sleepTime) + + // loop count per worker + var workerLoop *Loop + if r.loop != nil { + workerLoop = &Loop{loopCount: atomic.LoadInt64(&r.loop.loopCount) / int64(r.spawn.spawnCount)} + } + atomic.AddInt32(&r.currentClientsNum, 1) + go func() { + for { + select { + case <-quit: + atomic.AddInt64(&r.spawn.acquiredCount, -1) + atomic.AddInt32(&r.currentClientsNum, -1) return - } - if r.rateLimitEnabled { - blocked := r.rateLimiter.Acquire() - if !blocked { + default: + if workerLoop != nil && !workerLoop.acquire() { + return + } + if r.rateLimitEnabled { + blocked := r.rateLimiter.Acquire() + if !blocked { + task := r.getTask() + r.safeRun(task.Fn) + } + } else { task := r.getTask() r.safeRun(task.Fn) } - } else { - task := r.getTask() - r.safeRun(task.Fn) - } - if workerLoop != nil { - // finished count of total - r.loop.increaseFinishedCount() - // finished count of single worker - workerLoop.increaseFinishedCount() - if r.loop.isFinished() { - r.stop() + if workerLoop != nil { + // finished count of total + r.loop.increaseFinishedCount() + // finished count of single worker + workerLoop.increaseFinishedCount() + if r.loop.isFinished() { + r.stop() + } + } + if r.spawn.erase() { + atomic.AddInt32(&r.currentClientsNum, -1) + return + } + if !r.isStarted() { + atomic.AddInt64(&r.spawn.acquiredCount, -1) + atomic.AddInt32(&r.currentClientsNum, -1) + return } } } + }() + } else { + if r.getState() == StateSpawning { + r.spawn.done() + if spawnCompleteFunc != nil { + spawnCompleteFunc() + } + r.updateState(StateRunning) } - }() + time.Sleep(1 * time.Second) + } } } - - close(r.spawnDone) - if spawnCompleteFunc != nil { - spawnCompleteFunc() - } - atomic.StoreInt32(&r.state, stateRunning) } // setTasks will set the runner's task list AND the total task weight // which is used to get a random task later func (r *runner) setTasks(t []*Task) { + r.mutex.Lock() + defer r.mutex.Unlock() r.tasks = t weightSum := 0 @@ -255,6 +380,8 @@ func (r *runner) setTasks(t []*Task) { } func (r *runner) getTask() *Task { + r.mutex.RLock() + defer r.mutex.RUnlock() tasksCount := len(r.tasks) if tasksCount == 0 { log.Error().Msg("no valid testcase found") @@ -285,30 +412,78 @@ func (r *runner) getTask() *Task { return nil } +func (r *runner) statsStart() { + var ticker = time.NewTicker(reportStatsInterval) + for { + select { + // record stats + case t := <-r.stats.transactionChan: + r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) + case m := <-r.stats.requestSuccessChan: + r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) + case n := <-r.stats.requestFailureChan: + r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) + r.stats.logError(n.requestType, n.name, n.errMsg) + // report stats + case <-ticker.C: + r.reportStats() + // close reportedChan and return if the last stats is reported successfully + if !r.isStarted() { + close(r.reportedChan) + return + } + } + } +} + +func (r *runner) stop() { + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + close(r.stopChan) + if r.rateLimitEnabled { + r.rateLimiter.Stop() + } +} + +func (r *runner) getState() int32 { + return atomic.LoadInt32(&r.state) +} + +func (r *runner) updateState(state int32) { + log.Debug().Int32("from", atomic.LoadInt32(&r.state)).Int32("to", state).Msg("update runner state") + atomic.StoreInt32(&r.state, state) +} + +func (r *runner) isStarted() bool { + return r.getState() == StateRunning || r.getState() == StateSpawning +} + type localRunner struct { runner - - // close this channel will stop all goroutines used in runner. - stopChan chan bool } func newLocalRunner(spawnCount int, spawnRate float64) *localRunner { return &localRunner{ runner: runner{ - state: stateInit, - spawnRate: spawnRate, - spawnCount: spawnCount, - stats: newRequestStats(), - outputs: make([]Output, 0), - spawnDone: make(chan struct{}), + state: StateInit, + stats: newRequestStats(), + outputs: make([]Output, 0), + spawn: &SpawnInfo{ + spawnCount: int64(spawnCount), + spawnRate: spawnRate, + spawnDone: make(chan struct{}), + }, + reportedChan: make(chan bool), + stopChan: make(chan bool), + closeChan: make(chan bool), + once: &sync.Once{}, }, - stopChan: make(chan bool), } } func (r *localRunner) start() { // init state - atomic.StoreInt32(&r.state, stateInit) + r.updateState(StateInit) atomic.StoreInt32(&r.currentClientsNum, 0) r.stats.clearAll() @@ -317,51 +492,20 @@ func (r *localRunner) start() { r.rateLimiter.Start() } - // all running workers(goroutines) will select on this channel. - // close this channel will stop all running workers. - quitChan := make(chan bool) - // when this channel is closed, all statistics are reported successfully - reportedChan := make(chan bool) - go r.spawnWorkers(r.spawnCount, r.spawnRate, quitChan, nil) + go r.spawnWorkers(r.spawn.spawnCount, r.spawn.spawnRate, r.stopChan, nil) // output setup r.outputOnStart() - // start running - go func() { - ticker := time.NewTicker(reportStatsInterval) - for { - select { - // record stats - case t := <-r.stats.transactionChan: - r.stats.logTransaction(t.name, t.success, t.elapsedTime, t.contentSize) - case m := <-r.stats.requestSuccessChan: - r.stats.logRequest(m.requestType, m.name, m.responseTime, m.responseLength) - case n := <-r.stats.requestFailureChan: - r.stats.logRequest(n.requestType, n.name, n.responseTime, 0) - r.stats.logError(n.requestType, n.name, n.errMsg) - // report stats - case <-ticker.C: - r.reportStats() - // close reportedChan and return if the last stats is reported successfully - if atomic.LoadInt32(&r.state) == stateQuitting { - close(reportedChan) - return - } - } - } - }() + // start stats report + go r.runner.statsStart() // stop <-r.stopChan - atomic.StoreInt32(&r.state, stateQuitting) - - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(quitChan) + r.updateState(StateStopped) // wait until all stats are reported successfully - <-reportedChan + <-r.reportedChan // stop rate limiter if r.rateLimitEnabled { @@ -374,10 +518,494 @@ func (r *localRunner) start() { // output teardown r.outputOnStop() - atomic.StoreInt32(&r.state, stateStopped) + r.updateState(StateQuitting) return } func (r *localRunner) stop() { - close(r.stopChan) + if r.runner.isStarted() { + r.runner.stop() + } +} + +// workerRunner connects to the master, spawns goroutines and collects stats. +type workerRunner struct { + runner + + nodeID string + masterHost string + masterPort int + client *grpcClient + + // this channel will start worker for spawning. + spawnStartChan chan bool + // get testcase from master + testCaseBytes chan []byte + + startFlag bool + + ignoreQuit bool +} + +func newWorkerRunner(masterHost string, masterPort int) (r *workerRunner) { + r = &workerRunner{ + runner: runner{ + stats: newRequestStats(), + spawn: &SpawnInfo{ + spawnDone: make(chan struct{}), + }, + stopChan: make(chan bool), + reportedChan: make(chan bool), + closeChan: make(chan bool), + once: &sync.Once{}, + }, + masterHost: masterHost, + masterPort: masterPort, + nodeID: getNodeID(), + spawnStartChan: make(chan bool), + testCaseBytes: make(chan []byte, 10), + } + return r +} + +func (r *workerRunner) spawnComplete() { + data := make(map[string]int64) + data["count"] = r.spawn.getSpawnCount() + r.client.sendChannel() <- newGenericMessage("spawning_complete", data, r.nodeID) + r.updateState(StateRunning) +} + +func (r *workerRunner) onSpawnMessage(msg *genericMessage) { + r.client.sendChannel() <- newGenericMessage("spawning", nil, r.nodeID) + spawnCount, ok := msg.Data["spawn_count"] + if ok { + r.spawn.setSpawn(spawnCount, -1) + } + spawnRate, ok := msg.Data["spawn_rate"] + if ok { + r.spawn.setSpawn(-1, float64(spawnRate)) + } + if msg.Tasks != nil { + r.testCaseBytes <- msg.Tasks + } + log.Info().Msg("on spawn message successful") +} + +// Runner acts as a state machine. +func (r *workerRunner) onMessage(msg *genericMessage) { + switch r.getState() { + case StateInit: + switch msg.Type { + case "spawn": + r.onSpawnMessage(msg) + case "quit": + r.close() + } + case StateSpawning: + fallthrough + case StateRunning: + switch msg.Type { + case "spawn": + r.onSpawnMessage(msg) + case "stop": + r.stop() + log.Info().Msg("Recv stop message from master, all the goroutines are stopped") + r.client.sendChannel() <- newGenericMessage("client_stopped", nil, r.nodeID) + case "quit": + r.close() + log.Info().Msg("Recv quit message from master, all the goroutines are stopped") + } + case StateStopped: + switch msg.Type { + case "spawn": + r.onSpawnMessage(msg) + go r.start() + case "quit": + r.close() + } + } +} + +func (r *workerRunner) onQuiting() { + if r.getState() != StateQuitting { + r.client.sendChannel() <- newQuitMessage(r.nodeID) + } + r.updateState(StateQuitting) +} + +func (r *workerRunner) startListener() { + for { + select { + case msg := <-r.client.recvChannel(): + r.onMessage(msg) + case <-r.closeChan: + return + } + } +} + +// run starts service +func (r *workerRunner) run() { + r.updateState(StateInit) + r.client = newClient(r.masterHost, r.masterPort, r.nodeID) + + err := r.client.connect() + if err != nil { + log.Printf("Failed to connect to master(%s:%d) with error %v\n", r.masterHost, r.masterPort, err) + return + } + + // listen to master + go r.startListener() + + // register worker information to master + r.client.sendChannel() <- newGenericMessage("register", nil, r.nodeID) + // tell master, I'm ready + log.Info().Msg("send client ready signal") + r.client.sendChannel() <- newClientReadyMessageToMaster(r.nodeID) + + // heartbeat + // See: https://github.com/locustio/locust/commit/a8c0d7d8c588f3980303358298870f2ea394ab93 + go func() { + var ticker = time.NewTicker(heartbeatInterval) + for { + select { + case <-ticker.C: + if atomic.LoadInt32(&r.client.failCount) > 2 { + r.updateState(StateMissing) + } + if r.getState() == StateMissing { + if r.client.reConnect() == nil { + r.updateState(StateInit) + } + } + CPUUsage := GetCurrentCPUUsage() + data := map[string]int64{ + "state": int64(r.getState()), + "current_cpu_usage": int64(CPUUsage), + "spawn_count": int64(atomic.LoadInt32(&r.currentClientsNum)), + } + r.client.sendChannel() <- newGenericMessage("heartbeat", data, r.nodeID) + case <-r.closeChan: + return + } + } + }() + <-r.closeChan +} + +func (r *workerRunner) start() { + r.startFlag = true + defer func() { + r.startFlag = false + }() + r.stats.clearAll() + + // start rate limiter + if r.rateLimitEnabled { + r.rateLimiter.Start() + } + + r.once.Do(r.outputOnStart) + + r.startSpawning(r.spawn.getSpawnCount(), r.spawn.getSpawnRate(), r.spawnComplete) + + // start stats report + go r.runner.statsStart() + + <-r.reportedChan + + r.reportTestResult() + r.outputOnStop() +} + +func (r *workerRunner) stop() { + if r.isStarted() { + close(r.stopChan) + // stop rate limiter + if r.rateLimitEnabled { + r.rateLimiter.Stop() + } + r.updateState(StateStopped) + } +} + +func (r *workerRunner) close() { + r.stop() + if r.ignoreQuit { + return + } + for r.startFlag == true { + time.Sleep(1 * time.Second) + } + close(r.closeChan) + var ticker = time.NewTicker(1 * time.Second) + if r.client != nil { + // waitting for quit message is sent to master + select { + case <-r.client.disconnectedChannel(): + break + case <-ticker.C: + log.Warn().Msg("Timeout waiting for sending quit message to master, boomer will quit any way.") + r.onQuiting() + } + r.client.close() + } +} + +// masterRunner controls worker to spawn goroutines and collect stats. +type masterRunner struct { + runner + + masterBindHost string + masterBindPort int + server *grpcServer + + autoStart bool + expectWorkers int + expectWorkersMaxWait int + + parseTestCasesChan chan bool + startFlag bool + testCaseBytes chan []byte + + mutex sync.Mutex +} + +func newMasterRunner(masterBindHost string, masterBindPort int) *masterRunner { + return &masterRunner{ + runner: runner{ + state: StateInit, + spawn: &SpawnInfo{ + spawnDone: make(chan struct{}), + }, + closeChan: make(chan bool), + }, + masterBindHost: masterBindHost, + masterBindPort: masterBindPort, + server: newServer(masterBindHost, masterBindPort), + parseTestCasesChan: make(chan bool), + startFlag: false, + testCaseBytes: make(chan []byte), + } +} + +func (r *masterRunner) setExpectWorkers(expectWorkers int, expectWorkersMaxWait int) { + r.expectWorkers = expectWorkers + r.expectWorkersMaxWait = expectWorkersMaxWait +} + +func (r *masterRunner) heartbeatWorker() { + log.Info().Msg("heartbeatWorker, listen and record heartbeat from worker") + var ticker = time.NewTicker(heartbeatInterval) + for { + select { + case <-r.closeChan: + return + case <-ticker.C: + r.server.clients.Range(func(key, value interface{}) bool { + workerInfo, ok := value.(*WorkerNode) + if !ok { + log.Error().Msg("failed to get worker information") + } + if atomic.LoadInt32(&workerInfo.Heartbeat) <= 0 && workerInfo.getState() != StateMissing { + workerInfo.setState(StateMissing) + if r.getState() == StateRunning { + // all running workers missed, stopping runner + if r.server.getClientsLength() <= 0 { + r.updateState(StateStopped) + } + } + } else { + atomic.AddInt32(&workerInfo.Heartbeat, -1) + } + return true + }) + } + } +} + +func (r *masterRunner) clientListener() { + log.Info().Msg("clientListener, start to deal message from worker") + for { + select { + case <-r.closeChan: + return + case msg := <-r.server.recvChannel(): + worker, ok := r.server.getClients().Load(msg.NodeID) + if !ok { + continue + } + workerInfo, ok := worker.(*WorkerNode) + if !ok { + continue + } + switch msg.Type { + case typeClientReady: + if workerInfo.getState() == StateInit { + break + } + workerInfo.setState(StateInit) + if r.getState() == StateRunning { + println(fmt.Sprintf("worker(%s) joined, ready to rebalance the load of each worker", workerInfo.ID)) + err := r.rebalance() + if err != nil { + log.Error().Err(err).Msg("failed to rebalance") + } + } + case typeClientStopped: + workerInfo.setState(StateStopped) + if r.server.getWorkersLengthByState(StateStopped)+r.server.getWorkersLengthByState(StateInit) == r.server.getClientsLength() { + r.updateState(StateStopped) + } + case typeHeartbeat: + if workerInfo.getState() != int32(msg.Data["state"]) { + workerInfo.setState(int32(msg.Data["state"])) + } + workerInfo.updateHeartbeat(3) + if workerInfo.getCPUUsage() != float64(msg.Data["current_cpu_usage"]) { + workerInfo.updateCPUUsage(float64(msg.Data["current_cpu_usage"])) + } + if workerInfo.getSpawnCount() != msg.Data["spawn_count"] { + workerInfo.updateSpawnCount(msg.Data["spawn_count"]) + } + case typeSpawning: + workerInfo.setState(StateSpawning) + case typeSpawningComplete: + workerInfo.setState(StateRunning) + if r.server.getWorkersLengthByState(StateRunning) == r.server.getClientsLength() { + println(fmt.Sprintf("all(%v) workers spawn done, setting state as running", r.server.getClientsLength())) + r.updateState(StateRunning) + } + case typeQuit: + if workerInfo.getState() == StateQuitting { + break + } + workerInfo.setState(StateQuitting) + if r.isStarted() { + if r.server.getClientsLength() > 0 { + println(fmt.Sprintf("worker(%s) quited, ready to rebalance the load of each worker", workerInfo.ID)) + err := r.rebalance() + if err != nil { + log.Error().Err(err).Msg("failed to rebalance") + } + } + } + case typeException: + // Todo + default: + } + } + } +} + +func (r *masterRunner) run() { + r.updateState(StateInit) + + // start grpc server + err := r.server.start() + if err != nil { + log.Error().Err(err).Msg("failed to start grpc server") + return + } + + // listen and deal message from worker + go r.clientListener() + // listen and record heartbeat from worker + go r.heartbeatWorker() + + if r.autoStart { + log.Info().Msg("auto start, waiting expected workers joined") + var ticker = time.NewTicker(1 * time.Second) + var tickerMaxWait = time.NewTicker(time.Duration(r.expectWorkersMaxWait) * time.Second) + FOR: + for { + select { + case <-r.closeChan: + return + case <-ticker.C: + c := r.server.getClientsLength() + log.Info().Msg(fmt.Sprintf("expected worker number: %v, current worker count: %v", r.expectWorkers, c)) + if c >= r.expectWorkers { + go func() { + err = r.start() + if err != nil { + log.Error().Err(err).Msg("failed to run") + os.Exit(1) + } + }() + break FOR + } + case <-tickerMaxWait.C: + log.Warn().Msg("reached max wait time, quiting") + r.onQuiting() + os.Exit(1) + } + } + } + <-r.closeChan +} + +func (r *masterRunner) start() error { + numWorkers := r.server.getClientsLength() + if numWorkers == 0 { + return errors.New("current workers: 0") + } + workerSpawnRate := r.spawn.spawnRate / float64(numWorkers) + workerSpawnCount := r.spawn.getSpawnCount() / int64(numWorkers) + + log.Info().Msg("send spawn data to worker") + r.updateState(StateSpawning) + // waitting to fetch testcase + testcase, ok := r.fetchTestCase() + if !ok { + return errors.New("starting, do not retry frequently") + } + r.server.sendChannel() <- newSpawnMessageToWorker("spawn", map[string]int64{ + "spawn_count": workerSpawnCount, + "spawn_rate": int64(workerSpawnRate), + }, testcase) + println("send spawn data to worker successful") + log.Info().Msg("send spawn data to worker successful") + return nil +} + +func (r *masterRunner) fetchTestCase() ([]byte, bool) { + if r.startFlag { + return nil, false + } + r.startFlag = true + defer func() { + r.startFlag = false + }() + r.parseTestCasesChan <- true + return <-r.testCaseBytes, true +} + +func (r *masterRunner) rebalance() error { + return r.start() +} + +func (r *masterRunner) stop() { + if r.isStarted() { + r.updateState(StateStopping) + r.server.sendChannel() <- &genericMessage{Type: "stop", Data: map[string]int64{}} + r.updateState(StateStopped) + } +} + +func (r *masterRunner) onQuiting() { + if r.getState() != StateQuitting { + r.server.sendChannel() <- &genericMessage{ + Type: "quit", + } + } + r.updateState(StateQuitting) +} + +func (r *masterRunner) close() { + r.onQuiting() + r.server.wg.Wait() + close(r.closeChan) + r.server.close() } diff --git a/hrp/internal/boomer/runner_test.go b/hrp/internal/boomer/runner_test.go index 549980c9..28305752 100644 --- a/hrp/internal/boomer/runner_test.go +++ b/hrp/internal/boomer/runner_test.go @@ -1,6 +1,7 @@ package boomer import ( + "sync" "sync/atomic" "testing" "time" @@ -112,3 +113,416 @@ func TestLoopCount(t *testing.T) { t.Fatal() } } + +func TestSpawnWorkers(t *testing.T) { + taskA := &Task{ + Weight: 10, + Fn: func() { + time.Sleep(time.Second) + }, + Name: "TaskA", + } + tasks := []*Task{taskA} + + runner := newWorkerRunner("localhost", 5557) + defer runner.close() + + runner.client = newClient("localhost", 5557, runner.nodeID) + runner.setTasks(tasks) + go runner.spawnWorkers(10, 10, runner.stopChan, runner.spawnComplete) + time.Sleep(10 * time.Millisecond) + + currentClients := atomic.LoadInt32(&runner.currentClientsNum) + if currentClients != 10 { + t.Error("Unexpected count", currentClients) + } +} + +func TestSpawnWorkersWithManyTasks(t *testing.T) { + var lock sync.Mutex + taskCalls := map[string]int{} + + createTask := func(name string, weight int) *Task { + return &Task{ + Name: name, + Weight: weight, + Fn: func() { + lock.Lock() + taskCalls[name]++ + lock.Unlock() + }, + } + } + tasks := []*Task{ + createTask("one hundred", 100), + createTask("ten", 10), + createTask("one", 1), + } + + runner := newWorkerRunner("localhost", 5557) + defer runner.close() + + runner.setTasks(tasks) + runner.client = newClient("localhost", 5557, runner.nodeID) + + const numToSpawn int64 = 30 + + runner.spawnWorkers(numToSpawn, float64(numToSpawn), runner.stopChan, runner.spawnComplete) + time.Sleep(2 * time.Second) + + currentClients := atomic.LoadInt32(&runner.currentClientsNum) + + assert.Equal(t, numToSpawn, int(currentClients)) + lock.Lock() + hundreds := taskCalls["one hundred"] + tens := taskCalls["ten"] + ones := taskCalls["one"] + lock.Unlock() + + total := hundreds + tens + ones + t.Logf("total tasks run: %d\n", total) + + assert.True(t, total > 111) + + assert.True(t, ones > 1) + actPercentage := float64(ones) / float64(total) + expectedPercentage := 1.0 / 111.0 + if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage { + t.Errorf("Unexpected percentage of ones task: exp %v, act %v", expectedPercentage, actPercentage) + } + + assert.True(t, tens > 10) + actPercentage = float64(tens) / float64(total) + expectedPercentage = 10.0 / 111.0 + if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage { + t.Errorf("Unexpected percentage of tens task: exp %v, act %v", expectedPercentage, actPercentage) + } + + assert.True(t, hundreds > 100) + actPercentage = float64(hundreds) / float64(total) + expectedPercentage = 100.0 / 111.0 + if actPercentage > 2*expectedPercentage || actPercentage < 0.5*expectedPercentage { + t.Errorf("Unexpected percentage of hundreds task: exp %v, act %v", expectedPercentage, actPercentage) + } +} + +func TestSpawnAndStop(t *testing.T) { + taskA := &Task{ + Fn: func() { + time.Sleep(time.Second) + }, + } + taskB := &Task{ + Fn: func() { + time.Sleep(2 * time.Second) + }, + } + tasks := []*Task{taskA, taskB} + runner := newWorkerRunner("localhost", 5557) + defer runner.close() + runner.client = newClient("localhost", 5557, runner.nodeID) + + runner.setTasks(tasks) + runner.spawn.setSpawn(10, 10) + runner.updateState(StateSpawning) + + go runner.start() + + // wait for spawning goroutines + time.Sleep(2 * time.Second) + if atomic.LoadInt32(&runner.currentClientsNum) != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count", atomic.LoadInt32(&runner.currentClientsNum)) + } + + msg := <-runner.client.sendChannel() + if msg.Type != "spawning_complete" { + t.Error("Runner should send spawning_complete message when spawning completed, got", msg.Type) + } + runner.stop() + + runner.onQuiting() + msg = <-runner.client.sendChannel() + if msg.Type != "quit" { + t.Error("Runner should send quit message on quitting, got", msg.Type) + } +} + +func TestStop(t *testing.T) { + taskA := &Task{ + Fn: func() { + time.Sleep(time.Second) + }, + } + tasks := []*Task{taskA} + runner := newWorkerRunner("localhost", 5557) + runner.setTasks(tasks) + runner.spawn.setSpawn(10, 10) + runner.updateState(StateSpawning) + + runner.stop() + + if runner.getState() != StateStopped { + t.Error("Expected runner state to be 5, was", runner.getState()) + } +} + +func TestOnSpawnMessage(t *testing.T) { + taskA := &Task{ + Fn: func() { + time.Sleep(time.Second) + }, + } + runner := newWorkerRunner("localhost", 5557) + defer runner.close() + runner.client = newClient("localhost", 5557, runner.nodeID) + runner.updateState(StateInit) + runner.setTasks([]*Task{taskA}) + runner.spawn.spawnCount = 100 + runner.spawn.spawnRate = 100 + + runner.onSpawnMessage(newGenericMessage("spawn", map[string]int64{ + "spawn_count": 20, + "spawn_rate": 20, + }, runner.nodeID)) + + if runner.spawn.spawnCount != 20 { + t.Error("workers should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnCount) + } + if runner.spawn.spawnRate != 20 { + t.Error("spawnRate should be overwrote by onSpawnMessage, expected: 20, was:", runner.spawn.spawnRate) + } + + runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) +} + +func TestOnQuitMessage(t *testing.T) { + runner := newWorkerRunner("localhost", 5557) + runner.client = newClient("localhost", 5557, "test") + runner.updateState(StateInit) + + runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) + <-runner.closeChan + + runner.updateState(StateRunning) + runner.closeChan = make(chan bool) + runner.stopChan = make(chan bool) + runner.client.shutdownChan = make(chan bool) + runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) + <-runner.closeChan + if runner.getState() != StateQuitting { + t.Error("Runner's state should be StateQuitting") + } + + runner.updateState(StateStopped) + runner.closeChan = make(chan bool) + runner.stopChan = make(chan bool) + runner.client.shutdownChan = make(chan bool) + runner.onMessage(newGenericMessage("quit", nil, runner.nodeID)) + <-runner.closeChan + if runner.getState() != StateQuitting { + t.Error("Runner's state should be StateQuitting") + } +} + +func TestOnMessage(t *testing.T) { + taskA := &Task{ + Fn: func() { + time.Sleep(time.Second) + }, + } + taskB := &Task{ + Fn: func() { + time.Sleep(2 * time.Second) + }, + } + tasks := []*Task{taskA, taskB} + + runner := newWorkerRunner("localhost", 5557) + defer runner.close() + runner.client = newClient("localhost", 5557, runner.nodeID) + runner.updateState(StateInit) + runner.setTasks(tasks) + + go runner.start() + + // start spawning + runner.onMessage(newGenericMessage("spawn", map[string]int64{ + "spawn_count": 10, + "spawn_rate": 10, + }, runner.nodeID)) + + msg := <-runner.client.sendChannel() + if msg.Type != "spawning" { + t.Error("Runner should send spawning message when starting spawn, got", msg.Type) + } + + // spawn complete and running + time.Sleep(2 * time.Second) + if runner.getState() != StateRunning { + t.Error("State of runner is not running after spawn, got", runner.getState()) + } + if atomic.LoadInt32(&runner.currentClientsNum) != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + } + msg = <-runner.client.sendChannel() + if msg.Type != "spawning_complete" { + t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type) + } + + // increase goroutines while running + runner.onMessage(newGenericMessage("spawn", map[string]int64{ + "spawn_count": 15, + "spawn_rate": 15, + }, runner.nodeID)) + + msg = <-runner.client.sendChannel() + if msg.Type != "spawning" { + t.Error("Runner should send spawning message when starting spawn, got", msg.Type) + } + + time.Sleep(2 * time.Second) + msg = <-runner.client.sendChannel() + if msg.Type != "spawning_complete" { + t.Error("Runner should send spawning_complete message, got", msg.Type) + } + if runner.getState() != StateRunning { + t.Error("State of runner is not running after spawn, got", runner.getState()) + } + if atomic.LoadInt32(&runner.currentClientsNum) != 15 { + t.Error("Number of goroutines mismatches, expected: 20, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + } + + // stop all the workers + runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) + if runner.getState() != StateStopped { + t.Error("State of runner is not stopped, got", runner.getState()) + } + msg = <-runner.client.sendChannel() + if msg.Type != "client_stopped" { + t.Error("Runner should send client_stopped message, got", msg.Type) + } + + // spawn again + runner.onMessage(newGenericMessage("spawn", map[string]int64{ + "spawn_count": 10, + "spawn_rate": 10, + }, runner.nodeID)) + + msg = <-runner.client.sendChannel() + if msg.Type != "spawning" { + t.Error("Runner should send spawning message when starting spawn, got", msg.Type) + } + + // spawn complete and running + time.Sleep(2 * time.Second) + if runner.getState() != StateRunning { + t.Error("State of runner is not running after spawn, got", runner.getState()) + } + if atomic.LoadInt32(&runner.currentClientsNum) != 10 { + t.Error("Number of goroutines mismatches, expected: 10, current count:", atomic.LoadInt32(&runner.currentClientsNum)) + } + msg = <-runner.client.sendChannel() + if msg.Type != "spawning_complete" { + t.Error("Runner should send spawning_complete message when spawn completed, got", msg.Type) + } + + // stop all the workers + runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) + if runner.getState() != StateStopped { + t.Error("State of runner is not stopped, got", runner.getState()) + } + msg = <-runner.client.sendChannel() + if msg.Type != "client_stopped" { + t.Error("Runner should send client_stopped message, got", msg.Type) + } +} + +func TestClientListener(t *testing.T) { + runner := newMasterRunner("localhost", 5557) + defer runner.close() + runner.updateState(StateInit) + runner.spawn.setSpawn(10, 10) + go runner.clientListener() + runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 3}) + runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 3}) + runner.server.recvChannel() <- &genericMessage{ + Type: typeClientReady, + NodeID: "testID1", + } + worker1, ok := runner.server.getClients().Load("testID1") + if !ok { + t.Fatal("error") + } + workerInfo1, ok := worker1.(*WorkerNode) + if !ok { + t.Fatal("error") + } + time.Sleep(time.Second) + if workerInfo1.getState() != StateInit { + t.Error("State of worker runner is not init, got", workerInfo1.getState()) + } + runner.server.recvChannel() <- &genericMessage{ + Type: typeClientStopped, + NodeID: "testID2", + } + worker2, ok := runner.server.getClients().Load("testID2") + if !ok { + t.Fatal("error") + } + workerInfo2, ok := worker2.(*WorkerNode) + if !ok { + t.Fatal("error") + } + time.Sleep(time.Second) + if workerInfo2.getState() != StateStopped { + t.Error("State of worker runner is not stopped, got", workerInfo2.getState()) + } + runner.server.recvChannel() <- &genericMessage{ + Type: typeClientStopped, + NodeID: "testID1", + } + time.Sleep(time.Second) + if runner.getState() != StateStopped { + t.Error("State of master runner is not stopped, got", runner.getState()) + } +} + +func TestHeartbeatWorker(t *testing.T) { + runner := newMasterRunner("localhost", 5557) + defer runner.close() + runner.updateState(StateInit) + runner.spawn.setSpawn(10, 10) + runner.server.clients.Store("testID1", &WorkerNode{ID: "testID1", Heartbeat: 1, State: StateInit}) + runner.server.clients.Store("testID2", &WorkerNode{ID: "testID2", Heartbeat: 1, State: StateInit}) + go runner.clientListener() + go runner.heartbeatWorker() + time.Sleep(3 * time.Second) + worker1, ok := runner.server.getClients().Load("testID1") + if !ok { + t.Fatal() + } + workerInfo1, ok := worker1.(*WorkerNode) + if !ok { + t.Fatal() + } + if workerInfo1.getState() != StateMissing { + t.Error("expected state of worker runner is missing, but got", workerInfo1.getState()) + } + runner.server.recvChannel() <- &genericMessage{ + Type: typeHeartbeat, + NodeID: "testID2", + Data: map[string]int64{"state": 3}, + } + worker2, ok := runner.server.getClients().Load("testID2") + if !ok { + t.Fatal() + } + workerInfo2, ok := worker2.(*WorkerNode) + if !ok { + t.Fatal() + } + time.Sleep(time.Second) + if workerInfo2.getState() == StateMissing { + t.Error("expected state of worker runner is not missing, but got missing") + } +} diff --git a/hrp/internal/boomer/server.go b/hrp/internal/boomer/server.go new file mode 100644 index 00000000..853e847e --- /dev/null +++ b/hrp/internal/boomer/server.go @@ -0,0 +1 @@ +package boomer diff --git a/hrp/internal/boomer/server_grpc.go b/hrp/internal/boomer/server_grpc.go new file mode 100644 index 00000000..7eb92104 --- /dev/null +++ b/hrp/internal/boomer/server_grpc.go @@ -0,0 +1,343 @@ +package boomer + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" + + "github.com/httprunner/httprunner/v4/hrp/internal/grpc/messager" + "github.com/rs/zerolog/log" +) + +func (s *grpcServer) BidirectionalStreamingMessage(srv messager.Message_BidirectionalStreamingMessageServer) error { + s.wg.Add(1) + defer s.wg.Done() + req, err := srv.Recv() + switch err { + case nil: + break + case io.EOF: + return nil + default: + if err.Error() == status.Error(codes.Canceled, context.Canceled.Error()).Error() { + return nil + } + log.Error().Err(err).Msg("failed to get stream from client") + return err + } + wn := &WorkerNode{messenger: srv, ID: req.NodeID, Heartbeat: 3} + s.clients.Store(req.NodeID, wn) + println(fmt.Sprintf("worker(%v) joined, current worker count: %v", req.NodeID, s.getClientsLength())) + <-s.disconnectedChannel() + s.clients.Delete(req.NodeID) + println(fmt.Sprintf("worker(%v) quited, current worker count: %v", req.NodeID, s.getClientsLength())) + return nil +} + +type WorkerNode struct { + ID string `json:"id"` + State int32 `json:"state"` + Heartbeat int32 `json:"heartbeat"` + SpawnCount int64 `json:"spawn_count"` + CPUUsage float64 `json:"cpu_usage"` + CPUWarningEmitted bool `json:"cpu_warning_emitted"` + MemoryUsage float64 `json:"memory_usage"` + messenger messager.Message_BidirectionalStreamingMessageServer + mutex sync.RWMutex +} + +func (w *WorkerNode) getState() int32 { + return atomic.LoadInt32(&w.State) +} + +func (w *WorkerNode) setState(state int32) { + atomic.StoreInt32(&w.State, state) +} + +func (w *WorkerNode) updateHeartbeat(heartbeat int32) { + atomic.StoreInt32(&w.Heartbeat, heartbeat) +} + +func (w *WorkerNode) getHeartbeat() int32 { + return atomic.LoadInt32(&w.Heartbeat) +} + +func (w *WorkerNode) updateSpawnCount(spawnCount int64) { + atomic.StoreInt64(&w.SpawnCount, spawnCount) +} + +func (w *WorkerNode) getSpawnCount() int64 { + return atomic.LoadInt64(&w.SpawnCount) +} + +func (w *WorkerNode) updateCPUUsage(cpuUsage float64) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.CPUUsage = cpuUsage +} + +func (w *WorkerNode) getCPUUsage() float64 { + w.mutex.RLock() + defer w.mutex.RUnlock() + return w.CPUUsage +} + +func (w *WorkerNode) updateCPUWarningEmitted(cpuWarningEmitted bool) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.CPUWarningEmitted = cpuWarningEmitted +} + +func (w *WorkerNode) getCPUWarningEmitted() bool { + w.mutex.RLock() + defer w.mutex.RUnlock() + return w.CPUWarningEmitted +} + +func (w *WorkerNode) updateMemoryUsage(memoryUsage float64) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.MemoryUsage = memoryUsage +} + +func (w *WorkerNode) getMemoryUsage() float64 { + w.mutex.RLock() + defer w.mutex.RUnlock() + return w.MemoryUsage +} + +func (w *WorkerNode) getWorkerInfo() WorkerNode { + w.mutex.RLock() + defer w.mutex.RUnlock() + return WorkerNode{ + ID: w.ID, + State: w.getState(), + Heartbeat: w.getHeartbeat(), + SpawnCount: w.getSpawnCount(), + CPUUsage: w.getCPUUsage(), + CPUWarningEmitted: w.getCPUWarningEmitted(), + MemoryUsage: w.getMemoryUsage(), + } +} + +type grpcServer struct { + messager.UnimplementedMessageServer + masterHost string + masterPort int + server *grpc.Server + clients *sync.Map + + fromWorker chan *genericMessage + toWorker chan *genericMessage + disconnectedToWorker chan bool + shutdownChan chan bool + wg sync.WaitGroup +} + +func newServer(masterHost string, masterPort int) (server *grpcServer) { + log.Info().Msg("Boomer is built with grpc support.") + server = &grpcServer{ + masterHost: masterHost, + masterPort: masterPort, + clients: &sync.Map{}, + fromWorker: make(chan *genericMessage, 100), + toWorker: make(chan *genericMessage, 100), + disconnectedToWorker: make(chan bool), + shutdownChan: make(chan bool), + } + return server +} + +func (s *grpcServer) start() (err error) { + addr := fmt.Sprintf("%v:%v", s.masterHost, s.masterPort) + lis, err := net.Listen("tcp", addr) + if err != nil { + log.Error().Err(err).Msg("failed to listen") + return + } + // create gRPC server + serv := grpc.NewServer() + // register message server + messager.RegisterMessageServer(serv, s) + reflection.Register(serv) + // start grpc server + go func() { + err = serv.Serve(lis) + if err != nil { + log.Error().Err(err).Msg("failed to serve") + return + } + }() + + go s.recv() + go s.send() + + return nil +} + +func (s *grpcServer) getWorkersByState(state int32) (wns []*WorkerNode) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.getState() == state { + wns = append(wns, workerInfo) + } + } + return true + }) + return wns +} + +func (s *grpcServer) getWorkersLengthByState(state int32) (l int) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.getState() == state { + l++ + } + } + return true + }) + return +} + +func (s *grpcServer) getAllWorkers() (wns []WorkerNode) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + wns = append(wns, workerInfo.getWorkerInfo()) + } + return true + }) + return wns +} + +func (s *grpcServer) getClients() *sync.Map { + return s.clients +} + +func (s *grpcServer) getClientsLength() (l int) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.getState() != StateQuitting && workerInfo.getState() != StateMissing { + l++ + } + } + return true + }) + return +} + +func (s *grpcServer) close() { + close(s.shutdownChan) +} + +func (s *grpcServer) recvChannel() chan *genericMessage { + return s.fromWorker +} + +func (s *grpcServer) shutdownChannel() chan bool { + return s.shutdownChan +} + +func (s *grpcServer) recv() { + for { + select { + case <-s.shutdownChan: + return + default: + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing { + return true + } + msg, err := workerInfo.messenger.Recv() + switch err { + case nil: + if msg == nil { + return true + } + s.fromWorker <- newGenericMessage(msg.Type, msg.Data, msg.NodeID) + log.Info(). + Str("nodeID", msg.NodeID). + Str("type", msg.Type). + Interface("data", msg.Data). + Msg("receive data from worker") + case io.EOF: + s.fromWorker <- newQuitMessage(workerInfo.ID) + default: + if err.Error() == status.Error(codes.Canceled, context.Canceled.Error()).Error() { + s.fromWorker <- newQuitMessage(workerInfo.ID) + return true + } + log.Error().Err(err).Msg("failed to get stream from client") + } + } + return true + }) + } + } +} + +func (s *grpcServer) sendChannel() chan *genericMessage { + return s.toWorker +} + +func (s *grpcServer) send() { + for { + select { + case <-s.shutdownChan: + return + case msg := <-s.toWorker: + s.sendMessage(msg) + + // We may send genericMessage to Worker. + if msg.Type == "quit" { + close(s.disconnectedToWorker) + } + } + } +} + +func (s *grpcServer) sendMessage(msg *genericMessage) { + s.clients.Range(func(key, value interface{}) bool { + if workerInfo, ok := value.(*WorkerNode); ok { + if workerInfo.getState() == StateQuitting || workerInfo.getState() == StateMissing { + return true + } + err := workerInfo.messenger.Send( + &messager.StreamResponse{ + Type: msg.Type, + Data: msg.Data, + NodeID: workerInfo.ID, + Tasks: msg.Tasks}, + ) + switch err { + case nil: + break + case io.EOF: + fallthrough + default: + s.fromWorker <- newQuitMessage(workerInfo.ID) + log.Error().Err(err).Msg("failed to send message") + return true + } + log.Info(). + Str("nodeID", workerInfo.ID). + Str("type", msg.Type). + Interface("data", msg.Data). + Int32("state", workerInfo.getState()). + Msg("send data to worker") + } + return true + }) +} + +func (s *grpcServer) disconnectedChannel() chan bool { + return s.disconnectedToWorker +} diff --git a/hrp/internal/boomer/server_grpc_test.go b/hrp/internal/boomer/server_grpc_test.go new file mode 100644 index 00000000..853e847e --- /dev/null +++ b/hrp/internal/boomer/server_grpc_test.go @@ -0,0 +1 @@ +package boomer diff --git a/hrp/internal/boomer/utils.go b/hrp/internal/boomer/utils.go index 9a6f3fef..bc376ca6 100644 --- a/hrp/internal/boomer/utils.go +++ b/hrp/internal/boomer/utils.go @@ -6,10 +6,15 @@ import ( "io" "math" "os" + "runtime" "runtime/pprof" + "strings" "time" + "github.com/google/uuid" + "github.com/rs/zerolog/log" + "github.com/shirou/gopsutil/process" ) func round(val float64, roundOn float64, places int) (newVal float64) { @@ -75,3 +80,27 @@ func startCPUProfile(file string, duration time.Duration) (err error) { }) return nil } + +// generate a random nodeID like locust does, using the same algorithm. +func getNodeID() (nodeID string) { + hostname, _ := os.Hostname() + id := strings.Replace(uuid.New().String(), "-", "", -1) + nodeID = fmt.Sprintf("%s_%s", hostname, id) + return +} + +// GetCurrentCPUUsage get current CPU usage +func GetCurrentCPUUsage() float64 { + currentPid := os.Getpid() + p, err := process.NewProcess(int32(currentPid)) + if err != nil { + log.Printf("Fail to get CPU percent, %v\n", err) + return 0.0 + } + percent, err := p.CPUPercent() + if err != nil { + log.Printf("Fail to get CPU percent, %v\n", err) + return 0.0 + } + return percent / float64(runtime.NumCPU()) +} diff --git a/hrp/internal/builtin/utils.go b/hrp/internal/builtin/utils.go index abea592e..876eaff3 100644 --- a/hrp/internal/builtin/utils.go +++ b/hrp/internal/builtin/utils.go @@ -1,11 +1,13 @@ package builtin import ( + "archive/zip" "bufio" "bytes" "encoding/csv" builtinJSON "encoding/json" "fmt" + "io" "math/rand" "os" "os/exec" @@ -490,3 +492,168 @@ func GetFileNameWithoutExtension(path string) string { ext := filepath.Ext(base) return base[0 : len(base)-len(ext)] } + +func ZipDir(filename string, root string) error { + p, err := os.Getwd() + if err != nil { + return err + } + if strings.Contains(root, p) { + root, err = filepath.Rel(p, root) + if err != nil { + return err + } + } + err = os.RemoveAll(filename) + if err != nil { + return err + } + var files []string + err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + files = append(files, path) + return nil + }) + if err != nil { + return err + } + err = ZipFiles(filename, files) + return err +} + +// ZipFiles compresses one or many files into a single zip archive file. +// Param 1: filename is the output zip file's name. +// Param 2: files is a list of files to add to the zip. +func ZipFiles(filename string, files []string) error { + newZipFile, err := os.Create(filename) + if err != nil { + return err + } + defer newZipFile.Close() + + zipWriter := zip.NewWriter(newZipFile) + defer zipWriter.Close() + + // Add files to zip + for _, file := range files { + if err = AddFileToZip(zipWriter, file); err != nil { + return err + } + } + return nil +} + +func AddFileToZip(zipWriter *zip.Writer, filename string) error { + fileToZip, err := os.Open(filename) + if err != nil { + return err + } + defer fileToZip.Close() + + // Get the file information + info, err := fileToZip.Stat() + if err != nil { + return err + } + + header, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + + // Using FileInfoHeader() above only uses the basename of the file. If we want + // to preserve the folder structure we can overwrite this with the full path. + header.Name = filename + + // if dir + if info.IsDir() { + header.Name += `/` + } else { + // Change to deflate to gain better compression + // see http://golang.org/pkg/archive/zip/#pkg-constants + header.Method = zip.Deflate + } + + writer, err := zipWriter.CreateHeader(header) + if err != nil { + return err + } + if !info.IsDir() { + _, err = io.Copy(writer, fileToZip) + } + return err +} + +func UnZip(dst, src string) (err error) { + zr, err := zip.OpenReader(src) + defer zr.Close() + if err != nil { + return + } + if dst != "" { + if err := os.MkdirAll(dst, 0755); err != nil { + return err + } + } + for _, file := range zr.File { + path := filepath.Join(dst, file.Name) + if file.FileInfo().IsDir() { + if err := os.MkdirAll(path, file.Mode()); err != nil { + return err + } + continue + } + fr, err := file.Open() + if err != nil { + return err + } + fw, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, file.Mode()) + if err != nil { + return err + } + _, err = io.Copy(fw, fr) + if err != nil { + return err + } + log.Info().Msg(fmt.Sprintf("unzip %s successful\n", path)) + _ = fw.Close() + _ = fr.Close() + } + return nil +} + +func File2Bytes(filename string) ([]byte, error) { + file, err := os.Open(filename) + if err != nil { + return nil, err + } + defer file.Close() + + stats, err := file.Stat() + if err != nil { + return nil, err + } + + data := make([]byte, stats.Size()) + count, err := file.Read(data) + if err != nil { + return nil, err + } + log.Info().Msg(fmt.Sprintf("read file %s len: %d \n", filename, count)) + + return data, nil +} + +func Bytes2File(data []byte, filename string) error { + file, err := os.Create(filename) + if err != nil { + return err + } + defer file.Close() + + count, err := file.Write(data) + if err != nil { + return err + } + log.Info().Msg(fmt.Sprintf("write file %s len: %d \n", filename, count)) + return nil +} diff --git a/hrp/internal/grpc/messager/messager.pb.go b/hrp/internal/grpc/messager/messager.pb.go new file mode 100644 index 00000000..bb389289 --- /dev/null +++ b/hrp/internal/grpc/messager/messager.pb.go @@ -0,0 +1,276 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.20.0 +// source: grpc/proto/messager.proto + +package messager + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"` +} + +func (x *StreamRequest) Reset() { + *x = StreamRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_proto_messager_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamRequest) ProtoMessage() {} + +func (x *StreamRequest) ProtoReflect() protoreflect.Message { + mi := &file_grpc_proto_messager_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamRequest.ProtoReflect.Descriptor instead. +func (*StreamRequest) Descriptor() ([]byte, []int) { + return file_grpc_proto_messager_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamRequest) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *StreamRequest) GetData() map[string]int64 { + if x != nil { + return x.Data + } + return nil +} + +func (x *StreamRequest) GetNodeID() string { + if x != nil { + return x.NodeID + } + return "" +} + +type StreamResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Data map[string]int64 `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + NodeID string `protobuf:"bytes,3,opt,name=NodeID,proto3" json:"NodeID,omitempty"` + Tasks []byte `protobuf:"bytes,4,opt,name=tasks,proto3" json:"tasks,omitempty"` +} + +func (x *StreamResponse) Reset() { + *x = StreamResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_proto_messager_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamResponse) ProtoMessage() {} + +func (x *StreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_grpc_proto_messager_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamResponse.ProtoReflect.Descriptor instead. +func (*StreamResponse) Descriptor() ([]byte, []int) { + return file_grpc_proto_messager_proto_rawDescGZIP(), []int{1} +} + +func (x *StreamResponse) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *StreamResponse) GetData() map[string]int64 { + if x != nil { + return x.Data + } + return nil +} + +func (x *StreamResponse) GetNodeID() string { + if x != nil { + return x.NodeID + } + return "" +} + +func (x *StreamResponse) GetTasks() []byte { + if x != nil { + return x.Tasks + } + return nil +} + +var File_grpc_proto_messager_proto protoreflect.FileDescriptor + +var file_grpc_proto_messager_proto_rawDesc = []byte{ + 0x0a, 0x19, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x22, 0xaa, 0x01, 0x0a, 0x0d, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x34, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x1a, 0x37, 0x0a, 0x09, 0x44, 0x61, 0x74, 0x61, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0xc2, 0x01, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, + 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, + 0x16, 0x0a, 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x1a, 0x37, 0x0a, + 0x09, 0x44, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0x61, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x12, 0x56, 0x0a, 0x1d, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x61, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x12, 0x16, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x67, 0x72, 0x70, + 0x63, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_grpc_proto_messager_proto_rawDescOnce sync.Once + file_grpc_proto_messager_proto_rawDescData = file_grpc_proto_messager_proto_rawDesc +) + +func file_grpc_proto_messager_proto_rawDescGZIP() []byte { + file_grpc_proto_messager_proto_rawDescOnce.Do(func() { + file_grpc_proto_messager_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_messager_proto_rawDescData) + }) + return file_grpc_proto_messager_proto_rawDescData +} + +var file_grpc_proto_messager_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_grpc_proto_messager_proto_goTypes = []interface{}{ + (*StreamRequest)(nil), // 0: message.StreamRequest + (*StreamResponse)(nil), // 1: message.StreamResponse + nil, // 2: message.StreamRequest.DataEntry + nil, // 3: message.StreamResponse.DataEntry +} +var file_grpc_proto_messager_proto_depIdxs = []int32{ + 2, // 0: message.StreamRequest.data:type_name -> message.StreamRequest.DataEntry + 3, // 1: message.StreamResponse.data:type_name -> message.StreamResponse.DataEntry + 0, // 2: message.Message.BidirectionalStreamingMessage:input_type -> message.StreamRequest + 1, // 3: message.Message.BidirectionalStreamingMessage:output_type -> message.StreamResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_grpc_proto_messager_proto_init() } +func file_grpc_proto_messager_proto_init() { + if File_grpc_proto_messager_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_grpc_proto_messager_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_proto_messager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_grpc_proto_messager_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_grpc_proto_messager_proto_goTypes, + DependencyIndexes: file_grpc_proto_messager_proto_depIdxs, + MessageInfos: file_grpc_proto_messager_proto_msgTypes, + }.Build() + File_grpc_proto_messager_proto = out.File + file_grpc_proto_messager_proto_rawDesc = nil + file_grpc_proto_messager_proto_goTypes = nil + file_grpc_proto_messager_proto_depIdxs = nil +} diff --git a/hrp/internal/grpc/messager/messager_grpc.pb.go b/hrp/internal/grpc/messager/messager_grpc.pb.go new file mode 100644 index 00000000..8237aa3c --- /dev/null +++ b/hrp/internal/grpc/messager/messager_grpc.pb.go @@ -0,0 +1,137 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.20.0 +// source: grpc/proto/messager.proto + +package messager + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// MessageClient is the client API for Message service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MessageClient interface { + BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) +} + +type messageClient struct { + cc grpc.ClientConnInterface +} + +func NewMessageClient(cc grpc.ClientConnInterface) MessageClient { + return &messageClient{cc} +} + +func (c *messageClient) BidirectionalStreamingMessage(ctx context.Context, opts ...grpc.CallOption) (Message_BidirectionalStreamingMessageClient, error) { + stream, err := c.cc.NewStream(ctx, &Message_ServiceDesc.Streams[0], "/message.Message/BidirectionalStreamingMessage", opts...) + if err != nil { + return nil, err + } + x := &messageBidirectionalStreamingMessageClient{stream} + return x, nil +} + +type Message_BidirectionalStreamingMessageClient interface { + Send(*StreamRequest) error + Recv() (*StreamResponse, error) + grpc.ClientStream +} + +type messageBidirectionalStreamingMessageClient struct { + grpc.ClientStream +} + +func (x *messageBidirectionalStreamingMessageClient) Send(m *StreamRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *messageBidirectionalStreamingMessageClient) Recv() (*StreamResponse, error) { + m := new(StreamResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MessageServer is the server API for Message service. +// All implementations must embed UnimplementedMessageServer +// for forward compatibility +type MessageServer interface { + BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error + mustEmbedUnimplementedMessageServer() +} + +// UnimplementedMessageServer must be embedded to have forward compatible implementations. +type UnimplementedMessageServer struct { +} + +func (UnimplementedMessageServer) BidirectionalStreamingMessage(Message_BidirectionalStreamingMessageServer) error { + return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingMessage not implemented") +} +func (UnimplementedMessageServer) mustEmbedUnimplementedMessageServer() {} + +// UnsafeMessageServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MessageServer will +// result in compilation errors. +type UnsafeMessageServer interface { + mustEmbedUnimplementedMessageServer() +} + +func RegisterMessageServer(s grpc.ServiceRegistrar, srv MessageServer) { + s.RegisterService(&Message_ServiceDesc, srv) +} + +func _Message_BidirectionalStreamingMessage_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MessageServer).BidirectionalStreamingMessage(&messageBidirectionalStreamingMessageServer{stream}) +} + +type Message_BidirectionalStreamingMessageServer interface { + Send(*StreamResponse) error + Recv() (*StreamRequest, error) + grpc.ServerStream +} + +type messageBidirectionalStreamingMessageServer struct { + grpc.ServerStream +} + +func (x *messageBidirectionalStreamingMessageServer) Send(m *StreamResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *messageBidirectionalStreamingMessageServer) Recv() (*StreamRequest, error) { + m := new(StreamRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Message_ServiceDesc is the grpc.ServiceDesc for Message service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Message_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "message.Message", + HandlerType: (*MessageServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "BidirectionalStreamingMessage", + Handler: _Message_BidirectionalStreamingMessage_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "grpc/proto/messager.proto", +} diff --git a/hrp/internal/grpc/proto/messager.proto b/hrp/internal/grpc/proto/messager.proto new file mode 100644 index 00000000..ef311339 --- /dev/null +++ b/hrp/internal/grpc/proto/messager.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package message; + +option go_package = "grpc/messager"; + +service Message { + rpc BidirectionalStreamingMessage(stream StreamRequest) returns (stream StreamResponse){}; +} + +message StreamRequest{ + string type = 1; + map data = 2; + string NodeID = 3; +} + +message StreamResponse{ + string type = 1; + map data = 2; + string NodeID = 3; + bytes tasks = 4; +} \ No newline at end of file diff --git a/hrp/parameters.go b/hrp/parameters.go index ae8e1b9f..af54afa0 100644 --- a/hrp/parameters.go +++ b/hrp/parameters.go @@ -178,6 +178,14 @@ func (iter *ParametersIterator) Next() map[string]interface{} { return selectedParameters } +func (iter *ParametersIterator) outParameters() map[string]interface{} { + res := map[string]interface{}{} + for key, params := range iter.data { + res[key] = params + } + return res +} + func genCartesianProduct(multiParameters []Parameters) Parameters { if len(multiParameters) == 0 { return nil diff --git a/hrp/server.go b/hrp/server.go new file mode 100644 index 00000000..dc51e8c4 --- /dev/null +++ b/hrp/server.go @@ -0,0 +1,299 @@ +package hrp + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "net/http" + + "github.com/httprunner/httprunner/v4/hrp/internal/boomer" + "github.com/httprunner/httprunner/v4/hrp/internal/json" +) + +const jsonContentType = "application/json; encoding=utf-8" + +func methods(h http.HandlerFunc, methods ...string) http.HandlerFunc { + methodMap := make(map[string]struct{}, len(methods)) + for _, m := range methods { + methodMap[m] = struct{}{} + // GET implies support for HEAD + if m == "GET" { + methodMap["HEAD"] = struct{}{} + } + } + return func(w http.ResponseWriter, r *http.Request) { + if _, ok := methodMap[r.Method]; !ok { + http.Error(w, fmt.Sprintf("method %s not allowed", r.Method), http.StatusMethodNotAllowed) + return + } + h.ServeHTTP(w, r) + } +} + +func parseBody(r *http.Request) (data map[string]interface{}, err error) { + if r.Body == nil { + return nil, nil + } + + // Always set resp.Data to the incoming request body, in case we don't know + // how to handle the content type + body, err := ioutil.ReadAll(r.Body) + if err != nil { + r.Body.Close() + return nil, err + } + err = json.Unmarshal(body, data) + if err != nil { + return nil, err + } + return data, nil +} + +func writeResponse(w http.ResponseWriter, status int, contentType string, body []byte) { + w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(body))) + w.WriteHeader(status) + w.Write(body) +} + +func writeJSON(w http.ResponseWriter, body []byte, status int) { + writeResponse(w, status, jsonContentType, body) +} + +type StartRequestBody struct { + Worker string `json:"worker"` // all + SpawnCount int64 `json:"spawn_count"` + SpawnRate int64 `json:"spawn_rate"` + TestCasePath string `json:"testcase_path"` +} + +type ServerCode int + +// server response code +const ( + Success ServerCode = iota + ParamsError + ServerError + StopError +) + +// ServerStatus stores http response code and message +type ServerStatus struct { + Code ServerCode `json:"code"` + Message string `json:"message"` +} + +var EnumAPIResponseSuccess = ServerStatus{ + Code: Success, + Message: "success", +} + +func EnumAPIResponseParamError(errMsg string) ServerStatus { + return ServerStatus{ + Code: ParamsError, + Message: errMsg, + } +} + +func EnumAPIResponseServerError(errMsg string) ServerStatus { + return ServerStatus{ + Code: ServerError, + Message: errMsg, + } +} + +func EnumAPIResponseStopError(errMsg string) ServerStatus { + return ServerStatus{ + Code: StopError, + Message: errMsg, + } +} + +func CustomAPIResponse(errCode ServerCode, errMsg string) ServerStatus { + return ServerStatus{ + Code: errCode, + Message: errMsg, + } +} + +type RebalanceRequestBody struct { + Worker string `json:"worker"` + SpawnCount int64 `json:"spawn_count"` + SpawnRate int64 `json:"spawn_rate"` + TestCasePath string `json:"testcase_path"` +} + +type StopRequestBody struct { + Worker string `json:"worker"` +} + +type QuitRequestBody struct { + Worker string `json:"worker"` +} + +type CommonResponseBody struct { + ServerStatus +} + +type APIGetWorkersRequestBody struct { + ID string `json:"id"` + State int32 `json:"state"` + CPUUsage float64 `json:"cpu_usage"` + MemoryUsage float64 `json:"memory_usage"` +} + +type APIGetWorkersResponseBody struct { + ServerStatus + Data []boomer.WorkerNode `json:"data"` +} + +type apiHandler struct { + boomer *HRPBoomer +} + +func (b *HRPBoomer) NewAPIHandler() *apiHandler { + return &apiHandler{boomer: b} +} + +// Index renders an HTML index page +func (api *apiHandler) Index(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + w.Header().Set("Content-Security-Policy", "default-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' camo.githubusercontent.com") + fmt.Fprintf(w, "Welcome to httprunner page!") +} + +func (api *apiHandler) Start(w http.ResponseWriter, r *http.Request) { + data := map[string]interface{}{} + args := r.URL.Query() + for k, vs := range args { + for _, v := range vs { + data[k] = v + } + } + var resp *CommonResponseBody + err := api.boomer.Start(data) + if err != nil { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseServerError(err.Error()), + } + } else { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + } + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) +} + +func (api *apiHandler) Stop(w http.ResponseWriter, r *http.Request) { + data := map[string]interface{}{} + args := r.URL.Query() + for k, vs := range args { + for _, v := range vs { + data[k] = v + } + } + + api.boomer.Stop() + resp := &CommonResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) +} + +func (api *apiHandler) Quit(w http.ResponseWriter, r *http.Request) { + data := map[string]interface{}{} + args := r.URL.Query() + for k, vs := range args { + for _, v := range vs { + data[k] = v + } + } + + resp := &CommonResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) + api.boomer.Quit() +} + +func (api *apiHandler) ReBalance(w http.ResponseWriter, r *http.Request) { + data := map[string]interface{}{} + args := r.URL.Query() + for k, vs := range args { + for _, v := range vs { + data[k] = v + } + } + var resp *CommonResponseBody + err := api.boomer.ReBalance(data) + if err != nil { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseParamError(err.Error()), + } + } else { + resp = &CommonResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + } + } + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) +} + +func (api *apiHandler) GetWorkersInfo(w http.ResponseWriter, r *http.Request) { + resp := &APIGetWorkersResponseBody{ + ServerStatus: EnumAPIResponseSuccess, + Data: api.boomer.GetWorkersInfo(), + } + + body, _ := json.Marshal(resp) + writeJSON(w, body, http.StatusOK) +} + +func (api *apiHandler) Handler() http.Handler { + mux := http.NewServeMux() + + mux.HandleFunc("/", methods(api.Index, "GET")) + mux.HandleFunc("/start", methods(api.Start, "GET")) + mux.HandleFunc("/stop", methods(api.Stop, "GET")) + mux.HandleFunc("/quit", methods(api.Quit, "GET")) + mux.HandleFunc("/rebalance", methods(api.ReBalance, "GET")) + mux.HandleFunc("/workers", methods(api.GetWorkersInfo, "GET")) + + return mux +} + +func (apiHandler) ServeHTTP(http.ResponseWriter, *http.Request) {} + +func (b *HRPBoomer) StartServer() { + h := b.NewAPIHandler() + mux := h.Handler() + + server := &http.Server{ + Addr: ":9771", + Handler: mux, + } + + go func() { + <-b.GetCloseChan() + if err := server.Shutdown(context.Background()); err != nil { + log.Fatal("shutdown server:", err) + } + }() + + log.Println("Starting HTTP server...") + err := server.ListenAndServe() + if err != nil { + if err == http.ErrServerClosed { + log.Print("server closed under request") + } else { + log.Fatal("server closed unexpected") + } + } +} diff --git a/hrp/step_rendezvous.go b/hrp/step_rendezvous.go index 77291a36..edd9cf84 100644 --- a/hrp/step_rendezvous.go +++ b/hrp/step_rendezvous.go @@ -155,9 +155,9 @@ func (r *Rendezvous) setReleased() { } func initRendezvous(testcase *TestCase, total int64) []*Rendezvous { - tCase := testcase.ToTCase() var rendezvousList []*Rendezvous - for _, step := range tCase.TestSteps { + for _, s := range testcase.TestSteps { + step := s.Struct() if step.Rendezvous == nil { continue } @@ -188,16 +188,20 @@ func initRendezvous(testcase *TestCase, total int64) []*Rendezvous { return rendezvousList } -func waitRendezvous(rendezvousList []*Rendezvous) { +func (r *Rendezvous) updateRendezvousNumber(number int64) { + atomic.StoreInt64(&r.Number, int64(float32(number)*r.Percent)) +} + +func waitRendezvous(rendezvousList []*Rendezvous, b *HRPBoomer) { if rendezvousList != nil { lastRendezvous := rendezvousList[len(rendezvousList)-1] for _, rendezvous := range rendezvousList { - go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous) + go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous, b) } } } -func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous) { +func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous, b *HRPBoomer) { for { // cycle start: block current checking until current rendezvous activated <-rendezvous.activateChan @@ -241,6 +245,8 @@ func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, if rendezvous == lastRendezvous { for _, r := range rendezvousList { r.reset() + // dynamic adjustment based on the number of concurrent users + r.updateRendezvousNumber(int64(b.GetSpawnCount())) } } else { <-lastRendezvous.releaseChan diff --git a/hrp/testcase.go b/hrp/testcase.go index afe03713..ffedd829 100644 --- a/hrp/testcase.go +++ b/hrp/testcase.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog/log" "github.com/httprunner/httprunner/v4/hrp/internal/builtin" + "github.com/mitchellh/mapstructure" ) // ITestCase represents interface for testcases, @@ -40,6 +41,11 @@ func (tc *TestCase) ToTCase() *TCase { Config: tc.Config, } for _, step := range tc.TestSteps { + if step.Type() == stepTypeTestCase { + if testcase, ok := step.Struct().TestCase.(*TestCase); ok { + step.Struct().TestCase = testcase.ToTCase() + } + } tCase.TestSteps = append(tCase.TestSteps, step.Struct()) } return tCase @@ -106,13 +112,17 @@ func (tc *TCase) ToTestCase(casePath string) (*TestCase, error) { tc.Config = &TConfig{Name: "please input testcase name"} } tc.Config.Path = casePath + return tc.toTestCase() +} +// toTestCase converts *TCase to *TestCase +func (tc *TCase) toTestCase() (*TestCase, error) { testCase := &TestCase{ Config: tc.Config, } // locate project root dir by plugin path - projectRootDir, err := GetProjectRootDirPath(casePath) + projectRootDir, err := GetProjectRootDirPath(tc.Config.Path) if err != nil { return nil, errors.Wrap(err, "failed to get project root dir") } @@ -139,40 +149,71 @@ func (tc *TCase) ToTestCase(casePath string) (*TestCase, error) { for _, step := range tc.TestSteps { if step.API != nil { apiPath, ok := step.API.(string) + if ok { + path := filepath.Join(projectRootDir, apiPath) + if !builtin.IsFilePathExists(path) { + return nil, errors.New("referenced api file not found: " + path) + } + + refAPI := APIPath(path) + apiContent, err := refAPI.ToAPI() + if err != nil { + return nil, err + } + step.API = apiContent + } else { + apiMap, ok := step.API.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("referenced api should be map or path(string), got %v", step.API) + } + api := &API{} + err = mapstructure.Decode(apiMap, api) + if err != nil { + return nil, err + } + step.API = api + } + _, ok = step.API.(*API) if !ok { - return nil, fmt.Errorf("referenced api path should be string, got %v", step.API) + return nil, fmt.Errorf("failed to handle referenced API, got %v", step.TestCase) } - path := filepath.Join(projectRootDir, apiPath) - if !builtin.IsFilePathExists(path) { - return nil, errors.New("referenced api file not found: " + path) - } - - refAPI := APIPath(path) - apiContent, err := refAPI.ToAPI() - if err != nil { - return nil, err - } - step.API = apiContent - testCase.TestSteps = append(testCase.TestSteps, &StepAPIWithOptionalArgs{ step: step, }) } else if step.TestCase != nil { casePath, ok := step.TestCase.(string) - if !ok { - return nil, fmt.Errorf("referenced testcase path should be string, got %v", step.TestCase) - } - path := filepath.Join(projectRootDir, casePath) - if !builtin.IsFilePathExists(path) { - return nil, errors.New("referenced testcase file not found: " + path) - } + if ok { + path := filepath.Join(projectRootDir, casePath) + if !builtin.IsFilePathExists(path) { + return nil, errors.New("referenced testcase file not found: " + path) + } - refTestCase := TestCasePath(path) - tc, err := refTestCase.ToTestCase() - if err != nil { - return nil, err + refTestCase := TestCasePath(path) + tc, err := refTestCase.ToTestCase() + if err != nil { + return nil, err + } + step.TestCase = tc + } else { + testCaseMap, ok := step.TestCase.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("referenced testcase should be map or path(string), got %v", step.TestCase) + } + tCase := &TCase{} + err = mapstructure.Decode(testCaseMap, tCase) + if err != nil { + return nil, err + } + tc, err := tCase.toTestCase() + if err != nil { + return nil, err + } + step.TestCase = tc + } + _, ok = step.TestCase.(*TestCase) + if !ok { + return nil, fmt.Errorf("failed to handle referenced testcase, got %v", step.TestCase) } - step.TestCase = tc testCase.TestSteps = append(testCase.TestSteps, &StepTestCaseWithOptionalArgs{ step: step, })