feat: support rendezvous after spawn done

Change-Id: I4b07a88b61da4dc1863b189db9eb831ffb14130a
This commit is contained in:
buyuxiang
2022-01-25 14:25:14 +08:00
parent 639480a6c5
commit d7ff433818
8 changed files with 475 additions and 11 deletions

View File

@@ -56,8 +56,10 @@ func (b *HRPBoomer) Run(testcases ...ITestCase) {
if err != nil {
panic(err)
}
rendezvousList := initRendezvous(testcase, int64(b.GetSpawnCount()))
task := b.convertBoomerTask(testcase)
taskSlice = append(taskSlice, task)
waitRendezvous(rendezvousList)
}
b.Boomer.Run(taskSlice...)
}
@@ -147,6 +149,10 @@ func (b *HRPBoomer) convertBoomerTask(testcase *TestCase) *boomer.Task {
} else if stepData.stepType == stepTypeRendezvous {
// rendezvous
// TODO: implement rendezvous in boomer
rendezvous := step.ToStruct().Rendezvous
if !rendezvous.isSpawnDone() && b.IsSpawnDone() {
rendezvous.setSpawnDone()
}
} else {
// request or testcase step
b.RecordSuccess(step.Type(), step.Name(), stepData.elapsed, stepData.contentSize)

View File

@@ -0,0 +1,91 @@
package examples
import (
"testing"
"github.com/httprunner/hrp"
)
var rendezvousTestcase = &hrp.TestCase{
Config: hrp.NewConfig("run request with functions").
SetBaseURL("https://postman-echo.com").
WithVariables(map[string]interface{}{
"n": 5,
"a": 12.3,
"b": 3.45,
}),
TestSteps: []hrp.IStep{
// rendezvous boundary test
hrp.NewStep("test negative number").
Rendezvous("test negative number").
WithUserNumber(-1),
hrp.NewStep("test overflow number").
Rendezvous("test overflow number").
WithUserNumber(1000000),
hrp.NewStep("test negative percent").
Rendezvous("test very low percent").
WithUserPercent(-0.5),
hrp.NewStep("test very low percent").
Rendezvous("test very low percent").
WithUserPercent(0.00001),
hrp.NewStep("test overflow percent").
Rendezvous("test overflow percent").
WithUserPercent(1.5),
hrp.NewStep("test conflict params").
Rendezvous("test conflict params").
WithUserNumber(1).
WithUserPercent(0.123),
hrp.NewStep("test negative timeout").
Rendezvous("test negative timeout").
WithTimeout(-1000),
// rendezvous normal test
hrp.NewStep("waiting for all users in the beginning").
Rendezvous("rendezvous0").
WithUserNumber(10).
WithTimeout(3000),
hrp.NewStep("rendezvous before get").
Rendezvous("rendezvous1").
WithUserNumber(10).
WithTimeout(3000),
hrp.NewStep("get with params").
GET("/get").
WithParams(map[string]interface{}{"foo1": "foo1", "foo2": "foo2"}).
WithHeaders(map[string]string{"User-Agent": "HttpRunnerPlus"}).
Extract().
WithJmesPath("body.args.foo1", "varFoo1").
Validate().
AssertEqual("status_code", 200, "check status code"),
hrp.NewStep("rendezvous before post").
Rendezvous("rendezvous2").
WithUserNumber(20).
WithTimeout(2000),
hrp.NewStep("post json data with functions").
POST("/post").
WithHeaders(map[string]string{"User-Agent": "HttpRunnerPlus"}).
WithBody(map[string]interface{}{"foo1": "foo1", "foo2": "foo2"}).
Validate().
AssertEqual("status_code", 200, "check status code").
AssertLengthEqual("body.json.foo1", 4, "check args foo1").
AssertEqual("body.json.foo2", "foo2", "check args foo2"),
hrp.NewStep("waiting for all users in the end").
Rendezvous("rendezvous3"),
},
}
func TestRendezvous(t *testing.T) {
err := hrp.NewRunner(t).Run(rendezvousTestcase)
if err != nil {
t.Fatalf("run testcase error: %v", err)
}
}
func TestRendezvousDump2JSON(t *testing.T) {
tCase, err := rendezvousTestcase.ToTCase()
if err != nil {
t.Fatalf("ToTCase error: %v", err)
}
err = tCase.Dump2JSON("rendezvous_test.json")
if err != nil {
t.Fatalf("dump to json error: %v", err)
}
}

View File

@@ -0,0 +1,152 @@
{
"config": {
"name": "run request with functions",
"base_url": "https://postman-echo.com",
"variables": {
"a": 12.3,
"b": 3.45,
"n": 5
}
},
"teststeps": [
{
"name": "test negative number",
"rendezvous": {
"name": "test negative number",
"number": -1
}
},
{
"name": "test overflow number",
"rendezvous": {
"name": "test overflow number",
"number": 1000000
}
},
{
"name": "test negative percent",
"rendezvous": {
"name": "test very low percent",
"percent": -0.5
}
},
{
"name": "test very low percent",
"rendezvous": {
"name": "test very low percent",
"percent": 0.00001
}
},
{
"name": "test overflow percent",
"rendezvous": {
"name": "test overflow percent",
"percent": 1.5
}
},
{
"name": "test conflict params",
"rendezvous": {
"name": "test conflict params",
"percent": 0.123,
"number": 1
}
},
{
"name": "test negative timeout",
"rendezvous": {
"name": "test negative timeout",
"timeout": -1000
}
},
{
"name": "waiting for all users in the beginning",
"rendezvous": {
"name": "rendezvous0",
"number": 10,
"timeout": 3000
}
},
{
"name": "rendezvous before get",
"rendezvous": {
"name": "rendezvous1",
"number": 10,
"timeout": 3000
}
},
{
"name": "get with params",
"request": {
"method": "GET",
"url": "/get",
"params": {
"foo1": "foo1",
"foo2": "foo2"
},
"headers": {
"User-Agent": "HttpRunnerPlus"
}
},
"extract": {
"varFoo1": "body.args.foo1"
},
"validate": [
{
"check": "status_code",
"assert": "equals",
"expect": 200,
"msg": "check status code"
}
]
},
{
"name": "rendezvous before post",
"rendezvous": {
"name": "rendezvous2",
"number": 20,
"timeout": 2000
}
},
{
"name": "post json data with functions",
"request": {
"method": "POST",
"url": "/post",
"headers": {
"User-Agent": "HttpRunnerPlus"
},
"body": {
"foo1": "foo1",
"foo2": "foo2"
}
},
"validate": [
{
"check": "status_code",
"assert": "equals",
"expect": 200,
"msg": "check status code"
},
{
"check": "body.json.foo1",
"assert": "length_equals",
"expect": 4,
"msg": "check args foo1"
},
{
"check": "body.json.foo2",
"assert": "equals",
"expect": "foo2",
"msg": "check args foo2"
}
]
},
{
"name": "waiting for all users in the end",
"rendezvous": {
"name": "rendezvous3"
}
}
]
}

View File

@@ -127,3 +127,11 @@ func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exc
func (b *Boomer) Quit() {
b.localRunner.stop()
}
func (b *Boomer) IsSpawnDone() bool {
return b.localRunner.isSpawnDone
}
func (b *Boomer) GetSpawnCount() int {
return b.localRunner.spawnCount
}

View File

@@ -63,6 +63,7 @@ type runner struct {
spawnCount int // target clients to spawn
spawnRate float64
loop *Loop // specify running cycles
isSpawnDone bool
outputs []Output
}
@@ -193,6 +194,7 @@ func (r *localRunner) spawnWorkers(spawnCount int, spawnRate float64, quit chan
}
}
r.isSpawnDone = true
if spawnCompleteFunc != nil {
spawnCompleteFunc()
}

