diff --git a/boomer.go b/boomer.go index 8ec0ff67..292178e6 100644 --- a/boomer.go +++ b/boomer.go @@ -56,8 +56,10 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) { if err != nil { panic(err) } + rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount())) task := b.convertBoomerTask(testcase) taskSlice = append(taskSlice, task) + waitRendezvous(rendezvousList) } b.Boomer.Run(taskSlice...) } @@ -147,6 +149,10 @@ func (b *HRPBoomer) convertBoomerTask(testcase *TestCase) *boomer.Task { } else if stepData.stepType == stepTypeRendezvous { // rendezvous // TODO: implement rendezvous in boomer + rendezvous := step.ToStruct().Rendezvous + if !rendezvous.isSpawnDone() && b.IsSpawnDone() { + rendezvous.setSpawnDone() + } } else { // request or testcase step b.RecordSuccess(step.Type(), step.Name(), stepData.elapsed, stepData.contentSize) diff --git a/examples/rendezvous_test.go b/examples/rendezvous_test.go new file mode 100644 index 00000000..c9a1c365 --- /dev/null +++ b/examples/rendezvous_test.go @@ -0,0 +1,65 @@ +package examples + +import ( + "testing" + + "github.com/httprunner/hrp" +) + +var rendezvousTestcase = &hrp.TestCase{ + Config: hrp.NewConfig("run request with functions"). + SetBaseURL("https://postman-echo.com"). + WithVariables(map[string]interface{}{ + "n": 5, + "a": 12.3, + "b": 3.45, + }), + TestSteps: []hrp.IStep{ + hrp.NewStep("waiting for all users in the beginning"). + Rendezvous("rendezvous0"), + hrp.NewStep("rendezvous before get"). + Rendezvous("rendezvous1"). + WithUserNumber(50). + WithTimeout(3000), + hrp.NewStep("get with params"). + GET("/get"). + WithParams(map[string]interface{}{"foo1": "foo1", "foo2": "foo2"}). + WithHeaders(map[string]string{"User-Agent": "HttpRunnerPlus"}). + Extract(). + WithJmesPath("body.args.foo1", "varFoo1"). + Validate(). + AssertEqual("status_code", 200, "check status code"), + hrp.NewStep("rendezvous before post"). + Rendezvous("rendezvous2"). + WithUserNumber(20). + WithTimeout(2000), + hrp.NewStep("post json data with functions"). + POST("/post"). + WithHeaders(map[string]string{"User-Agent": "HttpRunnerPlus"}). + WithBody(map[string]interface{}{"foo1": "foo1", "foo2": "foo2"}). + Validate(). + AssertEqual("status_code", 200, "check status code"). + AssertLengthEqual("body.json.foo1", 4, "check args foo1"). + AssertEqual("body.json.foo2", "foo2", "check args foo2"), + hrp.NewStep("waiting for all users in the end"). + Rendezvous("rendezvous3"), + }, +} + +func TestRendezvous(t *testing.T) { + err := hrp.NewRunner(t).Run(rendezvousTestcase) + if err != nil { + t.Fatalf("run testcase error: %v", err) + } +} + +func TestRendezvousDump2JSON(t *testing.T) { + tCase, err := rendezvousTestcase.ToTCase() + if err != nil { + t.Fatalf("ToTCase error: %v", err) + } + err = tCase.Dump2JSON("rendezvous_test.json") + if err != nil { + t.Fatalf("dump to json error: %v", err) + } +} diff --git a/examples/rendezvous_test.json b/examples/rendezvous_test.json new file mode 100644 index 00000000..2a53e776 --- /dev/null +++ b/examples/rendezvous_test.json @@ -0,0 +1,100 @@ +{ + "config": { + "name": "run request with functions", + "base_url": "https://postman-echo.com", + "variables": { + "a": 12.3, + "b": 3.45, + "n": 5 + } + }, + "teststeps": [ + { + "name": "waiting for all users in the beginning", + "rendezvous": { + "name": "rendezvous0" + } + }, + { + "name": "rendezvous before get", + "rendezvous": { + "name": "rendezvous1", + "number": 50, + "timeout": 3000 + } + }, + { + "name": "get with params", + "request": { + "method": "GET", + "url": "/get", + "params": { + "foo1": "foo1", + "foo2": "foo2" + }, + "headers": { + "User-Agent": "HttpRunnerPlus" + } + }, + "extract": { + "varFoo1": "body.args.foo1" + }, + "validate": [ + { + "check": "status_code", + "assert": "equals", + "expect": 200, + "msg": "check status code" + } + ] + }, + { + "name": "rendezvous before post", + "rendezvous": { + "name": "rendezvous2", + "number": 20, + "timeout": 2000 + } + }, + { + "name": "post json data with functions", + "request": { + "method": "POST", + "url": "/post", + "headers": { + "User-Agent": "HttpRunnerPlus" + }, + "body": { + "foo1": "foo1", + "foo2": "foo2" + } + }, + "validate": [ + { + "check": "status_code", + "assert": "equals", + "expect": 200, + "msg": "check status code" + }, + { + "check": "body.json.foo1", + "assert": "length_equals", + "expect": 4, + "msg": "check args foo1" + }, + { + "check": "body.json.foo2", + "assert": "equals", + "expect": "foo2", + "msg": "check args foo2" + } + ] + }, + { + "name": "waiting for all users in the end", + "rendezvous": { + "name": "rendezvous3" + } + } + ] +} \ No newline at end of file diff --git a/internal/boomer/boomer.go b/internal/boomer/boomer.go index 0b22f6cb..563ed6f6 100644 --- a/internal/boomer/boomer.go +++ b/internal/boomer/boomer.go @@ -127,3 +127,11 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc func (b *Boomer) Quit() { b.localRunner.stop() } + +func (b *Boomer) IsSpawnDone() bool { + return b.localRunner.isSpawnDone +} + +func (b *Boomer) GetSpawnCount() int { + return b.localRunner.spawnCount +} diff --git a/internal/boomer/runner.go b/internal/boomer/runner.go index 9a7da130..90ed9615 100644 --- a/internal/boomer/runner.go +++ b/internal/boomer/runner.go @@ -63,6 +63,7 @@ type runner struct { spawnCount int // target clients to spawn spawnRate float64 loop *Loop // specify running cycles + isSpawnDone bool outputs []Output } @@ -193,6 +194,7 @@ func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan } } + r.isSpawnDone = true if spawnCompleteFunc != nil { spawnCompleteFunc() } diff --git a/models.go b/models.go index e5af8c05..c4b3f249 100644 --- a/models.go +++ b/models.go @@ -33,7 +33,7 @@ type TConfig struct { type TParamsConfig struct { Strategy interface{} `json:"strategy,omitempty" yaml:"strategy,omitempty"` Iteration int `json:"iteration,omitempty" yaml:"iteration,omitempty"` - Iterators []*Iterator `json:"parameterIterator,omitempty" yaml:"parameterIterator,omitempty"` //保存参数的迭代器 + Iterators []*Iterator `json:"parameterIterator,omitempty" yaml:"parameterIterator,omitempty"` // 保存参数的迭代器 } const ( @@ -142,11 +142,26 @@ type Transaction struct { Name string `json:"name" yaml:"name"` Type transactionType `json:"type" yaml:"type"` } + +const ( + defaultRendezvousTimeout int64 = 5000 + defaultRendezvousPercent float32 = 1.0 +) + type Rendezvous struct { - Name string `json:"name" yaml:"name"` // required - Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%) - Number int64 `json:"number,omitempty" yaml:"number,omitempty"` - Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds + Name string `json:"name" yaml:"name"` // required + Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%) + Number int64 `json:"number,omitempty" yaml:"number,omitempty"` + Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds + cnt int64 + releasedFlag uint32 + spawnDoneFlag uint32 + wg sync.WaitGroup + timerResetChan chan struct{} + activateChan chan struct{} + releaseChan chan struct{} + once *sync.Once + lock sync.Mutex } // TCase represents testcase data structure. diff --git a/runner.go b/runner.go index 0c1e01eb..dc05483c 100644 --- a/runner.go +++ b/runner.go @@ -13,6 +13,8 @@ import ( "os/signal" "strconv" "strings" + "sync" + "sync/atomic" "syscall" "testing" "time" @@ -334,21 +336,186 @@ func (r *caseRunner) runStepTransaction(transaction *Transaction) (stepResult *s return stepResult, nil } -func (r *caseRunner) runStepRendezvous(rend *Rendezvous) (stepResult *stepData, err error) { +func (r *caseRunner) runStepRendezvous(rendezvous *Rendezvous) (stepResult *stepData, err error) { log.Info(). - Str("name", rend.Name). - Float32("percent", rend.Percent). - Int64("number", rend.Number). - Int64("timeout", rend.Timeout). + Str("name", rendezvous.Name). + Float32("percent", rendezvous.Percent). + Int64("number", rendezvous.Number). + Int64("timeout", rendezvous.Timeout). Msg("rendezvous") stepResult = &stepData{ - name: rend.Name, + name: rendezvous.Name, stepType: stepTypeRendezvous, success: true, } + + // pass current rendezvous if already released, activate rendezvous sequentially after spawn done + if rendezvous.isReleased() || !r.isPreRendezvousAllReleased(rendezvous) || !rendezvous.isSpawnDone() { + return stepResult, nil + } + + // activate the rendezvous only once during each cycle + rendezvous.once.Do(func() { + close(rendezvous.activateChan) + }) + + // check current cnt using double check lock before updating to avoid negative WaitGroup counter + if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number { + rendezvous.lock.Lock() + if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number { + atomic.AddInt64(&rendezvous.cnt, 1) + rendezvous.wg.Done() + rendezvous.timerResetChan <- struct{}{} + } + rendezvous.lock.Unlock() + } + + // block until current rendezvous released + <-rendezvous.releaseChan return stepResult, nil } +func (r *caseRunner) isPreRendezvousAllReleased(rendezvous *Rendezvous) bool { + tCase, _ := r.ToTCase() + for _, step := range tCase.TestSteps { + preRendezvous := step.Rendezvous + if preRendezvous == nil { + continue + } + // meet current rendezvous, all previous rendezvous released, return true + if preRendezvous == rendezvous { + return true + } + if !preRendezvous.isReleased() { + return false + } + } + return true +} + +func (r *Rendezvous) reset() { + r.cnt = 0 + r.releasedFlag = 0 + r.wg.Add(int(r.Number)) + // timerResetChan channel will not be closed, thus init only once + if r.timerResetChan == nil { + r.timerResetChan = make(chan struct{}) + } + r.activateChan = make(chan struct{}) + r.releaseChan = make(chan struct{}) + r.once = new(sync.Once) +} + +func (r *Rendezvous) isSpawnDone() bool { + return atomic.LoadUint32(&r.spawnDoneFlag) == 1 +} + +func (r *Rendezvous) setSpawnDone() { + atomic.StoreUint32(&r.spawnDoneFlag, 1) +} + +func (r *Rendezvous) isReleased() bool { + return atomic.LoadUint32(&r.releasedFlag) == 1 +} + +func (r *Rendezvous) setReleased() { + atomic.StoreUint32(&r.releasedFlag, 1) +} + +func initRendezvous(testcase *TestCase, total int64) []*Rendezvous { + tCase, _ := testcase.ToTCase() + var rendezvousList []*Rendezvous + for _, step := range tCase.TestSteps { + if step.Rendezvous == nil { + continue + } + rendezvous := step.Rendezvous + + // either number or percent should be correctly put, otherwise set to default (total) + if rendezvous.Number == 0 && rendezvous.Percent > 0 && rendezvous.Percent <= defaultRendezvousPercent { + rendezvous.Number = int64(rendezvous.Percent * float32(total)) + } else if rendezvous.Number > 0 && rendezvous.Number <= total && rendezvous.Percent == 0 { + rendezvous.Percent = float32(rendezvous.Number) / float32(total) + } else { + log.Warn(). + Str("name", rendezvous.Name). + Int64("default number", total). + Float32("default percent", defaultRendezvousPercent). + Msg("rendezvous parameter not defined or error, set to default value") + rendezvous.Number = total + rendezvous.Percent = defaultRendezvousPercent + } + + if rendezvous.Timeout <= 0 { + rendezvous.Timeout = defaultRendezvousTimeout + } + + rendezvous.reset() + rendezvousList = append(rendezvousList, rendezvous) + } + return rendezvousList +} + +func waitRendezvous(rendezvousList []*Rendezvous) { + if rendezvousList != nil { + lastRendezvous := rendezvousList[len(rendezvousList)-1] + for _, rendezvous := range rendezvousList { + go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous) + } + } +} + +func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous) { + for { + // cycle start: block current checking until current rendezvous activated + <-rendezvous.activateChan + stop := make(chan struct{}) + timeout := time.Duration(rendezvous.Timeout) * time.Millisecond + timer := time.NewTimer(timeout) + go func() { + defer close(stop) + rendezvous.wg.Wait() + }() + for !rendezvous.isReleased() { + select { + case <-rendezvous.timerResetChan: + timer.Reset(timeout) + case <-stop: + rendezvous.setReleased() + close(rendezvous.releaseChan) + log.Info(). + Str("name", rendezvous.Name). + Float32("percent", rendezvous.Percent). + Int64("number", rendezvous.Number). + Int64("timeout(ms)", rendezvous.Timeout). + Int64("cnt", rendezvous.cnt). + Str("reason", "rendezvous release condition satisfied"). + Msg("rendezvous released") + case <-timer.C: + rendezvous.setReleased() + close(rendezvous.releaseChan) + log.Info(). + Str("name", rendezvous.Name). + Float32("percent", rendezvous.Percent). + Int64("number", rendezvous.Number). + Int64("timeout(ms)", rendezvous.Timeout). + Int64("cnt", rendezvous.cnt). + Str("reason", "time's up"). + Msg("rendezvous released") + } + } + // cycle end: reset all previous rendezvous after last rendezvous released + // otherwise, block current checker until the last rendezvous end + if rendezvous == lastRendezvous { + for _, r := range rendezvousList { + r.reset() + } + } else { + <-lastRendezvous.releaseChan + } + } +} + func (r *caseRunner) runStepRequest(step *TStep) (stepResult *stepData, err error) { stepResult = &stepData{ name: step.Name, diff --git a/runner_test.go b/runner_test.go index 2fb64fcf..8996a56c 100644 --- a/runner_test.go +++ b/runner_test.go @@ -1,6 +1,7 @@ package hrp import ( + "math" "os" "os/exec" "testing" @@ -54,3 +55,68 @@ func TestHttpRunner(t *testing.T) { t.Fatalf("run testcase error: %v", err) } } + +func TestInitRendezvous(t *testing.T) { + rendezvousBonudaryTestcase := &TestCase{ + Config: NewConfig("run request with functions"). + SetBaseURL("https://postman-echo.com"). + WithVariables(map[string]interface{}{ + "n": 5, + "a": 12.3, + "b": 3.45, + }), + TestSteps: []IStep{ + NewStep("test negative number"). + Rendezvous("test negative number"). + WithUserNumber(-1), + NewStep("test overflow number"). + Rendezvous("test overflow number"). + WithUserNumber(1000000), + NewStep("test negative percent"). + Rendezvous("test very low percent"). + WithUserPercent(-0.5), + NewStep("test very low percent"). + Rendezvous("test very low percent"). + WithUserPercent(0.00001), + NewStep("test overflow percent"). + Rendezvous("test overflow percent"). + WithUserPercent(1.5), + NewStep("test conflict params"). + Rendezvous("test conflict params"). + WithUserNumber(1). + WithUserPercent(0.123), + NewStep("test negative timeout"). + Rendezvous("test negative timeout"). + WithTimeout(-1000), + }, + } + + type rendezvousParam struct { + number int64 + percent float32 + timeout int64 + } + expectedRendezvousParams := []rendezvousParam{ + {number: 100, percent: 1, timeout: 5000}, + {number: 100, percent: 1, timeout: 5000}, + {number: 100, percent: 1, timeout: 5000}, + {number: 0, percent: 0.00001, timeout: 5000}, + {number: 100, percent: 1, timeout: 5000}, + {number: 100, percent: 1, timeout: 5000}, + {number: 100, percent: 1, timeout: 5000}, + } + + rendezvousList := initRendezvous(rendezvousBonudaryTestcase, 100) + + for i, r := range rendezvousList { + if r.Number != expectedRendezvousParams[i].number { + t.Fatalf("run rendezvous %v error: expected number: %v, real number: %v", r.Name, expectedRendezvousParams[i].number, r.Number) + } + if math.Abs(float64(r.Percent-expectedRendezvousParams[i].percent)) > 0.001 { + t.Fatalf("run rendezvous %v error: expected percent: %v, real percent: %v", r.Name, expectedRendezvousParams[i].percent, r.Percent) + } + if r.Timeout != expectedRendezvousParams[i].timeout { + t.Fatalf("run rendezvous %v error: expected timeout: %v, real timeout: %v", r.Name, expectedRendezvousParams[i].timeout, r.Timeout) + } + } +} diff --git a/step.go b/step.go index 12fe3217..2b248339 100644 --- a/step.go +++ b/step.go @@ -345,3 +345,31 @@ func (s *StepRendezvous) Type() string { func (s *StepRendezvous) ToStruct() *TStep { return s.step } + +// Rendezvous creates a new rendezvous +func (s *StepRequest) Rendezvous(name string) *StepRendezvous { + s.step.Rendezvous = &Rendezvous{ + Name: name, + } + return &StepRendezvous{ + step: s.step, + } +} + +// WithUserNumber sets the user number needed to release the current rendezvous +func (s *StepRendezvous) WithUserNumber(number int64) *StepRendezvous { + s.step.Rendezvous.Number = number + return s +} + +// WithUserPercent sets the user percent needed to release the current rendezvous +func (s *StepRendezvous) WithUserPercent(percent float32) *StepRendezvous { + s.step.Rendezvous.Percent = percent + return s +} + +// WithTimeout sets the timeout of duration between each user arriving at the current rendezvous +func (s *StepRendezvous) WithTimeout(timeout int64) *StepRendezvous { + s.step.Rendezvous.Timeout = timeout + return s +}