mirror of
https://github.com/httprunner/httprunner.git
synced 2026-05-24 17:59:58 +08:00
feat: support rendezvous after spawn done
Change-Id: I4b07a88b61da4dc1863b189db9eb831ffb14130a
This commit is contained in:
174
runner.go
174
runner.go
@@ -13,6 +13,8 @@ import (
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -334,21 +336,181 @@ func (r *caseRunner) runStepTransaction(transaction *Transaction) (stepResult *s
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
func (r *caseRunner) runStepRendezvous(rend *Rendezvous) (stepResult *stepData, err error) {
|
||||
func (r *caseRunner) runStepRendezvous(rendezvous *Rendezvous) (stepResult *stepData, err error) {
|
||||
log.Info().
|
||||
Str("name", rend.Name).
|
||||
Float32("percent", rend.Percent).
|
||||
Int64("number", rend.Number).
|
||||
Int64("timeout", rend.Timeout).
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout", rendezvous.Timeout).
|
||||
Msg("rendezvous")
|
||||
stepResult = &stepData{
|
||||
name: rend.Name,
|
||||
name: rendezvous.Name,
|
||||
stepType: stepTypeRendezvous,
|
||||
success: true,
|
||||
}
|
||||
|
||||
// pass current rendezvous if already released, activate rendezvous sequentially after spawn done
|
||||
if rendezvous.isReleased() || !r.isPreRendezvousAllReleased(rendezvous) || !rendezvous.isSpawnDone() {
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
// activate the rendezvous only once during each cycle
|
||||
rendezvous.once.Do(func() {
|
||||
close(rendezvous.activateChan)
|
||||
})
|
||||
|
||||
// check current cnt using double check lock before updating to avoid negative WaitGroup counter
|
||||
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
|
||||
rendezvous.lock.Lock()
|
||||
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
|
||||
atomic.AddInt64(&rendezvous.cnt, 1)
|
||||
rendezvous.wg.Done()
|
||||
rendezvous.msg <- struct{}{}
|
||||
}
|
||||
rendezvous.lock.Unlock()
|
||||
}
|
||||
|
||||
// block until current rendezvous released
|
||||
<-rendezvous.releaseChan
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
func (r *caseRunner) isPreRendezvousAllReleased(rendezvous *Rendezvous) bool {
|
||||
tCase, _ := r.ToTCase()
|
||||
for _, step := range tCase.TestSteps {
|
||||
preRendezvous := step.Rendezvous
|
||||
if preRendezvous == nil {
|
||||
continue
|
||||
}
|
||||
// meet current rendezvous, all previous rendezvous released, return true
|
||||
if preRendezvous == rendezvous {
|
||||
return true
|
||||
}
|
||||
if !preRendezvous.isReleased() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Rendezvous) reset() {
|
||||
r.cnt = 0
|
||||
r.releasedFlag = 0
|
||||
r.wg.Add(int(r.Number))
|
||||
// msg channel will not be closed, init only once
|
||||
if r.msg == nil {
|
||||
r.msg = make(chan struct{})
|
||||
}
|
||||
r.activateChan = make(chan struct{})
|
||||
r.releaseChan = make(chan struct{})
|
||||
r.once = new(sync.Once)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) isSpawnDone() bool {
|
||||
return atomic.LoadUint32(&r.spawnDoneFlag) == 1
|
||||
}
|
||||
|
||||
func (r *Rendezvous) setSpawnDone() {
|
||||
atomic.StoreUint32(&r.spawnDoneFlag, 1)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) isReleased() bool {
|
||||
return atomic.LoadUint32(&r.releasedFlag) == 1
|
||||
}
|
||||
|
||||
func (r *Rendezvous) setReleased() {
|
||||
atomic.StoreUint32(&r.releasedFlag, 1)
|
||||
}
|
||||
|
||||
func initRendezvous(testcase *TestCase, total int64) []*Rendezvous {
|
||||
tCase, _ := testcase.ToTCase()
|
||||
var rendezvousList []*Rendezvous
|
||||
for _, step := range tCase.TestSteps {
|
||||
if step.Rendezvous == nil {
|
||||
continue
|
||||
}
|
||||
rendezvous := step.Rendezvous
|
||||
|
||||
// either number or percent should be correctly put, otherwise set to default (total)
|
||||
if rendezvous.Number == 0 && rendezvous.Percent > 0 && rendezvous.Percent <= defaultRendezvousPercent {
|
||||
rendezvous.Number = int64(rendezvous.Percent * float32(total))
|
||||
} else if rendezvous.Number > 0 && rendezvous.Number <= total && rendezvous.Percent == 0 {
|
||||
rendezvous.Percent = float32(rendezvous.Number) / float32(total)
|
||||
} else {
|
||||
rendezvous.Number = total
|
||||
rendezvous.Percent = defaultRendezvousPercent
|
||||
}
|
||||
|
||||
if rendezvous.Timeout <= 0 {
|
||||
rendezvous.Timeout = defaultRendezvousTimeout
|
||||
}
|
||||
|
||||
rendezvous.reset()
|
||||
rendezvousList = append(rendezvousList, rendezvous)
|
||||
}
|
||||
return rendezvousList
|
||||
}
|
||||
|
||||
func waitRendezvous(rendezvousList []*Rendezvous) {
|
||||
if rendezvousList != nil {
|
||||
lastRendezvous := rendezvousList[len(rendezvousList)-1]
|
||||
for _, rendezvous := range rendezvousList {
|
||||
go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous) {
|
||||
for {
|
||||
// cycle start: block current checking until current rendezvous activated
|
||||
<-rendezvous.activateChan
|
||||
stop := make(chan struct{})
|
||||
timeout := time.Duration(rendezvous.Timeout) * time.Millisecond
|
||||
timer := time.NewTimer(timeout)
|
||||
go func() {
|
||||
defer close(stop)
|
||||
rendezvous.wg.Wait()
|
||||
}()
|
||||
for !rendezvous.isReleased() {
|
||||
select {
|
||||
case <-rendezvous.msg:
|
||||
timer.Reset(timeout)
|
||||
case <-stop:
|
||||
rendezvous.setReleased()
|
||||
close(rendezvous.releaseChan)
|
||||
log.Warn().
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout(ms)", rendezvous.Timeout).
|
||||
Int64("cnt", rendezvous.cnt).
|
||||
Str("reason", "rendezvous release condition satisfied").
|
||||
Msg("rendezvous released")
|
||||
case <-timer.C:
|
||||
rendezvous.setReleased()
|
||||
close(rendezvous.releaseChan)
|
||||
log.Warn().
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout(ms)", rendezvous.Timeout).
|
||||
Int64("cnt", rendezvous.cnt).
|
||||
Str("reason", "time's up").
|
||||
Msg("rendezvous released")
|
||||
}
|
||||
}
|
||||
// cycle end: reset all previous rendezvous after last rendezvous released
|
||||
// otherwise, block current checker until the last rendezvous end
|
||||
if rendezvous == lastRendezvous {
|
||||
for _, r := range rendezvousList {
|
||||
r.reset()
|
||||
}
|
||||
} else {
|
||||
<-lastRendezvous.releaseChan
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *caseRunner) runStepRequest(step *TStep) (stepResult *stepData, err error) {
|
||||
stepResult = &stepData{
|
||||
name: step.Name,
|
||||
|
||||
Reference in New Issue
Block a user