View File

@@ -33,7 +33,7 @@ type TConfig struct {
type TParamsConfig struct {
Strategy interface{} `json:"strategy,omitempty" yaml:"strategy,omitempty"`
Iteration int `json:"iteration,omitempty" yaml:"iteration,omitempty"`
Iterators []*Iterator `json:"parameterIterator,omitempty" yaml:"parameterIterator,omitempty"` //保存参数的迭代器
Iterators []*Iterator `json:"parameterIterator,omitempty" yaml:"parameterIterator,omitempty"` // 保存参数的迭代器
}
const (
@@ -142,11 +142,26 @@ type Transaction struct {
Name string `json:"name" yaml:"name"`
Type transactionType `json:"type" yaml:"type"`
}
const (
defaultRendezvousTimeout int64 = 5000
defaultRendezvousPercent float32 = 1.0
)
type Rendezvous struct {
Name string `json:"name" yaml:"name"` // required
Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%)
Number int64 `json:"number,omitempty" yaml:"number,omitempty"`
Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds
Name string `json:"name" yaml:"name"` // required
Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%)
Number int64 `json:"number,omitempty" yaml:"number,omitempty"`
Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds
cnt int64
releasedFlag uint32
spawnDoneFlag uint32
wg sync.WaitGroup
msg chan struct{}
activateChan chan struct{}
releaseChan chan struct{}
once *sync.Once
lock sync.Mutex
}
// TCase represents testcase data structure.

174
runner.go
View File

@@ -13,6 +13,8 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
@@ -334,21 +336,181 @@ func (r *caseRunner) runStepTransaction(transaction *Transaction) (stepResult *s
return stepResult, nil
}
func (r *caseRunner) runStepRendezvous(rend *Rendezvous) (stepResult *stepData, err error) {
func (r *caseRunner) runStepRendezvous(rendezvous *Rendezvous) (stepResult *stepData, err error) {
log.Info().
Str("name", rend.Name).
Float32("percent", rend.Percent).
Int64("number", rend.Number).
Int64("timeout", rend.Timeout).
Str("name", rendezvous.Name).
Float32("percent", rendezvous.Percent).
Int64("number", rendezvous.Number).
Int64("timeout", rendezvous.Timeout).
Msg("rendezvous")
stepResult = &stepData{
name: rend.Name,
name: rendezvous.Name,
stepType: stepTypeRendezvous,
success: true,
}
// pass current rendezvous if already released, activate rendezvous sequentially after spawn done
if rendezvous.isReleased() || !r.isPreRendezvousAllReleased(rendezvous) || !rendezvous.isSpawnDone() {
return stepResult, nil
}
// activate the rendezvous only once during each cycle
rendezvous.once.Do(func() {
close(rendezvous.activateChan)
})
// check current cnt using double check lock before updating to avoid negative WaitGroup counter
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
rendezvous.lock.Lock()
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
atomic.AddInt64(&rendezvous.cnt, 1)
rendezvous.wg.Done()
rendezvous.msg <- struct{}{}
}
rendezvous.lock.Unlock()
}
// block until current rendezvous released
<-rendezvous.releaseChan
return stepResult, nil
}
func (r *caseRunner) isPreRendezvousAllReleased(rendezvous *Rendezvous) bool {
tCase, _ := r.ToTCase()
for _, step := range tCase.TestSteps {
preRendezvous := step.Rendezvous
if preRendezvous == nil {
continue
}
// meet current rendezvous, all previous rendezvous released, return true
if preRendezvous == rendezvous {
return true
}
if !preRendezvous.isReleased() {
return false
}
}
return true
}
func (r *Rendezvous) reset() {
r.cnt = 0
r.releasedFlag = 0
r.wg.Add(int(r.Number))
// msg channel will not be closed, init only once
if r.msg == nil {
r.msg = make(chan struct{})
}
r.activateChan = make(chan struct{})
r.releaseChan = make(chan struct{})
r.once = new(sync.Once)
}
func (r *Rendezvous) isSpawnDone() bool {
return atomic.LoadUint32(&r.spawnDoneFlag) == 1
}
func (r *Rendezvous) setSpawnDone() {
atomic.StoreUint32(&r.spawnDoneFlag, 1)
}
func (r *Rendezvous) isReleased() bool {
return atomic.LoadUint32(&r.releasedFlag) == 1
}
func (r *Rendezvous) setReleased() {
atomic.StoreUint32(&r.releasedFlag, 1)
}
func initRendezvous(testcase *TestCase, total int64) []*Rendezvous {
tCase, _ := testcase.ToTCase()
var rendezvousList []*Rendezvous
for _, step := range tCase.TestSteps {
if step.Rendezvous == nil {
continue
}
rendezvous := step.Rendezvous
// either number or percent should be correctly put, otherwise set to default (total)
if rendezvous.Number == 0 && rendezvous.Percent > 0 && rendezvous.Percent <= defaultRendezvousPercent {
rendezvous.Number = int64(rendezvous.Percent * float32(total))
} else if rendezvous.Number > 0 && rendezvous.Number <= total && rendezvous.Percent == 0 {
rendezvous.Percent = float32(rendezvous.Number) / float32(total)
} else {
rendezvous.Number = total
rendezvous.Percent = defaultRendezvousPercent
}
if rendezvous.Timeout <= 0 {
rendezvous.Timeout = defaultRendezvousTimeout
}
rendezvous.reset()
rendezvousList = append(rendezvousList, rendezvous)
}
return rendezvousList
}
func waitRendezvous(rendezvousList []*Rendezvous) {
if rendezvousList != nil {
lastRendezvous := rendezvousList[len(rendezvousList)-1]
for _, rendezvous := range rendezvousList {
go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous)
}
}
}
func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous) {
for {
// cycle start: block current checking until current rendezvous activated
<-rendezvous.activateChan
stop := make(chan struct{})
timeout := time.Duration(rendezvous.Timeout) * time.Millisecond
timer := time.NewTimer(timeout)
go func() {
defer close(stop)
rendezvous.wg.Wait()
}()
for !rendezvous.isReleased() {
select {
case <-rendezvous.msg:
timer.Reset(timeout)
case <-stop:
rendezvous.setReleased()
close(rendezvous.releaseChan)
log.Warn().
Str("name", rendezvous.Name).
Float32("percent", rendezvous.Percent).
Int64("number", rendezvous.Number).
Int64("timeout(ms)", rendezvous.Timeout).
Int64("cnt", rendezvous.cnt).
Str("reason", "rendezvous release condition satisfied").
Msg("rendezvous released")
case <-timer.C:
rendezvous.setReleased()
close(rendezvous.releaseChan)
log.Warn().
Str("name", rendezvous.Name).
Float32("percent", rendezvous.Percent).
Int64("number", rendezvous.Number).
Int64("timeout(ms)", rendezvous.Timeout).
Int64("cnt", rendezvous.cnt).
Str("reason", "time's up").
Msg("rendezvous released")
}
}
// cycle end: reset all previous rendezvous after last rendezvous released
// otherwise, block current checker until the last rendezvous end
if rendezvous == lastRendezvous {
for _, r := range rendezvousList {
r.reset()
}
} else {
<-lastRendezvous.releaseChan
}
}
}
func (r *caseRunner) runStepRequest(step *TStep) (stepResult *stepData, err error) {
stepResult = &stepData{
name: step.Name,

28
step.go
View File

@@ -345,3 +345,31 @@ func (s *StepRendezvous) Type() string {
func (s *StepRendezvous) ToStruct() *TStep {
return s.step
}
// Rendezvous creates a new rendezvous
func (s *StepRequest) Rendezvous(name string) *StepRendezvous {
s.step.Rendezvous = &Rendezvous{
Name: name,
}
return &StepRendezvous{
step: s.step,
}
}
// WithUserNumber sets the user number needed to release the current rendezvous
func (s *StepRendezvous) WithUserNumber(number int64) *StepRendezvous {
s.step.Rendezvous.Number = number
return s
}
// WithUserPercent sets the user percent needed to release the current rendezvous
func (s *StepRendezvous) WithUserPercent(percent float32) *StepRendezvous {
s.step.Rendezvous.Percent = percent
return s
}
// WithTimeout sets the timeout of duration between each user arriving at the current rendezvous
func (s *StepRendezvous) WithTimeout(timeout int64) *StepRendezvous {
s.step.Rendezvous.Timeout = timeout
return s
}